广告取链场景Kafka消费者最佳实践

广告取链场景-Kafka消费者最佳实践
1. 背景
我们正在开发一个广告取链服务,其中包含一个消息生产者控量服务。这个服务需要监控Kafka消息的堆积量,当堆积量达到一定水平(目前配置为10万条消息)时,服务会停止生产新的消息。为了实现这个功能,我们需要一个可靠的方法来获取Kafka topic的未消费消息数量。
2. 问题描述
初始实现中,我们遇到了两个主要问题:
- 并发访问异常:使用同一个KafkaConsumer实例进行消费和查询操作,导致"java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access"错误。
- 查询范围不精确:仅通过consumer group ID查询,可能会包含其他topic的堆积量,导致结果不准确。
2.1 初始代码实现
以下是初始的代码实现,存在上述问题:
private Map<TopicPartition, Long> lagOf(String groupID) {
readWriteLock.readLock().lock();
try {
ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets(groupID);
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(consumedOffsets.keySet());
return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
} catch (Exception e) {
log.warn("获取每个分区未消费的数量 lag() error, Exception-->", e);
return Collections.emptyMap();
} finally {
readWriteLock.readLock().unlock();
}
}
public long getKafkaCount(String topicName) {
readWriteLock.readLock().lock();
long count = 0;
try {
Map<TopicPartition, Long> map = lagOf(groupId);
for (Long i : map.values()) {
count = count + i;
}
} catch (Exception e) {
log.error("getKafkaCount kafka topicName: {}, 返回count: {}, Exception-->", topicName, count, e);
} finally {
readWriteLock.readLock().unlock();
log.info("kafka message count for topic {}: {}", topicName, 0);
}
log.info("kafka message count: {}", count);
return count;
}
KafkaConsumer在代码设计上不允许多线程来进行访问的,通过阅读源码发现初始化时候会设置默认currentThread,并不允许修改,在多线程使用同一kafkaConsumer,即时通过加锁也还是会报出"java.util.ConcurrentModificationException"异常。
private void acquire() {
long threadId = Thread.currentThread().getId();
if (threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
} else {
this.refcount.incrementAndGet();
}
}
3. 优化过程
3.1 第一版优化
为解决上述问题,我们进行了第一次优化:
- 创建新的KafkaConsumer实例:每次查询时创建一个新的KafkaConsumer,避免并发访问问题。
- 指定topic查询:通过订阅特定topic,确保只查询目标topic的堆积量。
优化后的代码如下:
public synchronized long getTopicUnconsumedCount(String topicName) {
long totalLag = 0;
try {
KafkaConsumer<String,String> kafkaConsumer = createKafkaConsumer(topicName);
List<TopicPartition> topicPartitions = kafkaConsumer.partitionsFor(topicName).stream()
.map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
.collect(Collectors.toList());
ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupId);
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitions);
for (TopicPartition partition : topicPartitions) {
Long endOffset = endOffsets.get(partition);
OffsetAndMetadata consumedOffset = consumedOffsets.get(partition);
if (endOffset != null && consumedOffset != null) {
long lag = endOffset - consumedOffset.offset();
totalLag += lag;
}
}
} catch (Exception e) {
log.error("查询 topic {} 未消费消息数量时发生错误: {}", topicName, e.getMessage(), e);
}
log.info("Topic [{}] 未消费的消息总数: {}", topicName, totalLag);
return totalLag;
}
public KafkaConsumer<String, String> createKafkaConsumer(String topic) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConfiguration.consumerConfigs());
consumer.subscribe(Collections.singleton(topic));
return consumer;
}
然而,这个版本仍然存在一些问题:
- 潜在的内存泄漏:每次查询都创建新的KafkaConsumer实例,但没有及时释放。
- 可能触发再均衡:新创建的消费者subscribe操作可能影响现有的消费者,引起不必要的再均衡。



3.2 第二版优化
针对第一版的问题,我们进行了进一步优化:
- 使用try-with-resources语句:确保KafkaConsumer实例在使用后被正确关闭,避免内存泄漏。
- 移除consumer.subscribe调用:在查询时不订阅topic,减少触发再均衡的可能性。
- 使用独立的consumer group:为查询操作创建一个单独的consumer group,避免影响现有的消费者。
最终优化后的代码如下:
public long getTopicUnconsumedCount(String topicName) {
long totalLag = 0;
try (KafkaConsumer<String, String> kafkaConsumer = createKafkaConsumer()) {
List<TopicPartition> topicPartitions = kafkaConsumer.partitionsFor(topicName).stream()
.map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
.collect(Collectors.toList());
Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitions);
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = adminClient.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata()
.get(10, TimeUnit.SECONDS);
for (TopicPartition partition : topicPartitions) {
Long endOffset = endOffsets.get(partition);
OffsetAndMetadata consumedOffset = consumedOffsets.get(partition);
if (endOffset != null && consumedOffset != null) {
long lag = endOffset - consumedOffset.offset();
totalLag += lag;
}
}
} catch (Exception e) {
log.error("查询 topic {} 未消费消息数量时发生错误: {}", topicName, e.getMessage(), e);
}
log.info("Topic [{}] 堆积消费的消息总数: {}", topicName, totalLag);
return totalLag;
}
public KafkaConsumer<String, String> createKafkaConsumer() {
Properties props = new Properties();
props.putAll(kafkaConfiguration.consumerConfigs());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId + "-lag-checker");
return new KafkaConsumer<>(props);
}


