Spring Batch : SynchronizedItemStreamReader 정리

    이 글은 인프런 정수원님의 강의를 복습하며 작성한 글입니다.

    Spring Batch : SynchronizedItemStreamReader

    Spring Batch는 여러 멀티 쓰레드 환경에 Chunk 흐름을 제공한다. 멀티 쓰레드 환경에서 가장 조심해야 할 부분은 각 쓰레드가 서로에 영향을 미치지 않고 동작을 해야한다는 것이다. 

    ChunkOrientedTasklet → Provide

    먼저 데이터를 전달하는 관점에서 살펴본다. 위의 이미지는 ChunkOrientedTasklet의 Provider의 Provide 메서드다. Provide 메서드는 시작하자마자 새로운 Chunk를 만드는 것을 볼 수 있다. 이것은 Input Chunk인데, Input Chunk에 값을 채워서 Processor에 넘겨주고, Processor는 또 새로운 Chunk를 만들어서 전달해준다. 즉, 데이터를 전달하는 관점에서 각 쓰레드는 서로에게 영향을 미치지 않는다는 것이다. 

    AbstractPagingItemReader

    JdbcPagingItemReaer / JpaPagingItemReader의 부모 클래스는 AbstractPaingItemReader다.부모 클래스는 doRead() 메서드를 통해 자식을 호출하는데, 이 때 synchronzied로 비동기 처리가 되어있다. 따라서 이런 PagingItemReader 계열을 사용하면 쓰레드에 안전하다. 그렇지만 모든 ItemReader가 쓰레드에 안전하지는 않다. 

    병렬 처리가 된 ItemReader 같은 경우에는 Synchronized 키워드가 걸려있다. 즉, Lock을 획득한 쓰레드만 해당 메서드에 접근해서 일을 처리할 수 있고, 나머지 Thread는 Lock을 획득할 때까지 기다린다. 그리고 Lock을 획득하면 순차적으로 Item을 읽어온다.

    반면 왼쪽은 동기화 처리가 안되어있는데, 보면 여러 쓰레드가 동시에 ItemReader에 접근해서 각각 DB에서 Item을 읽어올 수 있다. 이렇게 될 경우, 중복된 데이터를 읽어올 수 있게 되면서 멀티 쓰레드 환경에서 안전하지 않게 된다. 

    SynchronizedItemStreamReader

    SynchronizedItemStreamReader는 이런 멀티 쓰레드 환경에 안전하지 않은 ItemReader들에게 손쉽게 멀티 쓰레딩 환경을 제공해준다. 보시다시피 synchronized 키워드로 묶여있고, 전달받은 delegate라는 ItemReader를 read() 해준다. 그래서 간단하게 멀티 쓰레딩 환경을 제공 해주는 것으로 이해를 하면 될 것 같다. 

     

    코드 실습

        @Bean
        public Job SynchronizedItemStreamReaderJob() {
            return jobBuilderFactory.get("SynchronizedItemStreamReaderJob")
                    .incrementer(new RunIdIncrementer())
                    .start(synchronizedItemStreamReaderStep())
                    .build();
        }
    
        @Bean
        public Step synchronizedItemStreamReaderStep() {
            return stepBuilderFactory.get("SynchronizedItemStreamReaderStep")
                    .<Customer, Customer2>chunk(10)
                    .reader(myItemStreamReader())
    //                .reader(myNonSynchronizedItemReader())
                    .processor(new ItemProcessor<Customer, Customer2>() {
                        @Override
                        public Customer2 process(Customer item) throws Exception {
                            System.out.println(" >> Now Thread : " + Thread.currentThread().getName() + " Item : " + item.getCustomer_id());
    
                            return Customer2.builder()
                                    .id(item.getCustomer_id())
                                    .firstName(item.getFirstName())
                                    .lastName(item.getLastName())
                                    .birthDate(item.getBirthDate())
                                    .build();
                        }
                    })
                    .writer(myItemStreamWriter())
                    .taskExecutor(myTaskExecutor())
                    .build();
        }

    Job과 Step은 위 코드처럼 등록했다. taskExecutor()를 등록해줘서 멀티 쓰레딩 환경의 Chunk로 만들었다. 

    @Bean
    public TaskExecutor myTaskExecutor() {
    
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(8);
        executor.setThreadNamePrefix("not-safety-thread");
    
        return executor;
    }

    이 때, taskExecutor()는 ThreadPoolTaskExecutor()로 만들었다. 이걸 사용한 이유는 ThreadPool 형식으로 관리하기 때문에 새로운 쓰레드가 만들어지고 파괴되는데 필요한 비용을 아낄 수 있기 때문이다.

    @StepScope
    @Bean
    public JdbcBatchItemWriter<Customer2> myItemStreamWriter() {
        return new JdbcBatchItemWriterBuilder<Customer2>()
                .sql("insert into Customer2(customer2_id, birth_date, first_name, last_name) values(:id, :birthDate, :firstName, :lastName)")
                .dataSource(dataSource)
                .beanMapped()
                .build();
    }

    Writer는 JdbcBatchItemWriter를 사용했다. Writer 중에 가장 좋아하는 라이터인데, Bulk Insert라서 나는 좋아해서 선호한다. 

     

    ItemReader 코드 처리

    @StepScope
    @Bean
    public JpaCursorItemReader<Customer> myNonSynchronizedItemReader() {
        return new JpaCursorItemReaderBuilder<Customer>()
                .queryString("select c from Customer c")
                .name("myNonSynchronizedItemReader")
                .maxItemCount(100)
                .currentItemCount(0)
                .entityManagerFactory(emf)
                .build();
    }
    • JpaCursorItemReader를 사용했다. JpaCursorItemReader는 비동기식 ItemReader다.
    • Cursor 방식으로 움직인다. 

     

    @StepScope
    @Bean
    public SynchronizedItemStreamReader<Customer> myItemStreamReader() {
        SynchronizedItemStreamReaderBuilder<Customer> returnBuilder = new SynchronizedItemStreamReaderBuilder<>();
        SynchronizedItemStreamReader<Customer> build = returnBuilder.delegate(myNonSynchronizedItemReader()).build();
        return build;
    }

    SynchronizedItemStreamReader를 이용해줬다. Builder를 만들고, Builder에 deligate 값을 추가해주기만 하면 전달된 ItemReader는 동기화 처리가 된다. 

     

    코드 실행 결과 확인 

    코드 실행 결과 멀티 쓰레드 환경에서 쓰레드에 안전하게 Item을 읽어오는 것을 확인했다. 내가 혼동했던 부분은 DataSource를 어떻게 나누는지에 대한 것이었다. 나는 이 부분을 이렇게 단계를 나누어 이해했다. 

    먼저 Data Source에서 가져올 전체 ItemCount는 최대 100개다. 따라서 지정된 Cursor는 최대 100번까지만 움직일 것이다. 그리고 트랜잭션은 한번 타졌을 때, Cursor는 계속 유지한다. 정리하면 트랜잭션이 연결된 상태에서 Cusor만 움직이면 Item을 하나씩 읽어올 것이다. 

    결국 SynchronizedItemStreamReader는 doRead(), 그러니까 read() 메서드에 Lock을 걸어주는 것이다. 그러면 각 쓰레드는 한번씩 사이좋게 Item을 하나씩 읽어서 간다. 예를 들어 1번 쓰레드가 read 하고, 메서드에서 빠져나오는 순간 다음 쓰레드가 read를 한다. 

    위 방식을 종합해보면 멀티 쓰레드 환경에서 JdbcCursorItemReader / JpaCursorItemReader는 다음과 같이 움직이는 것을 유추할 수 있다.

    Read를 할 때 마다 커서가 움직이는데, Read는 한번에 하나의 Item만 읽는다. 즉, 각 쓰레드는 한번에 하나의 Item만 읽으면서 각 Cursor를 순차적으로 이동시킬 것이다. 그렇게 생각해보면 위처럼 멀티 쓰레드가 동작한 것이 이해가 간다. 

     

     

    테스트 코드

     

     

    GitHub - chickenchickenlove/springbatchstudy

    Contribute to chickenchickenlove/springbatchstudy development by creating an account on GitHub.

    github.com

     

    'Spring > Spring Batch' 카테고리의 다른 글

    Spring Batch : Retry 기능  (0) 2022.03.27
    Spring Batch : Multi-threaded Step  (0) 2022.03.24
    Spring Batch : Step Skip  (0) 2022.03.19
    Spring Batch : Partition Step  (0) 2022.03.19
    Spring Batch : JobExplorer, JobRegistry,JobOperation,  (0) 2022.03.18

    댓글

    Designed by JB FACTORY