消费者位移:Consumer Offset ,表示消费者消费到的位置
消息位移:Offset , 表示消息在分区中的位置
位移提交消费者每次拉取消息时拉取到的都是还没有消费过的消息,要实现这一点,就需要将消费的进度保存到某个地方,这样即使消费者重启也可以得到之前的消费位置继续消费。
在新版本的kafka中,将消费位移数据保存到_consumer_offsets这个topic中,这个持久化消费位移数据的过程叫位移提交
手动提交和自动提交从用户的角度来看,位移提交分为手动提交和自动提交
自动提交通过配置enable.auto.commit=true(默认值)来表示是否开启自动提交,自动提交并不是每消费一条消息就提交一次,而是定期提交
通过配置auto.commit.interval.ms来表示自动提交的时间间隔(单位:ms,默认值:5000)。
手动提交手动提交需要配置enable.auto.commit参数为false
对于手动提交,KafkaConsumer提供了两个API:
KafkaConsumer#commitSync()``KafkaConsumer#commitAsync()
分别表示同步提交和异步提交
同步提交普通的提交
try{ ConsumerRecordsconsumerRecords = kafkaConsumer.poll(Duration.ofMillis(10000)); while (isRunning.get()){ for (ConsumerRecord consumerRecord : consumerRecords) { // 处理consumerRecord } kafkaConsumer.commitSync(); } }catch (Exception e){ // 处理异常 }finally { kafkaConsumer.close(); }
带参数的位移提交
while (isRunning.get()){ ConsumerRecordsconsumerRecords = kafkaConsumer.poll(Duration.ofMillis(10000)); for (ConsumerRecord consumerRecord : consumerRecords) { // 处理consumerRecord long offset = consumerRecord.offset(); TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition()); kafkaConsumer.commitSync(Collections.singletonMap(topicPartition,new OffsetAndMetadata(offset +1))); }
在实际应用中,很少会每消费一条消息就提交一次
按分区粒度同步提交
while (isRunning.get()){ ConsumerRecords异步提交consumerRecords = kafkaConsumer.poll(Duration.ofMillis(10000)); for (TopicPartition partition : consumerRecords.partitions()) { List > partitionRecords = consumerRecords.records(partition); for (ConsumerRecord consumerRecord : consumerRecords) { // 处理consumerRecord } long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); kafkaConsumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(lastConsumedOffset+1))); } }
while (isRunning.get()){ ConsumerRecordsconsumerRecords = kafkaConsumer.poll(Duration.ofMillis(10000)); for (ConsumerRecord consumerRecord : consumerRecords) { // 处理consumerRecord } // 普通的异步提交 // kafkaConsumer.commitAsync(); // 带回调的异步提交 kafkaConsumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map offsets, Exception exception) { if (Objects.isNull(exception)){ System.out.println(offsets); }else { // 异常处理 } } }); }