百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT知识 > 正文

Kafka 事件流式处理 AI 和自动化_kafka流程图

liuian 2025-02-20 16:44 18 浏览

探索如何使用 ChatGPT 创建物联网 Kafka 事件消费者,并使用 API 逻辑服务器来生成定义范围之外的温度读取事件。

Apache Kafka 已成为从静态数据(数据库事务)迁移到事件流的企业架构的明显领导者。有许多演示文稿解释了 Kafka 的工作原理以及如何扩展此技术堆栈(本地或云)。使用 ChatGPT 构建一个微服务来消费消息并丰富、转换和持久化是该项目的下一阶段。在此示例中,我们将使用来自 IoT 设备 (RaspberryPi) 的输入,该设备每隔几秒钟发送一次 JSON 温度读数。

使用消息

生成(并记录)每条 Kafka 事件消息时,Kafka 微服务使用者已准备好处理每条消息。我让 ChatGPT 生成一些 Python 代码,它为我提供了从命名的“主题”中轮询和读取的基础知识。我得到的是一个非常好的开始,可以消耗主题、键和 JSON 有效负载。ChatGPT 创建了代码,使用 SQLAlchemy 将其持久化到数据库中。然后,我想转换 JSON 有效负载,并使用 API Logic Server(ALS - GitHub 上的一个开源项目)规则来取消 JSON,验证、计算并根据给定范围之外的源温度生成一组新的消息有效负载。

ChatGPT: “design a Python Event Streaming Kafka Consumer interface”

注意:ChatGPT 选择了 Confluent Kafka 库(并使用其 Docker Kafka 容器)- 您可以修改代码以使用其他 Python Kafka 库。

SQLAlchemy模型

使用 API Logic Server(ALS:Python 开源平台),我们连接到 MySQL 数据库。ALS 将读取这些表,并为每个 ORM 端点创建一个 SQLAlchemy ORM 模型、一个 react-admin 用户界面、safrs-JSON Open API (Swagger) 和一个正在运行的 REST Web 服务。新的温度表将包含时间戳、IoT 设备 ID 和温度读数。在这里,我们使用 ALS 命令行实用程序来创建 ORM 模型:

ApiLogicServer create --project_name=iot --db_url=mysql+pymysql://root:password@127.0.0.1:3308/iot

API Logic Server 生成的类用于保存我们的值。Temperature

class Temperature(SAFRSBase, Base):br
    __tablename__ = 'Temperature'br
    _s_collection_name = 'Temperature' # type: ignorebr
    __bind_key__ = 'None'br
br
    Id = Column(Integer, primary_key=True)br
    DeviceId = Column(Integer, nullable=False)br
    TempReading = Column(Integer, nullable=False)br
    CreateDT = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"), nullable=False)br
    KafkaMessageSent = Column(Booelan, default=text("False"))

变化

因此,我们不是将 Kafka JSON 使用者消息再次保存在 SQL 数据库中(并触发规则来执行工作),而是解包 JSON 有效负载 () 并将其插入 Temperature 表,而不是保存 JSON 有效负载。我们让声明性规则处理每个温度读数。util.row_to_entity

    entity = models.Temperature()br
    util.row_to_entity(message_data, entity) br
    session.add(entity)

当消费者收到消息时,它会将其添加到会话中,从而触发规则(如下)。commit_event

声明性逻辑:生成消息

使用 API Logic Server(使用 SQLAlchemy、Flask 和类似 LogicBank 电子表格的规则引擎构建的自动化框架:公式、总和、计数、复制、约束、事件等),我们在 ORM 实体上添加一个声明性规则。当每条消息都保存到 Temperature 表中时,将调用该规则。如果温度读数超过或小于 ,我们将发送有关该主题的 Kafka 消息。我们还添加了一个约束,以确保我们在正常范围 (-) 内接收数据。我们将让另一个事件使用者处理警报消息。
commit_eventTemperaturecommit_eventMAX_TEMPMIN_TEMP“TempRangeAlert”32132
TDD 行为测试

