org.springframework.boot spring-boot-starter-parent2.6.6
org.springframework.cloud spring-cloud-starter-stream-kafkaorg.springframework.kafka spring-kafkaorg.springframework.cloud spring-cloud-streamio.projectreactor.kafka reactor-kafka1.3.11
关键架包reactor-kafka
2.创建监听监听Templatepackage com.kittlen.cloud.reactivekafka.config; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate; import org.springframework.stereotype.Component; import reactor.kafka.receiver.ReceiverOptions; import java.util.Collections; @Component public class ReactiveConsumerConfig { @Bean public ReceiverOptions3.根据Template创建对应的实际监听业务kafkaReceiverOptions(@Value(value = "${kafka.consumer.topic}") String topic, KafkaProperties kafkaProperties) { ReceiverOptions basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties()); return basicReceiverOptions.subscription(Collections.singletonList(topic)); } @Bean public ReactiveKafkaConsumerTemplate reactiveKafkaConsumerTemplate(ReceiverOptions kafkaReceiverOptions) { return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions); } }
package com.kittlen.cloud.reactivekafka.consumers; import com.alibaba.fastjson.JSON; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.kafka.receiver.ReceiverRecord; @Service public class ReactiveConsumerService implements CommandLineRunner { protected Log log = LogFactory.getLog(ReactiveConsumerService.class); @Autowired ReactiveKafkaConsumerTemplate4.监听多topic 使用同一个ReactiveKafkaConsumerTemplaterequestMsgReactiveKafkaConsumerTemplate; private Flux > dgkConsummer() { Flux > monoFlux = requestMsgReactiveKafkaConsumerTemplate .receiveAutoAck() .map(cr -> handler(cr)) .doOnError(throwable -> log.error("something bad happened while consuming : {}", throwable.getMessage())); return monoFlux; } //返回类型根据实际需求自己进行调整 //在该方法里面如果直接抛出异常,会直接导致停止对该topic的监听 protected Mono handler(ConsumerRecord consumerRecord) { try{ return Mono.just(true); }catch (Exception e) { return Mono.error(e); } } @Override public void run(String... args) { dgkConsummer().subscribe(m -> m.subscribe()); } }
创建kafkaReceiverOptions时订阅多个topic
@Bean public ReceiverOptionskafkaReceiverOptions(KafkaProperties kafkaProperties) { ReceiverOptions basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties()); return basicReceiverOptions.subscription(Stream.of("topic1", "topic2").collect(Collectors.toList())); }
处理消息时根据topic进行判断
protected Mono定义多个监听template,再根据template创建对应的实际监听业务handler(ConsumerRecord consumerRecord) { try{ if(consumerRecord.topic().equals("topic1")){ / return Mono.just(true); }catch (Exception e) { return Mono.error(e); } }
@Bean public ReceiverOptionskafkaReceiverOptions1(KafkaProperties kafkaProperties) { ReceiverOptions basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties()); return basicReceiverOptions.subscription(Collections.singletonList("topic1")); } @Bean public ReactiveKafkaConsumerTemplate reactiveKafkaConsumerTemplate1(ReceiverOptions kafkaReceiverOptions1) { return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions1); } @Bean public ReceiverOptions kafkaReceiverOptions2(KafkaProperties kafkaProperties) { ReceiverOptions basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties()); return basicReceiverOptions.subscription(Collections.singletonList("topic2")); } @Bean public ReactiveKafkaConsumerTemplate reactiveKafkaConsumerTemplate2(ReceiverOptions kafkaReceiverOptions2) { return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions2); }
@Resource(name = "reactiveKafkaConsumerTemplate1") ReactiveKafkaConsumerTemplaterequestMsgReactiveKafkaConsumerTemplate1; @Resource(name = "reactiveKafkaConsumerTemplate2") ReactiveKafkaConsumerTemplate requestMsgReactiveKafkaConsumerTemplate2; private Flux > dgkConsummer1() { Flux > monoFlux = requestMsgReactiveKafkaConsumerTemplate1 .receiveAutoAck() .map(cr -> handler1(cr)) .doOnError(throwable -> log.error("something bad happened while consuming : {}", throwable.getMessage())); return monoFlux; } //返回类型根据实际需求自己进行调整 protected Mono handler1(ConsumerRecord consumerRecord) { try{ return Mono.just(true); }catch (Exception e) { return Mono.error(e); } } private Flux > dgkConsummer2() { Flux > monoFlux = requestMsgReactiveKafkaConsumerTemplate2 .receiveAutoAck() .map(cr -> handler2(cr)) .doOnError(throwable -> log.error("something bad happened while consuming : {}", throwable.getMessage())); return monoFlux; } //返回类型根据实际需求自己进行调整 protected Mono handler2(ConsumerRecord consumerRecord) { try{ return Mono.just(true); }catch (Exception e) { return Mono.error(e); } } @Override public void run(String... args) { dgkConsummer().subscribe(m -> m.subscribe()); dgkConsummer2().subscribe(m -> m.subscribe()); }