canal1.1.5 创建工程、获取增删改数据、打印到控制台

模式

本案例是 tcp 模式,编写代码(即:客户端),连接cancel服务器端,获取MySQL binlog中的数据,转换成Json格式,打印到控制台

配置方式

打开 conf/canal.properties,修改成下面配置:

canal.serverMode = tcp

创建 Maven 工程

pom.xml

<dependencies>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.5</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.protocol</artifactId>
        <version>1.1.5</version>
    </dependency>
</dependencies>

java代码

package std;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.*;

public class CannalClient  {

    private final static int BATCH_SIZE = 1000;
    private final static String HOSTNAME="hadoop1";
    private final static int PORT=11111;
    private final static String SUBSCRIBE="scott.*";
    private final static String DESTINATION="example";

    public static void main(String[] args) {
        CannalClient client = new CannalClient();
        client.run();
    }
    public void run(){
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(HOSTNAME, PORT),
                DESTINATION,
                "",
                "");
        try {
            //打开连接
            connector.connect();
            //订阅数据库表,全部表
//            connector.subscribe(".*\\..*");
            connector.subscribe(SUBSCRIBE);
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
//            connector.rollback();
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(BATCH_SIZE);
                //获取批量ID
                long batchId = message.getId();
                //获取批量的数量
                int size = message.getEntries().size();
                //如果没有数据
                if (batchId == -1 || size == 0) {
                    //线程休眠2秒
                    Thread.sleep(2000);
                } else {
                    //如果有数据,处理数据
                    handle(message.getEntries());
                }
                //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }
    /**
     * 打印canal server解析binlog获得的实体类信息
     */
    private void handle(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                //开启/关闭事务的实体类型,跳过
                continue;
            }
            //RowChange对象,包含了一行数据变化的所有特征
            //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
            RowChange rowChage;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }
            //获取操作类型:insert/update/delete类型
            EventType eventType = rowChage.getEventType();
            //打印Header信息
//            System.out.println(String.format("================》binlog[%s:%s]  ",
//                    entry.getHeader().getLogfileName(),
//                    entry.getHeader().getLogfileOffset()));

            //判断是否是DDL语句
            if (rowChage.getIsDdl()) {
                System.out.println("================》ddl sql:" + rowChage.getSql());
            }
            handleRow(rowChage.getRowDatasList(),eventType,entry.getHeader());
        }
    }
    private void handleRow( List<RowData> list,EventType eventType,Header header){

        String schemaName = header.getSchemaName();
        String tableName = header.getTableName();

        Map beforeRowData = new HashMap();
        Map afterRowData = new HashMap();
        //获取RowChange对象里的每一行数据,打印出来
        for (RowData rowData : list) {
            //如果是删除语句
            if (eventType == EventType.DELETE) {
                handleColumns(rowData.getBeforeColumnsList(),beforeRowData);
                //如果是新增语句
            } else if (eventType == EventType.INSERT) {
                handleColumns(rowData.getAfterColumnsList(),afterRowData);
                //如果是更新的语句
            } else {
                //变更前的数据
                handleColumns(rowData.getBeforeColumnsList(),beforeRowData);
                //变更后的数据
                handleColumns(rowData.getAfterColumnsList(),afterRowData);
            }

            String str = String.format("changeData==>tableName:%s.%s--eventType:%s--before:%s--after:%s",
                    schemaName,tableName,eventType,beforeRowData,afterRowData);
            System.out.println(str);
        }
    }
    private void handleColumns(List<Column> columns, Map map) {
        for (Column column : columns) {
            map.put(column.getName(),column.getValue());
        }
    }
}

测试

增加

执行下面sql:

INSERT INTO `scott`.`dept`(`deptno`, `dname`, `loc`) VALUES (60, '测试部', 'NEW YORK');

控制台打印如下:

changeData==>tableName:scott.dept--eventType:INSERT--before:{}--after:{"loc":"NEW YORK","dname":"测试部","deptno":"60"}

修改

执行下面sql:

update `scott`.`dept` set loc='伦敦' where deptno = 60

控制台打印如下:

changeData==>tableName:scott.dept--eventType:UPDATE--before:{"loc":"NEW YORK","dname":"测试部","deptno":"60"}--after:{"loc":"伦敦","dname":"测试部","deptno":"60"}

删除

执行下面sql:

delete from scott.dept where deptno = 60

控制台打印如下:

changeData==>tableName:scott.dept--eventType:DELETE--before:{"loc":"伦敦","dname":"测试部","deptno":"60"}--after:{}

一条sql影响多行

执行下面sql:

INSERT INTO `scott`.`dept`(`deptno`, `dname`, `loc`) VALUES (61, '人力部', '广州'),(62, '开发部', '天津');

控制台打印如下:

changeData==>tableName:scott.dept--eventType:INSERT--before:{}--after:{"loc":"广州","dname":"人力部","deptno":"61"}
changeData==>tableName:scott.dept--eventType:INSERT--before:{}--after:{"loc":"天津","dname":"开发部","deptno":"62"}

原文出处:https://malaoshi.top/show_1IX3Y1Q3p7ta.html