百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT知识 > 正文

趣头条基于Flink+ClickHouse的实时数据分析平台

liuian 2025-01-08 15:16 18 浏览

趣头条一直致力于使用大数据分析指导业务发展。目前在实时化领域主要使用 Flink+ClickHouse 解决方案,覆盖场景包括实时数据报表、Adhoc 即时查询、事件分析、漏斗分析、留存分析等精细化运营策略,整体响应 80% 在 1 秒内完成,大大提升了用户实时取数体验,推动业务更快迭代发展。

本次分享主要内容:

  • 业务场景与现状分析
  • Flink to Hive 的小时级场景
  • Flink to ClickHouse 的秒级场景
  • 未来规划

趣头条的查询页面,分为离线查询和实时查询。离线查询有 presto,spark,hive 等,实时查询则引入了 ClickHouse 计算引擎。

上图为实时数据报表,左边为数据指标的曲线图,右边为详细数据指标,目前数据指标的采集和计算,每五分钟一个时间窗口,当然也会有三分钟或者一分钟的特殊情况。数据都是从 Kafka 实时导入 ClickHouse 进行计算的。

1. 小时级实现架构图

Flink-to-Hive 小时级实现架构图如图所示,架构实现的思路如下:

Database 中的 Binlog 抽数据到 Kafka,同时 Log server 数据也会上报到 Kafka,所有的实时数据落地到 Kafka 之后,通过 Flink 抽取到 HDFS 上。HDFS 到 Hive 之间有条虚线,即 Flink 落地到 HDFS 后,通过程序监控,Flink 在消费完成时,数据落地到 Hive 中可能是小时级的或者是半小时级的,甚至是分钟级的,此时需要知道数据的 Event time 已经到了什么时间,然后再去触发比如 alert table、add partition、 add location 等,把分区写进 Hive 中。这时还需要看一下当前的 Flink 任务的数据时间消费到了什么时间,如9点的数据要落地时,需要看一下 Kafka 里 Flink 数据消费是否到了9点,然后在 Hive 中触发分区写入。

2. 实现原理

这块的实现原理主要是使用 Flink 高阶版本的特性 StreamingFileSink。

StreamingFileSink 的主要功能如下:

  • forBulkFormat 支持 avro、parquet 格式,也就是支持链式的存储格式
  • withBucketAssigner 自定义按数据时间分桶,支持数据时间的分桶,上图用到该功能的地方定义了一个 EventtimeBucket,按照数据的时间落地到离线中
  • OnCheckpointRollingPolicy,会根据 CheckPoint 时间来进行数据的落地,此处可以理解为按照数据的时间,比如按照一定的 CheckPoint 时间内进行数据落地、回滚,数据落地策略还可以按照数据大小落地
  • Exactly-Once 语义实现,Flink 中自带的 StreamingFileSink 是用 Exactly-Once 语义来实现的。Flink 中有两个 Exactly-Once 的实现,第一个是 Kafka 的 Exactly-Once,第二个是 StreamingFileSink 实现了 Exactly-Once 语义,像上图中 CheckpointRollingPolicy 设置的是十分钟落地一次到 HDFS 文件中

下面来具体说一下 Exactly-Once 是如何实现的。

① Exactly-Once

具体实现 Exactly-Once 的方式,如上图所示,左侧是一个二阶段的模型,Coordinator 发一个 perpare,所有的参与者或者执行者开始触发 ack 动作,Coordinator 收到所有人的 ack 动作后,就开始执行 commit,所有的执行者就把左右的数据进行落地。到了 Flink 这块,Source 收到了 checkpoint barrier 流的时候,开始触发 snapshorState 发送到 Job Manager,Job Manager 把所有的 CheckPoint 都完成以后,会发送一个 notifyCheckpointComplete,Flink 这块跟上图左边的二阶段提交协议是一致的,Flink 也是可以实现二阶段提交协议的。

② 如何使用 Flink 实现二阶段提交协议

首先 StramingFileSink 实现了两个接口,分别是 CheckpointedFunction 和 CheckpointListener。

  • CheckpointedFunction 实现了 initialzeState 和 snaoshotState 这两个函数;
  • CheckpointListener 是 notifyCheckPoint Complete 的方法实现。

所以这两个接口可以实现二阶段提交的语义,initialzeState 算子刚启动的时候,它会启动三个动作 commitpendingFile、restoreInProgressFile、truncate。

