SpringBoot Kafka 主动消费

一般在SpringBoot使用kafka,通常用@KafkaListener注解来进行监听消费。
然而某些时候,我们不需要监听而要以定时拉取的方式进行消费,本文主要就是简单记录此方式的实现方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
//批次大小
private static Integer batchSize = 3;
//批次时间
private static Integer batchTime = 5;

@Resource
private KafkaProperties kafkaProperties;

@Test
void kafkaTest() {

    //配置消费者
    Map<String, Object> properties = kafkaProperties.buildConsumerProperties();
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");//指定消费组
    properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, batchSize); //指定批次消费条数
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); //禁用自动提交
    //建立消费者
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    //获取所有partition信息
    List<PartitionInfo> partitionList = kafkaConsumer.partitionsFor("test-consumer");
    Map<TopicPartition, Integer> topicPartitionMap = MapUtil.newHashMap();
    partitionList.forEach(item
            -> topicPartitionMap.put(new TopicPartition(item.topic(), item.partition()), item.partition()));
    //订阅topic并设置起始offset
    kafkaConsumer.assign(topicPartitionMap.keySet());
    topicPartitionMap.forEach(kafkaConsumer::seek);

    //启动消费线程(仅用作示例)
    ((Runnable) () -> {
        Duration duration = Duration.ofSeconds(batchTime);
        long batchTimeMs = batchTime * 1000L;
        Map<Integer, ConsumerRecord<String, String>> recordMap = MapUtil.newHashMap();
        while (true) {
            try {
                TimeInterval interval = DateUtil.timer();
                ConsumerRecords<String, String> records = kafkaConsumer.poll(duration);

                int count = records.count();
                log.info("测试消费获取到数据 => {} 条", count);
                if (count > 0) {
                    //处理数据
                    List<String> values = CollUtil.newArrayList();
                    records.forEach(item -> values.add(item.value()));
                    //记录当前批次每个Partition最小offset
                    for (ConsumerRecord<String, String> item : records) {
                        values.add(item.value());
                        if (recordMap.containsKey(item.partition())) {
                            ConsumerRecord<String, String> original = recordMap.get(item.partition());
                            if (item.offset() < original.offset()) {
                                recordMap.put(item.partition(), item);
                            }
                        } else {
                            recordMap.put(item.partition(), item);
                        }
                    }
                    //执行业务,抛出异常
                    throw new RuntimeException("测试错误");
                    //同步提交offset
                    kafkaConsumer.commitSync();
                    //正常提交后清除记录
                    recordMap.clear();
                }

                //批次消费达到上限,不休眠直接进行下一次消费
                if (batchSize == count) {
                    continue;
                }
                //计算消费耗时并休眠
                long used = interval.intervalMs();
                if (used < batchTimeMs) {
                    ThreadUtil.safeSleep(batchTimeMs - used);
                }

            } catch (Exception e) {
                log.error("消费出错 => {}", e.getMessage());
                recordMap.forEach((k, v) -> kafkaConsumer.seek(new TopicPartition(v.topic(), v.partition()), v.offset()));
                log.error(ExceptionUtil.stacktraceToString(e));
                ThreadUtil.safeSleep(batchTimeMs);
            }
        }
    }).run();
}

(备注:主要涉及依赖:spring-kafkahutool)