资讯 小学 初中 高中 语言 会计职称 学历提升 法考 计算机考试 医护考试 建工考试 教育百科
栏目分类:
子分类:
返回
空麓网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
空麓网 > 计算机考试 > 软件开发 > 后端开发 > Java

spring boot配置双Kafka方法

Java 更新时间: 发布时间: 计算机考试归档 最新发布

spring boot配置双Kafka方法

第一步:application.yml的配置

server:
  port: 8080
spring:
  application:
    name: demo 
  kafka:
    one:
      bootstrap-servers: xxx.xxx.xxx.xxx
      consumer:
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
        group-id: xxxx
        enable-auto-commit: true
    two:
      bootstrap-servers: xxx.xxx.xxx.xxx
      consumer:
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        group-id: xxxx
        enable-auto-commit: true

第二步:配置config

@EnableKafka
@Configuration
public class xxxxConfig {
    @Value("${spring.kafka.one.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.one.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.one.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Bean
    public KafkaTemplate xxxxTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    KafkaListenerContainerFactory> xxxxxxContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(20);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    private ProducerFactory producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map producerConfigs() {
        Map props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.ACKS_CONFIG, "all"); // 不能写成 1
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    private Map consumerConfigs() {
        Map props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//        value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
        return props;
    }
}
@EnableKafka
@Configuration
public class xxxxConfig {
    @Value("${spring.kafka.two.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.two.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.two.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Bean
    public KafkaTemplate xxxxxxxTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    KafkaListenerContainerFactory> xxxxxxContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(6);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    private ProducerFactory producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map producerConfigs() {
        Map props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.ACKS_CONFIG, "all"); // 不能写成 1
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    private Map consumerConfigs() {
        Map props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        //        value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
        return props;
    }
}

注意!注意!注意!!!代码中的一些字段名自己改一下。xxxx之类的换成自己的就行

第三步:

@Resource
private KafkaTemplate xxxOneTemplate;
@Resource
private KafkaTemplate xxxxTwoTemplate;

直接在你要用到的类中直接引用就行。

跟着以上三步走就可以简单的配置两个Kafka了,还有跟高级的spring切面切点的方法作者还没有研究成功就不献丑了。

转载请注明:文章转载自 http://www.konglu.com/
本文地址:http://www.konglu.com/it/1094426.html
免责声明:

我们致力于保护作者版权,注重分享,被刊用文章【spring boot配置双Kafka方法】因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理,本文部分文字与图片资源来自于网络,转载此文是出于传递更多信息之目的,若有来源标注错误或侵犯了您的合法权益,请立即通知我们,情况属实,我们会第一时间予以删除,并同时向您表示歉意,谢谢!

我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2023 成都空麓科技有限公司

ICP备案号:蜀ICP备2023000828号-2