广告取链场景Kafka消费者最佳实践

广告取链场景-Kafka消费者最佳实践

1. 背景

我们正在开发一个广告取链服务,其中包含一个消息生产者控量服务。这个服务需要监控Kafka消息的堆积量,当堆积量达到一定水平(目前配置为10万条消息)时,服务会停止生产新的消息。为了实现这个功能,我们需要一个可靠的方法来获取Kafka topic的未消费消息数量。

2. 问题描述

初始实现中,我们遇到了两个主要问题:

  1. 并发访问异常:使用同一个KafkaConsumer实例进行消费和查询操作,导致"java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access"错误。
  2. 查询范围不精确:仅通过consumer group ID查询,可能会包含其他topic的堆积量,导致结果不准确。

2.1 初始代码实现

以下是初始的代码实现,存在上述问题:

private Map<TopicPartition, Long> lagOf(String groupID) {
    readWriteLock.readLock().lock();
    try {
        ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets(groupID);
        Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
        Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(consumedOffsets.keySet());
        return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
                entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
    } catch (Exception e) {
        log.warn("获取每个分区未消费的数量 lag() error, Exception-->", e);
        return Collections.emptyMap();
    } finally {
        readWriteLock.readLock().unlock();
    }
}

public long getKafkaCount(String topicName) {
    readWriteLock.readLock().lock();
    long count = 0;
    try {
        Map<TopicPartition, Long> map = lagOf(groupId);
        for (Long i : map.values()) {
            count = count + i;
        }
    } catch (Exception e) {
        log.error("getKafkaCount kafka topicName: {}, 返回count: {}, Exception-->", topicName, count, e);
    } finally {
        readWriteLock.readLock().unlock();
        log.info("kafka message count for topic {}: {}", topicName, 0);
    }
    log.info("kafka message count: {}", count);
    return count;
}

KafkaConsumer在代码设计上不允许多线程来进行访问的,通过阅读源码发现初始化时候会设置默认currentThread,并不允许修改,在多线程使用同一kafkaConsumer,即时通过加锁也还是会报出"java.util.ConcurrentModificationException"异常。

private void acquire() {
  long threadId = Thread.currentThread().getId();
  if (threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {
    throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
  } else {
    this.refcount.incrementAndGet();
  }
}

3. 优化过程

3.1 第一版优化

为解决上述问题,我们进行了第一次优化:

  1. 创建新的KafkaConsumer实例:每次查询时创建一个新的KafkaConsumer,避免并发访问问题。
  2. 指定topic查询:通过订阅特定topic,确保只查询目标topic的堆积量。

优化后的代码如下:

public synchronized long getTopicUnconsumedCount(String topicName) {
    long totalLag = 0;
    try {
        KafkaConsumer<String,String> kafkaConsumer = createKafkaConsumer(topicName);
        List<TopicPartition> topicPartitions = kafkaConsumer.partitionsFor(topicName).stream()
                .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
                .collect(Collectors.toList());

        ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupId);
        Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);

        Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitions);

        for (TopicPartition partition : topicPartitions) {
            Long endOffset = endOffsets.get(partition);
            OffsetAndMetadata consumedOffset = consumedOffsets.get(partition);
            if (endOffset != null && consumedOffset != null) {
                long lag = endOffset - consumedOffset.offset();
                totalLag += lag;
            }
        }
    } catch (Exception e) {
        log.error("查询 topic {} 未消费消息数量时发生错误: {}", topicName, e.getMessage(), e);
    }
    log.info("Topic [{}] 未消费的消息总数: {}", topicName, totalLag);
    return totalLag;
}

public KafkaConsumer<String, String> createKafkaConsumer(String topic) {
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConfiguration.consumerConfigs());
    consumer.subscribe(Collections.singleton(topic));
    return consumer;
}

然而,这个版本仍然存在一些问题:

  • 潜在的内存泄漏:每次查询都创建新的KafkaConsumer实例,但没有及时释放。
  • 可能触发再均衡:新创建的消费者subscribe操作可能影响现有的消费者,引起不必要的再均衡。

image-20240929101049141

image-20240929095616258

image-20240929095901457

3.2 第二版优化

