百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT知识 > 正文

Kafka 事件流式处理 AI 和自动化_kafka流程图

liuian 2025-02-20 16:44 15 浏览

探索如何使用 ChatGPT 创建物联网 Kafka 事件消费者,并使用 API 逻辑服务器来生成定义范围之外的温度读取事件。

Apache Kafka 已成为从静态数据(数据库事务)迁移到事件流的企业架构的明显领导者。有许多演示文稿解释了 Kafka 的工作原理以及如何扩展此技术堆栈(本地或云)。使用 ChatGPT 构建一个微服务来消费消息并丰富、转换和持久化是该项目的下一阶段。在此示例中,我们将使用来自 IoT 设备 (RaspberryPi) 的输入,该设备每隔几秒钟发送一次 JSON 温度读数。

使用消息

生成(并记录)每条 Kafka 事件消息时,Kafka 微服务使用者已准备好处理每条消息。我让 ChatGPT 生成一些 Python 代码,它为我提供了从命名的“主题”中轮询和读取的基础知识。我得到的是一个非常好的开始,可以消耗主题、键和 JSON 有效负载。ChatGPT 创建了代码,使用 SQLAlchemy 将其持久化到数据库中。然后,我想转换 JSON 有效负载,并使用 API Logic Server(ALS - GitHub 上的一个开源项目)规则来取消 JSON,验证、计算并根据给定范围之外的源温度生成一组新的消息有效负载。

ChatGPT: “design a Python Event Streaming Kafka Consumer interface”

注意:ChatGPT 选择了 Confluent Kafka 库(并使用其 Docker Kafka 容器)- 您可以修改代码以使用其他 Python Kafka 库。

SQLAlchemy模型

使用 API Logic Server(ALS:Python 开源平台),我们连接到 MySQL 数据库。ALS 将读取这些表,并为每个 ORM 端点创建一个 SQLAlchemy ORM 模型、一个 react-admin 用户界面、safrs-JSON Open API (Swagger) 和一个正在运行的 REST Web 服务。新的温度表将包含时间戳、IoT 设备 ID 和温度读数。在这里,我们使用 ALS 命令行实用程序来创建 ORM 模型:

ApiLogicServer create --project_name=iot --db_url=mysql+pymysql://root:password@127.0.0.1:3308/iot

API Logic Server 生成的类用于保存我们的值。Temperature

class Temperature(SAFRSBase, Base):br
    __tablename__ = 'Temperature'br
    _s_collection_name = 'Temperature' # type: ignorebr
    __bind_key__ = 'None'br
br
    Id = Column(Integer, primary_key=True)br
    DeviceId = Column(Integer, nullable=False)br
    TempReading = Column(Integer, nullable=False)br
    CreateDT = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"), nullable=False)br
    KafkaMessageSent = Column(Booelan, default=text("False"))

变化

因此,我们不是将 Kafka JSON 使用者消息再次保存在 SQL 数据库中(并触发规则来执行工作),而是解包 JSON 有效负载 () 并将其插入 Temperature 表,而不是保存 JSON 有效负载。我们让声明性规则处理每个温度读数。util.row_to_entity

    entity = models.Temperature()br
    util.row_to_entity(message_data, entity) br
    session.add(entity)

当消费者收到消息时,它会将其添加到会话中,从而触发规则(如下)。commit_event

声明性逻辑:生成消息

使用 API Logic Server(使用 SQLAlchemy、Flask 和类似 LogicBank 电子表格的规则引擎构建的自动化框架:公式、总和、计数、复制、约束、事件等),我们在 ORM 实体上添加一个声明性规则。当每条消息都保存到 Temperature 表中时,将调用该规则。如果温度读数超过或小于 ,我们将发送有关该主题的 Kafka 消息。我们还添加了一个约束,以确保我们在正常范围 (-) 内接收数据。我们将让另一个事件使用者处理警报消息。
commit_eventTemperaturecommit_eventMAX_TEMPMIN_TEMP“TempRangeAlert”32132
TDD 行为测试

