1 集群模式

  mysql 查看分区_分区查看为0_分区查看软件

  图中 server 对应一个 canal 运行实例 ,对应一个 JVM 。

  server 中包含 1..n 个 instance , 我们可以将 instance 理解为配置任务。

  instance 包含如下模块 :

  真实场景中,canal 高可用依赖 zookeeper ,笔者将客户端模式可以简单划分为:TCP 模式 和 MQ 模式 。

  实战中我们经常会使用 MQ 模式 。因为 MQ 模式的优势在于解耦 ,canal server 将数据变更信息发送到消息队列 kafka 或者 RocketMQ ,消费者消费消息,顺序执行相关逻辑即可。

  顺序消费:

  对于指定的一个 Topic ,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。

  分区查看为0_mysql 查看分区_分区查看软件

  2 MySQL配置

  1、对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

  

[mysqld]    log-bin=mysql-bin # 开启 binlog    binlog-format=ROW # 选择 ROW 模式    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复    

  注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步。

  2、授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant 。

  

CREATE USER canal IDENTIFIED BY 'canal';      GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;    FLUSH PRIVILEGES;    

  3、创建数据库商品表 t_product 。

  

CREATE TABLE t_product (     id BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,     name VARCHAR ( 255 ) COLLATE utf8mb4_bin NOT NULL,     price DECIMAL ( 10, 2 ) NOT NULL,     status TINYINT ( 4 ) NOT NULL,     create_time datetime NOT NULL,     update_time datetime NOT NULL,       PRIMARY KEY ( id )     ) ENGINE = INNODB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_bin    

  3 Elasticsearch配置

  使用 Kibana 创建商品索引 。

  

PUT /t_product    {        "settings": {            "number_of_shards": 2,            "number_of_replicas": 1        },        "mappings": {                "properties": {                   "id": {                        "type":"keyword"                    },                    "name": {                        "type":"text"                    },                    "price": {                        "type":"double"                    },                    "status": {                        "type":"integer"                    },                    "createTime": {                        "type": "date",                        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"                    },                    "updateTime": {                        "type": "date",                        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"                    }            }        }    }    

  执行完成,如图所示 :

  mysql 查看分区_分区查看软件_分区查看为0

  4 RocketMQ 配置

  创建主题:product-syn-topic ,canal 会将 Binlog 的变化数据发送到该主题。

  分区查看软件_mysql 查看分区_分区查看为0

  分区查看为0_分区查看软件_mysql 查看分区

  5 canal 配置

  我们选取 canal 版本 1.1.6 ,进入 conf 目录。

  1、配置 canal.properties

  

#集群模式 zk地址    canal.zkServers = localhost:2181    #本质是MQ模式和tcp模式 tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ    canal.serverMode = rocketMQ    #instance 列表    canal.destinations = product-syn    #conf root dir    canal.conf.dir = ../conf    #全局的spring配置方式的组件文件 生产环境,集群化部署    canal.instance.global.spring.xml = classpath:spring/default-instance.xml        ######  以下部分是默认值 展示出来     # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)    canal.mq.canalBatchSize = 50    # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时    canal.mq.canalGetTimeout = 100    # 是否为 flat json格式对象    canal.mq.flatMessage = true    

  2、instance 配置文件

  在 conf 目录下创建实例目录 product-syn , 在 product-syn 目录创建配置文件 :instance.properties。

  

#  按需修改成自己的数据库信息    #################################################    ...    canal.instance.master.address=192.168.1.20:3306    # username/password,数据库的用户名和密码    ...    canal.instance.dbUsername = canal    canal.instance.dbPassword = canal    ...        # table regex     canal.instance.filter.regex=mytest.t_product        # mq config    canal.mq.topic=product-syn-topic    # 针对库名或者表名发送动态topic    #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*    canal.mq.partition=0    # hash partition config    #canal.mq.partitionsNum=3    #库名.表名: 唯一主键,多个表之间用逗号分隔    #canal.mq.partitionHash=mytest.person:id,mytest.role:id    #################################################    

  3、服务启动

  启动两个 canal 服务,我们从 zookeeper gui 中查看服务运行情况 。

  分区查看软件_分区查看为0_mysql 查看分区

  修改一条 t_product 表记录,可以从 RocketMQ 控制台中观测到新的消息。

  分区查看为0_mysql 查看分区_分区查看软件

  6 消费者

  1、产品索引操作服务

  分区查看为0_mysql 查看分区_分区查看软件

  2、消费监听器

  分区查看为0_分区查看软件_mysql 查看分区

  消费者逻辑重点有两点:

  7 写到最后

  canal 是一个非常有趣的开源项目,很多公司使用 canal 构建数据传输服务( Data Transmission Service ,简称 DTS ) 。

  推荐大家阅读这个开源项目,你可以从中学习到网络编程、多线程模型、高性能队列 Disruptor、 流程模型抽象等。

  这篇文章涉及到的代码已收录到下面的工程中,有兴趣的同学可以一看。

  mysql 查看分区_分区查看软件_分区查看为0

  END

  Java项目训练营

  我开通了,已经有不少消息推送平台项目股东拿了阿里/vivo等大厂offer了。我是没找到网上有跟我提供相同的服务,价格还比我低的。

  一对一周到的服务:有很多人的自学能力和基础确实不太行,不知道怎么开始学习,从哪开始看起,学习项目的过程中会走很多弯路,很容易就迷茫了。付费最跟自学最主要的区别就是我的服务会更周到。我会告诉你怎么开始学这个开源项目,哪些是重点需要掌握的,如何利用最短的时间把握整个系统架构和编码的设计,把时间节省下来去做其他事情。学习经验/路线/简历编写/面试经验知无不言

  本地直连远程服务:生产环境的应用系统肯定会依赖各种中间件,我专门买了两台服务器已经搭建好必要的环境,在本地就可以直接启动运行体验和学习,无须花额外的时间自行搭建调试。

  细致的文档&视频:巨细致的语雀文档11W+字,共106个文档,项目视频还在持续制作更新中(20个),不怕你学不会。

  付费社群:优质的社群里需筛选过滤,学习氛围是很重要的,多跟同辈或前辈聊聊,会少走很多弯路

  清爽干练commit:专属股东仓库,一步一步从零复现austin,每个commit都带着文档&视频学习。

  如果想获取上面的权益,可以看看

最后修改:2024 年 07 月 18 日
如果觉得我的文章对你有用,请随意赞赏