Spring Batch : JdbcCursorItemReader

    이 게시글은 인프런 정수원님의 강의를 수강하고 복습하며 작성한 글입니다. 

    JdbcCursorItemReader 기본 개념

    JdbcCursorItemReader는 Cursor 기반의 JDBC 구현체다. ResultSet과 함께 사용되고, Data Source로부터 SQL을 통해 값을 불러온다. JdbcCursorItemReader는 쓰레드 안정성을 보장하지 않기 때문에 멀티 쓰레드 환경에서 사용한다면 동기화 처리를 통한 동시성 문제를 해결해줘야 한다. 

     

    Cursor 기반이라는 것은 Cursor가 가리키는 지점에서 한번에 Fetch Size만큼 DB에서 메모리로 퍼올린다는 이야기다. Cursor를 기반으로 구현할 때는 Fetch Size와 Chunk Size를 맞춰주는 것이 좋다. 왜냐하면 Chunk Size 단위로 커밋을 하기 때문에, 이 숫자에 맞춰주는 것이 좋다. 예를 들어 Fetch Size = 10, Chunk Size = 10을 한다면 한번에 메모리에서 10개를 가져와서 Input Chunk로 만들어주고, OutPut Chunk가 만들어져 10개씩 Commit 되게 된다.

     

    JdbcCursorItemReader의 API

    • name : Reader의 이름을 붙인다.
    • fetchSize : Cursor 방식으로 DataSource에서 데이터를 가지고 올 때, 메모리로 한번에 가지고 올 사이즈를 설정함.
    • dataSource : DB 접근 위한 데이터 소스. DI로 처리.
    • rowMapper : 쿼리 결과로 반환되는 데이터를 객체 형식으로 변경하기 위한 rowMapper 지정
    • beanRowMapper : 별도의 rowMapper를 설정하지 않고, 클래스 타입 설정 시, 자동으로 객체에 맵핑됨.  rowMapper와 둘 중에 하나만 사용 가능. 
    • sql : DB에 보낼 SQL문
    • queryArguments : SQL문에 사용될 쿼리 파라미터 설정. 
    • maxItemCount : 조회할 최대 Item 수
    • currentItemCount : 조회 Item의 시작 지점
    • maxRows : ResultSet 오브젝트가 포함할 수 있는 최대 행수 

     

    API 사용 시, 헷갈리는 부분

    SQL문

    .sql("select id, first_Name, last_Name from customer order by last_Name")

    SQL문에서 사용할 Column 명은 모두 DB 테이블이 기준이 되어야한다. JPQL에서는 객체 맵핑이 되어서 객체 필드명으로 SQL을 넣어도 되지만, 직접 SQL문으로 작성할 때는 DB 테이블의 Column명을 참고해서 Query를 보내야한다. 

     

    MaxRows

    MaxRows는 ResultSet 객체가 가질 수 있는 최대 행수를 의미한다. ItemReader는 내부적으로 read를 할 때, 내부적으로 ResultSet 객체가 가질 수 있는 최대 행을 설정하는 값이다. 

     

    CurrentItemCount

    CurrentItemCount는 현재 ItemCount 갯수를 센다. MaxItemCount와 연동되어서 사용하는 개념이라고 볼 수 있다. 예를 들어 MaxItemCount가 20, CurrentItemCount가 20이면 모든 Item을 다 센것으로 판단해서 Reader는 Item을 더 이상 읽어오지 않는다. 

     

    JdbcCursorItemReader 순서도

    1. Step(Abstract Step) 이 실행 전 open() 메서드를 통해 ItemStream의 Open 기능을 활성화한다. 이 때, DB Connection / PrepareStatement / ResultSet 등이 만들어진다. 
    2. Step은 Data Source를 Open 한 후, Read() 메서드를 이용해 데이터를 하나씩 가져온다.
    3. Read 메서드를 사용하기 위해 SimpleChunkProvider까지 넘어오고, read 메서드를 통해 JdbcCursorItemReader로 넘어간다.
    4. JdbcCursorItemReader는 부모 클래스인 AbstractCursorItemReader 클래스의 doRead() 메서드로 넘어가고, 여기서 readCursor() 메서드를 통해 rowMapper.mapRow()를 통해 ResultSet 객체를 만들고 값을 불러온다. 이 때, doRead()는 내부적으로 ResultSet의 Next 값이 있는지 확인을 해준다. 즉, 다음 Cursor가 가리키는 것이 있는지 확인해준다. 이것의 반복은 SimpleChunkProvider의 Repeat Template을 통해서 반복된다. 
    5. 동작이 완료되었으면, Close 메서드를 통해 Resource를 정리한다.

     

    MaxItemCount의 의미

    MaxItemCount는 Reader가 읽어올 수 있는 최대값을 의미한다. 예를 들어 테이블에 10,000개의 데이터가 있다고 했을 때, MaxItemCount가 1,000이고, Chunk가 10이면, MaxItemCount 1,000개에 대한 값만 처리를 한다. 예를 들어 이런 상황에서 ItemWriter가 DB에 값을 밀어넣는 상황이라고 가정하면, 실제로 DB에는 1,000개의 데이터만 들어가게 된다. 

     

    결론을 정리하면 다음과 같다고 이해를 하면 될 것 같다

    • Chunk : 한번에 처리할 단위의 데이터 크기
    • maxItemCount : 처리할 데이터의 전체 크기. DB에 30,000개가 있을 때, 이 값을 10,000으로 설정하면 실제 결과물은 10,000개만 만들어진다.

     

     

     

    코드로 따라가보기

    abstractStep.execute()

    Step을 실행하면, TaskletStep의 부모 클래스인 AbstractStep에서 execute가 된다. execute 메서드에는 open, doexecute 메서드가 순차적으로 실행된다. 

    abstractStep.open

    abstractStep.open()으로 들어가보면, stream.open이 실행되는 것을 확인할 수 있다. 이것을 타고 들어가보면 JdbcCursorItemReader가 가진 open 메서드로 들어간다. 

    JdbcCursorItemReader → abstractCursorItemReader.doOpen

    이렇게 하면, JdbcCursorItemReader에 있는 doOpen 메서드로 들어오게 된다. 여기서 커넥션을 초기화해주고, openCursor() 메서드를 실행하는 것을 볼 수 있다. 

    jdbcCursorItemReader.openCursor()

    openCursor로 들어오면 preparedStatement, ResultSet(rs)를 준비하는 것을 확인할 수 있다.

    abstractStep.execute()

    이렇게까지 JdbcCursorItemReader가 구현한 ItemStream의 open 메서드가 완료되었다면, 다시 abstractStep으로 돌아온다. 그리고 doExecute() 메서드를 실행한다. 

    TaskLetStep.doExecute()

    TaskletStep의 doExecute로 넘어온다. 여기서 stepOperation이라는 repeat Template을 가지고, 이걸 Iterater를 하는 것을 확인할 수 있다. 여기서는 Data Source에 다음 Chunk 값이 남았는지를 확인하고, 남았으면 계속 돌려주는 루프로 이해를 하면 된다. 

    TaskLetStep.doExecute()

    TaskLetStep의 Repeat Template 안에서 Transaction Template을 만들고, 내부적으로 chunk에 대한 일을 수행해준다. 

    TaskletStep.doInTransaction

    트랜잭션 템플릿을 통해 일을 하게 되면 결국은 taskLetStep의 doInTransaction으로 온다. execute를 통해 트랜잭션 내에서 이제 일을 시킨다. 

    ChunkOrientedTasklet.execute()

    chunkOrientedTasklet.execute()로 넘어온다. 여기서는 chunkProvider.provide를 통해 Item을 읽어온다. 

    SimpleChunkProvider.provide()

    SimpleChunkProvider는 내부적으로 repeatTemplate을 가진다. 이 repeatTemplate은 repeatOperations라는 이름으로 활동하고, Item을 하나씩 불러와서 Chunk 사이즈만큼 맞춰주는 역할을 해준다. 그리고 item은 read() 메서드를 통해 불러와진다. 

    SimplChunkProvider.read()

    read() 메서드로 넘어오면 doRead()로 넘어가는 것을 확인할 수 있다. 

    SimpleChunkProvider.doRead()

    SimpleChunkProvider의 doRead() 메서드에서는 itemReader.read()를 통해 Item을 불러온다.

    AbstractItemCountingItemStreamItemReader.read()

    이 때 itemReader.read()는 실질적으로 AbstractItemCountingItemStreamItemReader.read()가 불러와진다. 여기서 item = doRead()가 되는 것을 확인할 수 있다.

    AbstractCursorItemReader.doread()

    AbstractCursorItemReader.doRead() 메서드는 rs(ResultSet)의 next 값이 있는지 확인하고, 있는 경우 readCursor 메서드를 통해서 Item을 불러오는 것을 확인할 수 있다. 

    jdbcCursorItemReader.readCursor()

    이 때, JdbcCursorItemReader.readCursor()로 넘어가게 되고, 여기서 주입받은 rowMapper를 통해 데이터를 맵핑해서 Item 형태로 돌려주게 된다. 이 때, beanRowMapper를 사용했을 경우 rowMapper는 BeanPropertyRowMapper가 된다. 

     

    테스트 코드

    https://github.com/chickenchickenlove/springbatchstudy/tree/main/SpringBatchLecture/main/java/io/springbatch/springbatchlecture/dbitemreader

    댓글

    Designed by JB FACTORY