使用 TDD(Test Driven Development),我们可以编写一个 Behave 测试,将记录直接插入到 Temperature 表中,然后检查返回值。行为以 /(.feature 文件)开头。对于每个场景,我们使用装饰器编写相应的 Python 类。
KafkaMessageSentFeatureScenarioBehave

功能定义

Feature: TDD Temperature Examplebr
br
Scenario: Temperature Processingbr
  Given A Kafka Message Normal (Temperature)br
  When Transactions normal temperature is submittedbr
  Then Check KafkaMessageSent Flag is Falsebr
br
Scenario: Temperature Processingbr
  Given A Kafka Message Abnormal (Temperature)br
  When Transactions abnormal temperature is submittedbr
  Then Check KafkaMessageSent Flag is True

TDD Python 类

from behave import *br
import safrsbr
br
db = safrs.DB br
session = db.sessionbr
br
def insertTemperature(temp:int) -> bool:br
    entity = model.Temperature()br
    entity.TempReading = tempbr
    entity.DeviceId = 'local_behave_test'br
    session.add(entity) br
    return entity.KafkaMessageSent br
br
@given('A Kafka Message Normal (Temperature)')br
def step_impl(context):br
    context.temp = 76br
    assert Truebr
br
@when('Transactions normal temperature is submitted')br
def step_impl(context):br
    context.response_text = insertTemperature(context.temp)br
br
@then('Check KafkaMessageSent Flag is False')br
def step_impl(context):br
    assert context.response_text  == False


总结

使用 ChatGPT 为 Consumer 和 Producer 生成 Kafka 消息代码似乎是一个很好的起点。安装 Confluent Docker for Kafka。将 API Logic Server 用于声明性逻辑规则,使我们能够将公式、约束和事件添加到正常的事务流中,并将其添加到我们的 SQL 数据库中,并生成(和转换)新的 Kafka 消息,这是一个很好的组合。ChatGPT 和声明式逻辑是“配对编程”的下一个层次。

from confluent_kafka import Producerbr
conf = {'bootstrap.servers': 'localhostd:9092'}br
producer = Producer(conf)br
MAX_TEMP = arg.MAX_TEMP or 102br
MIN_TEMP = arg.MIN_TTEMP or 78br
    br
def produce_message(br
    row: models.KafkaMessage, br
    old_row: models.KafkaMessage, br
    logic_row: LogicRow):br
  br
    if logic_row.isInserted() and row.TempReading > MAX_TEMP:br
        produce(topic="TempRangeAlert", br
               key=row.Id,br
               value=f"The temperature {row.TempReading}F exceeds {MAX_TEMP}F on Device {row.DeviceId}")br
    row.KafkaMessageSent = Truebr
br
  if logic_row.isInserted() and row.TempReading < MIN_TEMP:br
        produce(topic="TempRangeAlert", br
               key=row.Id,br
               value=f"The temperature {row.TempReading}F less than {MIN_TEMP}F on Device {row.DeviceId}")br
