Kafka使用手册
安装
执行如下命令,直接下载kafka最新版本:
官网下载地址:官网下载。有的时候由于一些原因,导致链接失效,请至官网下载。
cd /usr/local
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz & tar -zxvf kafka_2.11-2.2.0.tgz配置config/service.properties:
zookeeper.connect=192.168.218.225:2181,192.168.218.231:2181,192.168.218.237:2181
listeners=PLAINTEXT://192.168.218.198:9092
advertised.listeners=PLAINTEXT://192.168.218.198:9092注意:listeners与advertised.listeners建议加上。因为默认情况下,kafka是用hostname去查找服务器的IP。如果查找不到,或者查找到localhost等,则会导致外网连接不上。
如果没有安装zookeeper,可以使用 kafka 自带的启动即可
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties如果是自定义安装的java,那可能需要重新配置一下java执行环境。
配置kafka脚本执行的java环境:
vim bin/kafka-run-class.sh
#在首行添加如下代码
export JAVA_HOME=/usr/local/jdk1.8.0_151
export JRE_HOME=/usr/local/jdk1.8.0_151/jre配置服务
为kafka服务创建systemd service文件:
vim /etc/systemd/system/kafka.service输入如下代码到该文件中:
[Unit]
Description=kafka
After=network.target remote-fs.target nss-lookup.target
[Service]
Type=forking
User=root
Group=root
ExecStart=/bin/sh -c '/usr/local/kafka_2.12-2.2.0/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.12-2.2.0/config/server.properties'
ExecStop=/usr/local/kafka_2.12-2.2.0/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target启动kafka服务:
systemctl start kafka设置kafka服务为开机自启动
systemctl enable kafka日志清理
在 kafka 的配置中 conf/server.properties 中设置 log.retention.byte 值。kafka 的硬盘使用量,建议设置为硬盘的 70%。根据不同的分区大小,进行区分。
# 设置日志保留大小为 7G,单位为:字节
log.retention.byte=7516192768默认的日志保存天数为 168小时。即一周时间。此处可根据实际情况定义。
# 设置日志保留时间为1天
log.retention.hours=24Kafka的日志清理-LogCleaner
kafka + how to calculate the value of log.retention.byte
副本配置
- offsets.topic.replication.factor 用于配置offset记录的topic的partition的副本个数
- transaction.state.log.replication.factor 事务主题的复制因子
- transaction.state.log.min.isr 覆盖事务主题的min.insync.replicas配置
- num.partitions 新建Topic时默认的分区数
default.replication.factor自动创建topic时的默认副本的个数
注意:这些参数,设置得更高以确保高可用性!
其中 default.replication.factor 是真正决定,topi的副本数量的
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
num.partitions=1
default.replication.factor=3 关于kafka配置文件的更多解释,请参考链接:
https://blog.csdn.net/memoordit/article/details/78850086
参考:https://www.cnblogs.com/xiao987334176/p/10315176.html
命令
分区
分区一般是初始化的时候创建好的。所以在初始化之时,就必须规则好分区使用数量。
默认情况的分区配置在conf/service.properties下配置:
num.partitions=1手工分配分区
./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 2 --topic fooTopic
创建topic
./bin/kafka-topics.sh --create --zookeeper master:2181,worker1:2181,worker2:2181 --replication-factor 3 --partitions 3 --topic gilbert--partitions 指定分区数量--replication-factor 指定备份数量,默认值为1,不进行备份
在创建时间如果出现replication factor: 3 larger than available brokers: 0,则需要注意zookeeper的地址是否有引用对应的路径。例如--zookeeper master:2181/kafka
https://blog.csdn.net/qq_38976805/article/details/90577556
查看当前topic列表
./bin/kafka-topics.sh --zookeeper localhost:2181 --list清空分区或队列
将缓存时间设置1秒,等待1分钟
kafka-topics.sh --zookeeper localhost:2181 --alter --topic com.junbo.dw.beacon --config retention.ms=10001分钟后,将缓存时间设回原值。注意可以先查看原值是什么?
kafka-topics.sh --zookeeper localhost:2181 --alter --topic com.junbo.dw.beacon --config retention.ms=86400000kafka集群及与springboot集成
changing kafka retention period during runtime
查看分区信息
可查看分区数量等。
./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic com.junbo.dw.beacon结果:
Topic:com.junbo.dw.beacon PartitionCount:10 ReplicationFactor:1 Configs:
Topic: com.junbo.dw.beacon Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: com.junbo.dw.beacon Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: com.junbo.dw.beacon Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: com.junbo.dw.beacon Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: com.junbo.dw.beacon Partition: 4 Leader: 0 Replicas: 0 Isr: 0
Topic: com.junbo.dw.beacon Partition: 5 Leader: 0 Replicas: 0 Isr: 0
Topic: com.junbo.dw.beacon Partition: 6 Leader: 0 Replicas: 0 Isr: 0
Topic: com.junbo.dw.beacon Partition: 7 Leader: 0 Replicas: 0 Isr: 0
Topic: com.junbo.dw.beacon Partition: 8 Leader: 0 Replicas: 0 Isr: 0
Topic: com.junbo.dw.beacon Partition: 9 Leader: 0 Replicas: 0 Isr: 0查看分区消费信息
sh kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:6667 --new-consumer --group mojing --describe自动均衡
在创建一个topic时,kafka尽量将partition均分在所有的brokers上,并且将replicas也j均分在不同的broker上。
每个partitiion的所有replicas叫做"assigned replicas","assigned replicas"中的第一个replicas叫"preferred replica",刚创建的topic一般"preferred replica"是leader。leader replica负责所有的读写。
但随着时间推移,broker可能会停机,会导致leader迁移,导致机群的负载不均衡。我们期望对topic的leader进行重新负载均衡,让partition选择"preferred replica"做为leader。
./bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181一般情况下,在生产环境可开启配置打开自动均衡。
修改副本信息
编辑文件replication-factor-junbo-dw2-5.json:
{"version":1, "partitions":[{"topic":"com.junbo.dw2","partition":5,"replicas":[1001,1002,1003]}] }执行命令:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file replication-factor-junbo-dw2-5.json --execute验证结果:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file replication-factor-junbo-dw2-5.json --verify重新分配分区
重分配步骤:
- 确认brokers数
- 使用generate生成迁移计划
- 使用execute执行迁移计划
- 使用verify检查是否迁移完成
查看 brokers
在kafka的bin目录使用zookeeper查看所有的kafka brokers id。
./bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids获取borders id对应的机器:
./bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/1003生成迁移计划
手动创建一个json文件:topic.json
{
"topics": [
{"topic": "com.junbo.dw2"},
{"topic": "flink-user-action"},
{"topic": "flink-user-info"}
],
"version":1
}使用--generate生成计划:
./bin/kafka-reassign-partitions.sh --zookeeper bigdata1:2181 --topics-to-move-json-file ./topic.json --broker-list "1001,1002,1003" --generate生成结果:
Current partition replica assignment
{"version":1,
"partitions":[....]
}
Proposed partition reassignment configuration
{"version":1,
"partitions":[.....]
}Proposed partition reassignment configuration为迁移后的计划。将该计划保存到一个reassign.json文件。
执行计划
./bin/kafka-reassign-partitions.sh --zookeeper bigdata1:2181 --reassignment-json-file ./reassign.json --execute如果自定义计划,需要注意log_dirs列表的长度必须和replicas的长度一致。
partitions-reassignment-is-failing-in-kafka
确认计划
./bin/kafka-reassign-partitions.sh --zookeeper bigdata1:2181 --reassignment-json-file ./reassign.json --verify中断正在执行的计划
// 中止正在进行的重分配任务
// 登录zookeeper
./zkCli.sh
get /admin/reassign_partitions
delete /admin/reassign_partitions迁移优化
如果数据量较多,可以先删除指定分区3个小时前的数据:
./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic com.junbo.dw2 --config retention.ms=86400000迁移完成后,再改回来,清除3天前的数据:
./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic com.junbo.dw2 --config retention.ms=259200000或者使用kafka-configs.sh进行删除该配置。
./bin/kafka-configs.sh --zookeeper localhost:2181 --alter \
--entity-type topics --entity-name com.junbo.dw2 --delete-config retention.ms在修改完成后,可以使用describe命令来查看配置是否生效。
changing-kafka-retention-period-during-runtime
重新指定分区leader
有的时候节点down了,但是新选的leader并不合适,于是需要重新指定。
手动编辑文件topicPartitionList.json:
{"partitions":[{"topic":"com.junbo.dw2","partition":5}]}执行命令:
$ ./bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181 -path-to-json-file ~/kafka/topicPartitionList.json如果指定的leader本身不存在该分区的副本,则需要手动创建副本文件夹。一般在logs.dir文件夹下。手动指定leader
查看指定分区的状态:
在zookeeper的zkCli.sh执行:
get /brokers/topics/com.junbo.dw2/partitions/5/state修改返回的json,并执行:
set /brokers/topics/com.junbo.dw2/partitions/5/state {"controller_epoch":66,"leader":1003,"version":1,"leader_epoch":31,"isr":[1002]}kafka partition offline
导出指定主题的数据
使用以下命令导出最近的100W条数据:
./bin/kafka-console-consumer.sh --bootstrap-server bigdata1:6667 --topic com.junbo.dw2 --max-messages 1000000 > /opt/data/com-junbo-dw2.json也可以指定分区,以使用偏移量。
./bin/kafka-console-consumer.sh --bootstrap-server bigdata1:6667 --topic com.junbo.dw2 --offset 1 --partition 0 > /opt/data/com-junbo-dw2.json如果需要从开始导出则加上参数:--from-beginning。
将数据导入主题
./bin/kafka-console-producer.sh --bootstrap-server bigdata1:6667 --topic com.kafka.test1 < test.txt优化篇
jstat 查看 gc 信息
参数说明:
S0C:第一个幸存区的大小
S1C:第二个幸存区的大小
S0U:第一个幸存区的使用大小
S1U:第二个幸存区的使用大小
EC:伊甸园区的大小
EU:伊甸园区的使用大小
OC:老年代大小
OU:老年代使用大小
MC:方法区大小
MU:方法区使用大小
CCSC:压缩类空间大小
CCSU:压缩类空间使用大小
YGC:年轻代垃圾回收次数
YGCT:年轻代垃圾回收消耗时间
FGC:老年代垃圾回收次数
FGCT:老年代垃圾回收消耗时间
GCT:垃圾回收消耗总时间
- 使用 jps 查看 kafka 的进程ID
- 使用
jstat -gc <pid> 1s 30查看当前broker在kafka集群的gc频率 使用 jmap 查看 kafka 当前的堆内存信息
- jmap -heap
观察指标
- Survivor Space 使用情况
- G1 Old Generation 使用情况
- jmap -heap
优化内存
修改kafka配置,在kafka-server-start.sh修改export KAFKA_HEAP_OPTS="-Xmx1G -Xms 1G"改为对应的内存。
重启后再次查看内存gc频率。
https://www.cnblogs.com/yinzhengjie/p/9884552.html
优化记录
说明
PID:kafka进程ID
TIME:kafka运行时长(单位1/100秒)
YGC:年轻代垃圾回收次数
YGCT:年轻代垃圾回收消耗时间
2020-11-09
| 机器 | PID | TIME | YGC | YGCT | 回收速度 |
|---|---|---|---|---|---|
| bigdata1 | 29661 | 114962:14 | 4543762 | 102034.857 | 0.395次/秒 |
| bigdata2 | 12089 | 102601:30 | 3433346 | 73693.926 | 0.334次/秒 |
| bigdata4 | 18244 | 56538.15 | 1864546 | 39092.438 | 0.329次/秒 |
与 Spring Boot 的结合
引用
在build.gradle中加下kafka的依赖:
dependencies {
compile("org.springframework.kafka:spring-kafka:2.2.0.RELEASE")
}兼容
- Apache Kafka Clients 2.0.0
- Spring Framework 5.1.x
- Java 8+
配置
java:
kafka:
bootstrap-servers: 192.168.218.198:9092
consumer:
group-id: test-consumer-group
auto-offset-reset: earliest
enable-auto-commit: true
auto-commit-interval: 100
#key-deserializer: #默认情况下是字符串序列化
#value-deserializer:
properties:
session.timeout.ms: 15000
producer:
retries: 0
batch-size: 16384
buffer-memory: 33554432
#value-serializer: com.junbo.database.serializer.JsonSerializer #建议使用自定义的JsonSerializer
properties:
session.timeout.ms: 15000监听 Topic
Kafka直接多种类型的序列化与反序列化。此处建议均使用默认的字符串序列化方式。所以在producer的配置处将value-serizlizer配置为JsonSerizlizer。由于Kafka自带库的Json序列化方式存在问题,所以使用自定义的序列化:
public class JsonSerializer<T> implements Serializer<T> {
private static final Logger logger = LoggerFactory.getLogger(JsonSerializer.class);
@Override
public void configure(Map configs, boolean isKey) {}
@Override
public byte[] serialize(String topic, Object data) {
try {
return JsonMapperUtil.MAPPER.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
logger.warn("Json序列化失败", e);
}
return new byte[0];
}
@Override
public void close() {}
}监听Topic,代码如下:
@Component
public class KafkaReceiver {
private static final Logger logger = LoggerFactory.getLogger(KafkaReceiver.class);
@KafkaListener(topics = "kafkaTopic}")
public void listen(ConsumerRecord<String, String> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
logger.info("listen record: {}", record);
logger.info("listen key:{}, message: {}", record.key(), message);
}
}
}我们可以根据发送的key名,去做数据类型的区分。
发送 Topic
注入KafkaTemplate即可执行消息的发送:
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
private KafkaTemplate<String, UserEntity> kafkaTemplate;发送一条测试的userEntity:
UserEntity userEntity = new UserEntity();
userEntity.setId(1L);
userEntity.setBirthDay(new Date());
userEntity.setDataSource("test");
userEntity.setGender(0);
userEntity.setEmail("a@a.com");
this.kafkaTemplate.send(this.topic, "key-name", userEntity);