栏目分类:
子分类:
返回
文库吧用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
文库吧 > IT > 软件开发 > 后端开发 > Java

Spring-ReactiveKafkaConsumer(反应式消费kafka消息)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Spring-ReactiveKafkaConsumer(反应式消费kafka消息)

1.引入相应的kafka架包
    
        org.springframework.boot
        spring-boot-starter-parent
        2.6.6
    
            
        
            org.springframework.cloud
            spring-cloud-starter-stream-kafka
        
        
            org.springframework.kafka
            spring-kafka
        
        
            org.springframework.cloud
            spring-cloud-stream
        
        
            io.projectreactor.kafka
            reactor-kafka
            1.3.11
        

关键架包reactor-kafka

2.创建监听监听Template
package com.kittlen.cloud.reactivekafka.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.stereotype.Component;
import reactor.kafka.receiver.ReceiverOptions;

import java.util.Collections;


@Component
public class ReactiveConsumerConfig {

    @Bean
    public ReceiverOptions kafkaReceiverOptions(@Value(value = "${kafka.consumer.topic}") String topic, KafkaProperties kafkaProperties) {
        ReceiverOptions basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
        return basicReceiverOptions.subscription(Collections.singletonList(topic));
    }

    @Bean
    public ReactiveKafkaConsumerTemplate reactiveKafkaConsumerTemplate(ReceiverOptions kafkaReceiverOptions) {
        return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions);
    }
}

3.根据Template创建对应的实际监听业务
package com.kittlen.cloud.reactivekafka.consumers;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.ReceiverRecord;


@Service
public class ReactiveConsumerService implements CommandLineRunner {

    protected Log log = LogFactory.getLog(ReactiveConsumerService.class);

    @Autowired
    ReactiveKafkaConsumerTemplate requestMsgReactiveKafkaConsumerTemplate;

    private Flux> dgkConsummer() {
        Flux> monoFlux = requestMsgReactiveKafkaConsumerTemplate
                .receiveAutoAck()
                .map(cr -> handler(cr))
                .doOnError(throwable -> log.error("something bad happened while consuming : {}", throwable.getMessage()));
        return monoFlux;
    }

    //返回类型根据实际需求自己进行调整
    //在该方法里面如果直接抛出异常,会直接导致停止对该topic的监听
    protected Mono handler(ConsumerRecord consumerRecord) {
        try{
             
            return Mono.just(true);
        }catch (Exception e) {
            return Mono.error(e);
        }
    }

    @Override
    public void run(String... args) {
        dgkConsummer().subscribe(m -> m.subscribe());
    }
}

4.监听多topic 使用同一个ReactiveKafkaConsumerTemplate

创建kafkaReceiverOptions时订阅多个topic

 @Bean
    public ReceiverOptions kafkaReceiverOptions(KafkaProperties kafkaProperties) {
        ReceiverOptions basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
        return basicReceiverOptions.subscription(Stream.of("topic1", "topic2").collect(Collectors.toList()));
    }

处理消息时根据topic进行判断

  protected Mono handler(ConsumerRecord consumerRecord) {
        try{
            if(consumerRecord.topic().equals("topic1")){
             /
            return Mono.just(true);
        }catch (Exception e) {
            return Mono.error(e);
        }
    }
定义多个监听template,再根据template创建对应的实际监听业务
    @Bean
    public ReceiverOptions kafkaReceiverOptions1(KafkaProperties kafkaProperties) {
        ReceiverOptions basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
        return basicReceiverOptions.subscription(Collections.singletonList("topic1"));
    }

    @Bean
    public ReactiveKafkaConsumerTemplate reactiveKafkaConsumerTemplate1(ReceiverOptions kafkaReceiverOptions1) {
        return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions1);
    }
    @Bean
    public ReceiverOptions kafkaReceiverOptions2(KafkaProperties kafkaProperties) {
        ReceiverOptions basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
        return basicReceiverOptions.subscription(Collections.singletonList("topic2"));
    }

    @Bean
    public ReactiveKafkaConsumerTemplate reactiveKafkaConsumerTemplate2(ReceiverOptions kafkaReceiverOptions2) {
        return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions2);
    }

    @Resource(name = "reactiveKafkaConsumerTemplate1")
    ReactiveKafkaConsumerTemplate requestMsgReactiveKafkaConsumerTemplate1;
    
    @Resource(name = "reactiveKafkaConsumerTemplate2")
    ReactiveKafkaConsumerTemplate requestMsgReactiveKafkaConsumerTemplate2;

     private Flux> dgkConsummer1() {
        Flux> monoFlux = requestMsgReactiveKafkaConsumerTemplate1
                .receiveAutoAck()
                .map(cr -> handler1(cr))
                .doOnError(throwable -> log.error("something bad happened while consuming : {}", throwable.getMessage()));
        return monoFlux;
    }

    //返回类型根据实际需求自己进行调整
    protected Mono handler1(ConsumerRecord consumerRecord) {
        try{
             
            return Mono.just(true);
        }catch (Exception e) {
            return Mono.error(e);
        }
    }
     private Flux> dgkConsummer2() {
        Flux> monoFlux = requestMsgReactiveKafkaConsumerTemplate2
                .receiveAutoAck()
                .map(cr -> handler2(cr))
                .doOnError(throwable -> log.error("something bad happened while consuming : {}", throwable.getMessage()));
        return monoFlux;
    }

    //返回类型根据实际需求自己进行调整
    protected Mono handler2(ConsumerRecord consumerRecord) {
        try{
             
            return Mono.just(true);
        }catch (Exception e) {
            return Mono.error(e);
        }
    }

    @Override
    public void run(String... args) {
        dgkConsummer().subscribe(m -> m.subscribe());
        dgkConsummer2().subscribe(m -> m.subscribe());
    }
转载请注明:文章转载自 www.wk8.com.cn
本文地址:https://www.wk8.com.cn/it/1036450.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 wk8.com.cn

ICP备案号:晋ICP备2021003244-6号