第一步 commitpedingFile,也就是实时的数据落地到 HDFS 的时候,有三个状态,第一个状态是 in-progress,即正在进行中的一个状态,第二个状态是 pending 的状态,第三个状态是 finish 的状态。

在实时的写入时,如果 CheckPoint 还没有在这之间成功的时候,程序出问题了,那接下来启动的时候就会触发 initialzeState,会把曾经 pending 的 file 进行 commit,然后把写了一半的文件比如 in-progress 文件重置或者截断,进行重置或者截断是使用的是 Hadoop 的2.7版本的 turncate 方式。也就是数据在一直写入,但是写入没有达到一个 CheckPoint 周期,也就是说中间数据断开了,下一次启动的时候,要么把之前没有写完整的数据截断掉,之前 CheckPoint 触发已经写好的数据直接 commit。

第二步 invoke 就是数据实时的写入

第三步 snapshotState 在触发 CheckPoint 的时候会把 in-progress 文件转成 pending state 文件,也就是开始提交文件,同时记录 length 长度。记录长度是因为前边的步骤需要 truncate 来截断多长,snapshot 时,是没有真正的写入到 HDFS,其实是写入到 ListState,等所有的 CheckPoint 算子都完成了,就把 ListState 中的数据都刷到 HDFS 中,只要数据存在 Flink 自带的 state 中,不断把数据成功的刷到 HDFS 中就行了。

第四步 notifyCheckPoint Complete 会触发 pending 动作到 finished 状态的数据写入,实现的方式直接使用 rename,Streaming 会不断的写入 HDFS 中的临时文件,等到 notifyCheckPoint 结束之后,直接做一个 rename 动作,写成正式文件。

3. 跨集群多 nameservices

趣头条的实时集群跟离线集群是独立的,实时集群目前是一套,离线集群是有多套。通过实时集群要写入到离线集群,这样就会遇到一个问题,HDFS nameservices 问题,如果在实时集群中把所有的离线集群的 nameservice 用 namenode HA 的方式全部打入到实时集群,是不太合适的。所以使用 Flink 任务中 resource 下边把 HDFS 中的 xml 文件中间加 final 标签,设置为 true。此处的 value 标签中,stream 是一个实时集群,date 是一个离线集群,这样把两个 HA 配置在 value 标签,从而达到实时集群是实时集群,离线集群是离线集群,中间的 HDFS 中 set 不需要相互修改,直接在客户端时间就行了。

4. 多用户写入权限

针对多用户权限写入的问题,实时写入离线 HDFS 中的时候,会涉及到用户权限。遇到用户权限时,也会有一个问题,Flink 实时提交的用户,是定义好的,所有的程序里用户是同一个,但是离线是多个用户,Flink 目前对于这块用户的权限做的还不够好,所以我们自己改造了一下,在 API 中添加了 withBucketUser,上边已经配置好了 nameServices,然后通过该参数来配置具体是那个用户来写入 HDFS 中,这是 API 层级的。

API 层级的好处是一个 Flink 程序可以写多个,可以指定不同的 HDFS 的不同的用户就可以。具体实现就是在 Hadoop file system 中加一个 ugi.do as,代理用户。以上是趣头条用 Flink 在实时数据同步到 Hive 做的一些工作。其中会有一些小文件的问题,针对小文件,我们通过后台程序定期的 merge,如果 CheckPoint 的时间很短,就会出现大量的小文件的问题。

1. 秒级实现架构图

首先来解释一下趣头条使用 Flink+ClickHouse 的场景,最开始展示的很多实时指标,可能是每五分钟计算一次,也可能是每三分钟计算一次。如果每一个实时指标用一个 Flink 任务,即使是 FlinkSQL 来写,比如消费一个 Kafka Topic,计算它的日活、新增、流程等,当用户提出一个新的需求,那这个 Flink 任务是需要修改还是再启动一个 Flink 任务来消费这个 Topic,这样的话就会出现 Flink 任务在不断的修改或者不断的启动新的 Flink 新的任务。为了解决这个问题,就让 Flink 后边接一个套 ClickHouse 实现整体的 OLAP。

上图为秒级实现架构图,从 Kafka 到 Flink 到 Hive 然后再到 ClickHouse 集群,对接外部 Horizon ( 实时报表 )、QE ( 实时 adhoc 查询 )、千寻 ( 数据分析 )、用户画像 ( 实时的用户画像 )。

2. Why Flink+ClickHouse

