问题

生产环境下发现某个数据库中的某张表同步。

由于我们使用的是canal进行数据库的同步。

canal中针对kafka的主要配置如下:

canal.mq.partitionsNum=10
canal.mq.partitionHash=.*\\..*

该配置定义了根据库及表名,发送数据到指定的分区。最大分区数为10。

经过研究发现,未同步的表会将数据发送至分区5。至此发现主要的队列的分区5已不存在。

排查

使用命令查看kafka的分区信息:

./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic

分区信息如结果如下:

Topic:topic     PartitionCount:10       ReplicationFactor:1     Configs:
Topic: topic    Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
Topic: topic    Partition: 1    Leader: 1003    Replicas: 1006,1003     Isr: 1003
Topic: topic    Partition: 2    Leader: 1001    Replicas: 1001  Isr: 1001
Topic: topic    Partition: 3    Leader: 1002    Replicas: 1006,1002     Isr: 1002
Topic: topic    Partition: 4    Leader: 1001    Replicas: 1001  Isr: 1001
Topic: topic    Partition: 5    Leader: -1    Replicas: 1006  Isr: 1006
Topic: topic    Partition: 6    Leader: 1001    Replicas: 1001  Isr: 1001
Topic: topic    Partition: 7    Leader: 1003    Replicas: 1006,1003     Isr: 1003
Topic: topic    Partition: 8    Leader: 1001    Replicas: 1001  Isr: 1001
Topic: topic    Partition: 9    Leader: 1002    Replicas: 1006,1002     Isr: 1002

5号分区的Leader未选举出来。原因是brokers为1006的机器,已禁用kafka服务导致。

解决

  1. 分区信息修改:在zookeeper中查看分区信息,并修改分区至指定的broker,例如1002。
  2. 数据手工迁移:查看1002的机器,并将原1006机器下的分区文件夹拷贝至1002机器下
  3. 重启kafka服务
  4. 重分配分区

分区信息修改

使用zookeeper的命令行zkCli.sh查看分区信息:

get /brokers/topics/topic/partitions/5/state 

结果如下:

{"controller_epoch":66,"leader":-1,"version":1,"leader_epoch":31,"isr":[1006]}

修改其中的leader1003,并将leader_epoch+1。再设置回zookeeper:

set /brokers/topics/topic/partitions/5/state {"controller_epoch":66,"leader":1003,"version":1,"leader_epoch":32,"isr":[1002]}

数据手工迁移

在原1006机器上面的kafka配置中存储的日志目录,查找该分区的5号分区,并使用scp命令拷贝至1003机器上指定的位置。

一般数据的存储都在配置log.dirs配置。

至此重启服务。

重分配分区

编辑文件reassign.json

{"version":1,"partitions":[{"topic":"topic","partition":9,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"topic","partition":6,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"topic","partition":3,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"topic","partition":8,"replicas":[1003,1002],"log_dirs":["any","any"]},{"topic":"topic","partition":0,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"topic","partition":5,"replicas":[1003,1001],"log_dirs":["any","any"]},{"topic":"topic","partition":2,"replicas":[1003,1001],"log_dirs":["any","any"]},{"topic":"topic","partition":7,"replicas":[1002,1003],"log_dirs":["any","any"]},{"topic":"topic","partition":1,"replicas":[1002,1003],"log_dirs":["any","any"]},{"topic":"topic","partition":4,"replicas":[1002,1003],"log_dirs":["any","any"]}]}

使用以下命令进行分区重分配:

./bin/kafka-reassign-partitions.sh --zookeeper bigdata1:2181 --reassignment-json-file ./reassign.json --execute

使用以下命令查看进度:

./bin/kafka-reassign-partitions.sh --zookeeper bigdata1:2181 --reassignment-json-file ./reassign.json --verify

成功后结果如下:

Topic:     PartitionCount:10       ReplicationFactor:2     Configs:
Topic:     Partition: 0    Leader: 1001    Replicas: 1001,1002     Isr: 1001,1002
Topic:     Partition: 1    Leader: 1003    Replicas: 1002,1003     Isr: 1003,1002
Topic:     Partition: 2    Leader: 1001    Replicas: 1003,1001     Isr: 1001,1003
Topic:     Partition: 3    Leader: 1002    Replicas: 1001,1002     Isr: 1002,1001
Topic:     Partition: 4    Leader: 1002    Replicas: 1002,1003     Isr: 1003,1002
Topic:     Partition: 5    Leader: 1003    Replicas: 1003,1001     Isr: 1001,1003
Topic:     Partition: 6    Leader: 1001    Replicas: 1001,1002     Isr: 1001,1002
Topic:     Partition: 7    Leader: 1003    Replicas: 1002,1003     Isr: 1003,1002
Topic:     Partition: 8    Leader: 1003    Replicas: 1003,1002     Isr: 1003,1002
Topic:     Partition: 9    Leader: 1002    Replicas: 1001,1002     Isr: 1002,1001

如果数据较多,同步时间过长,可以通过以下命令清除一定时间范围以前的数据,以下命令指示清除1天前的数据:

./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic --config retention.ms=86400000

在增加该配置后,可以使用kafka-topics.sh --describe命令查看topic时,会展示在Configs处。

在迁移完成后,再使用以下命令,清除上述命令:

./bin/kafka-configs.sh --zookeeper localhost:2181 --alter \ --entity-type topics --entity-name topic --delete-config retention.ms

retention.ms该配置是topic级别的配置。在默认情况下,主题的数据有效期由kafka配置中的log.retention.hours控制。

标签: 大数据

添加新评论