本片不对kafak原理做过多解释,只是介绍一下kafka和spring boot耦合的方式。
依赖引入
在gradle依赖配置当中引入以下依赖:
1 2
| "org.springframework.kafka:spring-kafka", "com.google.code.gson:gson:2.8.5"
|
编写生产者
生产者代码第一要使用@Component
注册到spring容器当中:
1 2 3 4 5 6 7 8 9 10 11
| @Component public class KafkaProducer {
@Autowired private KafkaTemplate<String,String> kafkaTemplate;
public void sendKafkaMessage(KafkaMessage kafkaMessage){ this.kafkaTemplate.send("myTopic", new Gson().toJson(kafkaMessage)); }
}
|
编写消费者
和生产者一样,消费者也需要注册到spring 容器当中,并且要用KafkaListener
表示监听的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Component public class KafkaConsumer {
@KafkaListener(topics = "myTopic",groupId = "myGroup") public void obtainMessage(ConsumerRecord<String, String> consumerRecord){ System.out.println("ontainMessage invoked!"); String topic =consumerRecord.topic(); String key = consumerRecord.key(); String value = consumerRecord.value(); int partition = consumerRecord.partition(); long timeStamp = consumerRecord.timestamp();
System.out.println("topic: " + topic); System.out.println("key: " + key); System.out.println("value: " + value); System.out.println("partition: " + partition); System.out.println("timeStamp: " + timeStamp);
}
}
|
kafka配置信息
在application.yml文件当中配置相应的broker地址、序列化方式等信息:
1 2 3 4 5 6 7 8 9 10
| kafka: producer: bootstrap-servers: localhost:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: bootstrap-servers: localhost:9092 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
对外提供webapi
新建一个controller,实现在web端发送消息的功能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @RestController @RequestMapping(value = "/kafka", produces = MediaType.APPLICATION_JSON_UTF8_VALUE) public class KafkaController {
@Autowired private KafkaProducer kafkaProducer;
@RequestMapping(value = "message", method = RequestMethod.GET) public KafkaMessage sendKafkaMessages(@RequestParam(name = "id") long id, @RequestParam(name = "username") String username, @RequestParam(name = "password") String password){ System.out.println("send kafka message invoked!"); KafkaMessage kafkaMessage = new KafkaMessage(); kafkaMessage.setId(id);; kafkaMessage.setUsername(username); kafkaMessage.setPasseord(password); kafkaMessage.setDate(new Date()); kafkaProducer.sendKafkaMessage(kafkaMessage); return kafkaMessage; }
@RequestMapping(value = "sendMessage2", method = RequestMethod.POST) public KafkaMessage sendMessage2(@RequestBody KafkaMessage kafkaMessage){ System.out.println("send kafka message2 invoked!"); kafkaMessage.setDate(new Date()); this.kafkaProducer.sendKafkaMessage(kafkaMessage); return kafkaMessage; }
}
|