Building a Spring Boot Application for Efficient Data Processing with Spring Batch for Huge Data with Thread-Safe Parallel Processing

Processing large volumes of data efficiently is a common challenge in many applications. Spring Batch, a powerful framework for batch processing in Java, provides built-in features for handling such scenarios. In this article, we will explore how to leverage Spring Batch’s capabilities for processing huge data with thread-safe parallel processing.

spring batch thread
spring batch thread

Introduction:
Processing huge volumes of data efficiently is a common challenge in many applications. Spring Batch, a powerful framework provided by Spring Boot, offers robust support for batch processing. In this tutorial, we’ll explore how to leverage Spring Batch to handle large-scale data processing by implementing partitioning, parallel processing, and multi-threading techniques.

Table of Contents:

  1. Setting Up the Spring Boot Project
  2. Defining the Job
  3. Configuring Reader, Processor, and Writer
  4. Implementing Partitioning for Parallel Processing
  5. Enabling Multi-Threading for Improved Performance
  6. Running the Application and Observing Results
  7. Conclusion

1. Setting Up the Spring Boot Project:

To begin, we’ll create a new Spring Boot project and include the necessary dependencies for Spring Batch. You can set up the project manually or use the Spring Initializr (https://start.spring.io) for convenience.

To get started, make sure you have the following prerequisites:

  1. Java Development Kit (JDK) 8 or above
  2. Spring Boot 2.x
  3. Maven or Gradle as a build tool
  4. An IDE of your choice (e.g., IntelliJ IDEA, Eclipse)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>spring-batch-example</artifactId>
    <version>1.0.0</version>

    <properties>
        <java.version>11</java.version>
        <spring-boot.version>2.5.2</spring-boot.version>
        <spring-batch.version>4.3.2</spring-batch.version>
        <spring-data-jpa.version>2.5.2</spring-data-jpa.version>
    </properties>

    <dependencies>
        <!-- Spring Boot dependencies -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <version>${spring-boot.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
            <version>${spring-boot.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
            <version>${spring-boot.version}</version>
        </dependency>

        <!-- Spring Batch dependencies -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
            <version>${spring-batch.version}</version>
        </dependency>

        <!-- Database driver dependencies -->
        <!-- Add the appropriate database driver dependency based on your database vendor -->

        <!-- Other dependencies -->

    </dependencies>

    <build>
        <plugins>
            <!-- Maven compiler plugin -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>

            <!-- Maven Spring Boot plugin -->
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring-boot.version}</version>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

2. Defining the Job:

Next, we’ll define the Spring batch job configuration. This involves creating a Java class and annotating it with @Configuration and @EnableBatchProcessing. Inside the configuration class, we’ll define the main job and its steps using the @Bean annotation. We’ll also configure a TaskExecutor bean to enable parallel processing.

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job myJob(Step myStep) {
        return jobBuilderFactory.get("myJob")
                .incrementer(new RunIdIncrementer())
                .flow(myStep)
                .end()
                .build();
    }

    @Bean
    public Step myStep(TaskExecutor taskExecutor) {
        return stepBuilderFactory.get("myStep")
                .partitioner("slaveStep", partitioner())
                .step(slaveStep())
                .taskExecutor(taskExecutor)
                .build();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor("spring_batch");
    }

    @Bean
    public Partitioner partitioner() {
        return new MyPartitioner();
    }

    @Bean
    public Step slaveStep() {
        return stepBuilderFactory.get("slaveStep")
                .<String, String>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }

    @Bean
    public ItemReader<String> reader() {
        // Your reader implementation
    }

    @Bean
    public ItemProcessor<String, String> processor() {
        // Your processor implementation
    }

    @Bean
    public ItemWriter<String> writer() {
        // Your writer implementation
    }
}

3. Configuring Reader, Processor, and Writer:

In this step, we’ll define the reader, processor, and writer components. The reader retrieves data from a data source, the processor performs any necessary data transformations, and the writer persists the processed data. We’ll utilize Spring Batch’s built-in ItemReader, ItemProcessor, and ItemWriter interfaces to implement these components.

@Component
public class MyReader implements ItemReader<String> {
    // Implement the reader logic
}

@Component
public class MyProcessor implements ItemProcessor<String, String> {
    // Implement the processor logic
}

@Component
public class MyWriter implements ItemWriter<String> {
    // Implement the writer logic
}

Alternatively

@Bean
public ItemReader<User> itemReader() {
    return new RepositoryItemReaderBuilder<User>()
            .name("userRepositoryItemReader")
            .repository(userRepository)
            .methodName("findAll")
            .sorts(Map.of("id", Sort.Direction.ASC))
            .build();
}

@Bean
public ItemProcessor<User, User> itemProcessor() {
    return user -> {
        // Perform data processing/transformation here
        // For simplicity, we return the user object as is
        return user;
    };
}

@Bean
public ItemWriter<User> itemWriter() {
    return new JpaItemWriterBuilder<User>()
            .entityManagerFactory(entityManagerFactory)
            .build();
}

4. Implementing Partitioning for Parallel Processing:

To handle huge data sets efficiently, we’ll implement partitioning in Spring Batch. Partitioning involves dividing the data into smaller chunks or partitions, which can be processed in parallel. We’ll define a partition step and configure a partitioner to determine how to divide the data. This technique allows for optimal resource utilization and faster processing.

public class MyPartitioner implements Partitioner {
    private static final int GRID_SIZE = 5;

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> result = new HashMap<>();

        // Implement the partitioning logic to divide the data

        return result;
    }
}

