Spring Batch : Multi-threaded Step
- Spring/Spring Batch
- 2022. 3. 24.
이 게시글은 인프런 정수원님의 강의를 복습하며 작성한 글입니다
Multi-Threaded Step
Multi-Threaded Step은 Chunk 기반의 멀티 쓰레드 스텝이다. Chunk 기반으로 처리를 하게 되는데, 이 때 멀티 쓰레딩 환경으로 동작하게 되면서 일반적인 Single 쓰레드 방식의 Chunk 쓰레드 방식보다 조금 더 빠르게 동작할 가능성이 있다. Chunk는 각 ItemReader / ItemProcessor / ItemWriter마다 Chunk를 새로 만들어서 전달해주는 방식 때문에 멀티 쓰레딩 환경에서 동기화를 처리하는데 좀 더 수월하게 접근할 수 있다.
Multi-Threaded Step의 구성
- Multi-Threaded Step은 TaskExecutorRepeatTemplate을 통해서 반복된다.
- 쓰레드를 생성한 만큼 Tasklet이 동시에 수행되게 된다(병렬성)
Multi-Threaded Step이 실행되는 순서를 살펴보면 다음과 같다.
- Job이 실행되고, Step이 실행된다.
- Step은 TaskExecutorRepeatTemplate을 이용한다. 이 때, Thread만큼 Runnable을 만들고, 각 쓰레드는 Runnable을 실행한다.
- Runnable은 RepeatCallBack 함수를 실행한다.
- RepeatCallBack 함수는 ChunkOrientedTasklet을 수행한다.
- ChunkOrientedTasklet은 ItemReader / ItemProcessor / ItemWriter가 나눠서 처리를 한다. 이 때, 각 쓰레드는 독립적으로 ItemReader / ItemProcessor / ItemWriter를 이용한다.
주의해야 할 부분은 각 쓰레드 간의 영향을 미치지 않기 위해서 동기화 처리를 해줘야 한다. 그런데 기본적으로 Chunk는 쓰레드마다 새로운 Chunk를 만들어 ItemReader / ItemProcessor / ItemWriter에 넘겨준다. 따라서 자원을 넘겨주는 관점에서는 동기화 처리를 할 필요가 없다.
그렇지만 ItemReader는 동기화 처리를 반드시 해야한다. 기본적으로 DB에 접근을 하게 될텐데, 동시에 DB에 접근하게 되면 동시성 문제가 발생할 수 있기 때문이다. 이럴 때는 JdbcPagingItemReader / JpaPagingItemReader를 사용하거나 SynchronziedItemStreamReader를 이용해 동기화 처리를 해주면 된다.
TaskExecutorRepeatTemplate 살펴보기
- TaskExecutorRepeatTemplate은 기본적으로 4개의 쓰레드를 만들어 동작하게 된다.
- TaskExecutroRepeatTemplate의 ThreadExecutor는 SyncTaskExecutor()가 사용된다. 쓰레드 풀을 지원하는 Executor가 아니기 때문에 쓰레드 풀을 지원하는 Executor를 사용하는 것이 추천된다.
- TaskExecutorRepeatTemplate의 쓰레드들은 내부적으로 가지고 있는 ExecutingRunnable을 실행한다.
- ExecutingRunnable은 내부적으로 callback 함수, Contexts, Que를 가진다. CallBack은 taskletStep이고, Que는 내부적으로 반복 / 실행 중인지 / 종료했는지를 판별할 때 사용한다.
- Que에는 ExecutingRunnable의 RepeatStatus가 저장된다.
- ExecutinRunnable 함수는 run() 메서드를 실행시킨다.
- Run() 메서드는 내부적으로 callback 함수를 실행시키는데, 이 과정에서 우리가 알고 있던 Tasklet이 수행된다.
- 실행 후 Executing Runnable 자신은 Que에 저장된다. 이유는 현재 ExecutinRunnable의 상태가 완료된 것인지 아닌지를 판단하고, 반복문을 종료할지를 결정해주는데 사용된다.
Multi-Threaded Step의 실행 구조
앞서 이야기 했던 것의 반복이다. 다시 간략히 정리해보면 다음과 같다.
- taskExecutor는 지정된 수만큼의 쓰레드를 생성한다. 그리고 그 쓰레드는 Runnable을 가지고 있으며, Runnable을 실행한다.
- Runnable은 ChunkOrientedTasklet이다. ChunkOrientedTaskLet은 내부적으로 쓰레드 별로 Chunk를 생성해서 Reader → Processor → Writer 순으로 전달된다. 따라서 전달 과정은 쓰레드 프리하다.
SimpleChunkProvider는 Input Chunk를 제공해준다. Input Chunk는 보시다시피 매번 새롭게 만들어져서 전달된다. 따라서 쓰레드 프리하다는 것을 알 수 있다.
실제로 디버깅 환경에서 확인해보면 멀티 쓰레드가 동작을 하고 있고, 멀티 쓰레드에서 input chunk의 주소가 서로 다른 것을 확인할 수 있다. 즉, Input Chunk는 쓰레드 프리하게 동작하고 있다는 이야기다. 따!라!서! 동기화 처리된 ItemReader만 사용해주면 Chunk 환경의 멀티 쓰레드는 아주 편리하게 적용할 수 있다.
Multi-Threaded Step의 API
- taskExecutor() API를 적용하기만 하면 Multi-Threaded Step 이 된다. taskExecutor()를 따로 설정해주지 않으면, 기본적으로는 SyncTaskExecutor를 이용한다.
이 외에는 설정할 것이 없고, 동기화 처리된 Reader만 입력해주면 된다는 점에서 굉장히 사용히 간편하다. 그리고 Processor / Writer / Reader는 모두 프록시 객체일 필요가 없다. 왜냐하면 구조 자체가 ItemReader / ItemStream / ItemWriter는 하나씩 사용하는데, 각 쓰레드가 서로 다른 Chunk를 사용하기 때문에 멀티 쓰레딩 환경이 구성되기 때문이다.
코드 실습
Job 코드
@Bean
public Job multiThreadedJob() {
return jobBuilderFactory.get("multiThreadJob")
.incrementer(new RunIdIncrementer())
.start(multiThreadedStep())
.build();
}
Step 코드
@Bean
public Step multiThreadedStep() {
return stepBuilderFactory.get("multiThreadStep")
.<Customer, Customer2>chunk(1000)
.reader(multiThreadItemReader())
.processor((ItemProcessor<Customer, Customer2>) item -> Customer2.builder()
.id(item.getCustomer_id())
.firstName(item.getFirstName())
.lastName(item.getLastName())
.birthDate(item.getBirthDate())
.build())
.writer(multiThreadItemWriter())
.taskExecutor(myTaskExecutor())
.build();
}
Step 코드는 다음과 같이 작성했다. 기본적으로는 taskExecutor API만 설정하는데 신경쓰면 된다.
TaskExecutor 코드
@Bean
public TaskExecutor myTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("Non-Thread-Free");
executor.setMaxPoolSize(8);
executor.setCorePoolSize(4);
return executor;
}
TaskExecutor는 다음과 같이 쓰레드 풀을 만들어 넣어주었다.
Reader + Writer 코드
@Bean
public ItemWriter<Customer2> multiThreadItemWriter() {
return new JdbcBatchItemWriterBuilder<Customer2>()
.sql("insert into Customer2(customer2_id, birth_date, first_name, last_name) values(:id, :birthDate, :firstName, :lastName)")
.dataSource(dataSource)
.beanMapped()
.build();
}
@Bean
public JpaPagingItemReader<Customer> multiThreadItemReader() {
return new JpaPagingItemReaderBuilder<Customer>()
.name("builder Name")
.currentItemCount(0)
.pageSize(1000)
.maxItemCount(1000)
.entityManagerFactory(emf)
.queryString("select c from Customer c")
.build();
}
Reader와 Writer에서 굳이 @StepScope를 사용해서 프록시 객체를 사용해 줄 필요는 없다. 왜냐하면 multi-Threaded Step은 기본적으로 프록시 객체가 아닌 싱글톤 빈을 기준으로 동작하기 때문이다. 그렇지만 정말 께림찍하다면 프록시 객체를 만들어서 공급해주자!
실행 결과
실행 결과를 살펴보면 멀티 쓰레드와 싱글 쓰레드 간에 총 소요 시간이 2배 가까이 개선된 것을 볼 수 있다. 이론적으로는 쓰레드 갯수만큼 시간이 단축되어야 하지만 실상은 그렇지 않다. 아무래도 컨텍스트 스위칭 비용이라는 것이 존재하는데, 이것이 좀 큰 비용이기 때문에 이론치만큼의 값은 나오지 않은 것 같다. 그럼에도 불구하고 멀티 쓰레등을 이용할 경우 좋은 효과를 볼 수 있는 것 같다.
또한 앞의 그림에서 볼 수 있듯이 각각의 쓰레드가 생성되어서 독립적으로 값을 읽어오고 처리하는 것을 확인할 수 있었다.
테스트 코드
'Spring > Spring Batch' 카테고리의 다른 글
Spring Batch : Retry 기능 (0) | 2022.03.27 |
---|---|
Spring Batch : SynchronizedItemStreamReader 정리 (0) | 2022.03.23 |
Spring Batch : Step Skip (0) | 2022.03.19 |
Spring Batch : Partition Step (0) | 2022.03.19 |
Spring Batch : JobExplorer, JobRegistry,JobOperation, (0) | 2022.03.18 |