Mastering Spring Batch AggregateItemReader with Example

Spring Batch is a powerful framework for batch processing in Java-based applications. It provides various components that enable the development of robust and efficient batch jobs. One of the key components of Spring Batch is the AggregateItemReader, which allows you to read items from multiple sources and aggregate them into a single stream. In this tutorial, we will walk through an example of how to use the AggregateItemReader in a Spring Batch application.

Table of Contents:

Introduction to Spring Batch AggregateItemReader

Spring Batch is a powerful framework for batch processing in Java-based applications. It provides various components that enable the development of robust and efficient batch jobs. One of the key components of Spring Batch is the AggregateItemReader, which allows you to read items from multiple sources and aggregate them into a single stream. This feature is beneficial when you need to combine data from different sources before processing or writing to an output destination.

Understanding the AggregateItemReader

The AggregateItemReader is part of the Spring Batch framework and serves as a composite ItemReader. It aggregates the data read from multiple ItemReader implementations and presents them as a single stream of items. This abstraction allows you to read items from various sources, such as files, databases, or web services, and merge them into a cohesive data flow.

Key Features of AggregateItemReader

The AggregateItemReader provides the following key features:

  1. Aggregation of Multiple Data Sources: The primary function of AggregateItemReader is to combine data from multiple ItemReader sources. This allows you to create complex batch jobs that require data from various origins.
  2. Dynamic Configuration: The AggregateItemReader allows you to dynamically configure the delegates (individual ItemReader instances) at runtime. This flexibility enables you to switch between different data sources based on business requirements or external conditions.
  3. Fault Tolerance: Just like other Spring Batch components, the AggregateItemReader benefits from the built-in fault tolerance mechanisms. If any delegate reader fails, the entire job’s state will be persisted, and the processing will be resumed from the point of failure after a restart.

How the AggregateItemReader Works

The AggregateItemReader follows a straightforward workflow to combine data from multiple sources:

  1. Configuration: First, you define and configure the individual ItemReader instances that will act as delegates. Each delegate reader is responsible for reading data from a specific source.
  2. Aggregation: The AggregateItemReader aggregates these configured delegates into a single stream of items. It reads items from each delegate one by one and presents them as a unified sequence.
  3. Read and Delegate Selection: During the read operation, the AggregateItemReader internally selects the next delegate to read from. This selection is usually cyclic or based on a defined pattern.
  4. End of Data: The AggregateItemReader will determine the end of data when all delegate readers have exhausted their data sources. At this point, the reader will return null to signal the end of the data stream.

Implementing the AggregateItemReader

To implement the AggregateItemReader, you need to follow these steps:

  1. Create and configure individual ItemReader implementations: Implement separate classes for reading data from different sources. These classes should implement the ItemReader interface and handle the reading logic for their respective data sources.
  2. Define the AggregateItemReader bean: In your batch configuration class, create a bean of type AggregateItemReader and set the delegates to the previously configured ItemReader implementations.
  3. Use the AggregateItemReader in your Step: Inject the AggregateItemReader bean into your Spring Batch Step, and it will be ready to read aggregated data from multiple sources.

Example

Prerequisites

To follow this tutorial, you should have the following prerequisites in place:

  1. Basic knowledge of Spring Batch concepts and annotations.
  2. Java Development Kit (JDK) installed on your system (version 8 or higher).
  3. A Java IDE (such as Eclipse or IntelliJ) or a text editor to write and run Java code.
  4. Spring Boot project setup. You can use Spring Boot Initializr to generate a basic Spring Boot project.

Project Setup

Let’s start by setting up a new Spring Boot project with the necessary dependencies.

  1. Open your IDE or text editor and create a new Spring Boot project using Spring Initializr or your preferred method.
  2. Add the following dependencies to your pom.xml (if using Maven) or build.gradle (if using Gradle) file:
<!-- Spring Batch dependencies -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

Now, let’s create a simple POJO class that represents the items we want to read. For this example, we’ll use a Product class:

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;

@Entity
public class ProductEntity {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String name;
    private double price;

    // Constructors, getters, and setters (omitted for brevity)
}

Create a Product Repository

Create a Spring Data JPA repository for ProductEntity to perform CRUD operations:

import org.springframework.data.jpa.repository.JpaRepository;

public interface ProductRepository extends JpaRepository<ProductEntity, Long>{
}

Implementing the AggregateItemReader

Next, we will implement the AggregateItemReader to read items from multiple sources. In this example, we’ll use two separate ItemReader implementations: one to read products from a CSV file and another to read products from a database.

Create two separate classes that implement the ItemReader<Product> interface. One for reading products from a CSV file and another for reading products from the database. Let’s name them CsvProductReader and DatabaseProductReader, respectively.