3.3 第三版优化(最终版)
针对第二版性能进行进一步优化:
- 虽然没错创建使用完再释放KafkaConsumer能实现相关功能,也不会造成内存泄漏,但young GC过于频繁,修改会使用newSingleThreadExecutor线程池来进行绑定KafkaConsumer;
- 将拉取时间间隔max.poll.interval.ms 从1天调整为1小时,更为合理避免一些异常情况可以及时的Rebalance;
//使用单线程线程池,这样可以对KafkaConsumer使用都会统一线程ID
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
//内存缓存记录当前消息堆积情况,60秒更新一次
private AtomicLong nowTotalLag = new AtomicLong(-1);
/**
* 项目启动完毕以及类实例都创建完毕后执行
*/
private void initializeKafkaConsumer() {
executorService.submit(() -> {
try {
KafkaConsumer<String, String> consumer = createKafkaConsumer();
kafkaConsumerRef.set(consumer);
while (true) {
long totalLag = fetchTopicUnconsumedCount(kafkaConfiguration.getTopicName());
nowTotalLag.set(totalLag);
TimeUnit.SECONDS.sleep(60L);
}
} catch (WakeupException e) {
// Ignore exception if closing
log.warn("KafkaConsumer woken up", e);
} catch (Exception e) {
log.error("Error in KafkaConsumer. Will recreate after 5 seconds", e);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
} finally {
KafkaConsumer<String, String> consumer = kafkaConsumerRef.getAndSet(null);
if (consumer != null) {
consumer.close();
}
}
});
}



4. 原理分析
4.1 KafkaConsumer线程安全性
KafkaConsumer不是线程安全的。Kafka的设计假设每个消费者实例由单个线程使用。当多个线程尝试同时访问同一个KafkaConsumer实例时,会导致并发修改异常。
4.2 Consumer Group和Offset管理
Kafka使用Consumer Group来管理消费者和它们的offset。每个Consumer Group独立维护自己的offset,这允许不同的应用程序以不同的速率消费同一个topic而不相互影响。
4.3 Kafka再均衡机制
当Consumer Group中的消费者数量发生变化时(如新消费者加入或现有消费者离开),Kafka会触发再均衡过程。这个过程会重新分配分区给消费者,可能会暂时中断消息处理。
5. 结论
通过三次优化,我们解决了初始实现中的主要问题:
- 并发访问问题:通过为每次查询创建新的KafkaConsumer实例解决。
- 查询精确性:通过指定topic和使用独立的consumer group确保只查询目标topic的堆积量。
- 内存管理:使用try-with-resources确保及时释放资源。
- 减少再均衡影响:避免不必要的subscribe操作,减少对现有消费者的影响。
- 资源利用:增加内存缓存,减少进行查询的次数。
6. 最终方案
最终的实现方案具有以下特点:
- 使用独立线程对的KafkaConsumer实例进行访问操作,比如:newSingleThreadExecutor。(最佳实践)
- 使用专用的consumer group进行查询,避免影响生产环境的消费者。(最佳实践)
- 直接查询分区信息和offset,而不进行subscribe操作。(最佳实践)
- 通过比较每个分区的end offset和consumed offset来精确计算未消费的消息数量。
总结
经过三轮修改,优化了Kafka消息堆积查询服务,提高了系统的稳定性和查询准确性,第一版虽解决了一部分问题,但引入了新的内存管理问题。第二版通过合理的资源管理和消费者组控制,达到了预期效果。第三版通过使用newSingleThreadExecutor减少KafkaConsumer频繁创建与释放以及通过缓存减少操作的频率。
在使用KafkaConsumer时,我们应更注重类的相关特性,以及类的并发安全和资源管理的,此案例梳理了几个最佳实践,同时我们第三版这个方案保证了查询的准确性,又避免了对现有Kafka消费者的影响,个人理解为 Kafka消费者深入研发的一次最佳实践。
