canal1.1.5 将MySQL变化数据发送到kafka(单机模式) 作者:马育民 • 2022-06-23 21:07 • 阅读:10067 # 模式 需要修改成 `kafka` 模式 [![](/upload/0/0/1IX4Zwjeo8SM.png)](/upload/0/0/1IX4Zwjeo8SM.png) # 修改配置文件 ``` cd /program/canal.deployer-1.1.5 ``` ### canal.properties ``` vim conf/canal.properties ``` 修改成下面配置: ``` # kafka 模式 canal.serverMode = kafka # 配置kafka服务器ip、port kafka.bootstrap.servers = hadoop1:9092 ``` ### instance.properties ``` vim conf/example/instance.properties ``` 修改成下面配置: ``` # 唯一id canal.instance.mysql.slaveId=100 # MySQL的ip、port canal.instance.master.address=hadoop1:3306 # MySQL用户名 canal.instance.dbUsername=canal # MySQL密码 canal.instance.dbPassword=canal # kafka的topic canal.mq.topic=canal ``` # 重启canal ``` cd /program/canal.deployer-1.1.5 ``` ``` bin/stop.sh ``` ``` bin/startup.sh ``` # 启动 zookeeper 见 [zookeeper-3.4.x 单机启动、停止服务、查看服务状态、jps](https://www.malaoshi.top/show_1IX3Utf2nunB.html "zookeeper-3.4.x 单机启动、停止服务、查看服务状态、jps") # 启动 kafka 见 [kafka_2.12-2.4.x-启动、停止(单机)](https://www.malaoshi.top/show_1IX3UtnlefFt.html "kafka_2.12-2.4.x-启动、停止(单机)") # 启动 kafka 客户端 ### Offset Explorer(推荐) 操作见 [kafka可视化工具 offset explorer](https://www.malaoshi.top/show_1IX3UtZDOooc.html "kafka可视化工具 offset explorer") **注意:** 由于MySQL没有数据变化,所以还没有 `canal` 主题 ### 自带客户端 ``` cd cd /program/kafka_2.12-2.4.1/ ``` ``` bin/kafka-console-consumer.sh --bover hadoop1:9092 --topic canal ``` **注意:** 由于MySQL没有数据变化,所以还没有 `canal` 主题 # 测试 ### 增加 执行下面sql: ``` INSERT INTO `scott`.`dept`(`deptno`, `dname`, `loc`) VALUES (60, '测试部', 'NEW YORK'); ``` kafka接收数据如下: ``` { "data": [ # 变化数据,数组类型 { "deptno": "60", "dname": "测试部", "loc": "NEW YORK" } ], "database": "scott", # 数据库名 "es": 1656028167000, "id": 3, "isDdl": false, "mysqlType": { "deptno": "int(4)", "dname": "varchar(14)", "loc": "varchar(13)" }, "old": null, "pkNames": [ "deptno" ], "sql": "", "sqlType": { "deptno": 4, "dname": 12, "loc": 12 }, "table": "dept", # 表名 "ts": 1656028168060, "type": "INSERT" # 操作类型 } ``` ### 修改 执行下面sql: ``` update `scott`.`dept` set loc='伦敦' where deptno = 60 ``` kafka接收数据如下: ``` { "data": [ # 新数据,数组类型 { "deptno": "60", "dname": "测试部", "loc": "伦敦" } ], "database": "scott", # 数据库名 "es": 1656028259000, "id": 4, "isDdl": false, "mysqlType": { "deptno": "int(4)", "dname": "varchar(14)", "loc": "varchar(13)" }, "old": [ # 旧数据 { "loc": "NEW YORK" } ], "pkNames": [ "deptno" ], "sql": "", "sqlType": { "deptno": 4, "dname": 12, "loc": 12 }, "table": "dept", # 表名 "ts": 1656028259585, "type": "UPDATE" # 操作类型 } ``` ### 删除 执行下面sql: ``` delete from scott.dept where deptno = 60 ``` kafka接收数据如下: ``` { "data": [ # 变化数据,数组类型 { "deptno": "60", "dname": "测试部", "loc": "伦敦" } ], "database": "scott", # 数据库名 "es": 1656028325000, "id": 5, "isDdl": false, "mysqlType": { "deptno": "int(4)", "dname": "varchar(14)", "loc": "varchar(13)" }, "old": null, "pkNames": [ "deptno" ], "sql": "", "sqlType": { "deptno": 4, "dname": 12, "loc": 12 }, "table": "dept", # 表名 "ts": 1656028325542, "type": "DELETE" # 操作类型 } ``` ### 一条sql影响多行 执行下面sql: ``` INSERT INTO `scott`.`dept`(`deptno`, `dname`, `loc`) VALUES (61, '人力部', '广州'),(62, '开发部', '天津'); ``` kafka接收数据如下: ``` { "data": [ # 变化数据,数据类型,还有多个变化数据 { "deptno": "61", "dname": "人力部", "loc": "广州" }, { "deptno": "62", "dname": "开发部", "loc": "天津" } ], "database": "scott", # 数据库名 "es": 1656028504000, "id": 6, "isDdl": false, "mysqlType": { "deptno": "int(4)", "dname": "varchar(14)", "loc": "varchar(13)" }, "old": null, "pkNames": [ "deptno" ], "sql": "", "sqlType": { "deptno": 4, "dname": 12, "loc": 12 }, "table": "dept", # 表名 "ts": 1656028505186, "type": "INSERT" # 操作类型 } ``` 原文出处:http://malaoshi.top/show_1IX3YOJAyBHT.html