Kafka 笔记
什么是 MQ¶
很多人都说:MQ 通过讲消息的发送和接收分离来实现应用程序的异步和解耦,这个给人的直觉是——MQ 就是异步的,是用来解耦的,但这个只是 MQ 的效果而不是目的。MQ 真正的目的是为了通讯,屏蔽底层复杂的通讯协议,定义了一套应用层的更加简单的通讯协议。
一个分布式系统中两个模块之间通讯要么是 HTTP,要么是自己开发的 TCP,但是这两种协议其实都是原始的协议。HTTP 协议很难实现两端通讯——模块 A 可以调用 B,B 也可以主动调用 A,如果要做到这个,两端都要背上 WebServer,而且还不长连接(HTTP 2.0 的库根本找不到)。TCP 就更加原始了,粘包、心跳、私有的协议,想一想就头皮发麻。
MQ 所要做的就是在这些协议之上构建一个简单的“协议”——生产者 / 消费者模型。MQ 带给我们的“协议”不是具体的通讯协议,而是更高层次的通讯模型。
消息队列的流派¶
有 Broker 的 MQ¶
这个流派通常有一台服务器作为 Broker,所有的消息都通过它中转。生产者把消息发送给它就结束自己的任务了,Broker 则把消息主动推给消费者(或者消费者主动轮询)
重 Topic (Kafka、ActiveMQ、JMS)¶
生产者会发送 Key 和数据到 Broker,由 Broker 比较 Key 之后决定给哪个消费者。这种模式是我们最常见的模式,是我们对 MQ 最多的印象。在这种模式下,一个 Topic 往往是一个比较大的概念,甚至一个系统中就可能只有一个 Topic,Topic 某种意义上就是 Queue,生产者发送 Key 相当于说:“Hi,把数据放到 Key 的队列中”。
虽然架构一样但是 Kafka 的性能要比 JMS 的性能不知道高多少倍,所以基本上这种类型的 MQ 只有 Kafka 一种备选方案。
轻 Topic(RabbitMQ 或者 AMQP 协议)¶
生产者发送 Key 和数据,消费者定义订阅的队列,Broker 收到数据之后会通过一定的逻辑计算出 Key 对应的队列,然后把数据交给队列。
这种模式下解耦了 Key 和 Queue,在这种架构中 Queue 是非常轻量级的(在 RabbitMQ 中它的上限取决于你的内存),消费者关心的知识自己的 Queue,生产者不必关心数据最终给谁,只要指定 Key 就行了,中间的那层映射在 AMQP 中叫做 Exchange。
AMQP 中有四种 Exchange:
Direct Exchange | Key 就等于 Queue |
Fanout Exchange | 无视 Key,给所有的 Queue 都来一份 |
Topic Exchange | Key 可以用“宽字符”模糊匹配 Queue |
Header Exchange | 无视 Key,通过查看消息的头部元数据来决定发给哪个 Queue(AMQP 头部元数据非常丰富并且可以自定义) |
这种结构的架构给通讯带来了很大的灵活性,我们能想到的通讯方式都可以用这四种 Exchange 表达出来。
无 Broker 的 MQ¶
ZeroMQ¶
无 Broker 的 MQ 的代表是 ZeroMQ。该作者非常睿智,他非常敏锐的意识到——MQ 是更高级的 Socket,它是解决通讯问题的,所以 ZeroMQ 被设计成了一个“库”而不是一个中间件。
节点之间通讯的消息都是发送到彼此的队列中,每个节点都既是生产者又是消费者。ZeroMQ 做的事情就是封装出一套类似于 Socket 的 API,可以完成发送数据、读取数据。
ZeroMQ 其实就是一个跨语言的、重量级的 Actor 模型邮箱库。你可以把自己的程序想象成一个 Actor,ZeroMQ 就是提供邮箱功能的库。ZeroMQ 可以实现同一台机器的 RPC 通讯也可以实现不同机器上的 TCP、UDP 通讯。
如果你需要一个强大的、灵活、野蛮的通讯能力,别犹豫 ZeroMQ。
Kafka 介绍¶
Kafka 最初由 LinkedIn 公司开发,是一个分布式的、支持分区的(Partition)、多副本的(Replica)、基于 Zookeeper 协调的消息系统,它最大的特性就是可以实时的处理大量数据以满足各种需求场景。由 Scala 语言编写,LinkedIn 与 2010 年贡献给了 Apache 基金会并成为顶级开源项目。
Kafka 的使用场景¶
-
日志收集
一个公司可以用 Kafka 收集各种服务的 Log,通过 Kafka 以统一接口服务的方式开放给各种 Consumer,例如 Hadoop、HBase、Solr 等。
-
消息系统:
解耦生产者和消费者、缓存消息等。
-
用户活动跟踪
Kafka 经常被用来记录 Web 用户或者 app 用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发吧到 Kafka 的 Topic 中,然后订阅者通过订阅这些 Topic 来做实时的监控分析,后者装载到 Hadoop、数据仓库中做离线分析和挖掘。
-
运营指标
Kafka 也经常被用来记录运营监控数据,包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
Kafka 的基本概念¶
Kafka 是一个分布式的,分区的消息服务(官方称之为 Commit Log),它提供一个消息系统应该具备的功能,但是却有着独特的设计。
可以这样来说,Kafka 借鉴了 JMS 规范的思想,但是确并没有完全遵循 JMS 规范。
基础的消息相关术语:
Broker | 消息中间件处理节点,一个 Kafka 节点就是一个 Broker,一个或者多个 Broker 可以组成一个 Kafka 集群 |
Topic | Kafka 根据 Topic 对消息进行归类,发布到 Kafka 集群的每条消息都需要指定一个 Topic |
Producer | 消息生产者,向 Broker 发送消息的客户端 |
Consumer | 消息消费者,从 Broker 读取消息的客户端 |
Consumer Group | 每个 Consumer 属于一个特定的 Consumer Group,一条消息可以被多个不同的 Consumer Group 消费,但是一个 Consumer Group 中只能有一个 Consumer 能够消费该消息 |
Partition | 物理上的概念,一个 Topic 可以分为多个 Partition,每个 Partition 内部消息是有序的 |
服务端(Broker)和客户端(Producer、Consumer)之间通信通过 TCP 协议来完成。
Kafka 安装¶
环境准备¶
- jdk
- zookeeper
配置文件¶
-
config/server.properties
# The id of the broker. This must be set to a unique integer for each broker. broker.id=6 # Hostname and port the broker will advertise to producers and consumers. advertised.listeners=PLAINTEXT://192.192.192.6:9092 # A comma separated list of directories under which to store log files. log.dirs=/opt/kafka/data # Zookeeperi connection string. zookeeper.connect=192.192.192.6:2181
启动 Kafka 服务器¶
- 启动 Zookeeper
- 启动 Kafka
- 验证是否启动成功(进入 ZK 查看指定 ID 的 Broker 是否存在)
创建主题、发送消息、消费消息¶
# 创建主题
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
# 查看 Kafka 有哪些 Topic
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# 发送消息
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
# 消费最新消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
# 从头开始消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
关于消息的细节¶
- 生产者将消息发送给 Broker,Broker 会将消息保存在本地的日志文件中
- 消息的保存是有序的,通过 Offset 偏移量来描述消息的有序性
- 消费者消费消息时也是通过 Offset 来描述当前要消费的那条消息的位置
单播消息和多播消息¶
单播消息¶
在一个 Kafka 的 Topic 中,启动两个消费者和一个生产者,问:生产者发送消息,这条消息是否同时会被两个消费者消费?
如果两个消费者在同一个消费组,那么只有一个消费者可以收到订阅的 Topic 中的消息。
换言之,同一个消费组中只能有一个消费者收到一个 Topic 中的消息。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=test-group
多播消息¶
不同的消费组订阅同一个 Topic,那么不同的消费组中只有一个消费者能收到消息。
实际上也是多个消费组中的多个消费者收到了同一个消息。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=test-group-1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=test-group-2
查看消费组的详细信息¶
重点关注以下几个信息:
CURRENT-OFFSET | 最后被消费的消息的偏移量 |
LOG-END-OFFSET | 最后一条消息的偏移量(消息总量) |
LAG | 积压了多少条消息 |
Kafka 中主题和分区的概念¶
主题¶
Topic 在 Kafka 中是一个逻辑的概念,Kafka 通过 Topic 将消息进行分类,不同的 Topic 会被订阅该 Topic 的消费者消费。
但是有一个问题,如果说这个 Topic 中的消息非常非常多,因为消息是会被保存到 Log 日志文件中的,为了解决这个文件过大的问题, Kafka 提出了 Partition 的概念。
分区¶
通过 Partition 将一个 Topic 中的消息分区来存储。这样的好处有多个:
- 分区存储,可以解决存储文件过大的问题
- 提供了读写的吞吐量,读和写可以同时在多个分区中进行
Kafka 中消息日志文件中保存的内容¶
-
00000000000000000000.log
这个文件中保存的就是消息。文件中保存的消息,默认保存 7 天,7 天后消息会被删除。
-
__consumer_offsets-49
Kafka 内部创建了
__consumer_offsets
主题,包含了 50 个分区,这个主题用来存放消费者消费某个主题的偏移量。每个消费者会把消费的主题的偏移量自主上报给 Kafka 的默认主题__consumer_offsets
,因此 Kafka 为了提升这个主题的并发性,默认设置了 50 个分区。-
提交到哪个分区
通过 Hash 函数
hash(<Consumer Group ID>) % <__consumer_offsets 主题的分区数>
-
提交到该主题的内容
Key 为
<Consumer Group ID>
+<Topic>
+<分区号>
,Value 就是当前 Offset 的值
-
Kafka 集群操作¶
副本的概念¶
副本是为了主题中的分区创建多个备份,在 Kafka 集群的多个 Broker 中,会有一个副本作为 Leader,其它是 Follower。
查看 Topic 情况:
Leader | Kafka 的写和读的操作,都发生在 Leader 上。Leader 负责把数据同步给 Follower。当 Leader 挂了,经过主从选举,从多个 Follower 中选举一个新的 Leader |
Follower | 接收 Leader 的同步的数据 |
Isr | 可以同步和已同步的节点会被放入到 Isr 集群。如果 Isr 中的节点性能较差,会被踢出 Isr 集合 |
集群中有多个 Broker,创建主题时可以指明主题有多少个分区(把消息拆分到不同的分区中存储),可以为分区创建多个副本,不同的副本存放在不同的 Broker 里。
集群消费中分区分消费组的细节¶
- 一个 Parittion 只能被一个消费组中的一个消费者消费,目的是为了保证消息的顺序性,但是多个 Partition 的多个消费者消费的总顺序是得不到保证的。
- Partition 的数量决定了消费组中消费者的数量,建议同一个消费者的数量不要超过 Partition 的数量,否则多的消费者消费不到消息。
- 如果消费者挂了,那么就会触发 Rebalance 机制,会让其它消费者来消费该分区。
客户端 API¶
依赖¶
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
生产者¶
public class MyProducer {
public static void main(String[] args) throws Exception {
// 设置参数
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.192.192.6:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 创建生产者
Producer<String, String> producer = new KafkaProducer<>(props);
// 创建消息
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "my-key", "my-value");
// 发送消息
Future<RecordMetadata> future = producer.send(record);
// 获取消息发送的元数据
RecordMetadata metadata = future.get();
System.out.printf("topic=%s partition=%d offset=%d\n", metadata.topic(), metadata.partition(), metadata.offset());
}
}
生产者的同步发送消息¶
如果生产者发送消息没有收到 Ack,生产者会阻塞,阻塞到 3s 的时间,如果还没有收到消息,会进行重试,重试的次数为 3 次。
生产者的异步发送消息¶
异步发送,生产者发送完消息后就可以执行之后的业务,Broker 在收到 消息后异步调用生产者提供的 Callback 回调方法。
producer.send(record, (RecordMetadata metadata, Exception e) -> {
if (e != null) {
System.out.println("消息发送失败");
}
});
生产者中 Ack 配置¶
在同步发送的前提下,生产者在获得集群返回的 Ack 之前会一直阻塞,那么集群什么时候返回 Ack 呢?此时 Ack 有 3 个配置:
-
ack=0
Kafka 集群不需要任何的 Broker 收到消息,就立刻返回 Ack 给生产者。最容易丢消息,效率最高。
-
ack=1
(默认)多副本之间的 Leader 已经收到消息,并把消息写入到本地的 Log 中,才会返回 Ack 给生产者。性能和安全性是最均衡的。
-
ack=-1
/ack=all
有配置
min.insync.replicas=2
(默认为 1,推荐配置大于等于 2),此时就需要 Leader 和一个 Follower 同步完后,才会返回 Ack 给生产者,此时集群中有 2 个 Broker 已完成数据的接收。这种方式最安全,但性能最差。
下面是关于 Ack 和重试的配置(如果没有收到 Ack 就开启重试):
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
// 重试间隔设置(发送失败会重试,默认间隔 100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要再接收者那边做好消息接收的幂等性处理)
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
关于消息发送的缓冲区¶
- Kafka 默认会创建一个消息缓冲区,用来存放要发送的消息,缓冲区是 32m
- Kafka 本地线程会去缓冲区中一次拉 16k 的数据发送给 Broker
- 如果线程拉不到 16k 的数据,间隔 10ms 也会将已拉到的数据发送给 Broker
消费者¶
public class MyConsumer {
public static void main(String[] args) {
// 设置参数
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.192.192.6:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 消费组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
// 创建消费者客户端
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 消费者订阅主题列表
consumer.subscribe(List.of("test"));
while (true) {
// poll() 是拉取消息的长轮询
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("partitoin=%d offset=%d key=%s value=%s\n", record.partition(), record.offset(), record.key(), record.value());
}
}
}
}
关于消费者自动提交和手动提交 Offset¶
提交的内容¶
消费者无论是自动提交还是手动提交,都需要把所属的消费组 + 消费的主题 + 消费的分区 + 消费的偏移量这样的信息提交到集群的 __consumer_offsets
主题里面。
自动提交¶
消费者 poll
消息下来后就会自动提交 Offset。自动提交会丢消息,因为消费者在消费前提交 Offset,有可能提交完后还没消费就挂了。
// 是否自动提交 Offset(默认为 true)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自动提交 Offset 的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
手动提交¶
手动提交又分成了两种:
- 手动同步提交
> 在消费完消息后调用同步提交的方法,当集群返回 Ack 前一直阻塞,返回 Ack 后表示提交成功,执行之后的逻辑。
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("partitoin=%d offset=%d key=%s value=%s\n", record.partition(), record.offset(), record.key(), record.value()); } if (records.count() > 0) { // 手动同步提交 Offset,当前线程会阻塞,直到 Offset 提交成功 consumer.commitAsync(); } }
- 手动异步提交
> 在消息消费完后提交,不需要等集群 Ack,直接执行之后的逻辑,可以设置一个回调方法供集群调用。
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("partitoin=%d offset=%d key=%s value=%s\n", record.partition(), record.offset(), record.key(), record.value()); } consumer.commitAsync((Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) -> { if (e != null) { System.out.println("提交失败"); } }); }
长轮询 poll
消息¶
默认情况下,消费者一次会 poll
500 条消息。代码中设置了长轮询的时间为 1000 毫秒,意味着:
- 如果一次
poll
到 500 条,就直接执行for
循环 - 如果这一次没有
poll
到 500 条,且时间在 1 秒内,那么长轮询继续poll
,要么到 500 条,要么到 1 秒 - 如果两次
poll
都没到 500 条,且 1 秒时间到了,那么直接执行for
循环
如果两次 poll
的间隔超过 30 秒,集群会认为该消费者的消费能力过弱,该消费者会被踢出消费组,触发 Rebalance 机制,Rebalance 机制会造成性能开销。可以通过设置这个参数,让一次 poll
的消息少一点:
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
消费者的健康状态检查¶
消费者每隔 1 秒向 Kafka 集群发送心跳,集群发现如果有超过 10 秒没有续约的消费者,将踢出消费组,触发该消费组的 Rebalance 机制,将分区交给消费组里的其它消费者进行消费。
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
指定分区、偏移量、时间消费¶
- 指定分区消费
- 指定 Offset 消费
- 指定时间消费
> 根据时间,去所有的 Partition 中确定该时间对应的 Offset,然后去所有的 Partition 中找到该 Offset 之后的消息开始消费。
// long timestamp = new Date().getTime() - 1000 * 60 * 60; long timestamp = LocalDateTime.now().plusHours(-1).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); Map<TopicPartition, Long> timestampsToSearch = consumer .partitionsFor(topic) .stream() .collect(Collectors.toMap( (PartitionInfo it) -> new TopicPartition(topic, it.partition()), (PartitionInfo it) -> timestamp, (v1, v2) -> v2 )); consumer .offsetsForTimes(timestampsToSearch) .forEach((TopicPartition key, OffsetAndTimestamp value) -> { // 如果查询偏移量的时间点大于最大的索引记录时间那么 value 就会为空 if (value != null) { // 根据 timestamp 确定 offset long offset = value.offset(); consumer.assign(List.of(key)); consumer.seek(key, offset); } });
新消费组的消费 Offset 规则¶
新消费组中的消费者在启动以后,默认会从当前分区的最后一条消息的 offset + 1
开始消费(消费新消息)。通过以下的设置,让新的消费者第一次从头开始消费,之后开始消费新消息。
-
latest
(默认)消费新消息。
-
earliest
第一次从头开始消费,之后开始消费新消息。这个需要区别于
consumer.seekToBeginning()
(每次都从头开始消费)。
Spring Boot 中使用 Kafka¶
依赖¶
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.1</version>
</dependency>
配置文件¶
-
application.properties
spring.kafka.bootstrap-servers=192.192.192.6:9092 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.acks=1 spring.kafka.consumer.group-id=test-group spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
RECORD
当每一条记录被消费者监听器处理之后提交 BATCH
当每一批 poll
的数据被消费者监听器处理之后提交TIME
当每一批 poll
的数据被消费者监听器处理之后,距离上次提交时间大于ack-time
时提交COUNT
当每一批 poll
的数据被消费者监听器处理之后,被处理的 Record 数量大于等于ack-count
时提交COUNT_TIME
TIME
和COUNT
有一个条件满足时提交MANUAL
当每一批 poll
的数据被消费者监听器处理之后,手动调用Acknowledgment.acknowledge()
后提交MANUAL_IMMEDIATE
手动调用 Acknowledgment.acknowledge()
后立即提交
生产者¶
@RestController
public class MyKafkaController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/send")
public void send() {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", 0, "key", "消息");
}
}
消费者¶
@Component
public class MyConsumer {
@KafkaListener(topics = {"test"}, groupId = "test-group")
public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) {
System.out.println(record);
// 手动提交 Offset
ack.acknowledge();
}
}
消费者中配置主题、分区、偏移量¶
@KafkaListener(
groupId = "test-group",
topicPartitions = {
@TopicPartition(
topic = "topic-1",
partitions = {"0", "1"},
partitionOffsets = {@PartitionOffset(partition = "1", initialOffset = "100")}
),
@TopicPartition(topic = "topic-2")
},
// 同组下的消费者个数,即并发消费数,建议小于等于分区总数
concurrency = "3"
)
public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) {
System.out.println(record);
// 手动提交 Offset
ack.acknowledge();
}
Kafka 集群中的 Controller、Rebalance、HW¶
Controller¶
每个 Broker 启东时会向 ZK 创建一个临时序号节点,获得的序号最小的那个 Broker 将会作为集群中的 Controller,负责:
- 当集群中有一个副本的 Leader 挂掉,需要在集群中选举出一个新的 Leader,选举的规则是从 Isr 集合中最左边获得。
- 当集群中有 Broker 新增或减少,Controller 会同步信息给其它 Broker。
- 当集群中有分区新增或减少,Controller 会同步信息给其它 Broker。
Rebalance 机制¶
前提¶
消费组中的消费者没有指明分区来消费
触发的条件¶
当消费组中的消费者和分区的关系发生变化的时候
分区分配的策略¶
-
Range
根据公司计算得到每个消费者消费哪几个分区。前面的消费者是
<分区总数> / (<消费者数量> + 1)
,之后的消费者是<分区总数> / <消费者数量>
。 -
轮询
-
Sticky
粘合策略,如果需要 Rebalance,会在之前已分配的基础上调至,不会改变之前的分配情况。如果这个没有策略没有开启,那么就要进行全部重新分配。建议开启。
HW 和 LEO¶
HW(High Watermark)俗称高水位,取一个 Partition 对应的 Isr 集合中最小的 LEO(LOG-END-OFFSET)作为 HW,Consumer 最多只能消费到 HW 所在的位置。另外每个 Replica 都有 HW,Leader 和 Follower 各自负责更新自己的 HW 状态。
对于 Leader 新写入的消息,Consumer 不能立刻消费,Leader 会等待该消息被所有 Isr 集合中的 Reolicas 同步后更新 HW,此时消息才能被 Consumer 消费。
这样就保证了如果 Leader 所在的 Broker 失效,但该消息仍然可以从新选举的 Leader 中获取,这样的目的是防止消息的丢失。
Kafka 的优化问题¶
如何防止消息丢失¶
生产者¶
- 使用同步发送。
- 把 Ack 设为
1
或者all
,并且设置同步的分区数(min.insync.replicas
)为分区备份数。
消费者¶
- 把自动提交改成手动提交
如何防止重复消费¶
在防止消息的方案中,如果生产者发送完消息后,因为网络抖动,没有收到 Ack,但实际上 Broker 已经收到消息了,此时生产者就会重试,于是 Broker 就会收到多条相同的消息,而造成消费者的重复消费。
解决方案¶
-
生产者关闭重试
会造成消息丢失,不建议。
-
消费者解决非幂等性消费问题
业务 ID 唯一或分布式锁。
如何做到消息的顺序消费¶
- 生产者保证消息按顺序消费,且消息不丢失,使用同步发送,Ack 设置成非 0 的值。
- 主题只能设置一个分区,消费组中只能有一个消费者。
如何解决消息积压问题¶
消息的消费速度远赶不上生产速度,导致 Kafka 中有大量的数据没有被消费。随着没有消费的数据堆积越多,消费者的寻址性能会越来越差,最后导致整个 Kafka 对外提供的服务的性能很差,从而导致其它服务也访问速度变慢,造成服务雪崩。
解决方案¶
- 在这个消费者中,使用多线程,充分利用机器的性能进行消费消息。
- 通过业务的架构设计,提高业务层面消费的性能。
- 创建多个消费组,多个消费者,部署到其它机器上,一起消费,提高消费者的消费速度
- 创建一个消费者,该消费者在 Kafka 另建一个主题,配上多个分区,再配上多个消费者。该消费者将
poll
下来的消息,不进行消费,直接转发到新建的主题上。此时,新的主题的多个分区的多个消费者就开始一起消费了。
如何实现延时队列¶
应用场景¶
订单创建后,超过 30 分钟没有支付,则需要取消订单,这种场景可以通过延时队列来实现。
解决方案¶
- Kafka 中创建多个 Topic,每个 Topic 表示延时的间隔,如
topic_30m
。 - 消息发送者发送消息到相应的 Topic,并带上消息的发送时间。
- 消费者订阅相应的 Topic,消费时长轮询消费整个 Topic 中的消息。
- 判断消费的发送时间和当前比较当前时间是否超过预设的值,如 30 分钟。
- 如果超过,去数据库中修改订单状态为已取消。
- 如果未超过,记录当前消息的 Offset 并不再继续消费之后的消息,下次消费 Offset 之后的消息继续进行判断。