Spring Boot3中使用阿里Canal实现MySQL与ElasticSearch的数据同步
liuian 2025-08-03 06:02 2 浏览
在当今数据驱动的互联网时代,数据的高效处理和实时同步至关重要。对于互联网软件开发人员而言,如何在不同的数据存储系统之间实现精准、高效的数据同步是一个常见且具有挑战性的任务。本文将深入探讨如何在 Spring Boot3 框架下,借助阿里的 Canal 工具,实现 MySQL 数据库与 ElasticSearch 之间的数据同步,为大家提供一套完整且实用的解决方案。
Canal 简介
Canal 是阿里巴巴开源的一款基于 MySQL 数据库增量日志解析的中间件,其核心作用是将 MySQL 的 binlog(二进制日志)解析为结构化的数据,并提供给下游系统进行消费。Canal 通过模拟 MySQL 从库的交互协议,伪装成 MySQL 从库向主库发送 dump 协议请求,MySQL 主库在收到请求后,会推送 binary log 给 Canal。随后,Canal 将这些原始的二进制日志解析为易于理解和处理的结构化数据,例如 JSON 格式。这一过程实现了对 MySQL 数据库增量变更数据(包括插入、更新、删除等操作)的高效抓取,为数据同步和其他相关应用场景奠定了基础。
Canal 具备诸多显著优势,使其在数据同步领域脱颖而出。首先,它拥有出色的实时性,基于 MySQL 的 binlog 机制,能够在毫秒级内完成数据同步,确保数据的及时性和一致性。其次,Canal 支持批量获取数据库变更数据,大大减少了网络开销和处理时间,提高了数据处理效率。此外,它还具备多线程处理能力,可以配置多个线程来并行处理不同的数据变更事件,进一步提升整体吞吐量。
同时,Canal 支持断点续传功能,在数据同步过程中,若因各种原因导致中断,它能够从断点处继续消费数据,有效避免数据丢失。另外,Canal 还可以将消费进度持久化到 ZooKeeper 中,即使在出现故障后恢复,也能依据持久化的进度信息继续正常工作,内置的多种容错机制,如重试策略和自动恢复功能,极大地提高了系统的可靠性。
此外,Canal 使用标准化的 binlog 协议,方便与其他系统进行集成,并且支持灵活的过滤规则,可以根据实际需求选择性地订阅特定的数据库和表,还允许动态配置,方便开发者根据业务场景的变化随时调整监控范围和处理逻辑。不仅如此,Canal 还为开发者提供了自定义处理器的接口,开发者可以编写自己的代码来实现复杂的数据处理逻辑,满足多样化的业务需求。最后,Canal 能够精确地捕获和同步数据库的每一行变更,确保数据的一致性,并且能够处理复杂的事务场景,保证事务的原子性和完整性,同时还提供了多种冲突解决策略,有效避免数据同步过程中的冲突问题。
环境准备
(一)MySQL 配置
开启 Binlog 并设置模式:要使用 Canal 实现 MySQL 与 ElasticSearch 的数据同步,首先需要在 MySQL 中开启 Binlog 功能,并将其格式设置为 ROW 模式。这可以通过修改 MySQL 的配置文件(在 Linux 系统中通常是 my.cnf,在 Windows 系统中是 my.ini)来实现。在配置文件中添加或修改以下配置项:
log-bin=mysql-bin
binlog-format=ROW
server-id=1
添加或修改完成后,需要重启 MySQL 服务,使配置生效。开启 Binlog 并设置为 ROW 模式是确保 Canal 能够准确解析·数据变更的关键步骤,因为 ROW 模式会记录每一行数据的具体变更情况,为 Canal 提供详细的同步依据。
创建 Canal 用户并授权:为了让 Canal 能够访问 MySQL 数据库,需要创建一个专门的 Canal 用户,并为其赋予相应的权限。在 MySQL 的命令行或客户端工具中执行以下 SQL 语句:
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
上述语句创建了一个名为 “canal” 的用户,并设置其密码为 “canal”,同时赋予该用户在所有数据库和表上的 SELECT、REPLICATION SLAVE 以及 REPLICATION CLIENT 权限。这些权限是 Canal 能够正常连接 MySQL 并获取 binlog 日志所必需的。执行完上述语句后,记得使用FLUSH PRIVILEGES;语句刷新权限,使新的权限设置生效。
(二)Spring Boot 项目配置
添加依赖:在 Spring Boot 项目的 pom.xml 文件中,需要添加 Canal 客户端依赖以及 Elasticsearch 相关依赖。确保 Canal 版本与本地或服务器上启动的 Canal 版本一致,以避免出现兼容性问题。以下是相关依赖的示例代码:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.10.2</version>
</dependency>
通过添加这些依赖,项目可以引入 Canal 客户端来连接 Canal 服务器并获取数据变更信息,同时引入 Elasticsearch 的高级 REST 客户端,用于将同步的数据写入到 Elasticsearch 中。
新建监听类:创建一个专门的监听类,用于监听 Canal 通道中的 binlog 日志信息,实时捕捉数据库的数据变化。以下是一个示例代码:
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
public class CanalListener {
public void listen() {
// 直接连接Canal
com.alibaba.otter.canal.client.CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("localhost", 11111), "example", "", "");
try {
connector.connect();
connector.subscribe(".*\\..*");// 订阅所有数据库表,可根据需求修改
while (true) {
Message message = connector.get(100);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
processEntries(message.getEntries());
}
connector.ack(batchId); // 确认消息,小于等于此batchId的Message都会被确认
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
private void processEntries(java.util.List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error, data:" + entry.toString(), e);
}
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
// 在这里处理数据变更,将数据同步到Elasticsearch
if (rowData.getIsDdl()) {
// DDL操作处理
} else {
// DML操作处理,如INSERT、UPDATE、DELETE
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
// 获取列名和值,用于构建Elasticsearch文档
}
}
}
}
}
}
在上述代码中,首先通过
CanalConnectors.newSingleConnector方法创建一个 Canal 连接器,连接到本地的 Canal 服务器(地址为 “localhost”,端口为 “11111”),并订阅所有数据库表的变更信息。然后,在一个无限循环中,通过connector.get(100)方法获取一批(最多 100 条)数据变更消息。
如果获取到的消息 ID 为 - 1 或消息列表为空,表示没有新的变更数据,程序会暂停 1 秒后再次尝试获取。如果有新的变更数据,则调用processEntries方法对这些数据进行处理。在processEntries方法中,会遍历每一个数据变更条目,对于非事务开始和结束的条目,解析其RowChange对象,根据是否为 DDL 操作或 DML 操作分别进行处理。
对于 DML 操作,进一步遍历变更后的列数据,获取列名和值,为后续同步到 Elasticsearch 做准备。最后,通过connector.ack(batchId)方法确认已处理的消息批次,确保 Canal 服务器知道哪些消息已经被成功处理。
启动类集成接口:在 Spring Boot 的启动类上集成CommandLineRunner接口,并重写run方法,在run方法中启动 Canal 监听。这样,当 Spring Boot 项目启动时,就会自动开始监听 Canal 通道中的数据变更。以下是启动类的示例代码:
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class YourApplication implements CommandLineRunner {
private final CanalListener canalListener;
public YourApplication(CanalListener canalListener) {
this.canalListener = canalListener;
}
public static void main(String[] args) {
SpringApplication.run(YourApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
canalListener.listen();
}
}
在上述代码中,启动类YourApplication实现了CommandLineRunner接口,在构造函数中注入了CanalListener实例。在run方法中,调用canalListener.listen()方法启动 Canal 监听,开始捕获 MySQL 数据库的变更数据。
同步到 Elasticsearch 的关键要点
(一)异常处理
在数据同步过程中,由于网络波动、系统故障等原因,可能会出现各种异常情况。为了确保数据的一致性,需要增加重试机制或记录详细的错误日志。例如,可以使用 Spring 的RetryTemplate来实现重试逻辑。在捕获到异常时,RetryTemplate会按照预设的重试策略进行多次重试,直到操作成功或达到最大重试次数。同时,记录详细的错误日志,包括异常信息、发生时间、涉及的数据等,便于后续排查问题。这样,即使在同步过程中遇到临时的异常,也能最大程度保证数据的完整性和一致性。
(二)性能优化
为了提高数据同步的性能,减少对系统资源的消耗,可以采用批量处理 Canal 消息的方式,减少对 Redis/ES 的频繁写入。可以设置一个合适的批量处理大小,当收集到一定数量的变更消息后,一次性将这些消息写入到 Elasticsearch 中。这样可以减少网络请求次数,提高写入效率。同时,合理配置 Elasticsearch 的索引设置,如分片数量、副本数量等,根据数据量和查询负载进行优化,以提升整体的读写性能。
(三)数据结构
确保 Elasticsearch 的索引 Mapping 与 MySQL 表结构兼容是实现数据准确同步的重要环节。在创建 Elasticsearch 索引时,需要根据 MySQL 表的字段类型、索引情况等,定义合适的 Mapping。例如,对于 MySQL 中的字符串类型字段,在 Elasticsearch 中可能需要根据字段用途选择合适的文本类型或关键字类型,并设置是否进行分词等。对于数字类型、日期类型等字段,也需要在 Elasticsearch 中进行正确的映射配置,以保证数据在同步后能够正确存储和检索。
(四)事务管理
如果业务对数据一致性有较高要求,需要强一致性保证,可以结合本地事务表或消息队列(如 RocketMQ)做可靠投递。当 MySQL 中发生数据变更时,首先将变更信息记录到本地事务表中,并发送一条包含变更信息的消息到消息队列。Canal 在处理数据变更时,从消息队列中获取消息,并将数据同步到 Elasticsearch。同时,在本地事务表中标记该变更已处理。如果在同步过程中出现异常,可以通过查询本地事务表,重新发送未成功同步的消息,确保数据最终一致性。通过这种方式,能够在复杂的分布式环境下,保障 MySQL 与 Elasticsearch 之间的数据一致性。
总结
通过上述步骤,我们详细介绍了在 Spring Boot3 中使用阿里 Canal 实现 MySQL 数据库与 ElasticSearch 数据同步的方法和关键要点。从 Canal 的原理和优势,到 MySQL 和 Spring Boot 项目的配置,再到同步过程中的异常处理、性能优化、数据结构适配以及事务管理等方面,为互联网软件开发人员提供了一套完整的技术解决方案。在实际应用中,开发者可以根据具体的业务需求和场景,对这些技术进行灵活调整和优化,以构建高效、可靠的数据同步系统,满足不断变化的业务需求。希望本文能够对大家在相关技术领域的实践和探索有所帮助,助力大家在互联网软件开发的道路上取得更好的成果。
相关推荐
- 快速上手maven
-
Maven的作用在开发过程中需要用到各种各样的jar包,查找和下载这些jar包是件费时费力的事,特别是英文官方网站,可以将Maven看成一个整合了所有开源jar包的合集,我们需要jar包只需要从Mav...
- Windows系统——配置java环境变量
-
怎么配置java环境变量呢?首先是安装好jdk然后我的电脑右键选择属性然后选择左侧高级系统设置高级然后点环境变量然后在用户变量或系统变量中配置,用户变量指的是只有当前用户可用,系统变量指的是系统中...
- ollama本地部署更改默认C盘,Windows配置环境变量方法
-
ollama是一个大语言模型(LLM——LargeLanguageModel),本地电脑安装网上也要很多教程,看上去非常简单,一直下一步,然后直接就可以使用了。但是我在实操的时候并不是这样,安装完...
- # Windows 环境变量 Path 显示样式更改
-
#怎样学习Java##Windows环境变量Path显示样式更改##1、传统Path环境变量显示:```---》键盘上按【WIN+I】打开系统【设置】---》依次点击---》【系统...
- 如何在Windows中创建用户和系统环境变量
-
在Windows中创建环境变量之前您应该了解的事情在按照本指南中所示的任何步骤创建指向文件夹、文件或其他任何内容的用户和系统变量之前,您应该了解两件事。第一个也是最重要的一个是了解什么是环境变量。...
- Windows 中的环境变量是什么?
-
Windows中的环境变量是什么?那么,Windows中的环境变量是什么?简而言之,环境变量是描述应用程序和程序运行环境的变量。所有类型的程序都使用环境变量来回答以下问题:我安装的计算机的名称是什么...
- 【Python程序开发系列】谈一谈Windows环境变量:系统和用户变量
-
这是我的第350篇原创文章。一、引言环境变量(environmentvariables)一般是指在操作系统中用来指定操作系统运行环境的一些参数,如:临时文件夹位置和系统文件夹位置等。环境变量是在操作...
- 系统小技巧:还原Windows10路径环境变量
-
有时,我们在Windows10的“运行”窗口中执行一些命令或运行一些程序,这时即便没有指定程序的具体路径,只输入程序的名称(如notepad.exe),便可以迅速调用成功。这是因为Windows默认...
- Windows10系统的“环境变量”在哪里呢?
-
当我们在操作系统是Windows10的电脑里安装了一些软件,要通过配置环境变量才能使用软件时,在哪里能找到“环境变量”窗口呢?可以按照下面的步骤找到“环境变量”。说明:下面的步骤和截图是在Window...
- 系统小技巧:彻底弄懂Windows 10环境变量
-
每当我们进行系统清理时,清理软件总能自动找到Windows的临时文件夹之所在,然后加以清理,即便是我们重定向了TEMP目录也是如此。究其原因,是因为清理软件会根据TEMP环境变量来判断现有临时文件夹的...
- MySQL 5.7 新特性大全和未来展望
-
本文转自微信公众号:高可用架构作者:杨尚刚引用美图公司数据库高级DBA,负责美图后端数据存储平台建设和架构设计。前新浪高级数据库工程师,负责新浪微博核心数据库架构改造优化,以及数据库相关的服务器存...
- MySQL系列-源码编译安装(v8.0.25)
-
一、前言生产环境建议使用二进制安装法,其优点是部署简单、快速、方便,并且相对"yum/rpm安装"方法能更方便地自定义文件存放的目录结构,方便用脚本批量部署,方便日后运维管理。在生产...
- MySQL如何实时同步数据到ES?试试这款阿里开源的神器!
-
前几天在网上冲浪的时候发现了一个比较成熟的开源中间件——Canal。在了解了它的工作原理和使用场景后,顿时产生了浓厚的兴趣。今天,就让我们跟随我的脚步,一起来揭开它神秘的面纱吧。简介canal翻译为...
- 技术老兵十年专攻MySQL:编写了763页核心总结,90%MySQL问题全解
-
MySQL是开放源码的关系数据库管理系统,由于性能高、成本低、可靠性好,成为现在最流行的开源数据库。MySQL学习指南笔记领取方式:关注、转发后私信小编【111】即可免费获得《MySQL进阶笔记》的...
- Mysql和Hive之间通过Sqoop进行数据同步
-
文章回顾理论大数据框架原理简介大数据发展历程及技术选型实践搭建大数据运行环境之一搭建大数据运行环境之二本地MAC环境配置CPU数和内存大小查看CPU数sysctl machdep.cpu...
- 一周热门
-
-
Python实现人事自动打卡,再也不会被批评
-
【验证码逆向专栏】vaptcha 手势验证码逆向分析
-
Psutil + Flask + Pyecharts + Bootstrap 开发动态可视化系统监控
-
一个解决支持HTML/CSS/JS网页转PDF(高质量)的终极解决方案
-
再见Swagger UI 国人开源了一款超好用的 API 文档生成框架,真香
-
网页转成pdf文件的经验分享 网页转成pdf文件的经验分享怎么弄
-
C++ std::vector 简介
-
系统C盘清理:微信PC端文件清理,扩大C盘可用空间步骤
-
飞牛OS入门安装遇到问题,如何解决?
-
10款高性能NAS丨双十一必看,轻松搞定虚拟机、Docker、软路由
-
- 最近发表
- 标签列表
-
- python判断字典是否为空 (50)
- crontab每周一执行 (48)
- aes和des区别 (43)
- bash脚本和shell脚本的区别 (35)
- canvas库 (33)
- dataframe筛选满足条件的行 (35)
- gitlab日志 (33)
- lua xpcall (36)
- blob转json (33)
- python判断是否在列表中 (34)
- python html转pdf (36)
- 安装指定版本npm (37)
- idea搜索jar包内容 (33)
- css鼠标悬停出现隐藏的文字 (34)
- linux nacos启动命令 (33)
- gitlab 日志 (36)
- adb pull (37)
- python判断元素在不在列表里 (34)
- python 字典删除元素 (34)
- vscode切换git分支 (35)
- python bytes转16进制 (35)
- grep前后几行 (34)
- hashmap转list (35)
- c++ 字符串查找 (35)
- mysql刷新权限 (34)