Alternatively

@Bean
public Partitioner partitioner() {
    return gridSize -> {
        Map<String, ExecutionContext> partitions = new HashMap<>();
        List<User> allUsers = userRepository.findAll();
        int partitionSize = allUsers.size() / gridSize;

        int startIndex = 0;
        int endIndex = partitionSize - 1;
        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            context.put("startIndex", startIndex);
            context.put("endIndex", endIndex);
            partitions.put("partition" + i, context);

            startIndex = endIndex + 1;
            endIndex = i == gridSize - 2 ? allUsers.size() - 1 : endIndex + partitionSize;
        }

        return partitions;
    };
}

5. Enabling Multi-Threading for Improved Performance:

In addition to partitioning, we can further enhance the processing performance by enabling multi-threading in spring batch. We’ll configure TaskExecutor a thread pool to execute the partitions concurrently. This approach maximizes CPU utilization and significantly improves the overall processing speed.

Single Thread configuration:

@Bean
public Job partitionedJob(Step partitionStep, Partitioner partitioner) {
    return jobBuilderFactory.get("partitionedJob")
            .start(partitionStep)
            .partitioner("partitionStep", partitioner)
            .gridSize(4) // Number of partitions
            .taskExecutionMode(TaskExecutionMode.SINGLETHREAD) // Single-threaded execution for each partition
            .taskExecutor(new SimpleAsyncTaskExecutor()) // Enable parallel processing
            .build();
}

Multi Thread configuration:

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(8);
    taskExecutor.setMaxPoolSize(16);
    taskExecutor.setThreadNamePrefix("batch-thread-");
    taskExecutor.initialize();
    return taskExecutor;
}

@Bean
public Job partitionedJob(Step partitionStep, Partitioner partitioner, TaskExecutor taskExecutor) {
    return jobBuilderFactory.get("partitionedJob")
            .start(partitionStep)
            .partitioner("partitionStep", partitioner)
            .gridSize(4) // Number of partitions
            .taskExecutor(taskExecutor) // Enable multi-threaded processing
            .build();
}

6. Running the Application and Observing Results:

