Apache Kafka & Spring Boot Integration Guide
Spring Boot provides a Kafka connector, which allows you to easily set up a Kafka producer or consumer in a Spring application. To integrate Kafka with a Spring Boot application, you will need to add the spring-kafka dependency to your project. You will also need to configure the Kafka connection properties and provide the necessary serializers and deserializers for the messages being sent or received. Once this is done, you can use the Spring Kafka template to send and receive messages to and from a Kafka topic.
Example of how to set up a Kafka producer in a Spring Boot application:
1. Add the spring-kafka dependency to your project's pom.xml file:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.4.RELEASE</version>
</dependency>
2. Create a configuration class that sets up the Kafka connection properties and the necessary serializers and deserializers.
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
3. Inject KafkaTemplate into your service class, and use it to send messages to a topic.
@Service
public class MyService {
private final KafkaTemplate<String, String> kafkaTemplate;
public MyService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
Example of how you can add error handling and retries to the Kafka producer setup in a Spring Boot application:
1. Create a class that implements the org.springframework.kafka.listener.ErrorHandler interface. This class will handle any errors that occur while sending messages to Kafka.
@Slf4j
public class KafkaErrorHandler implements ErrorHandler {
@Override
public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {
log.error("Error while sending message to Kafka: ", e);
}
}
2. In the configuration class, add the error handler bean and configure the retries.
@Configuration
public class KafkaProducerConfig {
// existing configuration
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setErrorHandler(new KafkaErrorHandler());
kafkaTemplate.setRetries(3);
kafkaTemplate.setRecoveryCallback(new RecoveryCallback<String, String>() {
@Override
public String recover(ProducerRecord<String, String> producerRecord) {
log.info("Recovering message: {}", producerRecord);
return producerRecord.value();
}
});
return kafkaTemplate;
}
}
With this setup, any errors that occur while sending messages to Kafka will be handled by the KafkaErrorHandler class, and the message will be retried up to 3 times before giving up. The RecoveryCallback can be used to implement any custom logic for recovering failed messages.
Comments
Post a Comment