Spring Boot3中使用阿里Canal实现MySQL与ElasticSearch的数据同步
liuian 2025-08-03 06:02 31 浏览
在当今数据驱动的互联网时代,数据的高效处理和实时同步至关重要。对于互联网软件开发人员而言,如何在不同的数据存储系统之间实现精准、高效的数据同步是一个常见且具有挑战性的任务。本文将深入探讨如何在 Spring Boot3 框架下,借助阿里的 Canal 工具,实现 MySQL 数据库与 ElasticSearch 之间的数据同步,为大家提供一套完整且实用的解决方案。
Canal 简介
Canal 是阿里巴巴开源的一款基于 MySQL 数据库增量日志解析的中间件,其核心作用是将 MySQL 的 binlog(二进制日志)解析为结构化的数据,并提供给下游系统进行消费。Canal 通过模拟 MySQL 从库的交互协议,伪装成 MySQL 从库向主库发送 dump 协议请求,MySQL 主库在收到请求后,会推送 binary log 给 Canal。随后,Canal 将这些原始的二进制日志解析为易于理解和处理的结构化数据,例如 JSON 格式。这一过程实现了对 MySQL 数据库增量变更数据(包括插入、更新、删除等操作)的高效抓取,为数据同步和其他相关应用场景奠定了基础。
Canal 具备诸多显著优势,使其在数据同步领域脱颖而出。首先,它拥有出色的实时性,基于 MySQL 的 binlog 机制,能够在毫秒级内完成数据同步,确保数据的及时性和一致性。其次,Canal 支持批量获取数据库变更数据,大大减少了网络开销和处理时间,提高了数据处理效率。此外,它还具备多线程处理能力,可以配置多个线程来并行处理不同的数据变更事件,进一步提升整体吞吐量。
同时,Canal 支持断点续传功能,在数据同步过程中,若因各种原因导致中断,它能够从断点处继续消费数据,有效避免数据丢失。另外,Canal 还可以将消费进度持久化到 ZooKeeper 中,即使在出现故障后恢复,也能依据持久化的进度信息继续正常工作,内置的多种容错机制,如重试策略和自动恢复功能,极大地提高了系统的可靠性。
此外,Canal 使用标准化的 binlog 协议,方便与其他系统进行集成,并且支持灵活的过滤规则,可以根据实际需求选择性地订阅特定的数据库和表,还允许动态配置,方便开发者根据业务场景的变化随时调整监控范围和处理逻辑。不仅如此,Canal 还为开发者提供了自定义处理器的接口,开发者可以编写自己的代码来实现复杂的数据处理逻辑,满足多样化的业务需求。最后,Canal 能够精确地捕获和同步数据库的每一行变更,确保数据的一致性,并且能够处理复杂的事务场景,保证事务的原子性和完整性,同时还提供了多种冲突解决策略,有效避免数据同步过程中的冲突问题。
环境准备
(一)MySQL 配置
开启 Binlog 并设置模式:要使用 Canal 实现 MySQL 与 ElasticSearch 的数据同步,首先需要在 MySQL 中开启 Binlog 功能,并将其格式设置为 ROW 模式。这可以通过修改 MySQL 的配置文件(在 Linux 系统中通常是 my.cnf,在 Windows 系统中是 my.ini)来实现。在配置文件中添加或修改以下配置项:
log-bin=mysql-bin
binlog-format=ROW
server-id=1添加或修改完成后,需要重启 MySQL 服务,使配置生效。开启 Binlog 并设置为 ROW 模式是确保 Canal 能够准确解析·数据变更的关键步骤,因为 ROW 模式会记录每一行数据的具体变更情况,为 Canal 提供详细的同步依据。
创建 Canal 用户并授权:为了让 Canal 能够访问 MySQL 数据库,需要创建一个专门的 Canal 用户,并为其赋予相应的权限。在 MySQL 的命令行或客户端工具中执行以下 SQL 语句:
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';上述语句创建了一个名为 “canal” 的用户,并设置其密码为 “canal”,同时赋予该用户在所有数据库和表上的 SELECT、REPLICATION SLAVE 以及 REPLICATION CLIENT 权限。这些权限是 Canal 能够正常连接 MySQL 并获取 binlog 日志所必需的。执行完上述语句后,记得使用FLUSH PRIVILEGES;语句刷新权限,使新的权限设置生效。
(二)Spring Boot 项目配置
添加依赖:在 Spring Boot 项目的 pom.xml 文件中,需要添加 Canal 客户端依赖以及 Elasticsearch 相关依赖。确保 Canal 版本与本地或服务器上启动的 Canal 版本一致,以避免出现兼容性问题。以下是相关依赖的示例代码:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.10.2</version>
</dependency>通过添加这些依赖,项目可以引入 Canal 客户端来连接 Canal 服务器并获取数据变更信息,同时引入 Elasticsearch 的高级 REST 客户端,用于将同步的数据写入到 Elasticsearch 中。
新建监听类:创建一个专门的监听类,用于监听 Canal 通道中的 binlog 日志信息,实时捕捉数据库的数据变化。以下是一个示例代码:
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
public class CanalListener {
public void listen() {
// 直接连接Canal
com.alibaba.otter.canal.client.CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("localhost", 11111), "example", "", "");
try {
connector.connect();
connector.subscribe(".*\\..*");// 订阅所有数据库表,可根据需求修改
while (true) {
Message message = connector.get(100);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
processEntries(message.getEntries());
}
connector.ack(batchId); // 确认消息,小于等于此batchId的Message都会被确认
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
private void processEntries(java.util.List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error, data:" + entry.toString(), e);
}
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
// 在这里处理数据变更,将数据同步到Elasticsearch
if (rowData.getIsDdl()) {
// DDL操作处理
} else {
// DML操作处理,如INSERT、UPDATE、DELETE
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
// 获取列名和值,用于构建Elasticsearch文档
}
}
}
}
}
}在上述代码中,首先通过
CanalConnectors.newSingleConnector方法创建一个 Canal 连接器,连接到本地的 Canal 服务器(地址为 “localhost”,端口为 “11111”),并订阅所有数据库表的变更信息。然后,在一个无限循环中,通过connector.get(100)方法获取一批(最多 100 条)数据变更消息。
如果获取到的消息 ID 为 - 1 或消息列表为空,表示没有新的变更数据,程序会暂停 1 秒后再次尝试获取。如果有新的变更数据,则调用processEntries方法对这些数据进行处理。在processEntries方法中,会遍历每一个数据变更条目,对于非事务开始和结束的条目,解析其RowChange对象,根据是否为 DDL 操作或 DML 操作分别进行处理。
对于 DML 操作,进一步遍历变更后的列数据,获取列名和值,为后续同步到 Elasticsearch 做准备。最后,通过connector.ack(batchId)方法确认已处理的消息批次,确保 Canal 服务器知道哪些消息已经被成功处理。
启动类集成接口:在 Spring Boot 的启动类上集成CommandLineRunner接口,并重写run方法,在run方法中启动 Canal 监听。这样,当 Spring Boot 项目启动时,就会自动开始监听 Canal 通道中的数据变更。以下是启动类的示例代码:
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class YourApplication implements CommandLineRunner {
private final CanalListener canalListener;
public YourApplication(CanalListener canalListener) {
this.canalListener = canalListener;
}
public static void main(String[] args) {
SpringApplication.run(YourApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
canalListener.listen();
}
}在上述代码中,启动类YourApplication实现了CommandLineRunner接口,在构造函数中注入了CanalListener实例。在run方法中,调用canalListener.listen()方法启动 Canal 监听,开始捕获 MySQL 数据库的变更数据。
同步到 Elasticsearch 的关键要点
(一)异常处理
在数据同步过程中,由于网络波动、系统故障等原因,可能会出现各种异常情况。为了确保数据的一致性,需要增加重试机制或记录详细的错误日志。例如,可以使用 Spring 的RetryTemplate来实现重试逻辑。在捕获到异常时,RetryTemplate会按照预设的重试策略进行多次重试,直到操作成功或达到最大重试次数。同时,记录详细的错误日志,包括异常信息、发生时间、涉及的数据等,便于后续排查问题。这样,即使在同步过程中遇到临时的异常,也能最大程度保证数据的完整性和一致性。
(二)性能优化
为了提高数据同步的性能,减少对系统资源的消耗,可以采用批量处理 Canal 消息的方式,减少对 Redis/ES 的频繁写入。可以设置一个合适的批量处理大小,当收集到一定数量的变更消息后,一次性将这些消息写入到 Elasticsearch 中。这样可以减少网络请求次数,提高写入效率。同时,合理配置 Elasticsearch 的索引设置,如分片数量、副本数量等,根据数据量和查询负载进行优化,以提升整体的读写性能。
(三)数据结构
确保 Elasticsearch 的索引 Mapping 与 MySQL 表结构兼容是实现数据准确同步的重要环节。在创建 Elasticsearch 索引时,需要根据 MySQL 表的字段类型、索引情况等,定义合适的 Mapping。例如,对于 MySQL 中的字符串类型字段,在 Elasticsearch 中可能需要根据字段用途选择合适的文本类型或关键字类型,并设置是否进行分词等。对于数字类型、日期类型等字段,也需要在 Elasticsearch 中进行正确的映射配置,以保证数据在同步后能够正确存储和检索。
(四)事务管理
如果业务对数据一致性有较高要求,需要强一致性保证,可以结合本地事务表或消息队列(如 RocketMQ)做可靠投递。当 MySQL 中发生数据变更时,首先将变更信息记录到本地事务表中,并发送一条包含变更信息的消息到消息队列。Canal 在处理数据变更时,从消息队列中获取消息,并将数据同步到 Elasticsearch。同时,在本地事务表中标记该变更已处理。如果在同步过程中出现异常,可以通过查询本地事务表,重新发送未成功同步的消息,确保数据最终一致性。通过这种方式,能够在复杂的分布式环境下,保障 MySQL 与 Elasticsearch 之间的数据一致性。
总结
通过上述步骤,我们详细介绍了在 Spring Boot3 中使用阿里 Canal 实现 MySQL 数据库与 ElasticSearch 数据同步的方法和关键要点。从 Canal 的原理和优势,到 MySQL 和 Spring Boot 项目的配置,再到同步过程中的异常处理、性能优化、数据结构适配以及事务管理等方面,为互联网软件开发人员提供了一套完整的技术解决方案。在实际应用中,开发者可以根据具体的业务需求和场景,对这些技术进行灵活调整和优化,以构建高效、可靠的数据同步系统,满足不断变化的业务需求。希望本文能够对大家在相关技术领域的实践和探索有所帮助,助力大家在互联网软件开发的道路上取得更好的成果。
相关推荐
- 戴尔笔记本电脑一开机就蓝屏
-
笔记本蓝屏可能是电脑硬盘故障,可以更换一个硬盘尝试。也可能是更新了驱动与修复漏洞补丁,可以进入安全模式将更新的驱动删除。有可能是内存条故障,可以把内存条取下来,用橡皮擦轻轻擦拭金手指,然后用毛刷将内存...
- 优酷路由宝怎么设置(优酷路由宝怎么设置网络)
-
无线连接如果准备用手机、笔记本电脑来设置优酷路由宝,需要先把WAN口,连接宽带网线(宽带猫、光猫);然后手机/笔记本电脑搜索连接到优酷路由宝的WiFi。优酷路由宝的默认WiFi名称是:Youku_开...
- 一键装机软件大全(一键装机下载)
-
1一键装机工具是一种自动化安装计算机操作系统以及常用软件的工具。2使用一键装机工具,需要先准备好需要安装的操作系统镜像和需要安装的软件列表,然后将它们放在一键装机工具所指定的位置。接下来,打开一键...
- home键是什么意思苹果手机(home键是苹果手机哪个键)
-
就是手机屏幕正下方的那个圆形的按钮,就是苹果手机的home键,home键的作用比较大,可以用来设置指纹解锁,单机home键可以返回主屏幕界面,双击home键可以弹出后台应用程序可以进行清楚,还可以通过...
- tplink说明书图片(tp-link路由器说明书步骤图)
-
第一步连接路由器WIFI在手机获取IP地址里找到路由器网关地址,第二步在浏览器地址栏输入路由器网关地址,之后会跳转到路由器管理员登录界面,输入账号密码就可以进入路由后台管理路由,如果提示路由器密码错误...
- 如何不安装flash玩4399(现在4399不提供flash如何玩游戏)
-
没有flash是玩不了的,需要开启flash才可以。1、首先打开浏览器,进入4399的游戏页面。2、进入游戏页面后,点击【已被屏蔽】文字。3、然后右上角会出现窗口,点击【管理】按钮。4、进入管理页面后...
- chrome download apk(chromedownloadapk in english)
-
手机下载安装的第三方应用出现问题,无法正常使用,建议按照以下方法操作:1.关闭重新启动该应用。2.建议将此软件卸载重新安装尝试。3.更换其他版本尝试。4.更新下手机系统版本后安装尝试5.备份手机数据(...
-
- qq空间官网手机登录网页版(qq空间官网登陆入口)
-
z.qq.com可以通过以下方式登录手机QQ空间:1、使用手机登录手机腾讯网3g.qq.com,点击“空间”,根据提示QQ号码和QQ密码就可以登录;2、通过手机直接输入手机QQ空间网址z.qq.com,根据提示操作即可登录;3、下载手机Q...
-
2025-12-22 13:55 liuian
- windows11我的电脑在哪里打开
-
1/6通过“开始”进入“设置”-“时间和语言”。2/6在“时间和语言”界面选择“区域”3/6这里我们将区域更改位“新加披”,退出。4/6打开微软自带的市场,搜索“你的手机”获取并下载。5/6安装完成后...
- win10怎么取消开机自启动(win10如何关闭开机自动启动)
-
要关闭Windows10的开机自动启动程序,你可以按下Win+R键,输入"msconfig"并按回车键打开系统配置工具。在"启动"选项卡中,你可以看到所有开机自动...
- 手机cpu排名2025(手机cpu排名榜)
-
一、2022手机CPU性能综合排名前八名手机CPU:1、型号:苹果A16---综合分数:暂无2、型号:骁龙8gen1---综合分数:42333、联发科天玑9000---综合分数:38724、...
- 论坛系统(论坛系统数据流图)
-
BBS是电子布告栏系统的简称,一种网站系统,也是目前流行网络论坛的前身。它允许用户使用终端程序通过调制解调器拨接或者因特网来进行连接,BBS站台提供布告栏、分类讨论区、新闻阅读、软件下载与上传、游戏、...
- hp1020plus打印机无法打印(惠普1020plus打印机突然不能打印了)
-
删除惠普打印机驱动和软件:1.如果你的打印机已通过USB连接到电脑,断开USB连接;2.打开控制面板—程序和功能(卸载或更改应用程序);3.在软件列表中找到惠普打印机,将其卸载;4.重启电脑...
- wifi密码破解器电脑版(wifi密码破解工具电脑版)
-
肯定不是万能钥匙这种“破解”wifi的东西。不是一两次见到把万能钥匙当做破解wifi用的人了,但实际上那玩意就是个分享wifi的软件。你连上一个wifi,密码就会被分享到云端(可以不分享),别...
- 手机临时文件夹在哪个位置(手机临时文件夹在哪个位置找)
-
1.手机文件临时文件是指在手机使用过程中产生的临时文件。2.手机应用程序在运行时需要产生一些临时文件,如缓存文件、日志文件、临时下载文件等,这些文件可以提高应用程序的运行效率和用户体验。但是,这些...
- 一周热门
- 最近发表
- 标签列表
-
- python判断字典是否为空 (50)
- crontab每周一执行 (48)
- aes和des区别 (43)
- bash脚本和shell脚本的区别 (35)
- canvas库 (33)
- dataframe筛选满足条件的行 (35)
- gitlab日志 (33)
- lua xpcall (36)
- blob转json (33)
- python判断是否在列表中 (34)
- python html转pdf (36)
- 安装指定版本npm (37)
- idea搜索jar包内容 (33)
- css鼠标悬停出现隐藏的文字 (34)
- linux nacos启动命令 (33)
- gitlab 日志 (36)
- adb pull (37)
- python判断元素在不在列表里 (34)
- python 字典删除元素 (34)
- vscode切换git分支 (35)
- python bytes转16进制 (35)
- grep前后几行 (34)
- hashmap转list (35)
- c++ 字符串查找 (35)
- mysql刷新权限 (34)
