Spring Batch : Multi-threaded Step

    이 게시글은 인프런 정수원님의 강의를 복습하며 작성한 글입니다

    Multi-Threaded Step

    Multi-Threaded Step은 Chunk 기반의 멀티 쓰레드 스텝이다. Chunk 기반으로 처리를 하게 되는데, 이 때 멀티 쓰레딩 환경으로 동작하게 되면서 일반적인 Single 쓰레드 방식의 Chunk 쓰레드 방식보다 조금 더 빠르게 동작할 가능성이 있다. Chunk는 각 ItemReader / ItemProcessor / ItemWriter마다 Chunk를 새로 만들어서 전달해주는 방식 때문에 멀티 쓰레딩 환경에서 동기화를 처리하는데 좀 더 수월하게 접근할 수 있다. 

     

    Multi-Threaded Step의 구성

    • Multi-Threaded Step은 TaskExecutorRepeatTemplate을 통해서 반복된다.
    • 쓰레드를 생성한 만큼 Tasklet이 동시에 수행되게 된다(병렬성) 

    Multi-Threaded Step이 실행되는 순서를 살펴보면 다음과 같다.

    1. Job이 실행되고, Step이 실행된다. 
    2. Step은 TaskExecutorRepeatTemplate을 이용한다. 이 때, Thread만큼 Runnable을 만들고, 각 쓰레드는 Runnable을 실행한다.
    3. Runnable은 RepeatCallBack 함수를 실행한다. 
    4. RepeatCallBack 함수는 ChunkOrientedTasklet을 수행한다.
    5. ChunkOrientedTasklet은 ItemReader / ItemProcessor / ItemWriter가 나눠서 처리를 한다. 이 때, 각 쓰레드는 독립적으로 ItemReader / ItemProcessor / ItemWriter를 이용한다. 

    주의해야 할 부분은 각 쓰레드 간의 영향을 미치지 않기 위해서 동기화 처리를 해줘야 한다. 그런데 기본적으로 Chunk는 쓰레드마다 새로운 Chunk를 만들어 ItemReader / ItemProcessor / ItemWriter에 넘겨준다. 따라서 자원을 넘겨주는 관점에서는 동기화 처리를 할 필요가 없다. 

    그렇지만 ItemReader는 동기화 처리를 반드시 해야한다. 기본적으로 DB에 접근을 하게 될텐데, 동시에 DB에 접근하게 되면 동시성 문제가 발생할 수 있기 때문이다. 이럴 때는 JdbcPagingItemReader / JpaPagingItemReader를 사용하거나 SynchronziedItemStreamReader를 이용해 동기화 처리를 해주면 된다. 

     

    TaskExecutorRepeatTemplate 살펴보기

    • TaskExecutorRepeatTemplate은 기본적으로 4개의 쓰레드를 만들어 동작하게 된다.
    • TaskExecutroRepeatTemplate의 ThreadExecutor는 SyncTaskExecutor()가 사용된다. 쓰레드 풀을 지원하는 Executor가 아니기 때문에 쓰레드 풀을 지원하는 Executor를 사용하는 것이 추천된다. 

    • TaskExecutorRepeatTemplate의 쓰레드들은 내부적으로 가지고 있는 ExecutingRunnable을 실행한다. 
    • ExecutingRunnable은 내부적으로 callback 함수, Contexts, Que를 가진다. CallBack은 taskletStep이고, Que는 내부적으로 반복 / 실행 중인지 / 종료했는지를 판별할 때 사용한다. 
    • Que에는 ExecutingRunnable의 RepeatStatus가 저장된다.

    ExecutingRunnable.Run()

    • ExecutinRunnable 함수는 run() 메서드를 실행시킨다.
    • Run() 메서드는 내부적으로 callback 함수를 실행시키는데, 이 과정에서 우리가 알고 있던 Tasklet이 수행된다. 
    • 실행 후 Executing Runnable 자신은 Que에 저장된다. 이유는 현재 ExecutinRunnable의 상태가 완료된 것인지 아닌지를 판단하고, 반복문을 종료할지를 결정해주는데 사용된다. 

     

    Multi-Threaded Step의 실행 구조

    앞서 이야기 했던 것의 반복이다. 다시 간략히 정리해보면 다음과 같다.

    1. taskExecutor는 지정된 수만큼의 쓰레드를 생성한다. 그리고 그 쓰레드는 Runnable을 가지고 있으며, Runnable을 실행한다. 
    2. Runnable은 ChunkOrientedTasklet이다. ChunkOrientedTaskLet은 내부적으로 쓰레드 별로 Chunk를 생성해서 Reader → Processor → Writer 순으로 전달된다. 따라서 전달 과정은 쓰레드 프리하다.

    SimpleChunkProvider.provide()

    SimpleChunkProvider는 Input Chunk를 제공해준다. Input Chunk는 보시다시피 매번 새롭게 만들어져서 전달된다. 따라서 쓰레드 프리하다는 것을 알 수 있다. 

    쓰레드 4번 / inputs chunk 주소 확인
    쓰레드 3번 / inputs chunk 주소 확인

    실제로 디버깅 환경에서 확인해보면 멀티 쓰레드가 동작을 하고 있고, 멀티 쓰레드에서 input chunk의 주소가 서로 다른 것을 확인할 수 있다. 즉, Input Chunk는 쓰레드 프리하게 동작하고 있다는 이야기다. 따!라!서! 동기화 처리된 ItemReader만 사용해주면 Chunk 환경의 멀티 쓰레드는 아주 편리하게 적용할 수 있다. 

     

    Multi-Threaded Step의 API 

    1. taskExecutor() API를 적용하기만 하면 Multi-Threaded Step 이 된다. taskExecutor()를 따로 설정해주지 않으면, 기본적으로는 SyncTaskExecutor를 이용한다. 

    이 외에는 설정할 것이 없고, 동기화 처리된 Reader만 입력해주면 된다는 점에서 굉장히 사용히 간편하다. 그리고 Processor / Writer / Reader는 모두 프록시 객체일 필요가 없다. 왜냐하면 구조 자체가 ItemReader / ItemStream / ItemWriter는 하나씩 사용하는데, 각 쓰레드가 서로 다른 Chunk를 사용하기 때문에 멀티 쓰레딩 환경이 구성되기 때문이다.

     

     

    코드 실습

    Job 코드

    @Bean
    public Job multiThreadedJob() {
        return jobBuilderFactory.get("multiThreadJob")
                .incrementer(new RunIdIncrementer())
                .start(multiThreadedStep())
                .build();
    }

     

    Step 코드

    @Bean
    public Step multiThreadedStep() {
        return stepBuilderFactory.get("multiThreadStep")
                .<Customer, Customer2>chunk(1000)
                .reader(multiThreadItemReader())
                .processor((ItemProcessor<Customer, Customer2>) item -> Customer2.builder()
                        .id(item.getCustomer_id())
                        .firstName(item.getFirstName())
                        .lastName(item.getLastName())
                        .birthDate(item.getBirthDate())
                        .build())
                .writer(multiThreadItemWriter())
                .taskExecutor(myTaskExecutor())
                .build();
    }

    Step 코드는 다음과 같이 작성했다. 기본적으로는 taskExecutor API만 설정하는데 신경쓰면 된다.

     

    TaskExecutor 코드

    @Bean
    public TaskExecutor myTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("Non-Thread-Free");
        executor.setMaxPoolSize(8);
        executor.setCorePoolSize(4);
        return executor;
    }

    TaskExecutor는 다음과 같이 쓰레드 풀을 만들어 넣어주었다.

     

    Reader + Writer 코드

    @Bean
    public ItemWriter<Customer2> multiThreadItemWriter() {
        return new JdbcBatchItemWriterBuilder<Customer2>()
                .sql("insert into Customer2(customer2_id, birth_date, first_name, last_name) values(:id, :birthDate, :firstName, :lastName)")
                .dataSource(dataSource)
                .beanMapped()
                .build();
    
    }
    @Bean
    public JpaPagingItemReader<Customer> multiThreadItemReader() {
        return new JpaPagingItemReaderBuilder<Customer>()
                .name("builder Name")
                .currentItemCount(0)
                .pageSize(1000)
                .maxItemCount(1000)
                .entityManagerFactory(emf)
                .queryString("select c from Customer c")
                .build();
    }

    Reader와 Writer에서 굳이 @StepScope를 사용해서 프록시 객체를 사용해 줄 필요는 없다. 왜냐하면 multi-Threaded Step은 기본적으로 프록시 객체가 아닌 싱글톤 빈을 기준으로 동작하기 때문이다. 그렇지만 정말 께림찍하다면 프록시 객체를 만들어서 공급해주자! 

     

     

    실행 결과

    좌 : 멀티 쓰레드(4개) / 우 : 싱글 쓰레드

    실행 결과를 살펴보면 멀티 쓰레드와 싱글 쓰레드 간에 총 소요 시간이 2배 가까이 개선된 것을 볼 수 있다. 이론적으로는 쓰레드 갯수만큼 시간이 단축되어야 하지만 실상은 그렇지 않다. 아무래도 컨텍스트 스위칭 비용이라는 것이 존재하는데, 이것이 좀 큰 비용이기 때문에 이론치만큼의 값은 나오지 않은 것 같다. 그럼에도 불구하고 멀티 쓰레등을 이용할 경우 좋은 효과를 볼 수 있는 것 같다. 

    또한 앞의 그림에서 볼 수 있듯이 각각의 쓰레드가 생성되어서 독립적으로 값을 읽어오고 처리하는 것을 확인할 수 있었다.

     

     

    테스트 코드 

     

     

    GitHub - chickenchickenlove/springbatchstudy

    Contribute to chickenchickenlove/springbatchstudy development by creating an account on GitHub.

    github.com

     

    댓글

    Designed by JB FACTORY