某天,深夜突然收到线上报警,某个业务的kafka消息消费发生堆积了。
【阿里云】尊敬的用户xxx , 22:34 您的消息队列 Kafka实例instanceId=xxxxx,consumerGroup=xxxxx 消息堆积量当前值超过10000Count, 请登录云监控关注
由于发生在深夜,很快确定对应时间点没有进行任何线上变更,流量也并没有突增,consumer业务服务也正常,那是怎么回事呢?
之后在查看kafka的监控信息时发现了异常,rebalance 次数明显增多,大概一分钟一次。
rebalance是什么
kafka有几个关键术语,
- topic
- partition
- producer
- consumer
- consumer group
- broker
这些术语都是kafka的基础知识,略过不再解释。
我们都知道,一个partition只能被分配给一个consumer消费。
比如现在有1、2、3、4四个partition,有ab两个consumer,那么分配关系可以是a消费1和2,b消费3和4。
consumer | partition 1 | partition 2 | partition 3 | partition 4 |
---|---|---|---|---|
consumer a | ✅ | ✅ | ||
consumer b | ✅ | ✅ |
当然也可以是a消费1和3,b消费2和4。
这个分配关系不是一成不变的,某些情况下会被打破。
比如b挂了,那b消费的partition 3和partition 4就需要分配给consumer a。
比如新加入了consumer c,那就需要从四个partition里分些给c消费。
当之前的分配规则被打破,所有consumer要重新确认自己需要消费哪些partition,这个重新达成共识的过程就是rebalance。
哪些情况会发生rebalance
分配关系里有两方,一方是consumer,一方是需要被消费的partition。任意一方的数量发生变化时,都会导致rebalance。
一个consumer group可以订阅多个topic,订阅时支持模糊匹配。topic数量发生变化时,需要被消费的partition也会发生变化,所以也会发生rebalance。
即rebalance会发生在:
- consumer数量变化时
- consumer group订阅的topic数量变化时
- topic内partition数量变化时
consumer是怎么被判定为活跃/不活跃的?
这里引入一个新角色,consumer group coordinator。
coordinator其实就是一个broker,consumer group里的每个consumer会向它发心跳。如果超过一定时间没发心跳,就会被判定为不活跃,coordinator会触发rebalance。
不通版本的kafka心跳行为有点不一样。在0.10.1之前,consumer是单线程的,拉取消息(poll)或者提交偏移量时发送心跳。
/**
* Do one round of polling. In addition to checking for new data, this does any needed
* heart-beating, auto-commits, and offset updates.
* @param timeout The maximum time to block in the underlying poll
* @return The fetched records (may be empty)
*/
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
//......省略.......
long now = time.milliseconds();
// execute delayed tasks (e.g. autocommits and heartbeats) prior to fetching records
client.executeDelayedTasks(now);
//......省略.......
}
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
//......省略.......
// ensure the commit has a chance to be transmitted (without blocking on its completion).
// Note that commits are treated as heartbeats by the coordinator, so there is no need to
// explicitly allow heartbeats through delayed task execution.
client.pollNoWakeup();
}
/**
* Initialize the coordination manager.
*/
public AbstractCoordinator(ConsumerNetworkClient client,
String groupId,
int sessionTimeoutMs,
int heartbeatIntervalMs,
Metrics metrics,
String metricGrpPrefix,
Time time,
long retryBackoffMs) {
//......省略.......
this.heartbeat = new Heartbeat(this.sessionTimeoutMs, heartbeatIntervalMs, time.milliseconds());
this.heartbeatTask = new HeartbeatTask();
//......省略.......
}
public Heartbeat(long timeout,
long interval,
long now) {
if (interval >= timeout)
throw new IllegalArgumentException("Heartbeat must be set lower than the session timeout");
this.timeout = timeout;
this.interval = interval;
this.lastSessionReset = now;
}
以上代码为kafka-client 0.10.0.0版,可以看到heartbeat主要有两个参数:session.timeout.ms、heartbeat.interval.ms。
在0.10.1版本后,心跳由单独的HeartbeatThread线程执行。
private synchronized void startHeartbeatThreadIfNeeded() {
if (heartbeatThread == null) {
heartbeatThread = new HeartbeatThread();
heartbeatThread.start();
}
}
public Heartbeat(long sessionTimeout,
long heartbeatInterval,
long maxPollInterval,
long retryBackoffMs) {
if (heartbeatInterval >= sessionTimeout)
throw new IllegalArgumentException("Heartbeat must be set lower than the session timeout");
this.sessionTimeout = sessionTimeout;
this.heartbeatInterval = heartbeatInterval;
this.maxPollInterval = maxPollInterval;
this.retryBackoffMs = retryBackoffMs;
}
以上代码为kafka-client 0.10.1.1版,可以看到heartbeat新增了一个参数:max.poll.interval.ms。
The new Java Consumer now supports heartbeating from a background thread. There is a new configuration max.poll.interval.ms which controls the maximum time between poll invocations before the consumer will proactively leave the group (5 minutes by default).
根据官方说明,如果两次poll间隔超过了max.poll.interval.ms,consumer就会主动离开组,coordinator会发起rebalance。
本次故障到底是怎么发生的
我们使用的kafka-client版本是0.10.0.0,producer由于某些原因提交了一些格外大的数据,consumer消费的时长陡增,大于session.timeout.ms,发生了频繁rebalance。
解决这个问题,可以调大session.timeout.ms,可以升级kafka-client到0.10.1以上。
其他遗留问题
- rebalance期间会丢消息吗,会重复消费吗
参考:
- 一篇文章带你快速搞定Kafka术语
- 消费者组重平衡能避免吗?
- What is the difference in Kafka between a Consumer Group Coordinator and a Consumer Group Leader?
- 使用消息队列Kafka版时消费客户端频繁出现Rebalance
- Notable changes in 0.10.1.0