31.SpringBoot整合Kafka
31.SpringBoot整合Kafka
介绍
Kafka在实时流处理中,经常会和Flink结合一起使用,Flink接收来自Kafka的流数据进行处理,那么源源不断的Kafka数据怎么来的呢?如果是生产环境会有大量真实数据过来,但是测试环境我们要怎么模拟生产环境生产大量实时数据呢?
下面我们来整合SpringBoot来模拟生产环境产生大量数据的情况
业务
我们模拟每隔一秒向Kafka的主题中发送一条消息,使用定时任务实现
POM
新建一个SpringBoot项目,添加下面的依赖
<!--kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!--fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.69</version>
</dependency>
YML
配置Kafka生产者消费者和服务地址以及端口
spring:
kafka:
bootstrap-servers: 121.5.160.142:9092
producer:
retries: 0
acks: 1
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: consumer-group
enable-auto-commit: true
auto-commit-interval: 100
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
server:
port: 80
Kafka Producer
声明Kafka生产者发送消息,这里我们将User实体序列化成JSONString作为消息,通过@Scheduled(fixedDelay = 1000 * 1)注解每隔一秒发送一个消息
package com.example.kafka.controller;
import com.alibaba.fastjson.JSON;
import com.example.kafka.dto.User;
import com.example.kafka.util.DateUtil;
import com.example.kafka.util.NumberUtil;
import com.example.kafka.util.UUIDUtil;
import com.example.kafka.util.id.IdGenerator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.text.ParseException;
@RestController
@Slf4j
public class KafkaProducer {
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
@Scheduled(fixedDelay = 1000 * 1)
public void sendMessage() throws ParseException {
//声明实体
User user = new User();
user.setId(IdGenerator.nextCommonId());
user.setUsername(UUIDUtil.nextUUID().substring(0,10));
user.setPassword(UUIDUtil.nextUUID().substring(0,10));
user.setAddress(UUIDUtil.nextUUID().substring(0,10));
user.setEmail(UUIDUtil.nextUUID().substring(0,10));
user.setProfile(UUIDUtil.nextUUID().substring(0,10));
user.setBirthday(DateUtil.randomDate());
user.setRegisterDay(DateUtil.randomDate());
user.setLoginDay(DateUtil.randomDate());
user.setStatus(NumberUtil.getStatusInt());
user.setAccount(NumberUtil.accountDecimal());
user.setBalance(NumberUtil.balanceDecimal());
user.setAge(NumberUtil.getAgeInt());
user.setSex(NumberUtil.getSexInt());
user.setAvatar(UUIDUtil.nextUUID().substring(0,10));
user.setLevel(NumberUtil.getLevelInt());
//发送消息
kafkaTemplate.send("kafka", JSON.toJSONString(user));
log.info("kafka producer send message : {}", JSON.toJSONString(user));
}
}
Kafka Consumer
Kafka消费者,从kafka主题消费消息
package com.example.kafka.controller;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Component
@Slf4j
public class KafkaConsumer {
//不指定group,默认取yml里配置的
@KafkaListener(topics = {"kafka"})
public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent()) {
Object msg = optional.get();
log.info("kafka consumer receive message : {}", msg);
}
}
}
启动类
注意添加@EnableScheduling注解启动定时任务功能
package com.example.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
}
效果
生产者定时每隔一秒发送任务,消费者消费任务,都会在日志中打印出来,如下
来源:https://blog.csdn.net/weixin_41405524/article/details/125762379