CsvProductReader.java

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.core.io.ClassPathResource;

public class CsvProductReader implements ItemReader<Product> {

    private FlatFileItemReader<Product> reader;

    public CsvProductReader() {
        reader = new FlatFileItemReader<>();
        reader.setResource(new ClassPathResource("products.csv"));
        reader.setLinesToSkip(1); // Skip the header row
        reader.setLineMapper((line, lineNumber) -> {
            DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
            tokenizer.setNames("id", "name", "price");
            tokenizer.setDelimiter(",");
            BeanWrapperFieldSetMapper<Product> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
            fieldSetMapper.setTargetType(Product.class);
            return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
        });
    }

    @Override
    public Product read() throws Exception {
        return reader.read();
    }
}

DatabaseProductReader.java

import org.springframework.batch.item.ItemReader;

import java.util.Iterator;
import java.util.List;

public class DatabaseProductReader implements ItemReader<Product> {

    private Iterator<ProductEntity> productIterator;

    public DatabaseProductReader(List<ProductEntity> products) {
        productIterator = products.iterator();
    }

    @Override
    public Product read() {
        if (productIterator.hasNext()) {
            ProductEntity productEntity = productIterator.next();
            return new Product(productEntity.getId(), productEntity.getName(), productEntity.getPrice());
        }
        return null;
    }
}

Now, let’s create a configuration class for our Spring Batch job and define the AggregateItemReader. We’ll also create a simple processor and writer for demonstration purposes.

BatchConfig.java

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.List;

@Configuration
@EnableBatchProcessing
public class BatchConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job processProductsJob(Step step) {
        return jobBuilderFactory.get("processProductsJob")
                .flow(step)
                .end()
                .build();
    }

    @Bean
    public Step step(ItemReader<Product> aggregateItemReader,
                     ItemProcessor<Product, Product> itemProcessor,
                     ItemWriter<Product> itemWriter) {
        return stepBuilderFactory.get("step")
                .<Product, Product>chunk(10)
                .reader(aggregateItemReader)
                .processor(itemProcessor)
                .writer(itemWriter)
                .build();
    }

    @Bean
    public AggregateItemReader<Product> aggregateItemReader(CsvProductReader csvProductReader,
                                                            DatabaseProductReader databaseProductReader) {
        AggregateItemReader<Product> aggregateItemReader = new AggregateItemReader<>();
        aggregateItemReader.setDelegates(List.of(csvProductReader, databaseProductReader));
        return aggregateItemReader;
    }

    @Bean
    public ItemProcessor<Product, Product> itemProcessor() {
        // Create and return your custom ItemProcessor here (if needed)
        return product -> product;
    }

    @Bean
    public ItemWriter<Product> itemWriter() {
        // Create and return your custom ItemWriter here (if needed)
        return items -> {
            for (Product product : items) {
                // Implement the write logic (e.g., save to a database or send to an external system)
                System.out.println(product);
            }
        };
    }

    @Bean
    public JobExecutionListener jobExecutionListener() {
        return new JobCompletionListener();
    }
}

Finally, let’s create a CSV file named products.csv in the src/main/resources folder with sample product data. The content should look like this:

products.csv

id,name,price
1,Product A,10.0
2,Product B,20.0
3,Product C,30.0

Update the application.properties as below:

# H2 Database Configuration
spring.datasource.url=jdbc:h2:mem:testdb
spring.datasource.driver-class-name=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=
spring.jpa.database-platform=org.hibernate.dialect.H2Dialect
spring.h2.console.enabled=true
spring.h2.console.path=/h2-console

Testing the Batch Job

With all the components in place, let’s test our Spring Batch job with the AggregateItemReader.

  1. Run the Spring Boot application, and the batch job should automatically start.
  2. The job will read products from both the CSV file and the database and print the products’ information to the console using the itemWriter.
  3. Verify that the products are processed correctly by checking the console output.

The AggregateItemReader in Spring Batch is a powerful tool for combining data from different sources into a single stream. With this feature, you can create sophisticated batch processing jobs that handle diverse data inputs and process them as a cohesive data flow. Leveraging the fault tolerance and dynamic configuration capabilities of Spring Batch, the AggregateItemReader is a valuable component in building robust and flexible batch applications.


By Kurukshetran


Discover more spring batch tutorials below:

  1. Mastering Real-time Data Processing with KafkaItemReader and KafkaItemWriter in Spring Batch
  2. Building a Spring Boot Application for Efficient Data Processing with Spring Batch for Huge Data with Thread-Safe Parallel Processing

Leave a Reply

Your email address will not be published. Required fields are marked *

Post comment