使用 TDD(Test Driven Development),我们可以编写一个 Behave 测试,将记录直接插入到 Temperature 表中,然后检查返回值。行为以 /(.feature 文件)开头。对于每个场景,我们使用装饰器编写相应的 Python 类。
KafkaMessageSentFeatureScenarioBehave

功能定义

Feature: TDD Temperature Examplebr
br
Scenario: Temperature Processingbr
  Given A Kafka Message Normal (Temperature)br
  When Transactions normal temperature is submittedbr
  Then Check KafkaMessageSent Flag is Falsebr
br
Scenario: Temperature Processingbr
  Given A Kafka Message Abnormal (Temperature)br
  When Transactions abnormal temperature is submittedbr
  Then Check KafkaMessageSent Flag is True

TDD Python 类

from behave import *br
import safrsbr
br
db = safrs.DB br
session = db.sessionbr
br
def insertTemperature(temp:int) -> bool:br
    entity = model.Temperature()br
    entity.TempReading = tempbr
    entity.DeviceId = 'local_behave_test'br
    session.add(entity) br
    return entity.KafkaMessageSent br
br
@given('A Kafka Message Normal (Temperature)')br
def step_impl(context):br
    context.temp = 76br
    assert Truebr
br
@when('Transactions normal temperature is submitted')br
def step_impl(context):br
    context.response_text = insertTemperature(context.temp)br
br
@then('Check KafkaMessageSent Flag is False')br
def step_impl(context):br
    assert context.response_text  == False


总结

使用 ChatGPT 为 Consumer 和 Producer 生成 Kafka 消息代码似乎是一个很好的起点。安装 Confluent Docker for Kafka。将 API Logic Server 用于声明性逻辑规则,使我们能够将公式、约束和事件添加到正常的事务流中,并将其添加到我们的 SQL 数据库中,并生成(和转换)新的 Kafka 消息,这是一个很好的组合。ChatGPT 和声明式逻辑是“配对编程”的下一个层次。

from confluent_kafka import Producerbr
conf = {'bootstrap.servers': 'localhostd:9092'}br
producer = Producer(conf)br
MAX_TEMP = arg.MAX_TEMP or 102br
MIN_TEMP = arg.MIN_TTEMP or 78br
    br
def produce_message(br
    row: models.KafkaMessage, br
    old_row: models.KafkaMessage, br
    logic_row: LogicRow):br
  br
    if logic_row.isInserted() and row.TempReading > MAX_TEMP:br
        produce(topic="TempRangeAlert", br
               key=row.Id,br
               value=f"The temperature {row.TempReading}F exceeds {MAX_TEMP}F on Device {row.DeviceId}")br
    row.KafkaMessageSent = Truebr
br
  if logic_row.isInserted() and row.TempReading < MIN_TEMP:br
        produce(topic="TempRangeAlert", br
               key=row.Id,br
               value=f"The temperature {row.TempReading}F less than {MIN_TEMP}F on Device {row.DeviceId}")br
