百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT知识 > 正文

Kafka 事件流式处理 AI 和自动化_kafka流程图

liuian 2025-02-20 16:44 42 浏览

探索如何使用 ChatGPT 创建物联网 Kafka 事件消费者,并使用 API 逻辑服务器来生成定义范围之外的温度读取事件。

Apache Kafka 已成为从静态数据(数据库事务)迁移到事件流的企业架构的明显领导者。有许多演示文稿解释了 Kafka 的工作原理以及如何扩展此技术堆栈(本地或云)。使用 ChatGPT 构建一个微服务来消费消息并丰富、转换和持久化是该项目的下一阶段。在此示例中,我们将使用来自 IoT 设备 (RaspberryPi) 的输入,该设备每隔几秒钟发送一次 JSON 温度读数。

使用消息

生成(并记录)每条 Kafka 事件消息时,Kafka 微服务使用者已准备好处理每条消息。我让 ChatGPT 生成一些 Python 代码,它为我提供了从命名的“主题”中轮询和读取的基础知识。我得到的是一个非常好的开始,可以消耗主题、键和 JSON 有效负载。ChatGPT 创建了代码,使用 SQLAlchemy 将其持久化到数据库中。然后,我想转换 JSON 有效负载,并使用 API Logic Server(ALS - GitHub 上的一个开源项目)规则来取消 JSON,验证、计算并根据给定范围之外的源温度生成一组新的消息有效负载。

ChatGPT: “design a Python Event Streaming Kafka Consumer interface”

注意:ChatGPT 选择了 Confluent Kafka 库(并使用其 Docker Kafka 容器)- 您可以修改代码以使用其他 Python Kafka 库。

SQLAlchemy模型

使用 API Logic Server(ALS:Python 开源平台),我们连接到 MySQL 数据库。ALS 将读取这些表,并为每个 ORM 端点创建一个 SQLAlchemy ORM 模型、一个 react-admin 用户界面、safrs-JSON Open API (Swagger) 和一个正在运行的 REST Web 服务。新的温度表将包含时间戳、IoT 设备 ID 和温度读数。在这里,我们使用 ALS 命令行实用程序来创建 ORM 模型:

ApiLogicServer create --project_name=iot --db_url=mysql+pymysql://root:password@127.0.0.1:3308/iot

API Logic Server 生成的类用于保存我们的值。Temperature

class Temperature(SAFRSBase, Base):br
    __tablename__ = 'Temperature'br
    _s_collection_name = 'Temperature' # type: ignorebr
    __bind_key__ = 'None'br
br
    Id = Column(Integer, primary_key=True)br
    DeviceId = Column(Integer, nullable=False)br
    TempReading = Column(Integer, nullable=False)br
    CreateDT = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"), nullable=False)br
    KafkaMessageSent = Column(Booelan, default=text("False"))

变化

因此,我们不是将 Kafka JSON 使用者消息再次保存在 SQL 数据库中(并触发规则来执行工作),而是解包 JSON 有效负载 () 并将其插入 Temperature 表,而不是保存 JSON 有效负载。我们让声明性规则处理每个温度读数。util.row_to_entity

    entity = models.Temperature()br
    util.row_to_entity(message_data, entity) br
    session.add(entity)

当消费者收到消息时,它会将其添加到会话中,从而触发规则(如下)。commit_event

声明性逻辑:生成消息

使用 API Logic Server(使用 SQLAlchemy、Flask 和类似 LogicBank 电子表格的规则引擎构建的自动化框架:公式、总和、计数、复制、约束、事件等),我们在 ORM 实体上添加一个声明性规则。当每条消息都保存到 Temperature 表中时,将调用该规则。如果温度读数超过或小于 ,我们将发送有关该主题的 Kafka 消息。我们还添加了一个约束,以确保我们在正常范围 (-) 内接收数据。我们将让另一个事件使用者处理警报消息。
commit_eventTemperaturecommit_eventMAX_TEMPMIN_TEMP“TempRangeAlert”32132
TDD 行为测试

使用 TDD(Test Driven Development),我们可以编写一个 Behave 测试,将记录直接插入到 Temperature 表中,然后检查返回值。行为以 /(.feature 文件)开头。对于每个场景,我们使用装饰器编写相应的 Python 类。
KafkaMessageSentFeatureScenarioBehave

