Spring Batch : Step Skip

    이 게시글은 인프런 정수원님의 강의를 복습하며 작성한 글입니다.

    Step Skip → Chunk에서 사용됨. 

    Step을 실행하다보면, 실행 중에 Exception이 발생하는 경우가 있다. 그런데 이 Step의 데이터가 중요하지 않은 경우, 예를 들어 Log를 찍는 상황이라면 무시해도 충분할 것이다. 이처럼 중요하지 않은 Step에서 Exception이 발생했을 때, 이 부분을 무시하고 실행할 수 있는 것이 Step Skip이다. 

     

    Skip Step 선언 

    Skip Step을 선언하기 위해서는 FaultTolerantBuilder를 만들어야한다. 이것은 Chunk 상태에서만 사용할 수 있는데, Chunk를 해서 ChunkStepBuilder로 만든 후 FaultTolerant() API를 이용하면 FaultTolerantBuilder를 통해서 Skip과 관련된 API를 설정할 수 있다. 

     

    Step Skip의 구성도 

    ItemReader Exception

    ItemReader에서 Exception이 발생하면, Exception이 발생한 부분을 Skip으로 넘기고 ItemReader는 바로 뒤에 있는 것을 읽는다. 예를 들어 Item2를 읽다가 Exception이 발생하면, Skip하고 Item3을 읽는다. 

     

    ItemProcessor Exception

    ItemProcessor에서 Exception이 발생하면 ItemReader로 돌아간다. 그리고 ItemReader가 캐싱하고 있던 Input Chunk를 다시 받아와서, Input Chunk의 첫 부분부터 다시 읽는다. 그리고 Exception이 발생한 부분에 도달하면, 그 부분을 Skip하고 그 다음 Item을 읽는다.

    예를 들어 Item2에서 Exception이 발생하면, 다시 Input Chunk를 받아와서 Item1을 읽고, Item2를 건너뛰고 Item3을 읽는 방식으로 처리된다.

     

    ItemWriter Exception

    ItemWriter에서 Exception이 발생하면 ItemReader로 돌아간다. 동일한 방식으로 ItemProcessor를 통과한다. 그런데 ItemProcessor에서 ItemWriter로 Item을 넘겨줄 때, List 형태로 넘겨주는 것이 아니라 item 단건을 넘겨준다. 기존에 List 형태로 넘겨주다가 Exception이 발생했는데, 단건 Exception이 발생했다. 이 부분을 고려한다면 단건 Exception을 해결해주기 위해 단건씩 Item을 넘기는 건 합리적이다. 

     

     

    SkipPolicy → Skip 유무를 결정

    Exception이 발생했을 때, Skip이 가능한지 아닌지는 Skip Policy가 정의한다. Skip Policy는 크게 두 가지를 고려해서 Skip을 하느냐 하지 않느냐를 판단한다. 정확한 것은 Skip Policy마다 다르긴 하지만 말이다.

    1. 현재 Skip Count <=Skip Limit 인 경우
    2. Skip 가능한 Exception일 때 

    기본적으로 위 두 조건을 모두 만족하면 Skip이 가능하다. 그런데 이 조건은 Skip Policy마다 다르기 때문에 잘 알아봐야한다. 

     

    SkipPolicy의 Classifier

    SKipPolicy는 내부적으로 Classifier 객체를 참조한다. 이 Classifier 객체는 이 Skip Policy가 Skip 가능한지를 판단하는 근거가 되는 Exception 클래스들이 들어간다. 

    Map<Throwable, boolean>

    Classifier는 Map을 가지고 있는데, 이 Map은 Throwable 객체를 key로 하고 Value를 Boolean 값을 가진다. 이 객체의 값이 True면 Skip이 가능한 Exception으로 판단하고, False인 경우 Skip이 불가능한 객체로 판단한다. 

     

    SkipApi와 동작방식

    • Skip은 아래 API를 지원한다.
    • skip : Skip 가능한 Exception 타입을 설정한다. 여러 번 사용 가능하다.
    • noSkip : Skip 할 수 없는 Exception 타입을 설정한다. 
    • skipPolicy : 적용할 SkipPolicy를 설정할 수 있음. 
    • skipLimit : 최대 Skip 가능한 횟수를 설정한다. 이 API를 사용 시, LimitCheckingItemSkipPolicy가 생성됨. 

    위의 API를 사용해서 skip 관련 정책을 설정하고 적용할 수 있다. Skip에 들어가있는 Exception 클래스는 SkipPolicy의 Classifier 객체에 true 값으로 들어가고, noSkip에 들어가있는 Exception 클래스는 False로 들어가있는 것도 확인할 수 있다. True / False 값으로 Skip 유무를 나중에 판별한다. 

     

    Skip Policy 종류

    • AlwaysSkipItemSkipPolicy : 항상 Skip한다. 
    • ExceptionClassifierSkipPolicy : 예외 대상을 분류하여 Skip 여부 결정. (어떤 예외는 Skip, 어떤 예외는 No Skip 분류)
    • CompositeSkipPolicy : 여러 SkipPolicy를 탐색하며 Skip 여부를 결정. (하나라도 True 반환 시, Skip 가능)
    • LimitCheckingItemSkipPolicy : Skip 카운터 및 예외 등록 결과에 따라 skip 여부 결정. Default 값임. 
    • NeverSkipItemSkipPolicy : Skip을 하지 않음

    스프링 배치는 위와 같이 5개의 SkipPolicy를 제공하고, 필요 시 사용자가 구현하여 사용할 수 있다. 

     

    Skip Policy의 메서드

    SkipPolicy 인터페이스는 shouldSkip 메서드를 가진다. 이 메서드는 boolean 값을 가지는데, 현재 Exception에 대해서 skipcount를 넘겨주고 skip 가능한지를 True / False로 판단해서 넘겨주는 식으로 동작한다. 따라서 사용자가 구현할 때는 위의 내용을 참고해서 구현하면 된다. 

     

    Skip Step 실습

    @Configuration
    @RequiredArgsConstructor
    public class SkipStepConfig {
    
        private final StepBuilderFactory stepBuilderFactory;
        private final JobBuilderFactory jobBuilderFactory;
    
        @Bean
        public Job skipJob() {
            return jobBuilderFactory.get("skipJob")
                    .incrementer(new RunIdIncrementer())
                    .start(skipStep1())
                    .build();
        }
    
        @Bean
        public Step skipStep1() {
            return stepBuilderFactory.get("skipStep1")
                    .<String, String>chunk(5)
                    .reader(customReader())
                    .processor(customProcessor())
                    .writer(customWriter())
                    .faultTolerant()
                    .skip(SkipException1.class)
                    .noSkip(noSkipException.class)
                    .skipLimit(3)
                    .build();
        }
    
        private ItemWriter<String> customWriter() {
            return new ItemWriter<String>() {
                @Override
                public void write(List<? extends String> items) throws Exception {
                    for (String item : items) {
                        if (item.equals("item12") || item.equals("item13")) {
                            System.out.println("itemWriter Exception = " + item);
                            throw new SkipException1();
                        } else {
                            System.out.println("itemWriter Completed = " + item);
                        }
                    }
                }
            };
    
    
        }
    
        private ItemProcessor<String, String> customProcessor() {
            return new ItemProcessor<String, String>() {
                @Override
                public String process(String item) throws Exception {
                    if (item.equals("item3")) {
                        System.out.println("itemProcessor Exception = " + item);
                        throw new SkipException1();
                    } else {
                        System.out.println("itemProcessor Completed = " + item);
                        return item;
                    }
                }
            };
        }
    
        private ItemReader<String> customReader() {
            return new ItemReader<String>() {
                int i = 0;
                @Override
                public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                    i++;
                    System.out.println("itemReader = item" + i);
                    if ((i == 6) || (i == 7)) {
                        System.out.println("itemReader Exception = item" + i);
                        throw new SkipException1();
                    } else {
                        return i > 20 ? null : "item" + i;
                    }
                }
            };
        }
    }

    전체 코드는 위와 같이 작성했다. ItemReader / ItemProcessor / ItemWriter에서 각각 Exception이 발생하도록 했다. 코드의 작성 목적은 ItemReader / ItemProcessor / ItemWriter의 동작 방식을 이해하기 위해서였다. 

    실행 결과는 위와 같다.

    1. 1~5까지가 첫번째 청크다. 1~5까지 실행하던 도중 ItemProcessor에서 "Item3"을 진행하다가 Exception이 발생했다. 따라서 ItemReader에 캐시된 Input Chunk를 다시 가져와 Input Chunk의 처음부터 ItemProcessor 처리를 시작한다. 그리고 Item3을 Skip해준다. 그리고 Writer까지 정상적으로 진행된 것을 확인했다.
    2. 6~12까지가 두번째 청크다. ItemReader에서 6,7번에서 Exception이 발생해서 바로 Skip하고 8~12번까지 읽었다. 여기서 알 수 있는 점은 ItemReader는 Skip Count가 허락한다면, Chunk Size만큼 채운다는 것이다. 
    3. 이후 ItemWriter에서 Item12에서 Exception이 발생했다. 이 때 캐시된 Input Chunk로부터 값이 다시 ItemProcessor로 전달된다. ItemProcessor는 처리하는 족족 ItemWriter로 넘겨주는 것을 알 수 있다. ItemWriter에서 Skip을 하지 않을 경우에는 ItemProcessor는 Output Chunk를 만들어서 한방에 전달해주지만, ItemWriter에서 Skip이 발생한 경우는 Processor에서 생긴 것을 바로바로 전달해주는 것을 볼 수 있다.

    ItemWriterException을 좀 더 정확히 살펴보자. ItemWriter에서 Item13을 처리할 때 문제가 생겼다. 다시 input Chunk를 받아와서 ItemProcessor가 Item을 하나씩 넘겨주는데, 문제가 있었던 Item13도 ItemWriter로 다시 넘겨준다. 그렇지만 이것은 Skip Count로 치지 않는다. 이미 앞에서 한번 집계했기 때문이다. 

    이후에 Item14에서 발생했는데, 다시 앞으로 돌아가지 않고 Item15, 16, 17이 차례대로 전달된다. 그리고 이 Item14는 처음 발생한 것이기 때문에 SkipCount에 집계가 된다. 결론은 문제가 발생하면 ItemProcessor에서 ItemReader로 단건씩 전달되고, 이 때 Skip 되는 부분도 같이 전달이 되지만 Count로 집계는 되지 않는 방식으로 동작한다는 것이다. 

    이렇게 동작하는 이유는 내가 생각하기에는 Writer에서 Exception이 발생할 때 마다 Input Chunk로 돌아가면 최악의 경우 O(N^2)만큼의 반복을 해야할 수 있기 때문이라고 본다. 그래서 처음에는 Exception이 없을 것이라 가정해서 List로 전달하고, Exception 발생 시, 단건씩 전달하면서 O(N)만에 처리하는 방식인 것 같다. 

     

     

     

    정리 

    ItemReader Skip

    ItemReader Skip 발생 시, Exception 부분은 바로 Skip하고 Chunk Size만큼 채울 때까지 Item을 Input Chunk에 담아둔다. Skip Count가 허락한다면 말이다.

     

    ItemProcessor Skip

    ItemProcessor Skip 발생 시, ItemReader로 돌아가 캐싱된 Input Chunk를 다시 받아온다. 그리고 Input Chunk의 처음부분부터 다시 살펴보고, Skip 발생한 지점은 바로 Skip하고 넘어간다. 

     

    ItemWriter Skip

    ItemWrite Skip 발생 시, ItemReader에 캐싱된 Input Chunk를 요청해 ItemProcessor에 넘긴다. ItemProcessor는 Item 단건을 처리할 때 마다 ItemWriter로 넘긴다. 이 때, 이전에 Skip된 Item도 같이 넘기는데 이 녀석은 Skip Count에 포함되지 않는다. 단건씩 넘기면서 Skip이 또 발생할 수 있는데, 이 때는 그냥 ItemReader가 Skip하는 것처럼 Skip Count만 올리고 Skip해버린다. 이런 이유는 Exception 발생할 때 마다 처음으로 돌아가면 시간 복잡도가 O(N^2)이 되며 대용량 처리에 적합하지 않을 수 있기 때문으로 생각한다.

     

     

    테스트 코드

    https://github.com/chickenchickenlove/springbatchstudy/tree/main/SpringBatchLecture/main/java/io/springbatch/springbatchlecture/skip

    댓글

    Designed by JB FACTORY