百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT知识 > 正文

Kafka 事件流式处理 AI 和自动化_kafka流程图

liuian 2025-02-20 16:44 12 浏览

探索如何使用 ChatGPT 创建物联网 Kafka 事件消费者,并使用 API 逻辑服务器来生成定义范围之外的温度读取事件。

Apache Kafka 已成为从静态数据(数据库事务)迁移到事件流的企业架构的明显领导者。有许多演示文稿解释了 Kafka 的工作原理以及如何扩展此技术堆栈(本地或云)。使用 ChatGPT 构建一个微服务来消费消息并丰富、转换和持久化是该项目的下一阶段。在此示例中,我们将使用来自 IoT 设备 (RaspberryPi) 的输入,该设备每隔几秒钟发送一次 JSON 温度读数。

使用消息

生成(并记录)每条 Kafka 事件消息时,Kafka 微服务使用者已准备好处理每条消息。我让 ChatGPT 生成一些 Python 代码,它为我提供了从命名的“主题”中轮询和读取的基础知识。我得到的是一个非常好的开始,可以消耗主题、键和 JSON 有效负载。ChatGPT 创建了代码,使用 SQLAlchemy 将其持久化到数据库中。然后,我想转换 JSON 有效负载,并使用 API Logic Server(ALS - GitHub 上的一个开源项目)规则来取消 JSON,验证、计算并根据给定范围之外的源温度生成一组新的消息有效负载。

ChatGPT: “design a Python Event Streaming Kafka Consumer interface”

注意:ChatGPT 选择了 Confluent Kafka 库(并使用其 Docker Kafka 容器)- 您可以修改代码以使用其他 Python Kafka 库。

SQLAlchemy模型

使用 API Logic Server(ALS:Python 开源平台),我们连接到 MySQL 数据库。ALS 将读取这些表,并为每个 ORM 端点创建一个 SQLAlchemy ORM 模型、一个 react-admin 用户界面、safrs-JSON Open API (Swagger) 和一个正在运行的 REST Web 服务。新的温度表将包含时间戳、IoT 设备 ID 和温度读数。在这里,我们使用 ALS 命令行实用程序来创建 ORM 模型:

ApiLogicServer create --project_name=iot --db_url=mysql+pymysql://root:password@127.0.0.1:3308/iot

API Logic Server 生成的类用于保存我们的值。Temperature

class Temperature(SAFRSBase, Base):br
    __tablename__ = 'Temperature'br
    _s_collection_name = 'Temperature' # type: ignorebr
    __bind_key__ = 'None'br
br
    Id = Column(Integer, primary_key=True)br
    DeviceId = Column(Integer, nullable=False)br
    TempReading = Column(Integer, nullable=False)br
    CreateDT = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"), nullable=False)br
    KafkaMessageSent = Column(Booelan, default=text("False"))

变化

因此,我们不是将 Kafka JSON 使用者消息再次保存在 SQL 数据库中(并触发规则来执行工作),而是解包 JSON 有效负载 () 并将其插入 Temperature 表,而不是保存 JSON 有效负载。我们让声明性规则处理每个温度读数。util.row_to_entity

    entity = models.Temperature()br
    util.row_to_entity(message_data, entity) br
    session.add(entity)

当消费者收到消息时,它会将其添加到会话中,从而触发规则(如下)。commit_event

声明性逻辑:生成消息

使用 API Logic Server(使用 SQLAlchemy、Flask 和类似 LogicBank 电子表格的规则引擎构建的自动化框架:公式、总和、计数、复制、约束、事件等),我们在 ORM 实体上添加一个声明性规则。当每条消息都保存到 Temperature 表中时,将调用该规则。如果温度读数超过或小于 ,我们将发送有关该主题的 Kafka 消息。我们还添加了一个约束,以确保我们在正常范围 (-) 内接收数据。我们将让另一个事件使用者处理警报消息。
commit_eventTemperaturecommit_eventMAX_TEMPMIN_TEMP“TempRangeAlert”32132
TDD 行为测试

