百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT知识 > 正文

Kafka 事件流式处理 AI 和自动化_kafka流程图

liuian 2025-02-20 16:44 31 浏览

探索如何使用 ChatGPT 创建物联网 Kafka 事件消费者,并使用 API 逻辑服务器来生成定义范围之外的温度读取事件。

Apache Kafka 已成为从静态数据(数据库事务)迁移到事件流的企业架构的明显领导者。有许多演示文稿解释了 Kafka 的工作原理以及如何扩展此技术堆栈(本地或云)。使用 ChatGPT 构建一个微服务来消费消息并丰富、转换和持久化是该项目的下一阶段。在此示例中,我们将使用来自 IoT 设备 (RaspberryPi) 的输入,该设备每隔几秒钟发送一次 JSON 温度读数。

使用消息

生成(并记录)每条 Kafka 事件消息时,Kafka 微服务使用者已准备好处理每条消息。我让 ChatGPT 生成一些 Python 代码,它为我提供了从命名的“主题”中轮询和读取的基础知识。我得到的是一个非常好的开始,可以消耗主题、键和 JSON 有效负载。ChatGPT 创建了代码,使用 SQLAlchemy 将其持久化到数据库中。然后,我想转换 JSON 有效负载,并使用 API Logic Server(ALS - GitHub 上的一个开源项目)规则来取消 JSON,验证、计算并根据给定范围之外的源温度生成一组新的消息有效负载。

ChatGPT: “design a Python Event Streaming Kafka Consumer interface”

注意:ChatGPT 选择了 Confluent Kafka 库(并使用其 Docker Kafka 容器)- 您可以修改代码以使用其他 Python Kafka 库。

SQLAlchemy模型

使用 API Logic Server(ALS:Python 开源平台),我们连接到 MySQL 数据库。ALS 将读取这些表,并为每个 ORM 端点创建一个 SQLAlchemy ORM 模型、一个 react-admin 用户界面、safrs-JSON Open API (Swagger) 和一个正在运行的 REST Web 服务。新的温度表将包含时间戳、IoT 设备 ID 和温度读数。在这里,我们使用 ALS 命令行实用程序来创建 ORM 模型:

ApiLogicServer create --project_name=iot --db_url=mysql+pymysql://root:password@127.0.0.1:3308/iot

API Logic Server 生成的类用于保存我们的值。Temperature

class Temperature(SAFRSBase, Base):br
    __tablename__ = 'Temperature'br
    _s_collection_name = 'Temperature' # type: ignorebr
    __bind_key__ = 'None'br
br
    Id = Column(Integer, primary_key=True)br
    DeviceId = Column(Integer, nullable=False)br
    TempReading = Column(Integer, nullable=False)br
    CreateDT = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"), nullable=False)br
    KafkaMessageSent = Column(Booelan, default=text("False"))

变化

因此,我们不是将 Kafka JSON 使用者消息再次保存在 SQL 数据库中(并触发规则来执行工作),而是解包 JSON 有效负载 () 并将其插入 Temperature 表,而不是保存 JSON 有效负载。我们让声明性规则处理每个温度读数。util.row_to_entity

    entity = models.Temperature()br
    util.row_to_entity(message_data, entity) br
    session.add(entity)

当消费者收到消息时,它会将其添加到会话中,从而触发规则(如下)。commit_event

声明性逻辑:生成消息

使用 API Logic Server(使用 SQLAlchemy、Flask 和类似 LogicBank 电子表格的规则引擎构建的自动化框架:公式、总和、计数、复制、约束、事件等),我们在 ORM 实体上添加一个声明性规则。当每条消息都保存到 Temperature 表中时,将调用该规则。如果温度读数超过或小于 ,我们将发送有关该主题的 Kafka 消息。我们还添加了一个约束,以确保我们在正常范围 (-) 内接收数据。我们将让另一个事件使用者处理警报消息。
commit_eventTemperaturecommit_eventMAX_TEMPMIN_TEMP“TempRangeAlert”32132
TDD 行为测试

使用 TDD(Test Driven Development),我们可以编写一个 Behave 测试,将记录直接插入到 Temperature 表中,然后检查返回值。行为以 /(.feature 文件)开头。对于每个场景,我们使用装饰器编写相应的 Python 类。
KafkaMessageSentFeatureScenarioBehave