针对第一版的问题,我们进行了进一步优化:

  1. 使用try-with-resources语句:确保KafkaConsumer实例在使用后被正确关闭,避免内存泄漏。
  2. 移除consumer.subscribe调用:在查询时不订阅topic,减少触发再均衡的可能性。
  3. 使用独立的consumer group:为查询操作创建一个单独的consumer group,避免影响现有的消费者。

最终优化后的代码如下:

public long getTopicUnconsumedCount(String topicName) {
    long totalLag = 0;
    try (KafkaConsumer<String, String> kafkaConsumer = createKafkaConsumer()) {
        List<TopicPartition> topicPartitions = kafkaConsumer.partitionsFor(topicName).stream()
                .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
                .collect(Collectors.toList());

        Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitions);

        Map<TopicPartition, OffsetAndMetadata> consumedOffsets = adminClient.listConsumerGroupOffsets(groupId)
                .partitionsToOffsetAndMetadata()
                .get(10, TimeUnit.SECONDS);

        for (TopicPartition partition : topicPartitions) {
            Long endOffset = endOffsets.get(partition);
            OffsetAndMetadata consumedOffset = consumedOffsets.get(partition);
            if (endOffset != null && consumedOffset != null) {
                long lag = endOffset - consumedOffset.offset();
                totalLag += lag;
            }
        }
    } catch (Exception e) {
        log.error("查询 topic {} 未消费消息数量时发生错误: {}", topicName, e.getMessage(), e);
    }
    log.info("Topic [{}] 堆积消费的消息总数: {}", topicName, totalLag);
    return totalLag;
}

public KafkaConsumer<String, String> createKafkaConsumer() {
    Properties props = new Properties();
    props.putAll(kafkaConfiguration.consumerConfigs());
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId + "-lag-checker");
    return new KafkaConsumer<>(props);
}

image-20240929095748425

image-20240929095823434

3.3 第三版优化(最终版)

针对第二版性能进行进一步优化:

  1. 虽然没错创建使用完再释放KafkaConsumer能实现相关功能,也不会造成内存泄漏,但young GC过于频繁,修改会使用newSingleThreadExecutor线程池来进行绑定KafkaConsumer;
  2. 将拉取时间间隔max.poll.interval.ms 从1天调整为1小时,更为合理避免一些异常情况可以及时的Rebalance;
//使用单线程线程池,这样可以对KafkaConsumer使用都会统一线程ID
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
//内存缓存记录当前消息堆积情况,60秒更新一次
private AtomicLong nowTotalLag = new AtomicLong(-1);

/**
  * 项目启动完毕以及类实例都创建完毕后执行
  */
private void initializeKafkaConsumer() {
  executorService.submit(() -> {
    try {
      KafkaConsumer<String, String> consumer = createKafkaConsumer();
      kafkaConsumerRef.set(consumer);
      while (true) {
        long totalLag = fetchTopicUnconsumedCount(kafkaConfiguration.getTopicName());
        nowTotalLag.set(totalLag);
        TimeUnit.SECONDS.sleep(60L);
      }
    } catch (WakeupException e) {
      // Ignore exception if closing
      log.warn("KafkaConsumer woken up", e);
    } catch (Exception e) {
      log.error("Error in KafkaConsumer. Will recreate after 5 seconds", e);
      try {
        TimeUnit.SECONDS.sleep(5);
      } catch (InterruptedException ie) {
        Thread.currentThread().interrupt();
      }
    } finally {
      KafkaConsumer<String, String> consumer = kafkaConsumerRef.getAndSet(null);
      if (consumer != null) {
        consumer.close();
      }
    }
  });
}

image-20241006101345092

image-20241006101357166

image-20241006101417983

4. 原理分析

4.1 KafkaConsumer线程安全性

KafkaConsumer不是线程安全的。Kafka的设计假设每个消费者实例由单个线程使用。当多个线程尝试同时访问同一个KafkaConsumer实例时,会导致并发修改异常。