Once we have implemented partitioning and multi-threading, we can build and run the Spring Batch application. We’ll observe the logs to ensure that the batch job is executed with partitioning, parallel processing, and multithreading. You can monitor the performance improvements and verify the correctness of the data processing. Here’s the complete code.

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.data.RepositoryItemReader;
import org.springframework.batch.item.data.builder.RepositoryItemReaderBuilder;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.builder.JpaItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.data.domain.Sort;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private UserRepository userRepository;

    @Bean
    public ItemReader<User> itemReader() {
        return new RepositoryItemReaderBuilder<User>()
                .name("userRepositoryItemReader")
                .repository(userRepository)
                .methodName("findAll")
                .sorts(Map.of("id", Sort.Direction.ASC))
                .build();
    }

    @Bean
    public ItemProcessor<User, User> itemProcessor() {
        return user -> {
            // Perform data processing/transformation here
            // For simplicity, we return the user object as is
            return user;
        };
    }

    @Bean
    public ItemWriter<User> itemWriter() {
        return new JpaItemWriterBuilder<User>()
                .entityManagerFactory(entityManagerFactory)
                .build();
    }

    @Bean
    public Step partitionStep(ItemReader<User> itemReader, ItemProcessor<User, User> itemProcessor, ItemWriter<User> itemWriter) {
        return stepBuilderFactory.get("partitionStep")
                .<User, User>chunk(100)
                .reader(itemReader)
                .processor(itemProcessor)
                .writer(itemWriter)
                .build();
    }

    @Bean
    public Partitioner partitioner() {
        return gridSize -> {
            Map<String, ExecutionContext> partitions = new HashMap<>();
            List<User> allUsers = userRepository.findAll();
            int partitionSize = allUsers.size() / gridSize;

            int startIndex = 0;
            int endIndex = partitionSize - 1;
            for (int i = 0; i < gridSize; i++) {
                ExecutionContext context = new ExecutionContext();
                context.put("startIndex", startIndex);
                context.put("endIndex", endIndex);
                partitions.put("partition" + i, context);

                startIndex = endIndex + 1;
                endIndex = i == gridSize - 2 ? allUsers.size() - 1 : endIndex + partitionSize;
            }

            return partitions;
        };
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(8);
        taskExecutor.setMaxPoolSize(16);
        taskExecutor.setThreadNamePrefix("batch-thread-");
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Bean
    public Job partitionedJob(Step partitionStep, Partitioner partitioner, TaskExecutor taskExecutor) {
        return jobBuilderFactory.get("partitionedJob")
                .start(partitionStep)
                .partitioner("partitionStep", partitioner)
                .gridSize(4) // Number of partitions
                .taskExecutor(taskExecutor) // Enable multi-threaded processing
                .build();
    }

    @Entity
    public static class User {
        @Id
        @GeneratedValue(strategy = GenerationType.IDENTITY)
        private Long id;

        private String name;

        // Getters and setters
    }

    public interface UserRepository extends JpaRepository<User, Long> {
        @Query("SELECT u FROM User u")
        List<User> findAll();
    }
}

7. Conclusion:

In this tutorial, we explored how to leverage Spring Batch in a Spring Boot application for efficient data processing. By incorporating partitioning, parallel processing, and multi-threading techniques, we can handle huge data volumes effectively, significantly reducing processing time and improving application performance. Spring Batch’s flexibility and powerful features make it an excellent choice for batch processing requirements in various domains.

Implementing these strategies in your Spring Boot applications will empower you to process massive amounts of data efficiently, leading to enhanced productivity and scalability.

Remember to adapt the provided code and configurations based on your specific data processing requirements, such as customizing the reader, processor, and writer logic, adjusting thread pool sizes, and integrating with your data sources.

By following the best practices and leveraging the capabilities of Spring Batch, you can build robust and scalable data processing solutions for your applications.

With Spring Boot and Spring Batch, you have the tools to efficiently process huge amounts of data by leveraging partitioning, parallel processing, and multi-threading techniques. This allows you to handle complex data processing scenarios with ease, improving performance and resource utilization.

By implementing the steps outlined in this tutorial, you’ll be able to build a Spring Boot application that efficiently processes large-scale data. Remember to customize the code and configurations based on your specific requirements and data sources. Read more on annotations.

Happy coding and batch processing!

BecomeGeeks Author: Raghavendran Sundararaman

About the Author: Software Engineer with almost 7 years of experience in Java and Spring Frameworks and an enthusiastic programmer.

2 Responses

Leave a Reply

Your email address will not be published. Required fields are marked *

Post comment