使用 TDD(Test Driven Development),我们可以编写一个 Behave 测试,将记录直接插入到 Temperature 表中,然后检查返回值。行为以 /(.feature 文件)开头。对于每个场景,我们使用装饰器编写相应的 Python 类。
KafkaMessageSentFeatureScenarioBehave

功能定义

Feature: TDD Temperature Examplebr
br
Scenario: Temperature Processingbr
  Given A Kafka Message Normal (Temperature)br
  When Transactions normal temperature is submittedbr
  Then Check KafkaMessageSent Flag is Falsebr
br
Scenario: Temperature Processingbr
  Given A Kafka Message Abnormal (Temperature)br
  When Transactions abnormal temperature is submittedbr
  Then Check KafkaMessageSent Flag is True

TDD Python 类

from behave import *br
import safrsbr
br
db = safrs.DB br
session = db.sessionbr
br
def insertTemperature(temp:int) -> bool:br
    entity = model.Temperature()br
    entity.TempReading = tempbr
    entity.DeviceId = 'local_behave_test'br
    session.add(entity) br
    return entity.KafkaMessageSent br
br
@given('A Kafka Message Normal (Temperature)')br
def step_impl(context):br
    context.temp = 76br
    assert Truebr
br
@when('Transactions normal temperature is submitted')br
def step_impl(context):br
    context.response_text = insertTemperature(context.temp)br
br
@then('Check KafkaMessageSent Flag is False')br
def step_impl(context):br
    assert context.response_text  == False


总结

使用 ChatGPT 为 Consumer 和 Producer 生成 Kafka 消息代码似乎是一个很好的起点。安装 Confluent Docker for Kafka。将 API Logic Server 用于声明性逻辑规则,使我们能够将公式、约束和事件添加到正常的事务流中,并将其添加到我们的 SQL 数据库中,并生成(和转换)新的 Kafka 消息,这是一个很好的组合。ChatGPT 和声明式逻辑是“配对编程”的下一个层次。

from confluent_kafka import Producerbr
conf = {'bootstrap.servers': 'localhostd:9092'}br
producer = Producer(conf)br
MAX_TEMP = arg.MAX_TEMP or 102br
MIN_TEMP = arg.MIN_TTEMP or 78br
    br
def produce_message(br
    row: models.KafkaMessage, br
    old_row: models.KafkaMessage, br
    logic_row: LogicRow):br
  br
    if logic_row.isInserted() and row.TempReading > MAX_TEMP:br
        produce(topic="TempRangeAlert", br
               key=row.Id,br
               value=f"The temperature {row.TempReading}F exceeds {MAX_TEMP}F on Device {row.DeviceId}")br
    row.KafkaMessageSent = Truebr
br
  if logic_row.isInserted() and row.TempReading < MIN_TEMP:br
        produce(topic="TempRangeAlert", br
               key=row.Id,br
               value=f"The temperature {row.TempReading}F less than {MIN_TEMP}F on Device {row.DeviceId}")br