https://mermaid.ink/img/pako:eNpNkE1vwjAMhv9K5DMctmMPk0bpLhPSBL01PViJSyNoXJlE2kT573MLQ_PJfvT6672CY09QwFFw7E29tdFovDef2J2w5HjJA0lr1us3s2nqXgi9eWkfqgWXf_j1gTcznvaKJlM1NY_BmS-UFFLg-Gwu_6vuqFqQLnVZhGIyO_ahCw6Xxurb0Thnk_loKhGWFlag1w0YvH5wnYdYSD0NZKHQ1FOH-Zws2HhTKebEh5_ooEiSaQXC-dhD0eH5olUePSbaBlQnhiclHxLL7u7RYtXtF86GY3s?type=png

4.2 Consumer Group和Offset管理

Kafka使用Consumer Group来管理消费者和它们的offset。每个Consumer Group独立维护自己的offset,这允许不同的应用程序以不同的速率消费同一个topic而不相互影响。

https://mermaid.ink/img/pako:eNptkUFrwzAMhf-K0bmFLsccBmvSdIeVjXa3OAcRK4mhsYMjH0rpf5_nhJCO6aTn7z1kpDvUVhGk0DocOvFxlkaEeiu_7aDrSmy3r2JffqFjzdoasatmQyTZirw8kXxFkpnsIzmUmTWj78mJo7N-WILZhCeRr8UULP4Gk6dgsQ7O4hDFsfxsmpFYnNBgSz0ZXoYW0fH-jyOpYANhWI9ahf3cf_0SuAtUQhpaRQ36K0uQ5hGs6NlebqaGlJ2nDYQvth2kDV7HoPygkCnXGPbcL6-kNFt3mi4QD_H4AVXjd5Q?type=png

4.3 Kafka再均衡机制

当Consumer Group中的消费者数量发生变化时(如新消费者加入或现有消费者离开),Kafka会触发再均衡过程。这个过程会重新分配分区给消费者,可能会暂时中断消息处理。

https://mermaid.ink/img/pako:eNqdkk1LxDAQhv9KmHMF095yWLAVBKsgq6fSy9jOdoNtUvMhyLL_3alrZdftXswpyfMQXibvDhrbEijw9B7JNHSrsXM41EbwGtEF3egRTRCFFOhFYY2PAzkhF4z0xEgXjOzEyM6NfBJyZ9_IncOqnGhlbUk0zkIhr1arqlTi3moj7pyN4w9IL4HsAqhKBrkSL053Hedb0yv2yEM54JxpIZW48V53RjxNyYK2Rlwf8XSByyOeLfD0T64Hwg_6R7A14Vm0ZH59TrcgSUiA_2NA3XITdpNfQ9jSQDUo3ra0wdiHGmqzZxVjsM-fpgEVXKQEOGe3BbXB3vMpji2GuUa_t9TqYN3joWvfldt_AWh9yVM?type=png

5. 结论

通过三次优化,我们解决了初始实现中的主要问题:

  1. 并发访问问题:通过为每次查询创建新的KafkaConsumer实例解决。
  2. 查询精确性:通过指定topic和使用独立的consumer group确保只查询目标topic的堆积量。
  3. 内存管理:使用try-with-resources确保及时释放资源。
  4. 减少再均衡影响:避免不必要的subscribe操作,减少对现有消费者的影响。
  5. 资源利用:增加内存缓存,减少进行查询的次数。

6. 最终方案

最终的实现方案具有以下特点:

  1. 使用独立线程对的KafkaConsumer实例进行访问操作,比如:newSingleThreadExecutor。(最佳实践
  2. 使用专用的consumer group进行查询,避免影响生产环境的消费者。(最佳实践)
  3. 直接查询分区信息和offset,而不进行subscribe操作。(最佳实践)
  4. 通过比较每个分区的end offset和consumed offset来精确计算未消费的消息数量。

总结

经过三轮修改,优化了Kafka消息堆积查询服务,提高了系统的稳定性和查询准确性,第一版虽解决了一部分问题,但引入了新的内存管理问题。第二版通过合理的资源管理和消费者组控制,达到了预期效果。第三版通过使用newSingleThreadExecutor减少KafkaConsumer频繁创建与释放以及通过缓存减少操作的频率。

在使用KafkaConsumer时,我们应更注重类的相关特性,以及类的并发安全和资源管理的,此案例梳理了几个最佳实践,同时我们第三版这个方案保证了查询的准确性,又避免了对现有Kafka消费者的影响,个人理解为 Kafka消费者深入研发的一次最佳实践。

0%