br
    row.KafkaMessageSent = Truebr
    br
   Rules.constraint(models.Temperature, br
                    as_expression= lambda row: row.TempReading < 32 or row.TempReading > 132, br
                    error_message= "Temperature {row.TempReading} is out of range"br
   Rules.commit_event(models.Temperature, calling=produce_message)

仅当温度读数大于或小于时才会生成警报消息。Constraint 将在调用 commit 事件之前检查温度范围(请注意,规则始终是无序的,可以随着规范的变化而引入)。MAX_TEMPMIN_TEMP


原文标题:Kafka Event Streaming AI and Automation

原文链接:
https://dzone.com/articles/event-streaming-ai-amp-automation

作者:Tyler Band

编译:LCR

相关推荐

RazorSQL Mac版(SQL数据库查询工具)

RazorSQLMac特别版是一款看似简单实则功能非常出色的SQL数据库查询、编辑、浏览和管理工具。RazorSQLformac特别版可以帮你管理多个数据库,支持主流的30多种数据库,包括Ca...

史上最强!开源数据库管理工具DBeaver 24.2发布

DBeaverCommunity是一个免费的跨平台数据库工具,面向开发人员、数据库管理员、分析师和所有使用数据的人员。它支持所有流行的SQL数据库,如MySQL、MariaDB、PostgreSQL...

10个优秀的MySQL管理工具,都是大佬们的珍藏

Mysql开源、体积小、速度快、成本低、安全性高,目前在全球中小型网站中被广泛应用。今天给大家介绍10个优秀的MySQL管理工具,都是大佬们的珍藏,对你有用的话,可以收藏转发。1、Induction...

Mac电脑如何安装向量数据库Milvus

Milvus是一个高性能、高度可扩展的矢量数据库,可在从笔记本电脑到大规模分布式系统的各种环境中高效运行。Milvus提供强大的数据建模功能,使您能够将非结构化或多模态数据组织成结构化集合。Mil...

干掉 PowerDesigner!这款国人开源的数据库设计工具真香

当我们在项目开发初期时,往往需要设计大量的表,此时使用数据库设计工具就会比较高效!今天给大家推荐一款国人开源的数据库设计工具chiner,界面漂亮,功能强大,希望对大家有所帮助!聊聊PowerDesi...

数据库管理工具推荐!SQL Studio:免费、高效,歪...

随着国际环境的变化,越来越多的企业基于供应链安全的需求。信息技术的飞速发展,数据库管理工具的需求也越来越迫切。然而,在众多软件中,要找到一款得心应手的数据库管理工具并不容易。今天,我向大家推荐一款功能...

Mac密码安全管理工具----Enpass(mac密码管理在哪里)

Enpassmac版是一款适用于macOS用户的密码安全管理工具,使用Enpass,你无需再为记住太多的密码和其他重要凭据而头疼了。Enpass把你的密码存放在一个安全的地方,然后通过一个主密码随时...

超实用的14款MySQL数据库管理工具

MySQL是当前流行的数据库引擎之一,具有成本低、速度快、体积小且开放源代码的优点。今天就给大家分享14款MySQL数据库管理工具。1.MySQLDumper这款软件的应用,有效解决使用PHP进行大数...

神器收藏:macOS最强工具清单,16.6k+星 awesome-macOS

神器收藏:macOS最强工具清单,16.6k+星标必看引言在macOS生态中,有一个备受瞩目的神仓库,汇集了最全面、最实用的macOS应用和工具清单。这个项目在GitHub上已获得超过16.6k的...

JetBrains DataGrip Mac中文破解版V2025.1下载安装教程

DataGripforMac是由JetBrains开发的数据库集成开发环境(IDE),专为数据库管理员和开发人员设计。它支持多种数据库(如MySQL、PostgreSQL、Oracle、SQ...

GIS坐标参考系统:EPSG、WKT和PROJ

在之前的教程中,我们介绍了什么是坐标参考系统(CRS)、坐标参考系统的组成部分以及投影坐标参考系统和地理坐标参考系统之间的一般差异。在这个教程中,我们将介绍CRS信息的不同存储方式。推荐:用...

【地理信息可视化】basemap(cartopy)+geopandas显示地图-03

importwarningswarnings.filterwarnings('ignore')importosimportnumpyasnpfromscipy....

字符识别之PaddleOcr介绍、安装与应用

paddleocr介绍paddleocr是一款轻量型字符识别工具库,支持多语言识别,支持pip安装与自定义训练。详细信息如下表所示。名称许可证当前版本下载地址(github地址)支持语言运行方式pi...

111.Python——基于pipenv打包PaddlePaddle的GUI项目

飞桨PaddlePaddle是百度的深度学习框架,用来做一些项目还是非常不错。但是打包就是一件非常麻烦的过程。在文中有讲过打包问题。29.Python程序打包成可执行文件——常见疑难问题解决办法。本文...

Shamos算法:一种在平面上找到最远点的方法

旋转卡尺算法简介Shamos算法,也叫旋转卡尺(Rotatingcalipers)算法,是一种用于解决计算几何问题的优化算法。它可以用来解决许多几何问题,包括计算点集的宽度或直径。算法的名称来源于其...