项目简介
Canal是一个基于数据库增量日志解析的开源项目,其核心功能是实现数据库的增量数据订阅和消费。该系统通过解析MySQL的二进制日志(binlog),提供实时的数据同步能力,支持插入、更新和删除等多种数据库操作。Canal可广泛应用于数据库镜像、实时备份、多级索引、搜索构建、业务缓存刷新和重要业务消息推送等场景。
项目的主要特性和功能
- 增量数据订阅与消费:解析MySQL的binlog,实时获取数据库增量变更数据并同步到其他系统。
- 多种数据库操作支持:支持插入、更新、删除等操作的同步。
- 高可用性:借助ZooKeeper实现高可用,保障数据同步的稳定可靠。
- 灵活的过滤机制:支持基于正则表达式的数据过滤,可按需定制同步数据范围。
- 多种部署模式:支持单机、集群和混合模式,适应不同规模和复杂度的应用场景。
- 丰富的API和扩展性:提供多种API和扩展点,便于开发者根据业务需求定制扩展。
安装使用步骤
前提条件
- 已安装Java环境(JDK 8或更高版本)。
- 已安装MySQL数据库,并启用了binlog。
- 若使用集群模式,需已安装ZooKeeper。
安装步骤
- 下载Canal:
bash cd canal
- 配置Canal:
- 编辑
conf/canal.properties
文件,配置Canal基本参数,如ZooKeeper地址、MySQL连接信息等。 - 编辑
conf/instance.properties
文件,配置具体实例参数,如数据库连接信息、过滤规则等。 - 启动Canal:
bash ./bin/startup.sh
- 配置客户端:
- 根据业务需求,配置Canal客户端,订阅指定数据库和表的变更数据。
- 使用Canal提供的API,编写客户端代码处理接收到的增量数据。
- 运行客户端: 启动客户端程序,开始接收和处理Canal同步的增量数据。
使用示例
以下是一个简单的Canal客户端示例,用于订阅MySQL数据库的增量数据: ```java import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message;
public class CanalClientExample { public static void main(String[] args) { String destination = "example"; String ip = "127.0.0.1"; int port = 11111;
CanalConnector connector = CanalConnectors.newSingleConnector(ip, port, destination, "", "");
connector.connect();
connector.subscribe(".*\\..*");
while (true) {
Message message = connector.getWithoutAck(100);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
printEntry(message.getEntries());
}
connector.ack(batchId);
}
}
private static void printEntry(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
CanalEntry.RowChange rowChange = null;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
CanalEntry.EventType eventType = rowChange.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
}
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
} ```
注意事项
- 确保MySQL的binlog格式设置为ROW模式,以便Canal能够正确解析。
- 在集群模式下,确保ZooKeeper集群正常运行,并正确配置Canal的ZooKeeper地址。
- 根据业务需求,合理配置Canal的过滤规则,避免不必要的性能开销。
下载地址
点击下载 【提取码: 4003】【解压密码: www.makuang.net】