Mastering Real-time Data Processing with KafkaItemReader and KafkaItemWriter in Spring Batch

In this tutorial, we will explore the powerful KafkaItemReader and KafkaItemWriter components in Spring Batch. These components facilitate seamless integration with Apache Kafka, enabling efficient and scalable data processing in batch jobs. We will explain the concepts behind KafkaItemReader and KafkaItemWriter and delve into their various use cases.

Table of Content:

Understanding KafkaItemReader

KafkaItemReader is a Spring Batch item reader implementation that allows us to consume data from Apache Kafka topics. It reads messages from Kafka partitions and converts them into Java objects for processing within the Spring Batch job. Here’s how it works:

  • Configuration: To use KafkaItemReader, you need to set up a Kafka consumer configuration, specifying the bootstrap servers, consumer group, key and value deserializers, and other relevant properties.
  • Polling: KafkaItemReader uses the KafkaConsumer API to poll for new messages in Kafka topics. It fetches a batch of messages on each poll, which can be configured with a specific batch size.
  • Data Conversion: The retrieved messages from Kafka are deserialized into Java objects based on the provided key and value deserializers. This ensures that the data is in a usable format for further processing in the Spring Batch job.

Use Cases for KafkaItemReader

  • Real-time Data Processing: KafkaItemReader is ideal for real-time data processing scenarios, where you need to continuously process incoming data from Kafka topics.
  • Data Transformation: It enables data transformation by reading data from Kafka topics and converting it into a different format before further processing.
  • Microservices Communication: KafkaItemReader facilitates communication between microservices by consuming data from Kafka topics produced by other services and processing it in batch jobs.

Understanding KafkaItemWriter

KafkaItemWriter is a Spring Batch item writer implementation that writes data back to Apache Kafka topics. It takes processed Java objects from the Spring Batch job and sends them to Kafka topics for consumption by other systems or services. Here’s how it works:

  • Configuration: Similar to KafkaItemReader, KafkaItemWriter requires Kafka producer configuration, specifying the bootstrap servers, key and value serializers, and other relevant properties.
  • Batching and Sending: KafkaItemWriter collects processed data in batches and writes them to Kafka topics using the KafkaProducer API. You can configure the batch size to optimize performance.

Use Cases for KafkaItemWriter

  • Aggregating Data: KafkaItemWriter enables aggregating data from various sources in a Spring Batch job and producing it into Kafka for further consumption by downstream systems.
  • Event Sourcing: It can be used to implement event sourcing by writing processed data into Kafka topics as events for auditing, historical tracking, or event-driven architectures.
  • Data Synchronization: KafkaItemWriter is useful for synchronizing data between different systems by writing processed data into Kafka topics consumed by other services or applications.

Example

To understand KafkaItemReader and KafkaItemWriter, let us create a simple spring batch project. In this simple project, We’ll read JSON data from two Kafka topics, combine the data into a single JSON object, and then write the result to another Kafka topic.

KafkaItemReader and KafkaItemWriter

Prerequisites

To follow this tutorial, you should have the following installed:

  1. Java Development Kit (JDK) 8 or higher
  2. Apache Kafka
  3. Spring Boot (latest version)
  4. Maven or Gradle (for project setup)
  5. Start the Kafka server and create three topics: input_topic_1, input_topic_2, and output_topic.

Setting Up the Spring Batch Project

Let’s create a Spring Boot project and set up the necessary dependencies.

  1. Using Spring Boot Initializr, generate a new Spring Boot project with the following dependencies:
    • Spring Batch
    • Spring Kafka
    • Spring Boot DevTools (optional, for development convenience)
  2. Import the project into your favorite IDE.

Implementing KafkaItemReader

In this section, we’ll implement the KafkaItemReader to read JSON data from the input_topic_1 and input_topic_2 Kafka topics.

Open the application.properties file and add the following Kafka configurations:

spring.kafka.bootstrap-servers=localhost:9092

Create a new class named JsonItem to represent the JSON data structure:

public class JsonItem {
    // Define your JSON properties here
    private String property1;
    private int property2;
}

Implement the JsonItemReader class that extends the ItemReader<JsonItem> interface:

import org.springframework.batch.item.kafka.KafkaItemReader;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

public class JsonItemReader extends KafkaItemReader<JsonItem> {
    public JsonItemReader() {
        super(new DefaultKafkaConsumerFactory<>(new HashMap<>()), Arrays.asList("input_topic_1", "input_topic_2"));
    }
}

Implementing KafkaItemWriter

Next, we’ll implement the KafkaItemWriter to write the combined JSON data to the output_topic Kafka topic.

Implement the JsonItemWriter class that extends the ItemWriter<JsonItem> interface:

import org.springframework.batch.item.kafka.builder.KafkaItemWriterBuilder;

public class JsonItemWriter extends KafkaItemWriter<JsonItem> {
    public JsonItemWriter() {
        super(new KafkaItemWriterBuilder<JsonItem>()
            .producerProperties(new HashMap<>())
            .topic("output_topic")
            .build());
    }
}

Creating the Spring Batch Job

Now, let’s put everything together and create the Spring Batch job.

Create a new class named JsonCombineJob:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class JsonCombineJob {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private JobLauncher jobLauncher;

    @Bean
    public Job jsonCombineJob() {
        return jobBuilderFactory.get("jsonCombineJob")
                .start(step())
                .build();
    }

    @Bean
    public Step step() {
        return stepBuilderFactory.get("step")
                .<JsonItem, JsonItem>chunk(10)
                .reader(new JsonItemReader())
                .writer(new JsonItemWriter())
                .build();
    }

    public void runJob() throws Exception {
        JobParameters jobParameters = new JobParametersBuilder()
                .addLong("time", System.currentTimeMillis())
                .toJobParameters();

        JobExecution execution = jobLauncher.run(jsonCombineJob(), jobParameters);
        System.out.println("Job Exit Status: " + execution.getStatus());
    }
}

Running the Job

Finally, let’s execute our Spring Batch job to start the data processing:

public class MainApplication {
    public static void main(String[] args) throws Exception {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(JsonCombineJob.class);
        JsonCombineJob job = context.getBean(JsonCombineJob.class);
        job.runJob();
        context.close();
    }
}

In this tutorial, we’ve successfully created a real-time data pipeline using KafkaItemReader and KafkaItemWriter in Spring Batch. We learned how to read JSON data from two Kafka topics, combine the data into a single JSON object, and then write the result to another Kafka topic. Spring Batch provides powerful abstractions for building data processing workflows, and when combined with Apache Kafka, it becomes a robust solution for real-time data processing.

Feel free to extend this example by adding more sophisticated JSON processing logic or integrating with other data sources and sinks. Happy coding!


By Kurukshetran


Discover more about Efficient Data Processing with Spring Batch.

1 Response

Leave a Reply

Your email address will not be published. Required fields are marked *

Post comment