博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
springboot kafka @@KafkaListener 批处理消息
阅读量:6463 次
发布时间:2019-06-23

本文共 5572 字,大约阅读时间需要 18 分钟。

转载自 » 

 

application.properties

kafka.consumer.servers=10.100.136.33:9092,10.100.136.34:9092,10.100.136.35:9092 kafka.consumer.enable.auto.commit=false kafka.consumer.session.timeout=15000 kafka.consumer.auto.commit.interval=100 kafka.consumer.auto.offset.reset=earliest kafka.consumer.group.id=kafka-test-group kafka.consumer.concurrency=10 kafka.consumer.maxPollRecordsConfig=100 kafka.producer.servers=10.100.136.33:9092,10.100.136.34:9092,10.100.136.35:9092 kafka.producer.retries=1 kafka.producer.batch.size=4096 kafka.producer.linger=1 kafka.producer.buffer.memory=40960
KafkaConsumerConfig.java
@Configuration @EnableKafka public class KafkaConsumerConfig {
@Value("${kafka.consumer.servers}") private String servers; @Value("${kafka.consumer.enable.auto.commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.session.timeout}") private String sessionTimeout; @Value("${kafka.consumer.auto.commit.interval}") private String autoCommitInterval; @Value("${kafka.consumer.group.id}") private String groupId; @Value("${kafka.consumer.auto.offset.reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; @Value("${kafka.consumer.maxPollRecordsConfig}") private int maxPollRecordsConfig; @Bean public KafkaListenerContainerFactory
> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory
factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(1500); factory.setBatchListener(true);//@KafkaListener 批量消费 每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);//设置提交偏移量的方式 return factory; } public ConsumerFactory
consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map
consumerConfigs() {
Map
propsMap = new HashMap<>(8); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig);//每个批次获取数 return propsMap; } }
 
ConsumerController.java
@KafkaListener(topics = {"test-topic"},containerFactory = "kafkaListenerContainerFactory") public void consumerMsg(List
records , Acknowledgment ack){
try {
for (ConsumerRecord record : records) {
printStream.append(String.format("offset = %d, key = %s, value = %s%n \n", record.offset(), record.key(), record.value())); } } catch (Exception e) {
e.printStackTrace(); } finally {
ack.acknowledge();//手动提交偏移量 } }
KafkaProducerConfig.java
@Configuration @EnableKafka public class KafkaProducerConfig {
@Value("${kafka.producer.servers}") private String servers; @Value("${kafka.producer.retries}") private int retries; @Value("${kafka.producer.batch.size}") private int batchSize; @Value("${kafka.producer.linger}") private int linger; @Value("${kafka.producer.buffer.memory}") private int bufferMemory; @Bean public KafkaTemplate
kafkaTemplate() {
return new KafkaTemplate(producerFactory()); } public ProducerFactory
producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs()); } public Map
producerConfigs() {
Map
props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } }
 
ProducerController.java
ListenableFuture listenableFuture = kafkaTemplate.send("test-topic", "测试测试测试车");             //发送成功后回调             SuccessCallback successCallback = new SuccessCallback() {
@Override public void onSuccess(Object result) {
System.out.println("发送成功"); } }; //发送失败回调 FailureCallback failureCallback = new FailureCallback() {
@Override public void onFailure(Throwable ex) {
System.out.println("发送失败"); } }; listenableFuture.addCallback(successCallback,failureCallback);
 

 

转载于:https://www.cnblogs.com/dyg-ljx/p/9072981.html

你可能感兴趣的文章
Android学习笔记21-ImageView获取网络图片
查看>>
线段树分治
查看>>
git代码冲突
查看>>
poll
查看>>
解析查询 queryString 请求参数的函数
查看>>
学生选课系统数据存文件
查看>>
我的毕设总结所用的技术和只是要点 基于stm32F4的AGV嵌入式控制系统的设计
查看>>
JMeter—断言
查看>>
C++的新类创建:继承与组合
查看>>
asp操作access提示“无法从指定的数据表中删除”
查看>>
git bash 风格调整
查看>>
bzoj4589 Hard Nim
查看>>
java实现pdf旋转_基于Java实现PDF文本旋转倾斜
查看>>
python time库3.8_python3中datetime库,time库以及pandas中的时间函数区别与详解
查看>>
贪吃蛇java程序简化版_JAVA简版贪吃蛇
查看>>
poi java web_WebPOI JavaWeb 项目 导出excel表格(.xls) Develop 238万源代码下载- www.pudn.com...
查看>>
linux 脚本map,Linux Shell Map的用法详解
查看>>
如何在linux系统下配置共享文件夹,如何在windows和Linux系统之间共享文件夹.doc
查看>>
linux操作系统加固软件,系统安全:教你Linux操作系统的安全加固
查看>>
linux中yum源安装dhcp,24.Linux系统下动态网络源部署方法(dhcpd)
查看>>