Spring Batch : Chunk 개념

    이 게시글은 인프런 정수원님의 강의를 듣고 복습하며 정리한 글입니다. 

     

    Spring Batch의 Chunk

    Spring Batch에서 'Chunk'는 여러 개의 아이템을 묶은 하나의 덩어리를 지칭한다. 한번에 여러 개의 데이터를 입력 받아 Chunk로 만들고, Chunk 단위로 트랜잭션 내에서 데이터를 처리해준다. Chunk 단위로 데이터가 처리되기 때문에 Transaction의 Rollback, Commit은 모두 Chunk 단위로 이루어지게 된다. 

     

    Chunk의 흐름도

    Chunk의 큰 흐름도는 위와 같다.

    1. 여러 개의 Source를 1개씩 ItemReader가 불러와서 Input Chunk에 넣어준다. 이 때,  Transaction이 시작된다.
    2. ItemReader는 ChunkSize만큼 Source에서 데이터를 불러온 후, Chunk를 ItemProcessor에 전달한다.
    3. ItemProcessor는 Chunk를 하나씩 핸들링 해서 다시 한번 Output Chunk로 만들어준다. 이 때, ItemProcessor가 없다면 Input Chunk는 아무런 처리 없이 Output Chunk로 변경된다. 
    4. ItemWriter는 Output Chunk를 전달받어 한꺼번에 트랜잭션을 커밋하고 종료한다. 

    이 때 유의할 점은 이것이다. 

    1. 트랜잭션은 ItemReader가 소스를 읽기 시작할 때 시작, ItemWriter가 처리할 때 Commit한다. 만약, Exception이 발생할 경우 Job으로 돌아가고, Job은 실패로 끝난다.
    2. ItemReader / ItemProcessor는 하나씩 처리한다. ItemWriter는 Chunk 단위로 처리한다. 

    위에서 알 수 있듯이, Chunk 자체는 이미 Transaction 내에서 처리되기 때문에 개발자가 Transaction을 신경쓰지 않아도 된다. 

    좀 더 자세히 살펴보면 다음과 같다. ChunkSize만큼 ItemProcessor는 Source로부터 단건으로 데이터를 읽어와 Chunk를 만들어주고 이것을 ItemProcessor로 넘긴다. ItemProcessor는 개발자가 구현한 비즈니스 로직을 하나씩 처리해서 Output Chunk를 만들어준다. 그리고 이 Output Chunk는 ItemWriter로 넘어가고, ItemWrite는 한번에 DB로 트랜잭션 커밋 처리를 해준다. 

     

    Chunk의 내부 메서드

    	private List<W> items = new ArrayList<>(); // Chunk Item
    	private List<SkipWrapper<W>> skips = new ArrayList<>(); // 오류 발생 시, Skip한 item
    	private List<Exception> errors = new ArrayList<>(); // Skip 시 예외 목록
    
    
    	// ChunkIterator 내부 클래스
    	public class ChunkIterator implements Iterator<W> {
    		final private Iterator<W> iterator;
    
    		public ChunkIterator(List<W> items) {
    			iterator = items.iterator();
    		}
        }
    • Chunk 객체는 내부적으로 처리를 위한 Item을 List 형태로 가지고 있다. 이 값의 최대 크기는 Chunk Size다.
    • Chunk 객체는 중간에 예외가 발생한 객체와 에러를 저장하는 List를 가지고 있다. 
    • Chunk 객체는 내부 클래스로 Iterator를 가지고 있다. 이 내부 클래스에서 Chunk는 ChunkSize만큼의 반복 처리를 해준다. 

     

     

    Chunk의 코드 분석

    Tasklet 객체는 TaskletStep에 포함된다. 이것과 마찬가지로 Chunk 객체는 ChunkOrientedTasklet에 포함된다. ChunkOrientedTaskLet부터 어떤 흐름으로 Chunk가 진행되는지 확인해보고자 한다. 

    ChunkOrientedTasklet

    ChunkOrientedTasklet은 내부적으로 ChunkProcessor와 ChunkProvider를 가지고 있다. ChunkProvider는 내부적으로 ItemReader를 가지고 있고, ChunkProcessor는 내부적으로 ItemProcessor, ItemWriter를 가지고 있다. 

    ChunkOrientedTasklet

    ChunkOrientedTasklet은 execute() 메서드의 chunkProvider.provide를 이용해 Input Chunk를 읽어온다. 

    ChunkProvider

    ChunkProvider 인터페이스의 구현체 SimpleChunkProvider는 내부적으로 ItemReader와 RepeatOperations를 가진다. 이 RepeatOperations은 후에 좀 더 살펴보기로 한다. 

    ChunkProvider

    여튼 chunkProvider의 provide 메서드로 들어와면, Chunk 객체를 하나 만들어 inputs라는 이름을 붙인다. 그리고 앞서 만들었던 반복 객체를 통해 Iterator를 돌려주는 것을 볼 수 있다. 

    ChunkProvider

    이 때, iterate 메서드에 RepeatCallBack 함수를 익명 클래스로 구현을 하는데, doInIteration 안에서 item을 read해오는 것을 확인할 수 있다. 

    ChunkProvider

    read 메서드를 타고가보면 doRead()라는 메서드의 결과를 반환하는 것을 볼 수 있다. 

    ChunkProvider

    ChunkProvider는 doRead 메서드를 이용해 데이터를 읽어온다. 이 때, ItemReader.read()를 통해 Item을 읽어오고 Item을 반환해준다.

    ChunkProvider

    찾아온 Item은 ChunkProvider가 내부적으로 가지는 inputs 객체에 하나씩 담기는 것을 확인할 수 있다. 따라서 이와 같이 ChunkProvider는 내부적으로 Iterator를 이용해서 소스를 하나씩 읽어와서 저장하는 것을 확인할 수 있다. 

    ChunkOrientedTasklet

    다시 ChunkOrientdTasklet으로 돌아왔다. ChunkOrientedTasklet은 chunkProvider.provider 메서드를 사용했다. 이 메서드를 통해 ItemReader는 소스로부터 데이터를 하나씩 읽어 Chunk 객체를 만들어서 반환했고, ChunkOrientedTasklet은 이것을 inputs라는 객체에 저장했다. 

    ChunkOrientedTasklet

    Input Chunk를 전달받은 chunkProcessor는 process 메서드를 이용해 Chunk를 chunkProcessor에 전달해준다. 

    SimpleChunkProcessor

    ChunkProcess 인터페이스를 구현한 SimpleChunkProcessor로 넘어온다. SimpleChunkProcessor는 내부적으로 ItemProcessor, ItemWriter를 가진다. 

    SimpleChunkProcessor

    SimplChunkProcessor의 process 메서드로 넘어가보자. process 메서드에서는 transform이라는 메서드에 Input Chunk를 넘겨주고 output Chunk를 받는 것을 확인할 수 있다. 

    SimpleChunkProcessor

    transform 메서드에서는 Output Chunk를 만드는 것을 볼 수 있다. 내부적으로 input을 For문 형태로 반복문을 돌리고, For문의 각 요소를 하나씩 doProcess를 통해서 처리해주는 것을 볼 수 있다. 

    SimpleChunkProcessor

    doProcess는 다음과 같이 itemProcessor.process 메서드에 넘어온 Item 단건을 넘겨주면서 결과를 반환받는 것을 알 수 있다. 이 때, itemProcessor는 StepBuilderFactory에서 지정해준 itemProcessor가 사용된다. 

    SimpleChunkProcessor

    받아온 output은 outputs 리스트에 하나씩 차곡차곡 쌓아서 For문을 끝까지 도는 것을 볼 수 있다. For문을 끝까지 돌면, 이 Output Chunk를 반환해준다. 

    SimpleChunkProcessor

    transform 메서드가 끝나면 전달받은 input / outpt을 write 메서드에 넘겨주는 것을 확인할 수 있다. 

    Write 메서드에서는 doWrite에 output 전체를 넘겨주는 것을 볼 수 있다. 

    doWrite를 따라가보면 writeItems → itemWriter.write를 통해서 items를 한방에 처리하는 것을 볼 수 있다. 즉, ItemWriter는 단건이 아닌 Chunk 단위로 처리하는 것을 확인할 수 있다. 

     

    정리

    1. Chunk는 덩어리를 의미하고, ChunkOrientedTaskLet은 Chunk 단위로 데이터를 처리 / 커밋한다.
    2. ChunkOrientedTaskLet은 내부적으로 ChunkProvider / ChunkProcessor를 가지고 있다.
    3. ChunkProvider는 ItemReader를 가지고 있고, Input Chunk를 만들어준다.
    4. ChunkProcessor는 ItemWriter, ItemProcessor를 가지고 있고 OutPut Chunk를 만들어 커밋 처리를 해준다.
    5. ItemReader / ItemProcessor는 ChunkSize만큼 단건씩 처리해준다. ItemWriter는 Chunk를 한방에 처리해준다.
    6. ChunkOrientedStep은 Transaction 내에서 처리된다. 그리고 Chunk 단위로 데이터 처리 / 커밋된다.

    댓글

    Designed by JB FACTORY