Skip to content

使用jar包配置启动

以Kafka为源端同步至数据库,仅能解析由Binlog的变更组成的Kafka消息队列。除了cfg.properties基础配置需要修改之外,还需要修改Kafka的消费者和生产者,对应的配置文件分别为consumer.properties 和 producer.properties。还需要配置目标端数据库信息dbcfg.properties。

配置cfg.properties

配置参数说明
#type in (db,Kafka,file)-
source.type=db源端类型
aim.type=db目标端类型
#binlog source config源端数据库配置信息
source.log.db=db待同步库名
source.db.usr=usr_sod源端用户名
source.db.pwd=password源端用户名密码(通过Encryption.jar加密后的密码)
source.db.port=12345源端端口号
source.log.suber.name=sync2同步使用的订阅名
source.log.filter.T=null同步过滤表达式,过滤指定表。格式:模式名:表名,多个表之间以&符号分割。例:s1:t1&s2:t2。(均使用英文字符)
source.log.filter.V=null同步过滤表达式,过滤指定视图
source.log.filter.P=null同步过滤表达式,过滤指定存储过程
source.log.filter.F=null同步过滤表达式,过滤指定存储函数
source.log.forward=true与source.log.filter配合使用,若为true,则同步source.log.filter中所配置的表。若为false,则不同步source.log.filter中所配置的表。默认为true
source.log.partion=8数据库日志分区
source.log.fetch.size=20同步每次拉取日志大小
source.log.ips=127.0.0.1单事务最大行数
source.commit_rows=1000同步过滤表达式,过滤指定存储函数
ddl.support=false是否进行 DDL 同步
redo.offset.server=true是否启用数据库端订阅者偏移量
reconnect.num=-1数据库重连次数,为-1则为无限重连
disable.binlog=falsetrue目标端binlog不记录同步的数据,false为记录
isMax.binlog=falsefalse从binlog原始位置开始读取,true从最新的文件号的0偏移量开始读取
# Kafka source config源端Kafka配置
source.Kafka.parath=8每个topic的分区数
source.topic=test.d2.tclob\\test.u2.t1需要同步的topic名,以”\\”分隔
#to Kafka写入 Kafka 消息中间件配置
zk.connect=192.168.2.225:2181zookeeper ip
topic.partition=8Kafka 每个 topic 的分区数
repeat.pos.Kafka=true是否依赖 Kafka 数据进行断点续传过滤重复
case.sensitive=true用于同步至Kafka时创建topic的大小写控制。若为true,则创建的topic名大小写按照源端库实际大小写创建。若为false,则创建的topic名全为小写
#to db写入数据库
writer.thd.num=8同步时写入线程数(建议和source.log.partion 相同)
writer.db=xugu目标库类型(xugu,oracle,mysql)
writer.schema=SYSDBA指定目标端模式名(严格区分大小写)
writer.parathd.num=1入库并发数
# to HttpEI信息
url.ei=针对天境EI,可不填
task.id=针对天境EI,可不填
task.name=针对天境EI,可不填

说明:

以Kafka为源端同步至数据库需要修改cfg.properties中的目的端数据库信息和Kafka中间件信息。配置aim.type参数为db,source.type为Kafka。源端数据库信息 (# binlog source config) 无需填写或者修改。然后,Kafka 根据生产环境的配置,将zk.connect修改为zookeeper的IP端口。Kafka源端配置信息(# Kafka source config),topic.partition填写kafak分区数,source.topic填写需要同步的topic名。

配置producer.properties

配置参数说明
bootstrapServers=ip1:port1,ip2:port2,ip3:port3Kafka 连接信息按需配置
compressionType=none建议保持默认
lingerMs=10建议保持默认
acks=all建议保持默认
retries=8建议保持默认
batchSize=163840建议保持默认
bufferMemory=33554432建议保持默认
enableIdempotence=true建议保持默认
key.serializer=org.apache.Kafka.common.serialization.StringSerializer必须保持默认
value.serializer=org.apache.Kafka.common.serialization.ByteArraySerializer必须保持默认
partitioner.class=com.sync.util.BinlogPartitioner必须保持默认

说明:

按照实际部署情况,将bootstrap.servers修改为Kafka集群的IP和端口,其余参数保持默认即可。

配置consumer.properties

配置参数说明
bootstrapServers=ip1:port1,ip2:port2,ip3:port3Kafka 连接信息按需配置
groupId=getendpos若同步工具配置页面的正向同步过滤选项为是,则是选择需要同步的,为否则是选择不需要同步的
autoOffsetReset=earliest建议保持默认
enableAutoCommit=false建议保持默认
autoCommitIntervalMs=1000建议保持默认
sessionTimeoutMs=30000建议保持默认
maxPartitionFetchBytes=10485760建议保持默认
maxPollRecords=10建议保持默认
maxPollIntervalMs=300000建议保持默认
key.deserializer=org.apache.Kafka.common.serialization.StringDeserializer必须保持默认
value.deserializer=org.apache.Kafka.common.serialization.ByteArrayDeserializer必须保持默认

配置dbcfg.properties

  • 同步至虚谷数据库

    库信息配置参数说明
    目标库连接信息target-db.user=SYSDBA-
    target-db.password=SYSDBA通过 Encryption.jar 加密后的密码
    target-db.driverClass=com.xugu.cloudjdbc.Driver-
    target-db.jdbcUrl=jdbc:xugu://192.168.2.222:9930/dcc-
  • 同步至MySQL数据库

    库信息配置参数说明
    目标库连接信息target-db.user=SYSDBA-
    target-db.password=SYSDBA通过 Encryption.jar 加密后的密码
    target-db.driverClass=com.mysql.cj.jdbc.Driver-
    target-db.jdbcUrl=jdbc:mysql://192.168.2.85:3306/d2?useUnicode=true&characterEncoding=utf-8-
  • 同步至Oracle数据库

    库信息配置参数说明
    目标库连接信息target-db.user=u2-
    target-db.password=MTIzNDU2通过 Encryption.jar 加密后的密码
    target-db.driverClass=oracle.jdbc.driver.OracleDriver-
    target-db.jdbcUrl=jdbc:oracle:thin:@//192.168.2.17:1521/orcl-

启动程序

待所有配置修改完成,检查无误后启动即可。

  • 启动命令:

    ./xugusyn.sh start
  • 查看是否启动同步软件命令:

    ./xugusyn.sh status
  • 停止同步软件命令:

    ./xugusyn.sh stop