具体来说为什么要用 Flink+ClickHouse,主要有以下几点:

  • 指标实现支持 sql 描述,以前的方案使用是 storm 的程序,通过 stormsql 实现,包括 flinksql,这些内容对于 UDF 支持相对有限,但是现在这套 Flink+ClickHouse 基本上可以把分析师提的指标通过 sql 实现。
  • 指标的上下线互不影响,这个主要是解决上边提到的关于 Flink 任务消费了 topic 以后,假如用户提出新的指标的时候,是启动新任务还是要不断修改的问题。
  • 数据可回溯,方便异常排查,这个就类似上边提到的假如我的日活掉了,需要知道哪些指标的口径的逻辑掉了、哪个上报的数据掉了,如 cmd 掉了还是数据流 kafka 掉了还是用户上报的时候指标没有上报导致的日活掉了。假如单纯的 flink 的话,只是会计算出那个指标掉了,是没办法回溯的。
  • 计算快,一个周期内完成所有的指标计算,现在的 horizon 曲线可能是几百上千,需要在五分钟之内或者十分钟之内,把所有分时、累时、以及维度下降的指标全部计算出来。
  • 支持实时流,分部署部署,运维简单。

目前趣头条 Flink 集群有 100+ 台 32 核 128 G 3.5T SSD,日数据量 2000+ 亿,日查询量 21w+ 次,80% 查询在 1s 内完成。

上图为单表测试结果。ClickHouse 单表测试速度快。但受制于架构,ClickHouse 的 Join 较弱。

上图是处理相对较为复杂的 SQL,count+group by+order by,ClickHouse 在 3.6s内完成 26 亿数据计算。

3. Why ClickHouse so Fast

接下来说一下为什么 ClickHouse 这么快,主要是有以下几点:

  • 列式存储+LZ4、ZSTD 数据压缩:列式存储基本是通用的。
  • 计算存储本地化+向量化执行:计算存储本地化,ClickHouse 跟 presto 不一样,presto 数据可能存在 Hadoop 集群里边或者 HDFS 中,需要把数据拉过来,然后进行实时的计算;而 ClickHouse 是每一台计算机器需要的数据存储在本地的 ssd 盘,只要计算本地的数据就可以了,比如求 count 之类的,计算完成后把其他的节点进行合并就可以了。
  • LSM merge tree+Index:LSM merge tree,他会不断的使用 batch 的形式把数据写入到 ClickHouse 之后,在后台做了一个线程把数据进行 merge,做一个 index 索引,也就是给这张数据表建立很多索引,类如常见的 DT 的时间索引、小时级的数据索引来提高查询性能或者速度。
  • SIMD+LLVM 优化:SIMD 就是一个单指令多数据集,LLVM 是一个 C++ 的编译器
  • SQL 语法、UDF 完善:在这块有很大的需求,比如数据分析以及维度下坠,常规的 horizon 数据报表可能就是 count、sum、以及 group by、order by 等,但是在一些维度下坠或者是数据分析领域,可能会有一个窗口期的概念,在一段窗口期内的留存,所以要用到一些更高的特性,类如时间窗口的功能。

上图是 MergeTree 的运行原理图解,最上边的第一层是数据一个 batch 一个 batch 的实时写入,后台会做每一个层级的数据 merge,这块跟 HBase 差不多的实现,merge 的时候会进行数据的排序,然后做一个数据索引。

上图是 ClickHouse Connector,ClickHouse 有两个概念,local table 和 distribute table。local table 是用来写的,当然 distribute table 也可以写入,但是会出现很大的 io 问题,所以尽量不要写 distribute table。但是可以读 distribute table。5-10w 一个 batch 进行数据写入,正常的情况下,是5秒一个周期。

RoundRobinClickHouse DataSource 这块是趣头条自己实现的;

ClickHouse 官方 API 使用:

BalancedClickHouseDataSource 实现的。

上图是 ClickHouse 官方 API 使用:

BalancedClickHouseDataSource

里边有一个问题,比如 mysql 配置一个 ip 和端口号就可以把数据写入了,但是这块要写入 local table 的,所以必须要知道这一个集群到底有多少 local table,每一个 local table 的 ip 和端口号,假如有100台机器,就必须要把这100台机器的 ip 和端口号配置好,然后进行写入。

官方的 api 中有两个 schedule:

  • 一个是 scheduleActualization
  • 另一个是 scheduleConnectionsCleaning

第一个是指100台机器配置了100个 ip 或者端口号,可能会有一些机器出现 ping 不通或者服务无响应,这块是定时的做一个 Actualiza 来发现这些机器哪些无法连接,触发一个下限来把这些 ip 删除掉。