br
    row.KafkaMessageSent = Truebr
    br
   Rules.constraint(models.Temperature, br
                    as_expression= lambda row: row.TempReading < 32 or row.TempReading > 132, br
                    error_message= "Temperature {row.TempReading} is out of range"br
   Rules.commit_event(models.Temperature, calling=produce_message)

仅当温度读数大于或小于时才会生成警报消息。Constraint 将在调用 commit 事件之前检查温度范围(请注意,规则始终是无序的,可以随着规范的变化而引入)。MAX_TEMPMIN_TEMP


原文标题:Kafka Event Streaming AI and Automation

原文链接:
https://dzone.com/articles/event-streaming-ai-amp-automation

作者:Tyler Band

编译:LCR

相关推荐

打开新世界,教你用RooCode+Copliot+Mcp打造一个自己的Manus

本文耗时两天打造,想要一遍走通需要花点时间,建议找个专注的时间开搞!这不仅是个免费使用claude3.5的方案,也是一个超级智能体方案,绝对值得一试!最近Manus真是赚足了眼球,然而我还是没有邀请码...

Git仓库(git仓库有哪些)

#Git仓库使用方法流程详解##一、环境搭建与基础配置###1.1安装与初始化-**安装Git**:官网下载安装包,默认配置安装-**配置全局信息**:```bashgitconfig...

idea版的cursor:Windsurf Wave 7(ideawalk)

在企业环境中,VisualStudioCode和JetBrains系列是最常用的开发工具,覆盖了全球绝大多数开发者。这两类IDE各有优势,但JetBrains系列凭借其针对特定语言和企业场景的深度...

Ai 编辑器 Cursor 零基础教程:推箱子小游戏实战演练

最近Ai火的同时,Ai编辑器Cursor同样火了一把。今天我们就白漂一下Cursor,使用免费版本搞一个零基础教程,并实战演练一个“网页版的推箱子小游戏”。通过这篇文章,让你真正了解cursor是什么...

ChatGPT深度集成于苹果Mac软件 编码能力得到提升

【CNMO科技消息】近日,OpenAI发布了针对MacOS的桌面应用程序,并宣布了一系列与各类应用程序的互操作性功能,标志着ChatGPT正在从聊天机器人向AI智能体工具进化。此次发布的MacOS桌面...

日常开发中常用的git操作命令和使用技巧

日常开发中常用的git操作命令,从配置、初始化本地仓库到提交代码的常用git操作命令使用git前的配置刚使用git,先要在电脑上安装好git,接着我们需要配置一下帐户信息:用户名和邮箱。#设置用户名...

Trae IDE 如何与 GitHub 无缝对接?

TraeIDE内置了GitHub集成功能,让开发者可以直接在IDE里管理代码仓库和版本控制。1.直接从GitHub克隆项目如果你想把GitHub上的代码拉到本地,Trae提供了...

China&#39;s diplomacy to further provide strong support for country&#39;s modernization: FM

BEIJING,March7(Xinhua)--ChineseForeignMinisterWangYisaidFridaythatChina'sdiplomacywil...

三十分钟入门基础Go(Java小子版)(java入门级教程)

前言Go语言定义Go(又称Golang)是Google的RobertGriesemer,RobPike及KenThompson开发的一种静态、强类型、编译型语言。Go语言语法与...

China will definitely take countermeasures in response to arbitrary pressure: FM

BEIJING,March7(Xinhua)--Chinawilldefinitelytakecountermeasuresinresponsetoarbitrarypre...

Go操作etcd(go操作docker实现沙箱)

Go语言操作etcd,这里推荐官方包etcd/clientv3。文档:https://pkg.go.dev/go.etcd.io/etcd/clientv3etcdv3使用gRPC进行远程过程调...

腾讯 Go 性能优化实战(腾讯游戏优化软件)

作者:trumanyan,腾讯CSIG后台开发工程师项目背景网关服务作为统一接入服务,是大部分服务的统一入口。为了避免成功瓶颈,需要对其进行尽可能地优化。因此,特别总结一下golang后台服务...

golang 之JWT实现(golang gin jwt)

什么是JSONWebToken?JSONWebToken(JWT)是一个开放标准(RFC7519),它定义了一种紧凑且自包含的方式,用于在各方之间以JSON方式安全地传输信息。由于此信息是经...

一文看懂 session 和 cookie(session cookie的区别)

-----------cookie大家应该都熟悉,比如说登录某些网站一段时间后,就要求你重新登录;再比如有的同学很喜欢玩爬虫技术,有时候网站就是可以拦截住你的爬虫,这些都和cookie有关。如果...

有望取代 java?GO 语言项目了解一下

GO语言在编程界一直让人又爱又恨,有人说“GO将统治下一个十年”,“几乎所有新的、有趣的东西都是用Go写的”;也有人说它过于死板,使用感太差。国外有Google、AWS、Cloudflar...