功能定义

Feature: TDD Temperature Examplebr
br
Scenario: Temperature Processingbr
  Given A Kafka Message Normal (Temperature)br
  When Transactions normal temperature is submittedbr
  Then Check KafkaMessageSent Flag is Falsebr
br
Scenario: Temperature Processingbr
  Given A Kafka Message Abnormal (Temperature)br
  When Transactions abnormal temperature is submittedbr
  Then Check KafkaMessageSent Flag is True

TDD Python 类

from behave import *br
import safrsbr
br
db = safrs.DB br
session = db.sessionbr
br
def insertTemperature(temp:int) -> bool:br
    entity = model.Temperature()br
    entity.TempReading = tempbr
    entity.DeviceId = 'local_behave_test'br
    session.add(entity) br
    return entity.KafkaMessageSent br
br
@given('A Kafka Message Normal (Temperature)')br
def step_impl(context):br
    context.temp = 76br
    assert Truebr
br
@when('Transactions normal temperature is submitted')br
def step_impl(context):br
    context.response_text = insertTemperature(context.temp)br
br
@then('Check KafkaMessageSent Flag is False')br
def step_impl(context):br
    assert context.response_text  == False


总结

使用 ChatGPT 为 Consumer 和 Producer 生成 Kafka 消息代码似乎是一个很好的起点。安装 Confluent Docker for Kafka。将 API Logic Server 用于声明性逻辑规则,使我们能够将公式、约束和事件添加到正常的事务流中,并将其添加到我们的 SQL 数据库中,并生成(和转换)新的 Kafka 消息,这是一个很好的组合。ChatGPT 和声明式逻辑是“配对编程”的下一个层次。

from confluent_kafka import Producerbr
conf = {'bootstrap.servers': 'localhostd:9092'}br
producer = Producer(conf)br
MAX_TEMP = arg.MAX_TEMP or 102br
MIN_TEMP = arg.MIN_TTEMP or 78br
    br
def produce_message(br
    row: models.KafkaMessage, br
    old_row: models.KafkaMessage, br
    logic_row: LogicRow):br
  br
    if logic_row.isInserted() and row.TempReading > MAX_TEMP:br
        produce(topic="TempRangeAlert", br
               key=row.Id,br
               value=f"The temperature {row.TempReading}F exceeds {MAX_TEMP}F on Device {row.DeviceId}")br
    row.KafkaMessageSent = Truebr
br
  if logic_row.isInserted() and row.TempReading < MIN_TEMP:br
        produce(topic="TempRangeAlert", br
               key=row.Id,br
               value=f"The temperature {row.TempReading}F less than {MIN_TEMP}F on Device {row.DeviceId}")br
br
    row.KafkaMessageSent = Truebr
    br
   Rules.constraint(models.Temperature, br
                    as_expression= lambda row: row.TempReading < 32 or row.TempReading > 132, br
                    error_message= "Temperature {row.TempReading} is out of range"br
   Rules.commit_event(models.Temperature, calling=produce_message)

仅当温度读数大于或小于时才会生成警报消息。Constraint 将在调用 commit 事件之前检查温度范围(请注意,规则始终是无序的,可以随着规范的变化而引入)。MAX_TEMPMIN_TEMP


原文标题:Kafka Event Streaming AI and Automation

原文链接:
https://dzone.com/articles/event-streaming-ai-amp-automation

作者:Tyler Band

编译:LCR

相关推荐

搭建一个20人的办公网络(适用于20多人的小型办公网络环境)

楼主有5台机上网,则需要一个8口路由器,组网方法如下:设备:1、8口路由器一台,其中8口为LAN(局域网)端口,一个WAN(广域网)端口,价格100--400元2、网线N米,这个你自己会看了:)...

笔记本电脑各种参数介绍(笔记本电脑各项参数新手普及知识)

1、CPU:这个主要取决于频率和二级缓存,频率越高、二级缓存越大,速度越快,现在的CPU有三级缓存、四级缓存等,都影响相应速度。2、内存:内存的存取速度取决于接口、颗粒数量多少与储存大小,一般来说,内...

