转载自 »
application.properties
kafka.consumer.servers=10.100.136.33:9092,10.100.136.34:9092,10.100.136.35:9092 kafka.consumer.enable.auto.commit=false kafka.consumer.session.timeout=15000 kafka.consumer.auto.commit.interval=100 kafka.consumer.auto.offset.reset=earliest kafka.consumer.group.id=kafka-test-group kafka.consumer.concurrency=10 kafka.consumer.maxPollRecordsConfig=100 kafka.producer.servers=10.100.136.33:9092,10.100.136.34:9092,10.100.136.35:9092 kafka.producer.retries=1 kafka.producer.batch.size=4096 kafka.producer.linger=1 kafka.producer.buffer.memory=40960
KafkaConsumerConfig.java
@Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.consumer.servers}") private String servers; @Value("${kafka.consumer.enable.auto.commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.session.timeout}") private String sessionTimeout; @Value("${kafka.consumer.auto.commit.interval}") private String autoCommitInterval; @Value("${kafka.consumer.group.id}") private String groupId; @Value("${kafka.consumer.auto.offset.reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; @Value("${kafka.consumer.maxPollRecordsConfig}") private int maxPollRecordsConfig; @Bean public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(1500); factory.setBatchListener(true);//@KafkaListener 批量消费 每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);//设置提交偏移量的方式 return factory; } public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map consumerConfigs() { Map propsMap = new HashMap<>(8); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig);//每个批次获取数 return propsMap; } }
ConsumerController.java
@KafkaListener(topics = {"test-topic"},containerFactory = "kafkaListenerContainerFactory") public void consumerMsg(Listrecords , Acknowledgment ack){ try { for (ConsumerRecord record : records) { printStream.append(String.format("offset = %d, key = %s, value = %s%n \n", record.offset(), record.key(), record.value())); } } catch (Exception e) { e.printStackTrace(); } finally { ack.acknowledge();//手动提交偏移量 } }
KafkaProducerConfig.java
@Configuration @EnableKafka public class KafkaProducerConfig { @Value("${kafka.producer.servers}") private String servers; @Value("${kafka.producer.retries}") private int retries; @Value("${kafka.producer.batch.size}") private int batchSize; @Value("${kafka.producer.linger}") private int linger; @Value("${kafka.producer.buffer.memory}") private int bufferMemory; @Bean public KafkaTemplatekafkaTemplate() { return new KafkaTemplate(producerFactory()); } public ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } public Map producerConfigs() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } }
ProducerController.java
ListenableFuture listenableFuture = kafkaTemplate.send("test-topic", "测试测试测试车"); //发送成功后回调 SuccessCallback successCallback = new SuccessCallback() { @Override public void onSuccess(Object result) { System.out.println("发送成功"); } }; //发送失败回调 FailureCallback failureCallback = new FailureCallback() { @Override public void onFailure(Throwable ex) { System.out.println("发送失败"); } }; listenableFuture.addCallback(successCallback,failureCallback);