Spring Batch : JdbcPagingItemReader

    이 글은 인프런 정수원님의 강의를 보고 복습하며 작성한 글입니다.

    JdbcPaingItemReader 

    기본 개념

    JdbcPagingItemReader는 Paiging 기반의 JDBC 구현체다. Paging 기반이기 떄문에 OffSet / Limit을 설정해서 페이징 기능을 사용하는 SQL 쿼리문을 만들어줘야한다. 이 때, OffSet은 시작 페이지, Limit은 페이지 사이즈로 이해를 할 수 있다. 

    JdbcPagingItemReader는 반드시 정렬 기준을 설정해줘야한다. JdbcPagingItemReader가 페이지 단위로 쿼리를 보내서 값을 가져오기 때문이다. JdbcPagingItemReader는 페이지 단위로 SQL 쿼리를 새로 만들어서 실행한다. 따라서 정합성 있는 데이터를 가져오기 위해서는 항상 정렬된 상태에서 페이징을 해야한다. 따라서 OrderBy 기능을 사용해야한다.

    JdbcPaingItemReader는 reade를 할 때 synchronized 구문이 있다. 따라서 이 구문의 Lock을 획득한 쓰레드만 작업을 할 수 있기 때문에 멀티 쓰레드 환경에서 안정적이다. 그렇지만, 다른 쓰레드들은 락을 획득할 때까지 기다려야 하기 때문에(?) 자칫 잘못하면 오랜 시간이 걸릴 수 있다. 

     

    PagingQueryProvider → 구현체 : SQLPagingQueryProviderFactoryBean

    JdbcPagingItemReader는 Builder 자체에서도 Query를 만들기 위한 Builder 문법을 제공한다. 그런데 이 문법은 PagingQueryProvider에서 구현한 문법으로 이해를 할 수 있다. 따라서 JdbcPagingItemReader에서 Builder 문법으로 직접 각 SQL 절을 만들어줘도 되고, PaginQueryProvider에서 SQL 문법을 완성해서 JdbcPagingItemReader에 전달해주어도 된다. 

    QueryProvider의 구현체는 SQLPagingQueryProviderFactoryBean를 사용하면 된다. 이 Provider는 HashMap을 가지고 있고, HashMap에는 각 DB에 대한 QueryProvider가 들어가있는 것을 볼 수 있다. 왜냐하면 DB마다 페이징 전략이 다르기 때문이다. QueryProvider는 제공되는 DataSoruce가 무엇인지 확인하고, 이것에 따라 Provider를 자동으로 선택해준다. 

     

    JdbcPaingItemReader API

    • name : itemReader의 이름 설정
    • pagesize : 페이지의 사이즈를 설정해준다. (쿼리 당 요청할 레코드 수)
    • dataSoruce : 데이터 소스 설정. DI로 처리
    • queryProvider : 쿼리 문을 만들어서 전달해주면 됨. (SqlPaigingQueryProviderFactoryBean 이용)
    • rowMapper : DB에서 불러온 값을 객체에 전달해주기 위한 mapper 설정.
    • beanRowMapper : 객체 클래스를 넣으면 자동으로 DB 데이터가 객체에 맵핑됨.
    • parameterValues : 쿼리 파라미터 설정. Map 형태로 설정한다.
    • maxItemCount : 한번에 조회할 최대 item 수 설정
    • currentItemCount : 조회 Item의 시작 지점 설정
    • maxRows : ResultSet이 가질 수 있는 최대 행수를 설정함. 
    • build()

    위는 기본적으로 제공하는 API다. 위를 이용해서 Query를 만들어서 사용할 수 있다. queryProvider를 직접 만들어서 넣는다면, 저렇게 처리를 하면되는데 QueryProvider를 직접 만들기 보다는 itemReader에 직접 설정하는 방법도 있다. 아래 API를 reader에 사용할 수 있다. 

    • selectClause : Select 절 설정함. select 제외하고, 가져올 Column을 설정한다.
    • fromClause : From 절 설정함. from부터 설정함.
    • whereClause : where 절 설정함.
    • groupclause : group 절 설정함.
    • sortKeys : 정렬키 값을 설정 
    sortKey.put("first_name", Order.ASCENDING);

    정렬 키는 HashMap 형태로 전달을 해주면 된다. 여러 개를 넣어도 전달할 수 있는데, HashMap 자체는 순서가 없을텐데 어떤 방식으로 우선순위를 가지고 정렬이 될지 궁금하기도 하다. 이건 나중에 테스트를 통해서 확인해보고 업데이트 하겠다. 

     

     

    JdbcPagingItemReader 순서도

    1. Step은 Open 메서드를 이용해 ItemStream을 만들고, 현재 상태를 Update 해준다. 
    2. Stem은 read() 메서드를 통해 jdbcPagingItemReader로 간다.
    3. jdbcPagingItemReader는 doReadPage()를 통해 JdbcTempleate을 만을고, JdbcTempleate은 Query를 통해 ResultSet 객체를 만든다.
    4. ResultSet은 next()를 통해 다음이 있을 경우, DB에 Query를 날려서 결과값이 ResultSet에 저장된다. 이 때, 최대 페이지 사이즈만큼의 값을 가져온다. 
    5. 저장된 ResultSet은 rowMapper를 통해 List 형식으로 바뀌고, 이 객체는 ItemReader에 반환된다. 

     

    MaxItemCount의 의미

    MaxItemCount는 Reader가 읽어올 수 있는 최대값을 의미한다. 예를 들어 테이블에 10,000개의 데이터가 있다고 했을 때, MaxItemCount가 1,000이고, Chunk가 10이면, MaxItemCount 1,000개에 대한 값만 처리를 한다. 예를 들어 이런 상황에서 ItemWriter가 DB에 값을 밀어넣는 상황이라고 가정하면, 실제로 DB에는 1,000개의 데이터만 들어가게 된다. 

     

    결론을 정리하면 다음과 같다고 이해를 하면 될 것 같다

    • Chunk : 한번에 처리할 단위의 데이터 크기
    • maxItemCount : 처리할 데이터의 전체 크기. DB에 30,000개가 있을 때, 이 값을 10,000으로 설정하면 실제 결과물은 10,000개만 만들어진다.

     

     

     

    JdbcPagingItemReader 코드로 따라가보기 

    AbstractStep.execute()

    AbstractStep.execute()에서 doExecute() 메서드로 Step을 실행시킨다.

    tasklet.doInTransaction()

    tasklet.doInTransaction()에서 tasklet.execute를 통해 tasklet을 실행시킨다.

    ChunkOrientedTasklet.execute()

    ChunkOrientedTasklet.execute()에서 chunkProvider.provide를 통해서 ItemReader를 제공한다. 

    SimpleChunkProvider.provide()

    SimpleChunkProvider.provide()에서 repeatTemplate을 가지고, read 메서드를 반복해서 item을 읽어오는 것을 확인할 수 있다. 즉, item을 단 건씩 가지고 와서 여기에서 추가한다고 볼 수 있다. 

    SimpleChunkProvider.read()

    SimpleChunkProvider.read()로 들어오면, doRead() 메서드로 다시 넘어간다.

    simpleChunkProvider.doRead()

    simpleChunkProvider.doRead() 메서드를 이용해 itemReader로부터 값을 읽어온다. 

    AbstractItemCountingItemStreamItemReader.read()

    AbstractItemCountingItemStreamItemReader.read()를 통해 값을 읽어온다. 여기서 doRead() 메서드로 다시 한번 이동한다.

    AbstractPaigingItemReader.doRead()

    AbstractPaigingItemReader.doRead()로 넘어온다. 여기는 synchronized를 통해서 동기화 되는 구간인 것을 이해할 수 있다. 즉, Item을 읽어오는 것 자체가 Lock으로 동기화 처리가 되어있다. 멀티 스레드 환경에서 안전하다. 이 메서드에 있는 doReadPage()를 통해 Item을 Page 단위로 불러오는 일이 생긴다.

    JdbcPagingItemReader.doReadPage()

    JdbcPagingItemReader.doReadPage()로 넘어오면 먼저 Results를 만든다. 이 리스트의 용도는 DB에서 가져온 값을 저장하는 용도다. 

    JdbcPagingItemReader.doReadPage()

    위의 구문을 통해서 jdbcTemplate에서 페이지 단위로 값을 가져온다. 

    JdbcPagingItemReader.doReadPage()

    가져온 Query에는 다음과 같이 페이지 사이즈만큼의 객체가 들어있는 것을 확인할 수 있다. 

    JdbcPagingItemReader.doReadPage()

    Query를 results에 추가해준다.

    AbstractPaigingItemReader.doRead()

    results에는 위의 과정을 통해 10의 객체가 저장되어있다. 여기서 리스트 인덱싱으로 객체를 단건씩 넘겨주는 역할을 한다. 

     

     

    테스트 코드

    https://github.com/chickenchickenlove/springbatchstudy/tree/main/SpringBatchLecture/main/java/io/springbatch/springbatchlecture/dbitemreader

     

    GitHub - chickenchickenlove/springbatchstudy

    Contribute to chickenchickenlove/springbatchstudy development by creating an account on GitHub.

    github.com

     

     

    테스트 코드 실행 결과 → ChunkSize / PageSize 관계 확인하기

    ChunkSize = 10, PageSize = 2인 경우 ItemReader는 ChunkSize만큼 채우기 위해서 여러 번 페이징 쿼리를 보낸다. 따라서 ChunkSize만큼의 InputChunk를 만들고, 그 값을 ItemProcessor에 전달해준다. 이 때, 전체 테이블 데이터가 100개라면, ItemReader / ItermProcessor / ItemWriter는 총 10번 실행된다. 

    위처럼 여러 번 실행이 되는 것이 확인된다. 이상하게 JDBC로 엔티티를 가져오면, Id가 맵핑이 안되는 상황이 확인이 되고 있는데.. 이 부분은 해결되면 뭐가 문제였는지 적어보려고 한다. 

    'Spring > Spring Batch' 카테고리의 다른 글

    Spring Batch : ItemReaderAdapter  (0) 2022.03.13
    SpringBatch : JpaPagingItemReader  (0) 2022.03.13
    SpringBatch : JpaCursorItemReader  (1) 2022.03.13
    Spring Batch : JdbcCursorItemReader  (0) 2022.03.13
    Spring Batch : Step 도메인 이해  (0) 2022.03.09

    댓글

    Designed by JB FACTORY