汉字上面带拼音输入法下载(字上面带拼音的输入法是哪个)

使用手机上的拼音输入法打成汉字的方法如下:1.打开手机上的拼音输入法,在输入框中输入汉字的拼音,例如“nihao”。2.根据输入法提示的候选词,选择正确的汉字。例如,如果输入“nihao”,输...

xpsp3安装版系统下载(windowsxpsp3安装教程)

xpsp3纯净版在采用微软封装部署技术的基础上,结合作者的实际工作经验,融合了许多实用的功能。它通过一键分区、一键装系统、自动装驱动、一键设定分辨率,一键填IP,一键Ghost备份(恢复)等一系列...

没有备份的手机数据怎么恢复

手机没有备份恢复数据方法如下1、使用数据线将手机与电脑连接好,在“我的电脑”中可以看到手机的盘符。  2、将手机开启USB调试模式。在手机设置中找到开发者选项,然后点击“开启USB调试模式”。  3、...

电脑怎么激活windows11专业版

win11专业版激活方法有多种,以下提供两种常用的激活方式:方法一:使用激活密钥激活。在win11桌面上右键点击“此电脑”,选择“属性”选项。进入属性页面后,点击“更改产品密钥或升级windows”。...

华为手机助手下载官网(华为手机助手app下载专区)

华为手机助手策略调整,已不支持从应用市场下载手机助手,目前华为手机助手是需要在电脑上下载或更新手机助手到最新版本,https://consumer.huawei.com/cn/support/his...

光纤线断了怎么接(宽带光纤线断了怎么接)

宽带光纤线断了可以重接,具体操作方法如下:1、光纤连接的时候要根据束管内,同色相连,同芯相连,按顺序进行连接,由大到小。一般有三种连接方法,分别是熔接、活动连接和机械连接。2、连接的时候要开剥光缆,抛...

深度操作系统安装教程(深度操作系统安装教程图解)
  • 深度操作系统安装教程(深度操作系统安装教程图解)
  • 深度操作系统安装教程(深度操作系统安装教程图解)
  • 深度操作系统安装教程(深度操作系统安装教程图解)
  • 深度操作系统安装教程(深度操作系统安装教程图解)
win7旗舰版和专业版区别(win7旗舰版跟专业版)

1、功能区别:Win7旗舰版比专业版多了三个功能,分别是Bitlocker、BitlockerToGo和多语言界面; 2、用途区别:旗舰版的功能是所有版本中最全最强大的,占用的系统资源,...

万能连接钥匙(万能wifi连接钥匙下载)

1、首先打开wifi万能钥匙软件,若手机没有开启WLAN,就根据软件提示打开WLAN开关;2、打开WLAN开关后,会显示附近的WiFi,如果知道密码,可点击相应WiFi后点击‘输入密码’连接;3、若不...

雨林木风音乐叫什么(雨林木风是啥)

雨林木风的创始人是陈年鑫先生。陈年鑫先生于1999年创立了雨林木风公司,其初衷是为满足中国市场对高品质、高性能电脑的需求。在陈年鑫先生的领导下,雨林木风以技术创新、产品质量和客户服务为核心价值,不断推...

aics6序列号永久序列号(aics6破解序列号)

关于AICS6这个版本,虽然是比较久远的版本,但是在功能上也是十分全面和强大的,作为一名平面设计师的话,AICS6的现有的功能已经能够应付几乎所有的设计工作了……到底AICC2019的功能是不是...

win7正在启动windows 卡住(win7正在启动windows卡住了 进入安全模式)
  • win7正在启动windows 卡住(win7正在启动windows卡住了 进入安全模式)
  • win7正在启动windows 卡住(win7正在启动windows卡住了 进入安全模式)
  • win7正在启动windows 卡住(win7正在启动windows卡住了 进入安全模式)
  • win7正在启动windows 卡住(win7正在启动windows卡住了 进入安全模式)
手机可以装电脑系统吗(手机可以装电脑系统吗怎么装)

答题公式1:手机可以通过数据线或无线连接的方式给电脑装系统。手机安装系统需要一定的技巧和软件支持,一般需要通过数据线或无线连接的方式与电脑连接,并下载相应的软件和系统文件进行安装。对于大部分手机用户来...