功能定义

Feature: TDD Temperature Examplebr
br
Scenario: Temperature Processingbr
  Given A Kafka Message Normal (Temperature)br
  When Transactions normal temperature is submittedbr
  Then Check KafkaMessageSent Flag is Falsebr
br
Scenario: Temperature Processingbr
  Given A Kafka Message Abnormal (Temperature)br
  When Transactions abnormal temperature is submittedbr
  Then Check KafkaMessageSent Flag is True

TDD Python 类

from behave import *br
import safrsbr
br
db = safrs.DB br
session = db.sessionbr
br
def insertTemperature(temp:int) -> bool:br
    entity = model.Temperature()br
    entity.TempReading = tempbr
    entity.DeviceId = 'local_behave_test'br
    session.add(entity) br
    return entity.KafkaMessageSent br
br
@given('A Kafka Message Normal (Temperature)')br
def step_impl(context):br
    context.temp = 76br
    assert Truebr
br
@when('Transactions normal temperature is submitted')br
def step_impl(context):br
    context.response_text = insertTemperature(context.temp)br
br
@then('Check KafkaMessageSent Flag is False')br
def step_impl(context):br
    assert context.response_text  == False


总结

使用 ChatGPT 为 Consumer 和 Producer 生成 Kafka 消息代码似乎是一个很好的起点。安装 Confluent Docker for Kafka。将 API Logic Server 用于声明性逻辑规则,使我们能够将公式、约束和事件添加到正常的事务流中,并将其添加到我们的 SQL 数据库中,并生成(和转换)新的 Kafka 消息,这是一个很好的组合。ChatGPT 和声明式逻辑是“配对编程”的下一个层次。

from confluent_kafka import Producerbr
conf = {'bootstrap.servers': 'localhostd:9092'}br
producer = Producer(conf)br
MAX_TEMP = arg.MAX_TEMP or 102br
MIN_TEMP = arg.MIN_TTEMP or 78br
    br
def produce_message(br
    row: models.KafkaMessage, br
    old_row: models.KafkaMessage, br
    logic_row: LogicRow):br
  br
    if logic_row.isInserted() and row.TempReading > MAX_TEMP:br
        produce(topic="TempRangeAlert", br
               key=row.Id,br
               value=f"The temperature {row.TempReading}F exceeds {MAX_TEMP}F on Device {row.DeviceId}")br
    row.KafkaMessageSent = Truebr
br
  if logic_row.isInserted() and row.TempReading < MIN_TEMP:br
        produce(topic="TempRangeAlert", br
               key=row.Id,br
               value=f"The temperature {row.TempReading}F less than {MIN_TEMP}F on Device {row.DeviceId}")br
