Canal使用手册
canal
是阿里巴巴mysql数据库binlog的增量订阅&消费组件 。阿里云DRDS、阿里巴巴TDDL 二级索引、小表复制powerd by canal.
工作原理
原理相对比较简单:
- canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
- mysql master收到dump请求,开始推送binary log给slave(也就是canal)
- canal解析binary log对象(原始为byte流)
快速开始
mysql
配置
由于canal
的原理是基于mysql binlog技术,所以mysql 必须开启 binlog 写入功能。建议 binlog 模式为 row。
查看是否已开启binlog
:
show variables like 'log_%';
show variables like 'bin%';
配置:
[mysqld]
log-bin=mysql-bin
binlog-format=row
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
如果是从从同步,那需要加上配置项:
[mysqld]
log_slave_updates=1
canal 的原理是模拟自己为mysql slave,所以这里一定要做 mysql slave 的相关权限。
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, SHOW VIEW, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; 需要具有SHOW VIEW 权限
关于 MySql 主从同步可参考:MySQL主从同步配置
启动步骤
下载与安装
直接访问地址https://github.com/alibaba/canal/releases,列出所有的下载包,此处使用v1.1.2
作为示例:
wget https://github.com/alibaba/canal/releases/download/canal-1.1.2/canal.deployer-1.1.2.tar.gz
解压:
mkdir /tmp/canal
tar zxvf canal.deployer-1.1.2.tar.gz -C /tmp/canal
配置修改
应用参数:
vim conf/example/instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\..
#binlog 下的所有表
# binlog\\..*
#binlog 下指定表
# binlog\\.cal
kafka 配置:
# mq config
canal.mq.topic=canal.example
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=10
#canal.mq.partitionHash=.*\\..*
#散列规则定义 库名.表名 : 唯一主键,比如mytest.person: id 1.1.3版本支持新语法,见下文
#使用 .*\\..*时,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)
说明:
启动
sh bin/startup.sh
查看日志
vim logs/canal/canal.log
实例日志:
vim logs/example/example.log
关闭服务
sh bin/stop.sh
适配器
文档:客户端适配器
canal 1.1.1版本之后, 增加客户端数据落地的适配及启动功能, 目前支持功能:
- 客户端启动器同步管理REST接口
- 日志适配器, 作为DEMO
- 关系型数据库的数据同步(表对表同步), ETL功能
- HBase的数据同步(表对表同步), ETL功能(后续支持)
- ElasticSearch多表数据同步,ETL功能
与 Spring Boot 结合使用
项目准备
引用 canal
dependencies {
// https://mvnrepository.com/artifact/com.alibaba.otter/canal.client
compile('com.alibaba.otter:canal.client:1.1.2')
}
canal 客户端:
/**
* @author tangfan 2018/12/17 14:22
*/
@Component
public class CanalClient {
private static final Logger logger = LoggerFactory.getLogger(CanalClient.class);
private CanalService canalService;
private DatabaseProperties.CanalConfig canalConfig;
private CanalConnector canalConnector;
private boolean started;
@Autowired
public CanalClient(CanalService canalService, DatabaseProperties databaseProperties) {
this.canalService = canalService;
this.canalConfig = databaseProperties.getCanal();
}
@PostConstruct
public void start() {
if (canalConfig != null && StringUtils.isNotEmpty(canalConfig.getIp())) {
// 创建链接
this.canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getIp(), canalConfig.getPort()), canalConfig.getDestination(), canalConfig.getUsername(), canalConfig.getPassword());
logger.info("同步任务初始化完成。Canal 配置:{}", canalConfig);
if (!this.started) {
canalConnector.connect();
canalConnector.subscribe(this.canalConfig.getSubscribe());
canalConnector.rollback();
this.started = true;
}
}
}
@PreDestroy
public void stop() {
if (this.started) {
try {
canalConnector.disconnect();
} catch (CanalClientException exp) {
logger.warn("canal 客户端停止异常", exp);
}
this.started = false;
}
}
/**
* 执行一次
*/
public void handle() {
int batchSize = canalConfig.getBatchSize();
if (batchSize < 0 || batchSize >= 9999) {
throw new CleanServiceException(ExceptionCode.ARGUMENT_ERROR, "同步获取批次参数设置超过界限");
}
if (this.canalConfig.getInterval() == null || this.canalConfig.getInterval().toMillis() <= 0) {
throw new CleanServiceException(ExceptionCode.ARGUMENT_ERROR, "同步获取批次参数设置时间间隔超过界限");
}
long batchId;
Message message = null;
try {
message = canalConnector.getWithoutAck(batchSize);
} catch (Exception exp) {
logger.error("获取canal数据失败", exp);
}
if (message != null) {
batchId = message.getId();
int size = message.getEntries().size();
if (batchId != -1 && size != 0) {
try {
saveEntry(batchId, message.getEntries());
// 提交确认
canalConnector.ack(batchId);
} catch (Exception exp) {
logger.error("canal service 处理失败,batchId:{}", batchId, exp);
// 处理失败, 回滚数据
if (batchId > 0) {
canalConnector.rollback(batchId);
logger.info("回滚批次:{},共计:{}项。", batchId, size);
}
}
}
}
}
/**
* 保存同步实体
*
* @param entrys
*/
private void saveEntry(long batchId, List<CanalEntry.Entry> entrys) {
if (entrys != null) {
logger.info("批次:{}, 确认数量:{}项", batchId, entrys.size());
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
if (rowChage != null) {
CanalEntry.EventType eventType = rowChage.getEventType();
if (rowChage.getRowDatasList() != null) {
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
switch (eventType) {
case INSERT:
canalService.insert(afterColumnsList, schemaName, tableName);
break;
case UPDATE:
canalService.update(afterColumnsList, schemaName, tableName);
break;
case DELETE:
canalService.delete(afterColumnsList, schemaName, tableName);
break;
default:
logger.info("未知操作类型:{}", eventType);
break;
}
}
}
}
}
}
}
}