1:引入依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.5.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.28</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba.fastjson2</groupId> <artifactId>fastjson2</artifactId> <version>2.0.12</version> </dependency> </dependencies>
2:application.yml配置
server:
port: 8081
spring:
application:
name: application-kafka
kafka:
bootstrap-servers: localhost:9092 #这个是kafka的地址,对应你server.properties中配置的
producer:
batch-size: 16384 #批量大小
acks: -1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
retries: 10 # 消息发送重试次数
# transaction-id-prefix: tx_1 #事务id前缀
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
linger:
ms: 2000 #提交延迟
# partitioner: #指定分区器
# class: com.example.kafkademo.config.CustomizePartitioner
consumer:
group-id: testGroup #默认的消费组ID
enable-auto-commit: true #是否自动提交offset
auto-commit-interval: 2000 #提交offset延时
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
auto-offset-reset: latest
max-poll-records: 500 #单次拉取消息的最大条数
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
session:
timeout:
ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作)
request:
timeout:
ms: 18000 # 消费请求的超时时间
listener:
missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错
type: batch #设置批量消费
3:简单生产
package com.example.kafkademo.demo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
/**
* @Auther:
* @Date: 2023/10/19 16:22
* @Description: com.example.kafkademo.demo
* @version: 1.0
*/
@RestController
public class kafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@GetMapping("/kafka/normal/{message}")
public void sendNormalMessage(@PathVariable("message") String message) {
kafkaTemplate.send("sb_topic", message);
}
}
4:简单消费
package com.example.kafkademo.demo;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @Auther:
* @Date: 2023/10/19 16:24
* @Description: com.example.kafkademo.demo
* @version: 1.0
*/
@Component
public class KafkaConsumer {
//监听消费
@KafkaListener(topics = {"sb_topic"})
public void onNormalMessage(ConsumerRecord<String, Object> record) {
System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +
record.value());
}
}
5:带回调的生产者
kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法
/**
* 回调的第一种写法
* @param message
*/
@GetMapping("/kafka/callbackOne/{message}")
public void sendCallbackOneMessage(@PathVariable("message") String message) {
kafkaTemplate.send("sb_topic", message).addCallback(new SuccessCallback<SendResult<String, Object>>() {
//成功的回调
@Override
public void onSuccess(SendResult<String, Object> success) {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功1:" + topic + "-" + partition + "-" + offset);
}
}, new FailureCallback() {
//失败的回调
@Override
public void onFailure(Throwable throwable) {
System.out.println("发送消息失败1:" + throwable.getMessage());
}
});
} /**
* 回调的第二种写法
* @param message
*/
@GetMapping("/kafka/callbackTwo/{message}")
public void sendCallbackTwoMessage(@PathVariable("message") String message) {
kafkaTemplate.send("sb_topic", message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
System.out.println("发送消息失败2:"+throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("发送消息成功2:" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
}
});
}