Spring Batch : Partition Step

    이 포스팅은 인프런 정수원님의 강의를 복습하며 작성한 글입니다. 

    Partition Step

    파티셔닝 Step은 멀티 쓰레드 환경을 이용해 보다 빠른 Batch Job 처리를 위해 스프링 배치가 지원하는 Step이다. 위의 그림으로 Partitioning Step을 이해할 수 있다. 메인 쓰레드는 Job을 실행시키고, 이 Job은 Master Step을 가진다. Master Step은 내부적으로 Slave Step을 하나 가지는데, 이 Slave Step을 멀티 쓰레드에서 분산처리 하도록 처리한다. 

    즉, 하나의 Slave Step이 있는데, 쓰레드 갯수만큼 Slave Step을 생성해서 동시에 처리하는 것으로 이해할 수 있다. 기본개념은 아래에서 확인할 수 있다. 

     

    기본개념

    • MasterStep이 Slave Step을 실행시키는 구조
    • SlaveStep은 각 스레드에 의해 독립적으로 실행됨 (SlaveStep은 하나만 구현함) 
    • SlaveStep은 독립적인 StepExecution 파라미터 환경을 구성함 → 멀티 쓰레드에서 안전함
    • SlaveStep은 ItemReader / ItemProcessor / ItemWriter등을 독립적으로 가지고 동작 및 실행함. 
    • MasterStep은 Partitioning Step이고 Slave Step은 TaskletStep, FlowStep 등이 올 수 있음. 

    위 개념을 살짝 정리하면 Master Step이 멀티 쓰레드 환경을 구성해서 Slave Step에 각각 뿌리는 구조로 이해를 할 수 있다. 쓰레드 관점에서 보면 메인 쓰레드에서 Master Step을 실행하고, 이 때 멀티 쓰레드를 만드는데 멀티 쓰레드의 갯수만큼 Slave Step을 만들어서 동시에 처리해주는 것으로 동작할 수 있다. 

     

    Partition Step 살펴보기

    Partition Step은 내부적으로 PartitionHandler / StepExecutionSplitter / StepExecutionAggregator를 가진다. 간단하게 생각해보면 StepExecutionSplitter를 이용해서 쓰레드 갯수만큼 StepExecution을 만들어 PartitionHandler를 통해 멀티 쓰레드를 실행시킨다. 그리고 완료된 Slave Step 정보를 StepExecutionAggregator를 이용해서 집계한다. 아래에서 좀 더 자세히 알아본다. 

     

    PartitionStep

    • PartitionStep은 파티셔닝 기능을 수행하는 구현체. Master Step이라고도 불림.
    • 파티셔닝을 수행한 후 StepExecutionAggregator를 이용해서 최종 집계를 한다.
    • partitionHandler는 Step을 멀티 쓰레드 환경에서 실행해준다.
    • stepExecutionSplitter는 멀티 쓰레드만큼 StepExecution을 만들어준다. 

     

    Partition Handler

    • Partition Handler는 Partition Step에 의해서 호출된다.
    • 내부적으로 TaskExecutor를 통해서 Work Step(Slave Step)을 병렬로 실행시킨다. 
    • Slave Step들이 사용할 Step Execution은 StepSplitter, Partitioner를 통해서 생성한다.
    • Partition Handler는 병렬로 실행 후 최종 결과를 담은 StepExecution을 Partition Step에 반환해줌. 

     

    StepExecutionSplitter

    • Slaver Step에서 사용할 Step Execution을 gridSize만큼 생성해준다. grid Size는 파티션을 나누는 사이즌데, 쓰레드 갯수로 이해하면 된다. 
    • Partitioner를 통해서 Execution Context를 얻어서 Step Execution에 맵핑함.

     

    Partitioner

    • 파티셔너는 StepExecution에 매핑 할 Execution Context를 gridSize만큼 생성함.
    • 각 ExecutionContext에 저장된 정보를 바탕으로 Slave Step에서는 구역을 나누어 쓰레드마다 독립적으로 참조 및 활용한다. 

     

    Partition Step 실행 관점 살펴보기 

    여기서는 Partition Step이 동작하는 것에 대략적으로 살펴보고자 한다. 

    PartitionStep.doExecute()

    partitionStep은 doExecute() 메서드에서 Step을 실행한다. 정확하게는 handle 메서드에서 실행을 하는데, 실행 후 StepExecution을 Collection 형태로 받는 것을 확인할 수 있다. Handle을 할 때, Splitter도 함께 넘기는 것을 볼 수 있는데, 아직까지 StepExecution이 만들어지지 않은 것을 의미한다. 

    partitionerHandler 구현체 → JsrPartitionHandler.handle() 

    파티션 핸들러 구현체인 JsrPartitionHandler.handle()로 넘어온다. 여기서는 Future 타입으로 StepExecution을 가진 tasks가 있고, StepExecution Result, 그리고 taskExecutor가 있는 것을 확인할 수 있다. 

    partitionerHandler 구현체 → JsrPartitionHandler.handle() 

    splitStepExecution() 메서드를 실행하면서 파티션에서 사용할 stepExecution을 생성해서 partitionStepExecution에 반환하는 것을 확인할 수 있다. 

    jsrPartitionHandler.splitStepExecution()

    splitStepExecution()으로 넘어오면 stepSplitter를 이용해서 파티션에서 사용할 StepExecution을 생성해서 반환해주는 것을 확인할 수 있다. 이 때, grid Size만큼 StepExecution을 생성해서 반환해준다. 

    partitionerHandler 구현체 → JsrPartitionHandler.handle() 

    파티션에서 사용할 StepExecution를 받아온 다음, For문을 통해 StepExecuton을 하나씩 꺼내온다. 그리고 Step Execution과 실행시킬 Slave Step을 받은 후, createTask 메서드를 통해 실행할 작업을 만든다. 그리고 taskExecutor.execute()를 통해 slave Step을 다른 쓰레드에 배정 후 실행시켜준다.  그리고 작업이 완료되었으면 tasks에 추가해준다. 

    여기서 중요한 점은 Slave Step을 여러개를 만드는 것이 아니라 하나의 Slave Step을 여러 StepExecution + 쓰레드에 전달해서 멀티로 실행시켜준다는 것이다. 즉, 멀티 쓰레드 환경이라는 소리다.

    partitionerHandler 구현체 → JsrPartitionHandler.handle() 

    실행이 완료되면, set에 실행 결과인 StepExecution을 담고, 그 set인 result를 반환해준다. 

    partitionStep.doExecute()

    result는 executions로 반환된다. executions는 각 Slave Step의 실행 결과를 가지고 있다. 그리고 aggreator를 통해 Step 실행 결과에 대한 것들을 집계한다. 그리고 stepExecution을 통해 실패한 Step일 경우, Exception을 터뜨려 주게 된다. 

     

     

     

     

    Partition Step 실행

    Partition Step을 실행하는 과정을 모식도로 좀 더 살펴보고자 한다. 

    1. Job이 실행하며 Master Step을 실행시킨다. 이 때, Master Step은 Partition Step이 된다.
    2. Partiotn Step은 Partition Handler를 통해 handle()을 실행시켜준다. 
    3. 이 때, StepExecutionSplitter는 split() 메서드를 이용해서 Partitioner에 접근한다.
    4. Partitioner는 Grid Size만큼 Execution Context를 만든다. Execution Context는 파티셔닝할 데이터 정보가 들어가게 된다. 실제로 여기에 데이터가 들어가는 것이 아니라, 어디부터 어디까지 보겠다는 형식의 데이터가 들어간다. 
    5. Partitioner는 ExecutionContext를 반환하고, Splitter는 ExecutionContext를 각 StepExecution에 맵핑해주고 반환해준다. 
    6. 파티션 핸들러는 전달받은 StepExecution 들과 Step을 바탕으로 해야할 작업을 Future Task로 감싸서 만들고, 그것을 taskExecutor를 통해서 실행시켜준다. 이 때, 쓰레드는 Futuer Task를 가지고 있고, Future Task에는 Slave Step이 포함됨. 
    7. 실행한 결과를 모아서 StepExecutionAggregator에 실행 결과를 취합해준다. 

     

     

     

    Partitioning 과정 이해하기

    파티셔닝 과정을 이해하기 전에 반드시 알아야 하는 것은 @StepScope 개념이다. @StepScope는 Step의 실행 시점을 기준으로 설정한다는 의미고, 이 어노테이션이 붙은 빈은 프록시 빈으로 만들어 스프링 컨테이너에 등록하되, 타겟값은 실제 실행 시점에 공급해준다는 의미다. 

    파티셔닝은 멀티 쓰레드로 처리를 하지만, 멀티 쓰레드 관점에서는 안전하다고 했다. 먼저 StepExecution을 앞에서 쓰레드 갯수만큼 생성되기 때문에 독립적인 데이터 및 실행 결과를 가질 수 있다. 여기에 실제로 일하는 ItemReader, ItemWriter, ItemProcessor도 각 쓰레드에 독립적으로 생성된다면 멀티 쓰레드 환경에서 안전하게 동작할 수 있따. 

    따라서 멀티 쓰레드에서 안전한 환경을 만들어 주기 위해서 ItemReader / ItemWriter / ItermProcessor에 @StepScope를 통해서 프록시 빈을 통해서 쓰레드가 요청할 때 마다 새로운 빈을 만들어서 제공해야한다. 

    말이 길었는데 다시 한번 정리하면 Partition Job은 멀티 쓰레드 환경에서 동작하고, 멀티 쓰레드 환경에서 안전하기 위해서는 독립적인 데이터와 taskLet을 가져야한다. 이 때 독립적인 데이터는 StepExecutionSplitter를 통해서 제공되며, 독립적인 taskLet은 @StepScope를 통해 프록시 빈을 제공하면서 처리할 수 있다.  

     

    좀 더 쉽게 말하면 Master Step은 1개, Slave Step도 1개만 설정한다. Master Step은 쓰레드를 만들어서 Slave Step에 StepExecution과 함께 전달해준다. 그럼 멀티 쓰레드가 각각의 Slave Step을 갖고 동작을 한다. Slave Step은 내부적으로 ItemReader, ItemProcessor, ItemWriter를 따로 가지고 동작한다.  이 때, StepExecution에 파티셔닝 관련 정보가 담겨 있고, 각 Slave Step은 이 파티셔닝 정보를 바탕으로 Chunk를 처리하면 된다. 

     

     

    Partition Step API

    return stepBuilderFactory.get("masterStep")
            .partitioner(slaveStep().getName(), partitioner()) // partitioner 타입 설정하는 API 사용하자.
            .step(slaveStep()) // Slave 스텝 설정
            .gridSize(4) // 쓰레드 설정
            .taskExecutor() // 쓰레드 풀 + 쓰레드 실행할 Executor 설정
            .build();

    Partition Step은 Step을 만들 때, 다음과 같은 API를 통해 설정을 해서 만들 수 있다.

    • partitioner : 처리할 파티션을 나눌 파티셔너를 제공하고, SlaveStep의 이름을 제공해준다. StepExecution의 이름으로 이해해도 된다. 
    • Step : Master Step 밑에서 실제로 일을 할 Slave Step을 설정해서 전달한다
    • gridSize : 파티션을 얼마만큼 나눌지 설정한다. 쓰레드 풀의 숫자를 의미하기도 한다.
    • taskExecutor : 쓰레드 풀 및 쓰레드를 실행할 Executor를 설정한다.

     

    테스트 코드 작성

    Job + Master Step 작성

    @Bean
    public Job partitionBatchJob() {
        return jobBuilderFactory.get("partitionBatchJob")
                .incrementer(new RunIdIncrementer())
                .start(masterStep())
                .build();
    }
    
    
    @Bean
    public Step masterStep() {
        return stepBuilderFactory.get("partitioningStep")
                .partitioner("slaveStep", customPartitioner)
                .step(slaveStep())
                .gridSize(4)
                .taskExecutor(new SimpleAsyncTaskExecutor())
                .build();
    }
    • 먼저 Master Step을 가지는 Job을 선언한다. 이 때, Master Step은 partitioner API를 이용했다. 이렇게 되면 masterStep()을 실행했을 때 반환받는 Step은 Partition Step이다. 
    • masterStep에서는 partitioner를 통해 customPartitioner를 DI 해주었고, 각 slave Step의 이름을 "slaveStep"으로 지정해주었다. 
    • gridSize()를 이용해 4개의 쓰레드를 이용하는 것으로 했다. 
    • taskExecutore는 simpleAsyncTaskExecutor()를 생성해서 주입해주었다. 

     

    Partitioner 생성

    @Component
    @RequiredArgsConstructor
    public class CustomPartitioner implements Partitioner {
    
        private final EntityManager em;
    
        @Override
        public Map<String, ExecutionContext> partition(int gridSize) {
    
            Tuple singleResult = em.createQuery("select min(c.id), max(c.id) from Customer c", Tuple.class).getSingleResult();
            int minValue = ((Long) singleResult.get(0)).intValue();
            int maxValue = ((Long) singleResult.get(1)).intValue();
            int eachValue = ((maxValue - minValue) / gridSize) + 1 ;
    
            HashMap<String, ExecutionContext> partition = new HashMap<>();
    
            int start = minValue;
            int end = eachValue;
    
            for (int i = 0; i < gridSize; i++) {
    
                // 값 셋팅
                ExecutionContext executionContext = new ExecutionContext();
                executionContext.put("start", start);
                executionContext.put("end", end);
    
                // 파티셔너에 넣기
                partition.put(String.valueOf(i), executionContext);
    
                // 다음 값을 셋팅하기
                start = end + 1;
                end = end + eachValue;
    
            }
            return partition;
        }
    }
    • Custome Partitioner를 작성했다. 
    • Partitioner의 목적은 Step Execution에 넣어줄 Execution Context들을 만들어서 반환해주는 것이다. 
    • Execution Context는 실제 데이터를 가지는 것은 아니고, 각각 ItemReader , ItemWriter, ItemProcessor가 참고해서 파티셔닝을 할 정보를 넣어줘야한다. 이 때, 나는 전체 Row를 grid Size만큼 간격으로 나누어서 그 정보를 넣어주었다. 

     

     

    SlaveStep 생성

    @Bean
    public Step slaveStep() {
        return stepBuilderFactory.get("slaveStepMaster")
                .<Customer, Customer2>chunk(1000)
                .reader(pagingItemReader(null,null))
                .writer(batchWriter())
                .processor(batchProcessor())
                .build();
    }
    
    • Slave Step을 생성했다.
    • Slave Step은 멀티 쓰레드에 의해 일을 할 Step을 구현해주면 된다. 
    • Slave Step은 각 ItermReader, ItemWriter, ItemProcessor를 프록시 형태로 가지고 있기 때문에 매개변수를 넘겨줘야한다면 null로 넘겨준다. 

     

    ItemReader 생성

    @Bean
    @StepScope
    public JdbcPagingItemReader<Customer> pagingItemReader(
            @Value("#{stepExecutionContext['start']}") Integer start,
            @Value("#{stepExecutionContext['end']}") Integer end
            ){
    
        System.out.println("start = " + start);
        System.out.println("end = " + end);
    
    
        HashMap<String, Order> sortKeys = new HashMap<>();
        sortKeys.put("customer_id", Order.ASCENDING);
    
    
        return new JdbcPagingItemReaderBuilder<Customer>()
                .name("pagingBuilder")
                .dataSource(dataSource)
                .fetchSize(1000)
                .beanRowMapper(Customer.class)
                .selectClause("customer_id, first_name, last_name, birth_date")
                .fromClause("from customer")
                .whereClause("where customer_id >= " + start + " and customer_id <= " + end)
                .sortKeys(sortKeys)
                .build();
    }
    • ItemReader는 JdbcReader를 이용했다. 
    • 멀티 쓰레드 환경을 제공하기 위해 @StepScope를 통해 요청할 때 마다 Reader를 만들어서 제공하도록 했다. 
    • @StepScope를 사용하게 되면, Job이 가지고 있는 StepExecution 등의 값을 가질 수 있다. 파티셔너에서 제공한 값을 가지고 와서, Query 문에 포함했다. 즉, 파티셔닝 된 값만 불러오도록 기능을 하게 했다. 

     

    ItemWriter + ItemProcessor 코드

    @Bean
    @StepScope
    public ItemProcessor<? super Customer,? extends Customer2> batchProcessor() {
        return (ItemProcessor<Customer, Customer2>) item -> Customer2.builder()
                .id(item.getCustomer_id())
                .birthDate(item.getBirthDate())
                .lastName(item.getLastName())
                .firstName(item.getFirstName()).build();
    }
    
    @Bean
    @StepScope
    public ItemWriter<? super Customer2> batchWriter() {
        return new JdbcBatchItemWriterBuilder<Customer2>()
                .sql("INSERT INTO Customer2(customer2_id, birth_date, first_name, last_name) values (:id, :birthDate, :firstName, :lastName)")
                .dataSource(dataSource)
                .beanMapped()
                .build();
    }
    • ItemProcessor / ItemWriter 코드를 작성했다. 동일하게 @StepScope를 이용해서 작성했다. 
    • ItemProcessor는 Custom 객체를 Custom2 객체로 바꾸는 작업을 한다.
    • ItemWriter는 JdbcBatchWriter를 이용해서 Batch Insert를 하도록 구현했다. 

     

    실행 결과

    내 Customer Table에는 2101 ~ 24696의 ID가 분포하고 있었다. 이 값을 모두 불러와서, 파티셔닝을 하는 것이 JdbcItemReader에서 하는 일이었다. 그 로그를 남긴 것이었는데, 정상적으로 파티셔닝이 이루어진 것을 확인할 수 있었다. 

    Customer ID 값을 확인해봐도, 2101번부터 시작해서 24696번까지 들어가는 것이 확인되었다. 

     

    테스트 코드 주소

    https://github.com/chickenchickenlove/springbatchstudy/tree/main/SpringBatchLecture/main/java/io/springbatch/springbatchlecture/partitioning

     

     

    댓글

    Designed by JB FACTORY