Kafka使用手册
安装
执行如下命令,直接下载kafka
最新版本:
官网下载地址:官网下载。有的时候由于一些原因,导致链接失效,请至官网下载。
cd /usr/local
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz & tar -zxvf kafka_2.11-2.2.0.tgz
配置config/service.properties
:
zookeeper.connect=192.168.218.225:2181,192.168.218.231:2181,192.168.218.237:2181
listeners=PLAINTEXT://192.168.218.198:9092
advertised.listeners=PLAINTEXT://192.168.218.198:9092
注意:listeners
与advertised.listeners
建议加上。因为默认情况下,kafka
是用hostname
去查找服务器的IP。如果查找不到,或者查找到localhost
等,则会导致外网连接不上。
如果没有安装zookeeper
,可以使用 kafka 自带的启动即可
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
如果是自定义安装的java
,那可能需要重新配置一下java
执行环境。
配置kafka
脚本执行的java
环境:
vim bin/kafka-run-class.sh
#在首行添加如下代码
export JAVA_HOME=/usr/local/jdk1.8.0_151
export JRE_HOME=/usr/local/jdk1.8.0_151/jre
配置服务
为kafka
服务创建systemd service
文件:
vim /etc/systemd/system/kafka.service
输入如下代码到该文件中:
[Unit]
Description=kafka
After=network.target remote-fs.target nss-lookup.target
[Service]
Type=forking
User=root
Group=root
ExecStart=/bin/sh -c '/usr/local/kafka_2.12-2.2.0/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.12-2.2.0/config/server.properties'
ExecStop=/usr/local/kafka_2.12-2.2.0/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
启动kafka
服务:
systemctl start kafka
设置kafka
服务为开机自启动
systemctl enable kafka
日志清理
在 kafka 的配置中 conf/server.properties
中设置 log.retention.byte
值。kafka 的硬盘使用量,建议设置为硬盘的 70%。根据不同的分区大小,进行区分。
# 设置日志保留大小为 7G,单位为:字节
log.retention.byte=7516192768
默认的日志保存天数为 168小时。即一周时间。此处可根据实际情况定义。
# 设置日志保留时间为1天
log.retention.hours=24
Kafka的日志清理-LogCleaner
kafka + how to calculate the value of log.retention.byte
副本配置
- offsets.topic.replication.factor 用于配置offset记录的topic的partition的副本个数
- transaction.state.log.replication.factor 事务主题的复制因子
- transaction.state.log.min.isr 覆盖事务主题的min.insync.replicas配置
- num.partitions 新建Topic时默认的分区数
default.replication.factor
自动创建topic时的默认副本的个数
注意:这些参数,设置得更高以确保高可用性!
其中 default.replication.factor 是真正决定,topi的副本数量的
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
num.partitions=1
default.replication.factor=3
关于kafka配置文件的更多解释,请参考链接:
https://blog.csdn.net/memoordit/article/details/78850086
参考:https://www.cnblogs.com/xiao987334176/p/10315176.html
命令
分区
分区一般是初始化的时候创建好的。所以在初始化之时,就必须规则好分区使用数量。
默认情况的分区配置在conf/service.properties
下配置:
num.partitions=1
手工分配分区
./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 2 --topic foo
Topic
创建topic
./bin/kafka-topics.sh --create --zookeeper master:2181,worker1:2181,worker2:2181 --replication-factor 3 --partitions 3 --topic gilbert
--partitions
指定分区数量--replication-factor
指定备份数量,默认值为1,不进行备份
在创建时间如果出现replication factor: 3 larger than available brokers: 0
,则需要注意zookeeper
的地址是否有引用对应的路径。例如--zookeeper master:2181/kafka
https://blog.csdn.net/qq_38976805/article/details/90577556
查看当前topic列表
./bin/kafka-topics.sh --zookeeper localhost:2181 --list
清空分区或队列
将缓存时间设置1秒,等待1分钟
kafka-topics.sh --zookeeper localhost:2181 --alter --topic com.junbo.dw.beacon --config retention.ms=1000
1分钟后,将缓存时间设回原值。注意可以先查看原值是什么?
kafka-topics.sh --zookeeper localhost:2181 --alter --topic com.junbo.dw.beacon --config retention.ms=86400000
kafka集群及与springboot集成
changing kafka retention period during runtime
查看分区信息
可查看分区数量等。
./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic com.junbo.dw.beacon
结果:
Topic:com.junbo.dw.beacon PartitionCount:10 ReplicationFactor:1 Configs:
Topic: com.junbo.dw.beacon Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: com.junbo.dw.beacon Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: com.junbo.dw.beacon Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: com.junbo.dw.beacon Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: com.junbo.dw.beacon Partition: 4 Leader: 0 Replicas: 0 Isr: 0
Topic: com.junbo.dw.beacon Partition: 5 Leader: 0 Replicas: 0 Isr: 0
Topic: com.junbo.dw.beacon Partition: 6 Leader: 0 Replicas: 0 Isr: 0
Topic: com.junbo.dw.beacon Partition: 7 Leader: 0 Replicas: 0 Isr: 0
Topic: com.junbo.dw.beacon Partition: 8 Leader: 0 Replicas: 0 Isr: 0
Topic: com.junbo.dw.beacon Partition: 9 Leader: 0 Replicas: 0 Isr: 0
查看分区消费信息
sh kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:6667 --new-consumer --group mojing --describe
自动均衡
在创建一个topic时,kafka尽量将partition均分在所有的brokers上,并且将replicas也j均分在不同的broker上。
每个partitiion的所有replicas叫做"assigned replicas","assigned replicas"中的第一个replicas叫"preferred replica",刚创建的topic一般"preferred replica"是leader。leader replica负责所有的读写。
但随着时间推移,broker可能会停机,会导致leader迁移,导致机群的负载不均衡。我们期望对topic的leader进行重新负载均衡,让partition选择"preferred replica"做为leader。
./bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181
一般情况下,在生产环境可开启配置打开自动均衡。
修改副本信息
编辑文件replication-factor-junbo-dw2-5.json
:
{"version":1, "partitions":[{"topic":"com.junbo.dw2","partition":5,"replicas":[1001,1002,1003]}] }
执行命令:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file replication-factor-junbo-dw2-5.json --execute
验证结果:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file replication-factor-junbo-dw2-5.json --verify
重新分配分区
重分配步骤:
- 确认brokers数
- 使用generate生成迁移计划
- 使用execute执行迁移计划
- 使用verify检查是否迁移完成
查看 brokers
在kafka的bin目录使用zookeeper查看所有的kafka brokers id。
./bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids
获取borders id对应的机器:
./bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/1003
生成迁移计划
手动创建一个json文件:topic.json
{
"topics": [
{"topic": "com.junbo.dw2"},
{"topic": "flink-user-action"},
{"topic": "flink-user-info"}
],
"version":1
}
使用--generate
生成计划:
./bin/kafka-reassign-partitions.sh --zookeeper bigdata1:2181 --topics-to-move-json-file ./topic.json --broker-list "1001,1002,1003" --generate
生成结果:
Current partition replica assignment
{"version":1,
"partitions":[....]
}
Proposed partition reassignment configuration
{"version":1,
"partitions":[.....]
}
Proposed partition reassignment configuration
为迁移后的计划。将该计划保存到一个reassign.json
文件。
执行计划
./bin/kafka-reassign-partitions.sh --zookeeper bigdata1:2181 --reassignment-json-file ./reassign.json --execute
如果自定义计划,需要注意log_dirs
列表的长度必须和replicas
的长度一致。
partitions-reassignment-is-failing-in-kafka
确认计划
./bin/kafka-reassign-partitions.sh --zookeeper bigdata1:2181 --reassignment-json-file ./reassign.json --verify
中断正在执行的计划
// 中止正在进行的重分配任务
// 登录zookeeper
./zkCli.sh
get /admin/reassign_partitions
delete /admin/reassign_partitions
迁移优化
如果数据量较多,可以先删除指定分区3个小时前的数据:
./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic com.junbo.dw2 --config retention.ms=86400000
迁移完成后,再改回来,清除3天前的数据:
./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic com.junbo.dw2 --config retention.ms=259200000
或者使用kafka-configs.sh
进行删除该配置。
./bin/kafka-configs.sh --zookeeper localhost:2181 --alter \
--entity-type topics --entity-name com.junbo.dw2 --delete-config retention.ms
在修改完成后,可以使用describe
命令来查看配置是否生效。
changing-kafka-retention-period-during-runtime
重新指定分区leader
有的时候节点down了,但是新选的leader并不合适,于是需要重新指定。
手动编辑文件topicPartitionList.json
:
{"partitions":[{"topic":"com.junbo.dw2","partition":5}]}
执行命令:
$ ./bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181 -path-to-json-file ~/kafka/topicPartitionList.json
如果指定的leader本身不存在该分区的副本,则需要手动创建副本文件夹。一般在logs.dir
文件夹下。
手动指定leader
查看指定分区的状态:
在zookeeper的zkCli.sh执行:
get /brokers/topics/com.junbo.dw2/partitions/5/state
修改返回的json,并执行:
set /brokers/topics/com.junbo.dw2/partitions/5/state {"controller_epoch":66,"leader":1003,"version":1,"leader_epoch":31,"isr":[1002]}
kafka partition offline
导出指定主题的数据
使用以下命令导出最近的100W条数据:
./bin/kafka-console-consumer.sh --bootstrap-server bigdata1:6667 --topic com.junbo.dw2 --max-messages 1000000 > /opt/data/com-junbo-dw2.json
也可以指定分区,以使用偏移量。
./bin/kafka-console-consumer.sh --bootstrap-server bigdata1:6667 --topic com.junbo.dw2 --offset 1 --partition 0 > /opt/data/com-junbo-dw2.json
如果需要从开始导出则加上参数:--from-beginning
。
将数据导入主题
./bin/kafka-console-producer.sh --bootstrap-server bigdata1:6667 --topic com.kafka.test1 < test.txt
优化篇
jstat 查看 gc 信息
参数说明:
S0C:第一个幸存区的大小
S1C:第二个幸存区的大小
S0U:第一个幸存区的使用大小
S1U:第二个幸存区的使用大小
EC:伊甸园区的大小
EU:伊甸园区的使用大小
OC:老年代大小
OU:老年代使用大小
MC:方法区大小
MU:方法区使用大小
CCSC:压缩类空间大小
CCSU:压缩类空间使用大小
YGC:年轻代垃圾回收次数
YGCT:年轻代垃圾回收消耗时间
FGC:老年代垃圾回收次数
FGCT:老年代垃圾回收消耗时间
GCT:垃圾回收消耗总时间
- 使用 jps 查看 kafka 的进程ID
- 使用
jstat -gc <pid> 1s 30
查看当前broker在kafka集群的gc频率 使用 jmap 查看 kafka 当前的堆内存信息
- jmap -heap
观察指标
- Survivor Space 使用情况
- G1 Old Generation 使用情况
- jmap -heap
优化内存
修改kafka配置,在kafka-server-start.sh
修改export KAFKA_HEAP_OPTS="-Xmx1G -Xms 1G"
改为对应的内存。
重启后再次查看内存gc频率。
https://www.cnblogs.com/yinzhengjie/p/9884552.html
优化记录
说明
PID:kafka进程ID
TIME:kafka运行时长(单位1/100秒)
YGC:年轻代垃圾回收次数
YGCT:年轻代垃圾回收消耗时间
2020-11-09
机器 | PID | TIME | YGC | YGCT | 回收速度 |
---|---|---|---|---|---|
bigdata1 | 29661 | 114962:14 | 4543762 | 102034.857 | 0.395次/秒 |
bigdata2 | 12089 | 102601:30 | 3433346 | 73693.926 | 0.334次/秒 |
bigdata4 | 18244 | 56538.15 | 1864546 | 39092.438 | 0.329次/秒 |
与 Spring Boot 的结合
引用
在build.gradle
中加下kafka
的依赖:
dependencies {
compile("org.springframework.kafka:spring-kafka:2.2.0.RELEASE")
}
兼容
- Apache Kafka Clients 2.0.0
- Spring Framework 5.1.x
- Java 8+
配置
java:
kafka:
bootstrap-servers: 192.168.218.198:9092
consumer:
group-id: test-consumer-group
auto-offset-reset: earliest
enable-auto-commit: true
auto-commit-interval: 100
#key-deserializer: #默认情况下是字符串序列化
#value-deserializer:
properties:
session.timeout.ms: 15000
producer:
retries: 0
batch-size: 16384
buffer-memory: 33554432
#value-serializer: com.junbo.database.serializer.JsonSerializer #建议使用自定义的JsonSerializer
properties:
session.timeout.ms: 15000
监听 Topic
Kafka
直接多种类型的序列化与反序列化。此处建议均使用默认的字符串序列化方式。所以在producer
的配置处将value-serizlizer
配置为JsonSerizlizer
。由于Kafka
自带库的Json
序列化方式存在问题,所以使用自定义的序列化:
public class JsonSerializer<T> implements Serializer<T> {
private static final Logger logger = LoggerFactory.getLogger(JsonSerializer.class);
@Override
public void configure(Map configs, boolean isKey) {}
@Override
public byte[] serialize(String topic, Object data) {
try {
return JsonMapperUtil.MAPPER.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
logger.warn("Json序列化失败", e);
}
return new byte[0];
}
@Override
public void close() {}
}
监听Topic
,代码如下:
@Component
public class KafkaReceiver {
private static final Logger logger = LoggerFactory.getLogger(KafkaReceiver.class);
@KafkaListener(topics = "kafkaTopic}")
public void listen(ConsumerRecord<String, String> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
logger.info("listen record: {}", record);
logger.info("listen key:{}, message: {}", record.key(), message);
}
}
}
我们可以根据发送的key
名,去做数据类型的区分。
发送 Topic
注入KafkaTemplate
即可执行消息的发送:
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
private KafkaTemplate<String, UserEntity> kafkaTemplate;
发送一条测试的userEntity
:
UserEntity userEntity = new UserEntity();
userEntity.setId(1L);
userEntity.setBirthDay(new Date());
userEntity.setDataSource("test");
userEntity.setGender(0);
userEntity.setEmail("a@a.com");
this.kafkaTemplate.send(this.topic, "key-name", userEntity);