第二个 scheduleConnectionsCleaning,因为 ClickHouse 是 http 的方式,定期的会把一些没用的 http 的请求清理掉。

针对于官方提供的 API,趣头条对这方面做了一个加强,开发了一个 RoundRobinClickHouseDataSource,实现了三个语义,分别是 testOnBorrow、testOnReturn、testWhileldle。

第一个 testOnBorrow 取链接的时候,设置 为true,然后去 ping 一下这个链接能不能拿到,ClickHouse 写入的时候,使用的 batch,所以尽量就是拿链接的时候要拿到成功的链接;第二个 testOnReturn 设置为 false,testWhileldle 设置为 true,把上边官方的两个 schedule 功能集成进去了。为什么要实现 RoundRobin,主要是因为假如有100台机器,ClickHouse 相对于 Hadoop 来说,还是需要好好维护一下,如果是 insert 的话,后台是不断 merge 的过程,insert 速度大于 merge 速度时候,会导致 merge 速度永远跟不上,所以就写完这台机器接下来写别的机器,以及5秒一个间隔的写,使 merge 的速度尽量跟上 insert 的速度,这块是整个部分最需要注意的地方。

4. Backfill

趣头条针对集群容错做了一些优化,主要包括两点:

  • 第一点是 Flink 任务小时级容错
  • 第二点是 ClickHouse 集群小时级容错

Flink 导入数据到 ClickHouse,来实现数据的查询、报表展示,会遇到一些问题。如 Flink 任务出现故障、报错、数据反压、network 的一些问题;或者 ClickHouse 集群出现了一些不可响应、ZK 跟不上等 ZK 问题;或者集群的负载问题;或者是上边提到的 insert 太快的问题;会导致整个任务都有问题。如果数据量突然暴涨,把 Flink 启动,就会出现一段时间内不停的追数据,可能就需要调大它的并行度之类的,让 Flink 任务把数据追上。但是数据已经积压了,Flink 又要加大它的并发度来处理数据,但是 ClickHouse 那块又限制了 insert 速度不能太快,所以就做了另外一个机制,也就是 Flink 故障了或者 ClickHouse 故障了,等到 ClickHouse 集群恢复之后,Flink 任务还是从最新的开始消费,过去的一段数据不再去追了,通过 Hive 来把数据导入到 ClickHouse。

用 Hive 是因为数据通过 Kafka 已经实时落地到 Hive,通过 waterdrop 把数据写入到 ClickHouse,ClickHouse 是有分区的,只要把上一个小时的数据删除,再把 Hive 一个小时的数据导入进来,这样就可以继续提供数据查询操作了。

最后是对未来的发展与思考。

1. Connector SQL

对于未来的发展,首先是 Connectors SQL,也就是把 Connector 进行 SQL 化,现在是 Flink-to-Hive 以及 Flink-to-ClickHouse,相对来讲,都是比较固化的一些场景,所以是可以进行 sql 化,除了把 HDFS 的路径指定以及用户指定,其他的一些过程都是可以 SQL 化描述出来的。

2. Delta lake

Flink 是流批一体计算引擎,但是没有流批一体的存储。趣头条会用 HBase、Kudu、Redis 等能够与 Flink 实时交互的 KV 存储进行数据计算。如计算新增问题,目前趣头条的方案是需要将 Hive 历史用户刷到 Redis 或 HBase 中,与 Flink 进行实时交互判断用户是否新增。但因为 Hive 中的数据和 Redis 中的数据是存储为两份数据。其次 Binlog 抽取数据会涉及 delete 动作,Hbase,Kudu 支持数据修改,定期回到 Hive 中。带来的问题是 HBase,Kudu 中存在数据,Hive 又保存了一份数据,多出一份或多份数据。如果有流批一体的存储支持上述场景,当 Flink 任务过来,可以与离线数据进行实时交互,包括实时查询 Hive 数据等,可以实时判断用户是否新增,对数据进行实时修改、更新或 delete,也能支持 Hive 的批的动作存储。未来,趣头条考虑对 Flink 做流批的存储,使 Flink 生态统一为流批结合。

作者:王金海 趣头条数据平台负责人,10 年互联网历练,先后在唯品会负责用户画像系统,提供人群的个性化营销服务;饿了么担任架构师,负责大数据任务调度、元数据开发、任务画像等工作;现为趣头条数据中心平台负责人,负责大数据基础计算层 ( spark、presto、flink、clickhouse )、平台服务层 ( libra 实时计算、kepler 离线调度 )、数据产品层 ( qe即时查询、horizon 数据报表、metadata 元数据、数据权限等 )、以及团队建设。

