Apache Kafka & Spring Boot Integration Guide

Spring Boot provides a Kafka connector, which allows you to easily set up a Kafka producer or consumer in a Spring application. To integrate Kafka with a Spring Boot application, you will need to add the spring-kafka dependency to your project. You will also need to configure the Kafka connection properties and provide the necessary serializers and deserializers for the messages being sent or received. Once this is done, you can use the Spring Kafka template to send and receive messages to and from a Kafka topic.

Example of how to set up a Kafka producer in a Spring Boot application:


1. Add the spring-kafka dependency to your project's pom.xml file:


<dependency>

    <groupId>org.springframework.kafka</groupId>

    <artifactId>spring-kafka</artifactId>

    <version>2.5.4.RELEASE</version>

</dependency>



2. Create a configuration class that sets up the Kafka connection properties and the necessary serializers and deserializers.


@Configuration

public class KafkaProducerConfig {


    @Value("${kafka.bootstrap-servers}")

    private String bootstrapServers;


    @Bean

    public ProducerFactory<String, String> producerFactory() {

        Map<String, Object> configProps = new HashMap<>();

        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(configProps);

    }


    @Bean

    public KafkaTemplate<String, String> kafkaTemplate() {

        return new KafkaTemplate<>(producerFactory());

    }

}


3. Inject KafkaTemplate into your service class, and use it to send messages to a topic.

@Service

public class MyService {

    private final KafkaTemplate<String, String> kafkaTemplate;


    public MyService(KafkaTemplate<String, String> kafkaTemplate) {

        this.kafkaTemplate = kafkaTemplate;

    }


    public void sendMessage(String topic, String message) {

        kafkaTemplate.send(topic, message);

    }

}


Example of how you can add error handling and retries to the Kafka producer setup in a Spring Boot application:


1. Create a class that implements the org.springframework.kafka.listener.ErrorHandler interface. This class will handle any errors that occur while sending messages to Kafka.


@Slf4j

public class KafkaErrorHandler implements ErrorHandler {


    @Override

    public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {

        log.error("Error while sending message to Kafka: ", e);

    }

}


2. In the configuration class, add the error handler bean and configure the retries.


@Configuration

public class KafkaProducerConfig {

    // existing configuration

    @Bean

    public KafkaTemplate<String, String> kafkaTemplate() {

        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());

        kafkaTemplate.setErrorHandler(new KafkaErrorHandler());

        kafkaTemplate.setRetries(3);

        kafkaTemplate.setRecoveryCallback(new RecoveryCallback<String, String>() {

            @Override

            public String recover(ProducerRecord<String, String> producerRecord) {

                log.info("Recovering message: {}", producerRecord);

                return producerRecord.value();

            }

        });

        return kafkaTemplate;

    }

}


With this setup, any errors that occur while sending messages to Kafka will be handled by the KafkaErrorHandler class, and the message will be retried up to 3 times before giving up. The RecoveryCallback can be used to implement any custom logic for recovering failed messages.

Comments