br
    row.KafkaMessageSent = Truebr
    br
   Rules.constraint(models.Temperature, br
                    as_expression= lambda row: row.TempReading < 32 or row.TempReading > 132, br
                    error_message= "Temperature {row.TempReading} is out of range"br
   Rules.commit_event(models.Temperature, calling=produce_message)

仅当温度读数大于或小于时才会生成警报消息。Constraint 将在调用 commit 事件之前检查温度范围(请注意,规则始终是无序的,可以随着规范的变化而引入)。MAX_TEMPMIN_TEMP


原文标题:Kafka Event Streaming AI and Automation

原文链接:
https://dzone.com/articles/event-streaming-ai-amp-automation

作者:Tyler Band

编译:LCR

相关推荐

总结下SpringData JPA 的常用语法

SpringDataJPA常用有两种写法,一个是用Jpa自带方法进行CRUD,适合简单查询场景、例如查询全部数据、根据某个字段查询,根据某字段排序等等。另一种是使用注解方式,@Query、@Modi...

解决JPA在多线程中事务无法生效的问题

在使用SpringBoot2.x和JPA的过程中,如果在多线程环境下发现查询方法(如@Query或findAll)以及事务(如@Transactional)无法生效,通常是由于S...

PostgreSQL系列(一):数据类型和基本类型转换

自从厂子里出来后,数据库的主力就从Oracle变成MySQL了。有一说一哈,贵确实是有贵的道理,不是开源能比的。后面的工作里面基本上就是主MySQL,辅MongoDB、ES等NoSQL。最近想写一点跟...

基于MCP实现text2sql

目的:基于MCP实现text2sql能力参考:https://blog.csdn.net/hacker_Lees/article/details/146426392服务端#选用开源的MySQLMCP...

ORACLE 错误代码及解决办法

ORA-00001:违反唯一约束条件(.)错误说明:当在唯一索引所对应的列上键入重复值时,会触发此异常。ORA-00017:请求会话以设置跟踪事件ORA-00018:超出最大会话数ORA-00...

从 SQLite 到 DuckDB:查询快 5 倍,存储减少 80%

作者丨Trace译者丨明知山策划丨李冬梅Trace从一开始就使用SQLite将所有数据存储在用户设备上。这是一个非常不错的选择——SQLite高度可靠,并且多种编程语言都提供了广泛支持...

010:通过 MCP PostgreSQL 安全访问数据

项目简介提供对PostgreSQL数据库的只读访问功能。该服务器允许大型语言模型(LLMs)检查数据库的模式结构,并执行只读查询操作。核心功能提供对PostgreSQL数据库的只读访问允许L...

发现了一个好用且免费的SQL数据库工具(DBeaver)

缘起最近Ai不是大火么,想着自己也弄一些开源的框架来捣腾一下。手上用着Mac,但Mac都没有显卡的,对于学习Ai训练模型不方便,所以最近新购入了一台4090的拯救者,打算用来好好学习一下Ai(呸,以上...

微软发布.NET 10首个预览版:JIT编译器再进化、跨平台开发更流畅

IT之家2月26日消息,微软.NET团队昨日(2月25日)发布博文,宣布推出.NET10首个预览版更新,重点改进.NETRuntime、SDK、libraries、C#、AS...

数据库管理工具Navicat Premium最新版发布啦

管理多个数据库要么需要使用多个客户端应用程序,要么找到一个可以容纳你使用的所有数据库的应用程序。其中一个工具是NavicatPremium。它不仅支持大多数主要的数据库管理系统(DBMS),而且它...

50+AI新品齐发,微软Build放大招:拥抱Agent胜算几何?

北京时间5月20日凌晨,如果你打开微软Build2025开发者大会的直播,最先吸引你的可能不是一场原本属于AI和开发者的技术盛会,而是开场不久后的尴尬一幕:一边是几位微软员工在台下大...

揭秘:一条SQL语句的执行过程是怎么样的?

数据库系统能够接受SQL语句,并返回数据查询的结果,或者对数据库中的数据进行修改,可以说几乎每个程序员都使用过它。而MySQL又是目前使用最广泛的数据库。所以,解析一下MySQL编译并执行...

各家sql工具,都闹过哪些乐子?

相信这些sql工具,大家都不陌生吧,它们在业内绝对算得上第一梯队的产品了,但是你知道,他们都闹过什么乐子吗?首先登场的是Navicat,这款强大的数据库管理工具,曾经让一位程序员朋友“火”了一把。Na...

详解PG数据库管理工具--pgadmin工具、安装部署及相关功能

概述今天主要介绍一下PG数据库管理工具--pgadmin,一起来看看吧~一、介绍pgAdmin4是一款为PostgreSQL设计的可靠和全面的数据库设计和管理软件,它允许连接到特定的数据库,创建表和...

Enpass for Mac(跨平台密码管理软件)

还在寻找密码管理软件吗?密码管理软件有很多,但是综合素质相当优秀且完全免费的密码管理软件却并不常见,EnpassMac版是一款免费跨平台密码管理软件,可以通过这款软件高效安全的保护密码文件,而且可以...