第一步:application.yml的配置
server: port: 8080 spring: application: name: demo kafka: one: bootstrap-servers: xxx.xxx.xxx.xxx consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer group-id: xxxx enable-auto-commit: true two: bootstrap-servers: xxx.xxx.xxx.xxx consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer group-id: xxxx enable-auto-commit: true
第二步:配置config
@EnableKafka @Configuration public class xxxxConfig { @Value("${spring.kafka.one.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.one.consumer.group-id}") private String groupId; @Value("${spring.kafka.one.consumer.enable-auto-commit}") private boolean enableAutoCommit; @Bean public KafkaTemplatexxxxTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean KafkaListenerContainerFactory > xxxxxxContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(20); factory.getContainerProperties().setPollTimeout(3000); return factory; } private ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } private Map producerConfigs() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.ACKS_CONFIG, "all"); // 不能写成 1 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } private Map consumerConfigs() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer return props; } }
@EnableKafka @Configuration public class xxxxConfig { @Value("${spring.kafka.two.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.two.consumer.group-id}") private String groupId; @Value("${spring.kafka.two.consumer.enable-auto-commit}") private boolean enableAutoCommit; @Bean public KafkaTemplatexxxxxxxTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean KafkaListenerContainerFactory > xxxxxxContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(6); factory.getContainerProperties().setPollTimeout(3000); return factory; } private ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } private Map producerConfigs() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.ACKS_CONFIG, "all"); // 不能写成 1 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } private Map consumerConfigs() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); // value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer return props; } }
注意!注意!注意!!!代码中的一些字段名自己改一下。xxxx之类的换成自己的就行
第三步:
@Resource private KafkaTemplatexxxOneTemplate; @Resource private KafkaTemplate xxxxTwoTemplate;
直接在你要用到的类中直接引用就行。
跟着以上三步走就可以简单的配置两个Kafka了,还有跟高级的spring切面切点的方法作者还没有研究成功就不献丑了。