br
    row.KafkaMessageSent = Truebr
    br
   Rules.constraint(models.Temperature, br
                    as_expression= lambda row: row.TempReading < 32 or row.TempReading > 132, br
                    error_message= "Temperature {row.TempReading} is out of range"br
   Rules.commit_event(models.Temperature, calling=produce_message)

仅当温度读数大于或小于时才会生成警报消息。Constraint 将在调用 commit 事件之前检查温度范围(请注意,规则始终是无序的,可以随着规范的变化而引入)。MAX_TEMPMIN_TEMP


原文标题:Kafka Event Streaming AI and Automation

原文链接:
https://dzone.com/articles/event-streaming-ai-amp-automation

作者:Tyler Band

编译:LCR

相关推荐

最新款手机vivo(vivo最新款手机及价格表)

vivoX60Pro+、vivoX70Pro、vivoX60、vivoX70、vivoX70Pro+、vivoiQOO7、vivoiQOO7Pro、vivoIQOO8、viv...

oppo万能密码6位密码(oppo手机6个数万能密码)

oppo手机6位数万能密码是:67766776,但是不要轻易尝试,因为当输入这个密码之后没你的手机就会自动关机,之后会自动重启开机,然后你的手机密码都会变成预设的状态,当手机变成预设密码状态之后,我们...

系统重装win10专业版官网(重装系统win10专业版步骤和详细教程)

win10重装系统后也需要安装硬件驱动的,不过win10自带的系统更新会主动扫描硬件并下载适配的硬件驱动程序,再自动安装,所以win10在装驱动这方面比之前的Windows系统方便得多。win10安装...

u盘文件恢复工具破解版免费(u盘文件恢复工具软件)
u盘文件恢复工具破解版免费(u盘文件恢复工具软件)

迷你兔数据恢复提供了免费版供大家试用,这个免费版只是有恢复额度上的限制,其他方面基本上与正版是一样的,你可以用来感受一下步骤/方式1免费恢复u盘数据的方法如下:第1步:关闭软件的注册页面,选择免费试用。在扫描页面,选择要扫描的所有文件类型,...

2026-01-11 06:55 liuian

win10密钥在哪里买(win10系统密钥在哪儿买)

Windows密钥的正规获取渠道有以下几种:1.购买正版Windows操作系统。在微软官方网站或授权的经销商处购买正版Windows操作系统,可以获得正规的密钥。2.从计算机制造商处获取。一些品牌...

万能声卡驱动器官方下载win10

重新安装声卡驱动的方法如下:1.首先需要确定你的声卡品牌和型号,在设备管理器中搜索声卡,展开此项,可以看到声卡的品牌和型号。2.在浏览器中搜索所需的驱动程序,例如你的声卡型号是RealtekHi...

usb驱动程序在哪里(usb驱动叫什么名字)
usb驱动程序在哪里(usb驱动叫什么名字)

U盘添加驱动号或路径的方法如下在我的电脑上按右键,在快捷菜单里,选择“管理”,打开“计算机管理”窗口。在计算机管理窗口里,选择“存储”下面的“磁盘管理”,如果看得到没有盘符的U盘,那么在这个U盘上按鼠标右键,选择“更改驱动器名称和路径”选项...

2026-01-11 05:05 liuian

windows 10专业版怎么激活(windows十专业版怎么激活)
  • windows 10专业版怎么激活(windows十专业版怎么激活)
  • windows 10专业版怎么激活(windows十专业版怎么激活)
  • windows 10专业版怎么激活(windows十专业版怎么激活)
  • windows 10专业版怎么激活(windows十专业版怎么激活)
美德少年事迹材料(美德少年事迹材料500字左右)

就写平时做了什么好事就可以了。他们分别是许昌市文化街小学六(1)班学生谭天、许昌市第一中学七(12)班学生安家宝。现年12岁的谭天是一个阳光男孩儿,他性格活泼,热情开朗,富有爱心,品学兼优,有较强的集...

手机改无线路由器密码(手机改路由器密码怎么改教程)
  • 手机改无线路由器密码(手机改路由器密码怎么改教程)
  • 手机改无线路由器密码(手机改路由器密码怎么改教程)
  • 手机改无线路由器密码(手机改路由器密码怎么改教程)
  • 手机改无线路由器密码(手机改路由器密码怎么改教程)
win7万能网卡驱动离线版安装包

要使用Win7网卡驱动离线包,首先将离线包下载到计算机上。然后,打开设备管理器,找到你的网卡设备。右键点击该设备,选择“更新驱动程序软件”。在弹出的对话框中,选择“浏览计算机以查找驱动程序软件”。然后...

音频驱动器怎么安装(音频驱动程序怎么安装)
音频驱动器怎么安装(音频驱动程序怎么安装)

1、在浏览器中输入并搜索,然后下载并安装。2、安装完成后打开360驱动大师,它就会自动检测你的电脑需要安装或升级的驱动。3、检测完毕后,我们可以看到我们的声卡驱动需要安装或升级,点击安装或升级,就会开始自动安装或升级声卡了。4、升级过程中会...

2026-01-11 02:55 liuian

硬盘分区win10(硬盘分区win7)
  • 硬盘分区win10(硬盘分区win7)
  • 硬盘分区win10(硬盘分区win7)
  • 硬盘分区win10(硬盘分区win7)
  • 硬盘分区win10(硬盘分区win7)
win11要不要升级

答案是:不必强更,稍安勿躁。  没错,Windows11系统的确是微软的最新力作,其中安卓APP可以在桌面系统中直接使用的“噱头”也极有吸引力,但是,按照win10更新后bug层出不穷的情况来看,正...

windows7联想旗舰版(联想win7旗舰版配置)

你好!联想Windows7旗舰版并不是一个显示设备,而是一个操作系统。因此,无法用英寸来描述其大小。旗舰版是指Windows7操作系统的最高版本,具有更多的功能和特性与其他版本不同。Windows...