相关推荐

快速上手maven

Maven的作用在开发过程中需要用到各种各样的jar包,查找和下载这些jar包是件费时费力的事,特别是英文官方网站,可以将Maven看成一个整合了所有开源jar包的合集,我们需要jar包只需要从Mav...

Windows系统——配置java环境变量

怎么配置java环境变量呢?首先是安装好jdk然后我的电脑右键选择属性然后选择左侧高级系统设置高级然后点环境变量然后在用户变量或系统变量中配置,用户变量指的是只有当前用户可用,系统变量指的是系统中...

ollama本地部署更改默认C盘,Windows配置环境变量方法

ollama是一个大语言模型(LLM——LargeLanguageModel),本地电脑安装网上也要很多教程,看上去非常简单,一直下一步,然后直接就可以使用了。但是我在实操的时候并不是这样,安装完...

# Windows 环境变量 Path 显示样式更改

#怎样学习Java##Windows环境变量Path显示样式更改##1、传统Path环境变量显示:```---》键盘上按【WIN+I】打开系统【设置】---》依次点击---》【系统...

如何在Windows中创建用户和系统环境变量

在Windows中创建环境变量之前您应该了解的事情在按照本指南中所示的任何步骤创建指向文件夹、文件或其他任何内容的用户和系统变量之前,您应该了解两件事。第一个也是最重要的一个是了解什么是环境变量。...

Windows 中的环境变量是什么?

Windows中的环境变量是什么?那么,Windows中的环境变量是什么?简而言之,环境变量是描述应用程序和程序运行环境的变量。所有类型的程序都使用环境变量来回答以下问题:我安装的计算机的名称是什么...

【Python程序开发系列】谈一谈Windows环境变量:系统和用户变量

这是我的第350篇原创文章。一、引言环境变量(environmentvariables)一般是指在操作系统中用来指定操作系统运行环境的一些参数,如:临时文件夹位置和系统文件夹位置等。环境变量是在操作...

系统小技巧:还原Windows10路径环境变量

有时,我们在Windows10的“运行”窗口中执行一些命令或运行一些程序,这时即便没有指定程序的具体路径,只输入程序的名称(如notepad.exe),便可以迅速调用成功。这是因为Windows默认...

Windows10系统的“环境变量”在哪里呢?

当我们在操作系统是Windows10的电脑里安装了一些软件,要通过配置环境变量才能使用软件时,在哪里能找到“环境变量”窗口呢?可以按照下面的步骤找到“环境变量”。说明:下面的步骤和截图是在Window...

系统小技巧:彻底弄懂Windows 10环境变量

每当我们进行系统清理时,清理软件总能自动找到Windows的临时文件夹之所在,然后加以清理,即便是我们重定向了TEMP目录也是如此。究其原因,是因为清理软件会根据TEMP环境变量来判断现有临时文件夹的...

MySQL 5.7 新特性大全和未来展望

本文转自微信公众号:高可用架构作者:杨尚刚引用美图公司数据库高级DBA,负责美图后端数据存储平台建设和架构设计。前新浪高级数据库工程师,负责新浪微博核心数据库架构改造优化,以及数据库相关的服务器存...

MySQL系列-源码编译安装(v8.0.25)

一、前言生产环境建议使用二进制安装法,其优点是部署简单、快速、方便,并且相对"yum/rpm安装"方法能更方便地自定义文件存放的目录结构,方便用脚本批量部署,方便日后运维管理。在生产...

MySQL如何实时同步数据到ES?试试这款阿里开源的神器!

前几天在网上冲浪的时候发现了一个比较成熟的开源中间件——Canal。在了解了它的工作原理和使用场景后,顿时产生了浓厚的兴趣。今天,就让我们跟随我的脚步,一起来揭开它神秘的面纱吧。简介canal翻译为...

技术老兵十年专攻MySQL:编写了763页核心总结,90%MySQL问题全解

MySQL是开放源码的关系数据库管理系统,由于性能高、成本低、可靠性好,成为现在最流行的开源数据库。MySQL学习指南笔记领取方式:关注、转发后私信小编【111】即可免费获得《MySQL进阶笔记》的...

Mysql和Hive之间通过Sqoop进行数据同步

文章回顾理论大数据框架原理简介大数据发展历程及技术选型实践搭建大数据运行环境之一搭建大数据运行环境之二本地MAC环境配置CPU数和内存大小查看CPU数sysctl machdep.cpu...