이 게시글은 인프런의 정수원님의 강의를 듣고 복습하며 작성한 글입니다.
ChunkOrientedTaskLet
ChunkOrientedTaskLet은 Spring Batch에서 제공하는 TaskLet 인터페이스의 구현체다. 내부적으로 Chunk를 가지고 있어 Chunk 프로세스 처리가 가능하다. ChunkOrientedTaskLet은 내부적으로 ItemReader / ItemProcessor / ItemWriter를 가지고 있으며 이들이 Chunk 프로세스 처리를 해준다.
내부적으로는 Repeat Templeate을 가지고 있어, ChunkOrientedTasklet이 실행될 때 마다, 새로운 트랜잭션이 생성되어 처리가 이루어진다. 이 때, 트랜잭션 과정에 Exception이 발생하면 Roll Back이 되고, 이미 Commit이 완료된 Chunk는 유지된다. 이런 동작은 ChunkOreientedTaskLet이 Chunk 단위로 트랜잭션을 커밋하기 떄문에 일어난다.
ChunkOrientedTaskLet의 동작방향
- TaskletStep은 execute()를 통해 ChunkOrientedTasklet을 실행해준다.
- ChunkOrientedTasklet은 내부적으로 chunkProvider.provide()를 통해 1개씩 Resource로부터 Chunk에 저장할 Item을 읽어온다. 이 때, 처음 불러오기 시작하면 트랜잭션을 열어준다. 그리고 이 데이터는 ChunkSize만큼 반복해준다.
- 불러온 Input 데이터는 Input Chunk로 만들어진다. Input Chunk는 ChunkProcessor에 전달된다.
- ChunkProcessor에 전달된 Input Chunk는 ItemProcessor에서 단건씩 처리되며, 처리된 결과가 Output Chunk에 하나씩 들어가는 것을 확인할 수 있다.
- Output Chunk는 ItemWriter에 전달되어 일괄로 write한다.
전체의 Iterator는 두 가지가 있다. 바로 내부적으로 ItemProcessor / ItemReader의 내부적으로 있는 Iterator들이다. 또 다른 하나는 ChunkOrientedTaskLet이 가지는 Repeat Template이다. ChunkOrientedTaskLet은 입력받은 소스가 20개인데, Chunk Size가 2라면 10번을 반복해야한다. 이런 이유 때문에 Repeat Template으로 여러번 반복하게 된다.
ChunkOrientedTaskLet의 API
- <I,O> chunk(int chunkSize) // chunkSize 설정. commit 인터벌을 이야기함. I : Input / O : output
- <I,O> chunk(CompletionPolicy) // Chunk Process 완료 위한 설정 클래스 지정 (둘 중 하나)
- reader() // ItemReader 구현체 설정
- writer() // ItemWriter 구현체 설정
- processor() // ItemProcessor 구현체 설정, Optional 함.
- stream() // 재시작 데이터를 관리하는 콜백에 대한 스트림 등록
- readerIsTransactionalQueue() // Item이 JMS 같은 트랜잭션 외부에서 읽혀지고 캐시할 것인지 지정
- listener // 리스너 설정
- build
ChunkOrientTaskLetStep을 위해 StepBuilder는 다음과 같은 API를 제공해준다. 중요한 것은 Chunk의 Size다. Chunk는 최대 Size만큼의 크기를 가진다. 예를 들어 20개의 소스를 처리해야하고, Chunk Size가 2인 경우 Chunk는 2개씩 처리되기 때문에 총 10번의 반복이 필요하다. 즉 10번의 트랜잭션을 탄다는 소리로 이해를 할 수 있다.
ChunkOrientedTaskLet에서 알아둘 것.
Chunk 처리 중 예외가 발생하여 재시도할 경우, 다시 데이터를 읽지 않고 버퍼에 담아 놓았던 데이터를 가지고 온다. (ChunkContext에 있는 값)
ChunkOrientedTaskLet 내부 확인
- chunkContext.getAttribute() : ChunkContext에 이전에 실행한 Chunk가 캐싱되어있는지를 확인함. 예전에 실행하던 Chunk가 재시도 하는 경우 캐싱을 해야하기 때문이고, 이 메서드를 통해 버퍼에 담아두었던 데이터를 Input Chunk로 만들어줌.
- chunkProvider.provide() : Item을 ChunkSize만큼 반복해서 읽은 다음 Input Chunk로 만들어서 반환해줌.
- chunkContext.setAttribute : Chunk를 캐싱하기 위해 Chunk Context에 저장해줌.
- chunkProcessor.process : InputChunk를 Output Chunk로 가공해줌.
- chunkContext.removerAttribute : Chunk 단위 입출력이 완료되면, 다음 Chunk 처리를 위해 캐싱한 것 제거해줌.
- return RepeatStatuscontinueIf : 읽을 Item이 더 존재하는지 체크해서 존재하면 Chunk 프로세스를 반복해줌.
ChunkOrientedTasklet은 Input Chunk를 메모리에 캐싱해둔다. 따라서, Chunk 처리 중에 예외가 발생해서 재시도 할 경우, ChunkContext에서 이전에 진행했던 Chunk를 불러와서 다시 실행을 해준다.
Chunk의 생성
ChunkOrientedTasklet은 Repeat Template을 이용해서 Chunk Size 단위로 Chunk를 반복해서 처리해준다. 이 때, Chunk는 매번 다시 만들어지는 것을 알 수 있다.
Chunk Provider
Chunk Provider는 ItemReader를 가지고 있다. 따라서 소스로부터 아이템을 Chunk Size만큼 하나씩 읽어 Chunk 단위로 만들어 제공하는 도메인 객체다. 내부적으로 Iterator를 사용해서 itemReader.read()를 계속 호출하면서 item을 chunk에 쌓아준다.
Chunk Provider는 provider.provide()로 호출될 때 마다 새로운 Chunk를 생성해준다. 내부적으로는 Repeat Operation이 있는데, 이 객체를 통해 Iterator를 돌리며 각 소스를 하나씩 읽어오게 된다.
ChunkProvider의 provide 메서드
- new Chunk() : ChunkProvider는 provide 메서드가 호출될 때 마다 새로운 Chunk를 만든다
- repeatOperation.iterate : Chunk Size만큼 반복 실행해준다. 내부적으로 doInIteration 클래스를 구현해줌.
- read(Contribution, inputs) : ItemReader를 통해 소스로부터 Item을 읽어옴
- inputs.setEnd() : 읽어온 Item이 없는 경우, 종료한다.
- inputs.add(item) : 읽어온 Item이 있는 경우 Input Chunk에 넣어준다.
Chunk Processor
Chunk Processor는 ItemProcessor를 이용해 Input Chunk를 Output Chunk로 가공해준다. 또한, ItemWriter를 이용해 Output Chunk를 Chunk 단위로 저장, 출력해준다. 또한, Chunk Provider와 마찬가지로 Chunk Processor가 만드는 OutPut Chunk는 매번 호출될 때 마다 새로운 Chunk가 만들어진다.
Item Processor는 필수 사항이 아니다. Item Processor가 없는 경우에는 Input Chunk가 Output Chunk로 바로 전환되어 ItemWriter에 전달된다. ItemWriter는 일을 마치면, 트랜잭션이 종료되고, 반복문에서 Step 내의 반복문에서 ChunkOrientedTasklet이 새롭게 실행된다.
Chunk Process의 메서드
Chunk Processor의 Process는 다음과 같은 메서드로 내부 구성되어있다.
- transform() : Input Chunk를 itemProcessor에 전달해서 Output Chunk를 만들어온다.
- incrementFilterCount : Processor 과정에서 Filter를 통과한 녀석들의 값을 저장해줌.
- write : Output Chunk를 Item Wirter에게 전달해준다.
- transform 메서드는 새로운 Chunk를 만들어준다.
- for문을 통해 doProcess(Item)을 ItemSize만큼 처리해준다. 즉, Item을 단건으로 Processing 해준다는 이야기다.
write 메서드에서는 doWrite에 output Chunk의 List를 넘겨준다. 그리고 doWrite 메서드는 writeItems에게 Items 전체를 전달해주는 모습을 볼 수 있다. 즉, Reader는 전체적으로 한방에 작업하는 것을 알 수 있다.
테스트 코드
@Configuration
@RequiredArgsConstructor
public class ChunkConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job job() {
return jobBuilderFactory.get("batchJob")
.start(step1())
.next(step2())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<String, String>chunk(5) // chunkSize 설정. 5개만큼 처리한다. / input, output 타입 설정 가능
.reader(new ListItemReader<>(Arrays.asList("item1", "item2", "item3", "item4", "item5"))) //Item을 읽어온다.
.processor(new ItemProcessor<String, String>() { // Processor 처리
@Override
public String process(String item) throws Exception {
Thread.sleep(3000); // 약간의 시간차
System.out.println("item = " + item);
return "my" + item;
}
})
.writer(new ItemWriter<String>() { // 매개변수로 Items가 전달된다. 즉 일괄 처리가 된다.
@Override
public void write(List<? extends String> items) throws Exception {
Thread.sleep(3000);
System.out.println("items = " + items);
}
})
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("step2 has executed");
return null;
}
})
.build();
}
다음과 같이 ChunkOrientedTasklet을 가지는 Job을 만들 수 있다. Processor, Write, Reader는 개발자가 직접 해당 인터페이스를 구현해서 사용해도 된다. 뿐만 아니라 Spring Batch에서는 여러가지 Processor, Write, Reader를 제공하니 이걸 사용하는 것도 방법이 될 수 있다.
'Spring > Spring Batch' 카테고리의 다른 글
Spring Batch : ChunkOrientedTaskletStep 순서도 (0) | 2022.03.09 |
---|---|
Spring Batch : ItemReader / ItemProcessor / ItemWriter / ItemStream (0) | 2022.03.09 |
Spring Batch : Chunk 개념 (0) | 2022.03.08 |
Spring Batch : JobRepository 관련 정리 (0) | 2022.03.07 |
Spring Batch : TaskLetStep의 AllowStartIfComplete() (0) | 2022.03.07 |