SpringBoot配置Kafka
- 设置kafka可以被外网访问(修改kafka下面的config下的
server.properties )
//开启监听网址为服务器ip
listeners=PLAINTEXT://192.168.111.100:9092
//配置该kafka可以被访问的外网地址
advertised.listeners=PLAINTEXT://ip:9092
配置后重启
- SpringBoot配置kafka application.yml
spring:
kafka:
//刚才配置的kafka可以被外网访问的地址
bootstrap-servers: 192.168.111.100:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
//到kafka的config下consume中查看group-id
group-id: test-consumer-group
enable-auto-commit: true
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- Producer
@Service
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private static final String TOPIC = "users";
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String message) {
logger.info(String.format("生产者的消息为:%s", message));
this.kafkaTemplate.send(TOPIC, message);
}
}
- Consume
@Service
public class Consumer {
private final Logger logger= LoggerFactory.getLogger(Consumer.class);
@KafkaListener(topics = "users",groupId = "test-consumer-group")
public void consume(String message){
logger.info(String.format("消费者收到的消息为:%s",message));
}
}
- Controller
@RestController
@RequestMapping("/kafka")
public class KafkaController {
private Producer producer;
@Autowired
public KafkaController(Producer producer){
this.producer= producer;
}
@RequestMapping("/publish")
public String sendMessageToKafkaTopic(@RequestParam("message") String message){
this.producer.sendMessage(message);
return "sucess";
}
}
|