Mastering Real-time Data Processing with KafkaItemReader and KafkaItemWriter in Spring Batch
In this tutorial, we will explore the powerful KafkaItemReader and KafkaItemWriter components in Spring Batch. These components facilitate seamless integration with Apache Kafka, enabling efficient and scalable data processing in batch jobs. We will explain the concepts behind KafkaItemReader and KafkaItemWriter and delve into their various use cases.
Table of Content:
Understanding KafkaItemReader
KafkaItemReader is a Spring Batch item reader implementation that allows us to consume data from Apache Kafka topics. It reads messages from Kafka partitions and converts them into Java objects for processing within the Spring Batch job. Here’s how it works:
- Configuration: To use KafkaItemReader, you need to set up a Kafka consumer configuration, specifying the bootstrap servers, consumer group, key and value deserializers, and other relevant properties.
- Polling: KafkaItemReader uses the KafkaConsumer API to poll for new messages in Kafka topics. It fetches a batch of messages on each poll, which can be configured with a specific batch size.
- Data Conversion: The retrieved messages from Kafka are deserialized into Java objects based on the provided key and value deserializers. This ensures that the data is in a usable format for further processing in the Spring Batch job.
Use Cases for KafkaItemReader
- Real-time Data Processing: KafkaItemReader is ideal for real-time data processing scenarios, where you need to continuously process incoming data from Kafka topics.
- Data Transformation: It enables data transformation by reading data from Kafka topics and converting it into a different format before further processing.
- Microservices Communication: KafkaItemReader facilitates communication between microservices by consuming data from Kafka topics produced by other services and processing it in batch jobs.
Understanding KafkaItemWriter
KafkaItemWriter is a Spring Batch item writer implementation that writes data back to Apache Kafka topics. It takes processed Java objects from the Spring Batch job and sends them to Kafka topics for consumption by other systems or services. Here’s how it works:
- Configuration: Similar to KafkaItemReader, KafkaItemWriter requires Kafka producer configuration, specifying the bootstrap servers, key and value serializers, and other relevant properties.
- Batching and Sending: KafkaItemWriter collects processed data in batches and writes them to Kafka topics using the KafkaProducer API. You can configure the batch size to optimize performance.
Use Cases for KafkaItemWriter
- Aggregating Data: KafkaItemWriter enables aggregating data from various sources in a Spring Batch job and producing it into Kafka for further consumption by downstream systems.
- Event Sourcing: It can be used to implement event sourcing by writing processed data into Kafka topics as events for auditing, historical tracking, or event-driven architectures.
- Data Synchronization: KafkaItemWriter is useful for synchronizing data between different systems by writing processed data into Kafka topics consumed by other services or applications.
Example
To understand KafkaItemReader and KafkaItemWriter, let us create a simple spring batch project. In this simple project, We’ll read JSON data from two Kafka topics, combine the data into a single JSON object, and then write the result to another Kafka topic.
Prerequisites
To follow this tutorial, you should have the following installed:
- Java Development Kit (JDK) 8 or higher
- Apache Kafka
- Spring Boot (latest version)
- Maven or Gradle (for project setup)
- Start the Kafka server and create three topics:
input_topic_1
,input_topic_2
, andoutput_topic
.
Setting Up the Spring Batch Project
Let’s create a Spring Boot project and set up the necessary dependencies.
- Using Spring Boot Initializr, generate a new Spring Boot project with the following dependencies:
- Spring Batch
- Spring Kafka
- Spring Boot DevTools (optional, for development convenience)
- Import the project into your favorite IDE.
Implementing KafkaItemReader
In this section, we’ll implement the KafkaItemReader to read JSON data from the input_topic_1
and input_topic_2
Kafka topics.
Open the application.properties
file and add the following Kafka configurations:
spring.kafka.bootstrap-servers=localhost:9092
Create a new class named JsonItem
to represent the JSON data structure:
public class JsonItem {
// Define your JSON properties here
private String property1;
private int property2;
}
Implement the JsonItemReader
class that extends the ItemReader<JsonItem>
interface:
import org.springframework.batch.item.kafka.KafkaItemReader;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
public class JsonItemReader extends KafkaItemReader<JsonItem> {
public JsonItemReader() {
super(new DefaultKafkaConsumerFactory<>(new HashMap<>()), Arrays.asList("input_topic_1", "input_topic_2"));
}
}
Implementing KafkaItemWriter
Next, we’ll implement the KafkaItemWriter to write the combined JSON data to the output_topic
Kafka topic.
Implement the JsonItemWriter
class that extends the ItemWriter<JsonItem>
interface:
import org.springframework.batch.item.kafka.builder.KafkaItemWriterBuilder;
public class JsonItemWriter extends KafkaItemWriter<JsonItem> {
public JsonItemWriter() {
super(new KafkaItemWriterBuilder<JsonItem>()
.producerProperties(new HashMap<>())
.topic("output_topic")
.build());
}
}
Creating the Spring Batch Job
Now, let’s put everything together and create the Spring Batch job.
Create a new class named JsonCombineJob
:
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class JsonCombineJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JobLauncher jobLauncher;
@Bean
public Job jsonCombineJob() {
return jobBuilderFactory.get("jsonCombineJob")
.start(step())
.build();
}
@Bean
public Step step() {
return stepBuilderFactory.get("step")
.<JsonItem, JsonItem>chunk(10)
.reader(new JsonItemReader())
.writer(new JsonItemWriter())
.build();
}
public void runJob() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
JobExecution execution = jobLauncher.run(jsonCombineJob(), jobParameters);
System.out.println("Job Exit Status: " + execution.getStatus());
}
}
Running the Job
Finally, let’s execute our Spring Batch job to start the data processing:
public class MainApplication {
public static void main(String[] args) throws Exception {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(JsonCombineJob.class);
JsonCombineJob job = context.getBean(JsonCombineJob.class);
job.runJob();
context.close();
}
}
In this tutorial, we’ve successfully created a real-time data pipeline using KafkaItemReader and KafkaItemWriter in Spring Batch. We learned how to read JSON data from two Kafka topics, combine the data into a single JSON object, and then write the result to another Kafka topic. Spring Batch provides powerful abstractions for building data processing workflows, and when combined with Apache Kafka, it becomes a robust solution for real-time data processing.
Feel free to extend this example by adding more sophisticated JSON processing logic or integrating with other data sources and sinks. Happy coding!
By Kurukshetran
Discover more about Efficient Data Processing with Spring Batch.
1 Response