Spring Batch : ItemReader / ItemProcessor / ItemWriter / ItemStream

    이 강의는 인프런 정수원님의 강의를 수강하고 복습하며 작성한 글입니다.

    ItemReader

    ItemReader는 인터페이스고, 여러 구현체가 존재한다. 여러 구현체는 다양한 소스를 하나씩 읽어오는 기능을 구현해준다. ItemReader는 ChunkOrientedTasklet을 사용할 때, 필수적으로 설정해줘야한다. ItemReader가 읽어올 수 있는 소스는 다음과 같다.

    • Flat 파일 : csv, txt 
    • XML,JSON
    • DB
    • JMS(Java Message Service), RabbitMQ와 같은 Messag Queuing 서비스
    • Custom Reader → 사용자 설정 Reader. 직접 구현 시, 멀티 스레드 환경에서 스레드에 안전하게 구현해야함.

     

    ItemReader의 메서드

    ItemReader는 reade 메서드를 가진다. 이 메서드는 입력 데이터를 하나씩 읽고 다음 데이터를 읽는다. 이 때, 더 읽어올 데이터가 없으면 Null을 Return한다. 하나의 아이템이라는 것은 csv 파일 한줄, DB의 1 Row, XML의 한 엘리먼트가 될 수 있다. 

     

    read 메서드는 만약 데이터를 읽다가, 더 읽을 수 없는 데이터가 없는 경우에는 예외 발생하지 않고 다음 단계로 넘어간다. read 메서드를 통해 읽어온 값은 Input Chunk로 만들어진다. 

     

    ItemReader 클래스 다이어그램

    ItemReader는 다음과 같이 여러 구현체를 가진다. 눈여겨볼 것은 Reader 구현체는 대부분 ItemStream, ItemReader 인터페이스를 동시에 구현하고 있다는 것이다. 한 가지 더 알아 두어야 할 것은 모든 구현체가 멀티 쓰레드 환경에서 안전한 구현체가 아니라는 점이다.

    멀티쓰레드 환경에서 안전한 것은 JdbcPagingItemReader / JpaPagingItemReader 뿐이며, 나머지 ItemReader를 이용해 병렬처리를 할 경우 쓰레드를 고려한 동기화가 필요하다. 

     

    ItemReader는 Item을 처음 읽어올 때, 처음으로 트랜잭션이 시작된다. 

     

    ItemStream 인터페이스의 역할은?

    ItemStream 인터페이스는 파일의 스트림을 열거나 종료, DB 커넥션을 열거나 종료, 입력 장치 초기화 등의 작업을 한다. 또한, ExecutionContext에 read와 관련된 여러 상태 정보를 저장하는 역할을 한다. 

     

    ItemWriter

    ItemWriter는 Chunk 단위로 데이터를 받아, Chunk 단위로 데이터를 처리하는 역할을 한다. 인터페이스며, 다양한 형식을 지원하기 위해 다양한 구현체가 존재한다. 또한, ItemReader와 마찬가지로 ChunkOrientedTasklet을 사용하려면 반드시 설정해야하는 값이다.

    • Flat 파일 : csv, txt 
    • XML,JSON
    • DB
    • JMS(Java Message Service), RabbitMQ와 같은 Messag Queuing 서비스
    • 메일링 서비스

    주의 해야할 점은 ItemWriter는 ItemReader와 다르게 Chunk 단위로 한꺼번에 데이터를 전달받는 다는 점이다. 

     

    ItemWriter 인터페이스 

    ItemWriter는 write 메서드를 통해 받아온 Output Chunk를 처리해준다. 이 때, Chunk를 List 형태로 받는다. 즉, 덩어리를 한꺼번에 처리해주는 것으로 이해를 할 수 있다. writer 메서드가 끝나면 ItemReader에서 열어준 Transaction이 끝이나고, 남은 Chunk를 확인하기 위해 ChunkOrientedTaskLet으로 돌아가 추가적인 작업을 한다.

     

    ItemWriter 클래스 다이어그램

    ItemWriter는 인터페이스다. 스프링 배치는 ItemWriter의 여러 구현체를 지원한다. 이 때, 대부분의 구현체는 ItemStream 인터페이스도 함께 구현을 해준다. ItemReader에서 본 것과 유사한 형태다. 여기서도 ItemStream은 스트림을 읽거나 닫고, DB 커넥션을 열거나 종료, 입출력 장치의 초기화 등을 담당하게 된다. 

     

    ItemProcessor

    ItemReader가 입력받은 Input Chunk의 실질적인 비즈니스 로직을 적용해주는 역할을 한다. 당연한 이야기지만, Reader에게 받은 Chunk를 다른 형태의 Output Chunk로 바꿔 넘겨주는 것도 가능하다. 또한 필터 과정도 가능하다. ItemProcessor는 ItemReader와 마찬가지로 Chunk 단위가 아닌 단건 단위로 데이터를 처리해준다. 

    비즈니스 로직을 완료한 Item을 Return 해주면 이게 Output Chunk에 하나씩 저장이 된다. 그런데 이 때, Null 값을 입력해주면 Output Chunk에 반환되지 않기 때문에 ItemWriter에 전달되는 것을 막을 수 있게 된다. 

     

    ItemProcessor 메서드

    ItemProcessor 인터페이스 process 메서드를 가지고 있다. 이 process 메서드에서 Item을 단건으로 전달받아 단건으로 처리하게 된다. 그리고 돌려준 값은 Output Chunk로 저장되어, 추후에 ItemWriter에게 전달되게 된다. 이 때, Null을 돌려주면 그 값은 Output Chunk에 저장이 되지 않는다. 

     

    ItemProcessor 클래스 다이어그램

    ItemProcessor는 ItemReader / ItemWriter와 다르게 ItemStream 인터페이스를 구현하지 않는다. 또한, 스프링 배치에서 제공해주는 구현체가 굉장히 적다. 이유는 ItemPrcoess가 비즈니스 로직을 구현하는 인터페이스기 때문이다. 비즈니스 로직은 사람마다 달라, 직접 구현하는 경우가 많기 때문이다. 

     

    ItemStream

    ItemStream은 ItemReader / ItemWriter 구현체들이 보통 함께 구현하는 인터페이스다. ItemStream은 ItemReader / ItemWriter를 돕는 역할을 하는데, 주로 처리 과정 중 상태를 저장, 오류 상태 저장을 돕는다. 또한, 리소스를 열고 닫는 역할도 한다.

    ItemStream이 하는 역할은 구체적으로 이런 역할을 한다고 볼 수 있다. 예를 들어 ChunkSize가 100인 Chunk가 있다고 해보자. 이 때, 50개를 읽은 후 오류가 발생했다. 이 때 ItemStream은 현재 상태를 ItemReader가 사용하는 ExecutionContext 등에 저장해두고, 값을 캐싱해두기 때문에 따로 값을 다시 불러오지 않고 50번째부터 다시 시작할 수 있게 된다.

    스프링 배치가 제공하는 ItemReader / ItemWriter 구현체를 사용할 때는 신경을 쓸 필요가 없으나, 사용자가 구현하는 Reader / Writer를 사용할 경우 ItemStream도 함께 구현해줘야한다. 

     

    ItemStream 메서드 

    ItemStream은 Open / Close / Update 메서드를 가진다. Open은 리소스를 읽어오고 초기화 하는 역할을, Close는 리소스를 닫는 역할을 한다. Update는 Chunk 과정에서 중간 중간 데이터를 ExecutionContext 등에 업데이트 하는 역할을 해준다. 즉, Update 메서드를 통해서 현재까지 진행된 모든 상태를 잘 저장하도록 한다. 

     

    ItemStream의 역할

    In ItemReader 

    1. ItemReader에서 처음으로 리소스를 불러올 때, itemStream은 open() 메서드를 이용해서 리소스를 열고 초기화해준다.
    2. ItemReader가 Item을 읽어온다.
    3. ItemStream은 읽어온 Item에 대한 현재 상태를 Update() 메서드를 이용해 ExecutionContext에 저장한다. 이 과정을 ChunkSize만큼 반복한다. 
    4. 완료 후, close() 메서드를 이용해 사용한 리소스를 갈무리해준다. 

     

    In ItemWriter

    1. ItemWriter에서 처음으로 리소스를 불러올 때, itemStream은 open() 메서드를 이용해서 리소스를 열고 초기화해준다.
    2. ItemWriter가 Item을 쓴다. 
    3. ItemWriter는 쓰기가 완료된 Item에 대한 현재 상태를 Update() 메서드를 이용해 ExecutionContext에 저장한다. 이 과정을 ChunkSize만큼 반복한다. 
    4. 완료 후, close() 메서드를 이용해 사용한 리소스를 갈무리해준다. 

     

    ItemStream 구현 해보기

    ItemSream을 포함하는 Reader / Writer를 사용해야한다. 따라서, ItemStreamReader / ItemStreamWriter를 구현하면 된다. ItemStreamReader를 구현하기 위해서는 read / open / close / update 메서드를 구현해야한다. 

    public CustomItemStreamReader(List<String> items) {
        this.items = items;
        this.index = 0;
    }

    생성자는 다음과 같이 구현한다. items는 Reader가 읽어올 리소스를 가리키고, index는 현재 어디까지 읽어왔는지를 알려주기 위해 내부적으로 가지는 필드다. 

     

    @Override
    public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        String item = null;
    
        // 성공 로직
        if (this.index < items.size()){
            item = this.items.get(index);
            index ++;
        }
    
        // 실패 로직 (실패 하지 않은 상태에서 Index가 6이면서, restart가 False인 경우)
        if (this.index == 6 && !restart){
            throw new RuntimeException("Restart is required");
        }
        return item;
    }

    read() 메서드는 items라는 리소스로부터 item을 하나씩 가져오는 역할을 한다. 실패 로직은 chunk Size를 의미한다. ChunkSize는 5로 주어졌는데,  현재 Index가 6이 되는 순간 Exception을 터뜨려주는 역할을 한다. 이 때, index가 6일 때 restart가 True이면, 실패 후 6번부터 읽는 상황이기 때문에 읽을 수 있도록 로직을 짠다. 

     

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        // executionContext에 index가 포함되어있다면, DB에 이미 실행 이력이 저장되어있음.
        if(executionContext.containsKey("index")){
            index = executionContext.getInt("index");
            this.restart = true; // 기존 인덱스 값을 넣고, 재시작을 할 수 있도록 한다.
        }else{
            // 처음 시작하는 경우
            index = 0;
            executionContext.put("index", index);
        }
    }

    Open 메서드는 현재 상태 정보를 초기화해주는 역할을 한다. 매개변수로 ExecutionContext를 받는다. ExecutionContext에 Index 값을 저장하는 역할을 한다. 만약 저장된 index가 있다면, 이전에 저장해둔 값이기 때문에 그 값을 불러와주고 재시작 상태임을 알려준다. 

     

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        // 현재 상태값을 계속 저장한다.
        // Job이 재시작할 때, 이 index부터 시작하게 됨.
        executionContext.put("index", index);
    }

    Update는 현재 값을 얼마나 읽어왔는지를 업데이트 해주는 역할을 한다. 그래서 ExecutionContext에 현재 Index를 꾸준히 넣어준다.

     

    @Override
    public void close() throws ItemStreamException {
        System.out.println("ItemStreamReader Close");
    }

    Close 메서드는 실제로는 리소스를 제거하는 역할등을 해야하지만, 여기서는 단순히 로그만 찍는 역할을 했다. 

     

    테스트 코드

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<String, String>chunk(5)
                .reader(itemReader())
                .writer(itemWriter())
                .build();
    }
    
    public CustomItemWriter itemWriter() {
        return new CustomItemWriter();
    }
    
    public CustomItemStreamReader itemReader() {
        List<String> items = new ArrayList<>(10);
        for (int i = 0; i < 10; i++) {
            items.add(String.valueOf(i));
        }
        return new CustomItemStreamReader(items);
    }

    다음과 같이 테스트 코드를 작성했다. Chunk Size는 5고, Reader에 전달되는 리소스는 크기가 10짜리의 리스트다. 

     

    실행 단계 확인

    AbstractStep의 Execute 메서드

    AbstractStep의 Execute 메서드로 넘어온다.

    execute 메서드에는 open과 doexecute 메서드가 있다. open 메서드는 ItemStreamReader의 Open 메서드를 실행해서 초기화해준다. doExecute는 실제 Reader에서 값을 RepeatTemplate을 이용해서 읽어오는 메서드다.

    TaskletStep - doInTransaction

    doExecute를 타고 가보면, DoInTransaction 메서드에서 tasklet.execute를 한 후, stream.Update 메서드를 해주는 것을 볼 수 있다. 이 둘은 RepeatTemplate 에서 반복되기 때문에 동일하게 Chunk Size만큼 읽고 업데이트 되는 것을 알 수 있다.

     

     

    정리 

    1. ItemWriter / ItemReader 구현체는 ItemStream 인터페이스도 함께 구현한다.
    2. ItemStream 인터페이스는 불러오는 리소스에 대한 정보를 초기화 해주고, 현재 상태를 JobRepository와 함께 업데이트 해주는 역할을 한다. 
    3. ItemStrea은 open / close / update 메서드가 있는데 update는 Chunk Size만큼 반복하고 나머지는 1번씩만 사용된다.

     

    댓글

    Designed by JB FACTORY