littlebot
Published on 2025-04-03 / 0 Visits
0

【源码】基于Java的Canal数据库同步系统

项目简介

Canal是一个基于数据库增量日志解析的开源项目,其核心功能是实现数据库的增量数据订阅和消费。该系统通过解析MySQL的二进制日志(binlog),提供实时的数据同步能力,支持插入、更新和删除等多种数据库操作。Canal可广泛应用于数据库镜像、实时备份、多级索引、搜索构建、业务缓存刷新和重要业务消息推送等场景。

项目的主要特性和功能

  1. 增量数据订阅与消费:解析MySQL的binlog,实时获取数据库增量变更数据并同步到其他系统。
  2. 多种数据库操作支持:支持插入、更新、删除等操作的同步。
  3. 高可用性:借助ZooKeeper实现高可用,保障数据同步的稳定可靠。
  4. 灵活的过滤机制:支持基于正则表达式的数据过滤,可按需定制同步数据范围。
  5. 多种部署模式:支持单机、集群和混合模式,适应不同规模和复杂度的应用场景。
  6. 丰富的API和扩展性:提供多种API和扩展点,便于开发者根据业务需求定制扩展。

安装使用步骤

前提条件

  • 已安装Java环境(JDK 8或更高版本)。
  • 已安装MySQL数据库,并启用了binlog。
  • 若使用集群模式,需已安装ZooKeeper。

安装步骤

  1. 下载Canalbash cd canal
  2. 配置Canal
  3. 编辑conf/canal.properties文件,配置Canal基本参数,如ZooKeeper地址、MySQL连接信息等。
  4. 编辑conf/instance.properties文件,配置具体实例参数,如数据库连接信息、过滤规则等。
  5. 启动Canalbash ./bin/startup.sh
  6. 配置客户端
  7. 根据业务需求,配置Canal客户端,订阅指定数据库和表的变更数据。
  8. 使用Canal提供的API,编写客户端代码处理接收到的增量数据。
  9. 运行客户端: 启动客户端程序,开始接收和处理Canal同步的增量数据。

使用示例

以下是一个简单的Canal客户端示例,用于订阅MySQL数据库的增量数据: ```java import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message;

public class CanalClientExample { public static void main(String[] args) { String destination = "example"; String ip = "127.0.0.1"; int port = 11111;

    CanalConnector connector = CanalConnectors.newSingleConnector(ip, port, destination, "", "");
    connector.connect();
    connector.subscribe(".*\\..*");

    while (true) {
        Message message = connector.getWithoutAck(100);
        long batchId = message.getId();
        int size = message.getEntries().size();

        if (batchId == -1 || size == 0) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            printEntry(message.getEntries());
        }

        connector.ack(batchId);
    }
}

private static void printEntry(List<CanalEntry.Entry> entries) {
    for (CanalEntry.Entry entry : entries) {
        if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
            CanalEntry.RowChange rowChange = null;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            CanalEntry.EventType eventType = rowChange.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }
}

private static void printColumn(List<CanalEntry.Column> columns) {
    for (CanalEntry.Column column : columns) {
        System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
    }
}

} ```

注意事项

  • 确保MySQL的binlog格式设置为ROW模式,以便Canal能够正确解析。
  • 在集群模式下,确保ZooKeeper集群正常运行,并正确配置Canal的ZooKeeper地址。
  • 根据业务需求,合理配置Canal的过滤规则,避免不必要的性能开销。

下载地址

点击下载 【提取码: 4003】【解压密码: www.makuang.net】