记一次Kafka问题排查
问题
生产环境下发现某个数据库中的某张表同步。
由于我们使用的是canal
进行数据库的同步。
canal
中针对kafka
的主要配置如下:
canal.mq.partitionsNum=10
canal.mq.partitionHash=.*\\..*
该配置定义了根据库及表名,发送数据到指定的分区。最大分区数为10。
经过研究发现,未同步的表会将数据发送至分区5。至此发现主要的队列的分区5已不存在。
排查
使用命令查看kafka的分区信息:
./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic
分区信息如结果如下:
Topic:topic PartitionCount:10 ReplicationFactor:1 Configs:
Topic: topic Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic Partition: 1 Leader: 1003 Replicas: 1006,1003 Isr: 1003
Topic: topic Partition: 2 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic Partition: 3 Leader: 1002 Replicas: 1006,1002 Isr: 1002
Topic: topic Partition: 4 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic Partition: 5 Leader: -1 Replicas: 1006 Isr: 1006
Topic: topic Partition: 6 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic Partition: 7 Leader: 1003 Replicas: 1006,1003 Isr: 1003
Topic: topic Partition: 8 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic Partition: 9 Leader: 1002 Replicas: 1006,1002 Isr: 1002
5号分区的Leader未选举出来。原因是brokers为1006的机器,已禁用kafka服务导致。
解决
- 分区信息修改:在zookeeper中查看分区信息,并修改分区至指定的broker,例如1002。
- 数据手工迁移:查看1002的机器,并将原1006机器下的分区文件夹拷贝至1002机器下
- 重启kafka服务
- 重分配分区
分区信息修改
使用zookeeper的命令行zkCli.sh
查看分区信息:
get /brokers/topics/topic/partitions/5/state
结果如下:
{"controller_epoch":66,"leader":-1,"version":1,"leader_epoch":31,"isr":[1006]}
修改其中的leader
为1003
,并将leader_epoch
+1。再设置回zookeeper:
set /brokers/topics/topic/partitions/5/state {"controller_epoch":66,"leader":1003,"version":1,"leader_epoch":32,"isr":[1002]}
数据手工迁移
在原1006机器上面的kafka配置中存储的日志目录,查找该分区的5号分区,并使用scp
命令拷贝至1003机器上指定的位置。
一般数据的存储都在配置log.dirs
配置。
至此重启服务。
重分配分区
编辑文件reassign.json
:
{"version":1,"partitions":[{"topic":"topic","partition":9,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"topic","partition":6,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"topic","partition":3,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"topic","partition":8,"replicas":[1003,1002],"log_dirs":["any","any"]},{"topic":"topic","partition":0,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"topic","partition":5,"replicas":[1003,1001],"log_dirs":["any","any"]},{"topic":"topic","partition":2,"replicas":[1003,1001],"log_dirs":["any","any"]},{"topic":"topic","partition":7,"replicas":[1002,1003],"log_dirs":["any","any"]},{"topic":"topic","partition":1,"replicas":[1002,1003],"log_dirs":["any","any"]},{"topic":"topic","partition":4,"replicas":[1002,1003],"log_dirs":["any","any"]}]}
使用以下命令进行分区重分配:
./bin/kafka-reassign-partitions.sh --zookeeper bigdata1:2181 --reassignment-json-file ./reassign.json --execute
使用以下命令查看进度:
./bin/kafka-reassign-partitions.sh --zookeeper bigdata1:2181 --reassignment-json-file ./reassign.json --verify
成功后结果如下:
Topic: PartitionCount:10 ReplicationFactor:2 Configs:
Topic: Partition: 0 Leader: 1001 Replicas: 1001,1002 Isr: 1001,1002
Topic: Partition: 1 Leader: 1003 Replicas: 1002,1003 Isr: 1003,1002
Topic: Partition: 2 Leader: 1001 Replicas: 1003,1001 Isr: 1001,1003
Topic: Partition: 3 Leader: 1002 Replicas: 1001,1002 Isr: 1002,1001
Topic: Partition: 4 Leader: 1002 Replicas: 1002,1003 Isr: 1003,1002
Topic: Partition: 5 Leader: 1003 Replicas: 1003,1001 Isr: 1001,1003
Topic: Partition: 6 Leader: 1001 Replicas: 1001,1002 Isr: 1001,1002
Topic: Partition: 7 Leader: 1003 Replicas: 1002,1003 Isr: 1003,1002
Topic: Partition: 8 Leader: 1003 Replicas: 1003,1002 Isr: 1003,1002
Topic: Partition: 9 Leader: 1002 Replicas: 1001,1002 Isr: 1002,1001
如果数据较多,同步时间过长,可以通过以下命令清除一定时间范围以前的数据,以下命令指示清除1天前的数据:
./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic --config retention.ms=86400000
在增加该配置后,可以使用kafka-topics.sh --describe
命令查看topic时,会展示在Configs
处。
在迁移完成后,再使用以下命令,清除上述命令:
./bin/kafka-configs.sh --zookeeper localhost:2181 --alter \ --entity-type topics --entity-name topic --delete-config retention.ms
retention.ms
该配置是topic级别的配置。在默认情况下,主题的数据有效期由kafka配置中的log.retention.hours
控制。