Mastering Spring Batch AggregateItemReader with Example
Spring Batch is a powerful framework for batch processing in Java-based applications. It provides various components that enable the development of robust and efficient batch jobs. One of the key components of Spring Batch is the AggregateItemReader
, which allows you to read items from multiple sources and aggregate them into a single stream. In this tutorial, we will walk through an example of how to use the AggregateItemReader
in a Spring Batch application.
Table of Contents:
Introduction to Spring Batch AggregateItemReader
Spring Batch is a powerful framework for batch processing in Java-based applications. It provides various components that enable the development of robust and efficient batch jobs. One of the key components of Spring Batch is the AggregateItemReader
, which allows you to read items from multiple sources and aggregate them into a single stream. This feature is beneficial when you need to combine data from different sources before processing or writing to an output destination.
Understanding the AggregateItemReader
The AggregateItemReader
is part of the Spring Batch framework and serves as a composite ItemReader
. It aggregates the data read from multiple ItemReader
implementations and presents them as a single stream of items. This abstraction allows you to read items from various sources, such as files, databases, or web services, and merge them into a cohesive data flow.
Key Features of AggregateItemReader
The AggregateItemReader
provides the following key features:
- Aggregation of Multiple Data Sources: The primary function of
AggregateItemReader
is to combine data from multipleItemReader
sources. This allows you to create complex batch jobs that require data from various origins. - Dynamic Configuration: The
AggregateItemReader
allows you to dynamically configure the delegates (individualItemReader
instances) at runtime. This flexibility enables you to switch between different data sources based on business requirements or external conditions. - Fault Tolerance: Just like other Spring Batch components, the
AggregateItemReader
benefits from the built-in fault tolerance mechanisms. If any delegate reader fails, the entire job’s state will be persisted, and the processing will be resumed from the point of failure after a restart.
How the AggregateItemReader Works
The AggregateItemReader
follows a straightforward workflow to combine data from multiple sources:
- Configuration: First, you define and configure the individual
ItemReader
instances that will act as delegates. Each delegate reader is responsible for reading data from a specific source. - Aggregation: The
AggregateItemReader
aggregates these configured delegates into a single stream of items. It reads items from each delegate one by one and presents them as a unified sequence. - Read and Delegate Selection: During the read operation, the
AggregateItemReader
internally selects the next delegate to read from. This selection is usually cyclic or based on a defined pattern. - End of Data: The
AggregateItemReader
will determine the end of data when all delegate readers have exhausted their data sources. At this point, the reader will returnnull
to signal the end of the data stream.
Implementing the AggregateItemReader
To implement the AggregateItemReader
, you need to follow these steps:
- Create and configure individual
ItemReader
implementations: Implement separate classes for reading data from different sources. These classes should implement theItemReader
interface and handle the reading logic for their respective data sources. - Define the
AggregateItemReader
bean: In your batch configuration class, create a bean of typeAggregateItemReader
and set the delegates to the previously configuredItemReader
implementations. - Use the
AggregateItemReader
in your Step: Inject theAggregateItemReader
bean into your Spring Batch Step, and it will be ready to read aggregated data from multiple sources.
Example
Prerequisites
To follow this tutorial, you should have the following prerequisites in place:
- Basic knowledge of Spring Batch concepts and annotations.
- Java Development Kit (JDK) installed on your system (version 8 or higher).
- A Java IDE (such as Eclipse or IntelliJ) or a text editor to write and run Java code.
- Spring Boot project setup. You can use Spring Boot Initializr to generate a basic Spring Boot project.
Project Setup
Let’s start by setting up a new Spring Boot project with the necessary dependencies.
- Open your IDE or text editor and create a new Spring Boot project using Spring Initializr or your preferred method.
- Add the following dependencies to your
pom.xml
(if using Maven) orbuild.gradle
(if using Gradle) file:
<!-- Spring Batch dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
Now, let’s create a simple POJO class that represents the items we want to read. For this example, we’ll use a Product
class:
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
@Entity
public class ProductEntity {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String name;
private double price;
// Constructors, getters, and setters (omitted for brevity)
}
Create a Product Repository
Create a Spring Data JPA repository for ProductEntity
to perform CRUD operations:
import org.springframework.data.jpa.repository.JpaRepository;
public interface ProductRepository extends JpaRepository<ProductEntity, Long>{
}
Implementing the AggregateItemReader
Next, we will implement the AggregateItemReader
to read items from multiple sources. In this example, we’ll use two separate ItemReader
implementations: one to read products from a CSV file and another to read products from a database.
Create two separate classes that implement the ItemReader<Product>
interface. One for reading products from a CSV file and another for reading products from the database. Let’s name them CsvProductReader
and DatabaseProductReader
, respectively.
CsvProductReader.java
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.core.io.ClassPathResource;
public class CsvProductReader implements ItemReader<Product> {
private FlatFileItemReader<Product> reader;
public CsvProductReader() {
reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("products.csv"));
reader.setLinesToSkip(1); // Skip the header row
reader.setLineMapper((line, lineNumber) -> {
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames("id", "name", "price");
tokenizer.setDelimiter(",");
BeanWrapperFieldSetMapper<Product> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
fieldSetMapper.setTargetType(Product.class);
return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
});
}
@Override
public Product read() throws Exception {
return reader.read();
}
}
DatabaseProductReader.java
import org.springframework.batch.item.ItemReader;
import java.util.Iterator;
import java.util.List;
public class DatabaseProductReader implements ItemReader<Product> {
private Iterator<ProductEntity> productIterator;
public DatabaseProductReader(List<ProductEntity> products) {
productIterator = products.iterator();
}
@Override
public Product read() {
if (productIterator.hasNext()) {
ProductEntity productEntity = productIterator.next();
return new Product(productEntity.getId(), productEntity.getName(), productEntity.getPrice());
}
return null;
}
}
Now, let’s create a configuration class for our Spring Batch job and define the AggregateItemReader
. We’ll also create a simple processor and writer for demonstration purposes.
BatchConfig.java
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.List;
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job processProductsJob(Step step) {
return jobBuilderFactory.get("processProductsJob")
.flow(step)
.end()
.build();
}
@Bean
public Step step(ItemReader<Product> aggregateItemReader,
ItemProcessor<Product, Product> itemProcessor,
ItemWriter<Product> itemWriter) {
return stepBuilderFactory.get("step")
.<Product, Product>chunk(10)
.reader(aggregateItemReader)
.processor(itemProcessor)
.writer(itemWriter)
.build();
}
@Bean
public AggregateItemReader<Product> aggregateItemReader(CsvProductReader csvProductReader,
DatabaseProductReader databaseProductReader) {
AggregateItemReader<Product> aggregateItemReader = new AggregateItemReader<>();
aggregateItemReader.setDelegates(List.of(csvProductReader, databaseProductReader));
return aggregateItemReader;
}
@Bean
public ItemProcessor<Product, Product> itemProcessor() {
// Create and return your custom ItemProcessor here (if needed)
return product -> product;
}
@Bean
public ItemWriter<Product> itemWriter() {
// Create and return your custom ItemWriter here (if needed)
return items -> {
for (Product product : items) {
// Implement the write logic (e.g., save to a database or send to an external system)
System.out.println(product);
}
};
}
@Bean
public JobExecutionListener jobExecutionListener() {
return new JobCompletionListener();
}
}
Finally, let’s create a CSV file named products.csv
in the src/main/resources
folder with sample product data. The content should look like this:
products.csv
id,name,price
1,Product A,10.0
2,Product B,20.0
3,Product C,30.0
Update the application.properties as below:
# H2 Database Configuration
spring.datasource.url=jdbc:h2:mem:testdb
spring.datasource.driver-class-name=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=
spring.jpa.database-platform=org.hibernate.dialect.H2Dialect
spring.h2.console.enabled=true
spring.h2.console.path=/h2-console
Testing the Batch Job
With all the components in place, let’s test our Spring Batch job with the AggregateItemReader
.
- Run the Spring Boot application, and the batch job should automatically start.
- The job will read products from both the CSV file and the database and print the products’ information to the console using the
itemWriter
. - Verify that the products are processed correctly by checking the console output.
The AggregateItemReader
in Spring Batch is a powerful tool for combining data from different sources into a single stream. With this feature, you can create sophisticated batch processing jobs that handle diverse data inputs and process them as a cohesive data flow. Leveraging the fault tolerance and dynamic configuration capabilities of Spring Batch, the AggregateItemReader
is a valuable component in building robust and flexible batch applications.
By Kurukshetran
Discover more spring batch tutorials below: