1:引入依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.5.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.28</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba.fastjson2</groupId> <artifactId>fastjson2</artifactId> <version>2.0.12</version> </dependency> </dependencies>
2:application.yml配置
server: port: 8081 spring: application: name: application-kafka kafka: bootstrap-servers: localhost:9092 #这个是kafka的地址,对应你server.properties中配置的 producer: batch-size: 16384 #批量大小 acks: -1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) retries: 10 # 消息发送重试次数 # transaction-id-prefix: tx_1 #事务id前缀 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer properties: linger: ms: 2000 #提交延迟 # partitioner: #指定分区器 # class: com.example.kafkademo.config.CustomizePartitioner consumer: group-id: testGroup #默认的消费组ID enable-auto-commit: true #是否自动提交offset auto-commit-interval: 2000 #提交offset延时 # 当kafka中没有初始offset或offset超出范围时将自动重置offset # earliest:重置为分区中最小的offset; # latest:重置为分区中最新的offset(消费分区中新产生的数据); # none:只要有一个分区不存在已提交的offset,就抛出异常; auto-offset-reset: latest max-poll-records: 500 #单次拉取消息的最大条数 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: session: timeout: ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作) request: timeout: ms: 18000 # 消费请求的超时时间 listener: missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错 type: batch #设置批量消费
3:简单生产
package com.example.kafkademo.demo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; /** * @Auther: * @Date: 2023/10/19 16:22 * @Description: com.example.kafkademo.demo * @version: 1.0 */ @RestController public class kafkaProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; @GetMapping("/kafka/normal/{message}") public void sendNormalMessage(@PathVariable("message") String message) { kafkaTemplate.send("sb_topic", message); } }
4:简单消费
package com.example.kafkademo.demo; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * @Auther: * @Date: 2023/10/19 16:24 * @Description: com.example.kafkademo.demo * @version: 1.0 */ @Component public class KafkaConsumer { //监听消费 @KafkaListener(topics = {"sb_topic"}) public void onNormalMessage(ConsumerRecord<String, Object> record) { System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" + record.value()); } }
5:带回调的生产者
kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法
/** * 回调的第一种写法 * @param message */ @GetMapping("/kafka/callbackOne/{message}") public void sendCallbackOneMessage(@PathVariable("message") String message) { kafkaTemplate.send("sb_topic", message).addCallback(new SuccessCallback<SendResult<String, Object>>() { //成功的回调 @Override public void onSuccess(SendResult<String, Object> success) { // 消息发送到的topic String topic = success.getRecordMetadata().topic(); // 消息发送到的分区 int partition = success.getRecordMetadata().partition(); // 消息在分区内的offset long offset = success.getRecordMetadata().offset(); System.out.println("发送消息成功1:" + topic + "-" + partition + "-" + offset); } }, new FailureCallback() { //失败的回调 @Override public void onFailure(Throwable throwable) { System.out.println("发送消息失败1:" + throwable.getMessage()); } }); }
/** * 回调的第二种写法 * @param message */ @GetMapping("/kafka/callbackTwo/{message}") public void sendCallbackTwoMessage(@PathVariable("message") String message) { kafkaTemplate.send("sb_topic", message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable throwable) { System.out.println("发送消息失败2:"+throwable.getMessage()); } @Override public void onSuccess(SendResult<String, Object> result) { System.out.println("发送消息成功2:" + result.getRecordMetadata().topic() + "-" + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset()); } }); }