카프카 스트림즈와 ksqlDB 정복 : ksqlDB 중급, 고급 스트림 처리(11장)

    들어가기 전

    이 글은 카프카 스트림즈와 ksqlDB 정복의 11장을 공부하며 작성한 글입니다. 


    11.1 데이터 보강 (Data Enrich)

    데이터 보강은 Raw Data에 새로운 데이터를 추가해서 더 의미있는 데이터를 만들어내는 행위를 의미한다. 단순히 데이터를 변경하는 작업과는 다르다. 데이터 보강의 대표적인 행위는 Join이 있다. 

     

    11.1.1 Join

    RDBMS에는 여러 테이블이 존재하고 각 테이블에서 데이터를 가져와 Join해서 의미있는 데이터를 만들어낸다. 마찬가지로 ksqlDB에서는 여러 Stream / Table에 데이터가 존재하고 여기서 메세지를 가져와 Join해서 의미있는 데이터를 만들어 낼 수 있다. Join은 두 가지 차원으로 분리할 수 있다. 

    • Join에 사용하는 식 : Inner, Left, Full
    • Join에 사용하는 Collection : Stream / Table 
    SQL 식 설명
    Inner Join Inner Join은 조인 양측의 입력 레코드가 동일한 Key일 때 동작.
    Left Join Left Join은 조인의 왼쪽 레코드를 받으면 동작함.
    Right Side에 동일 키로 일치하는 레코드가 없으면 오른쪽 값은 Null로 Join 된다.
    Full Join Full Join은 어느 쪽에 레코드가 들어와도 동작한다.
    레코드를 받고 조인을 시작할 때, 반대쪽에 해당하는 Key가 존재하지 않으면 Null로 Join된다.

    기본적으로 Join과 관련된 식은 위의 표에서 정리할 수 있다. 이 표를 보면 알 수 있겠지만 RDBMS와 유사한 Join이 제공된다. 그렇다면 Stream / Collection 관점에서는 어떤 Join이 지원될까?

    조인 종류 지원하는 식 Window
    Stream - Stream Inner Join
    Left Join
    Full Join
    O
    Stream - Table Inner Join
    Left Join
    X
    Table - Table Inner Join
    Left Join
    Full Join
    X

    위는 ksqlDB의 Collection 관점에서의 Join을 살펴본 내용이다. 특이한 점은 Stream - Stream Join을 하게 되면 반드시 Window가 필요하다는 것이다. Join은 State를 기억한 후에 StateStore에서 필요한 상태를 찾아서 Join을 하게 된다. 그런데 Stream은 무한하기 때문에 무한한 값을 StateStore에 기억할 수는 없다. 따라서 Stream - Stream Join에서는 Window가 필수적이다. 

     

    11.1.1.2 Join시 요구사항 → Copartition은 필수.

    ksqlDB에서 Join을 수행하기 위해서는 반드시 선행되어야 하는 조건이 있다. 이 조건이 만족되지 않는 경우 Join이 수행되지 않거나 수행된 Join 결과가 잘못될 수 있다. 따라서 반드시 아래 조건을 체크해야한다.

    • Join 대상이 되는 Collection의 파티션 갯수를 동일하게 맞춰야한다.
    • Join 대상이 되는 Collection이 동일한 Partition 전략으로 Partitioning 되어야 한다. 
    • Join 식 (ON 절)에서 참조하는 데이터는 같은 데이터 타입이어야 한다. 

    위에서 이야기한 내용은 카프카 스트림즈의 Co-Partition 내용이다. 위의 내용을 만족하면 아래 식을 이용해서 Join을 할 수 있게 된다.

    SELECT 
    	s.title_id,
        t.title,
        s.season_id,
        s.old_episode_count,
        s.new_episode_count,
        s.created_at
    FROM season_length_changes s
    INNER JOIN titles t
    ON s.title_id = t.id
    EMIT CHANGES;

     

     

    11.1.1.3 Stream Join / Table Join의 주의사항.

    스트림은 Join 할 때 아무 Column을 사용할 수 있다. 그렇지만 Table은 PRIMARY KEY라고 지정되어있는 컬럼만 조인할 수 있다. Table은 내부적으로 StateStore를 기반으로 Key - Value 형식으로 관리되고 있고 여기서 Key는 PRIMARY KEY이기 때문이다. 따라서, 반드시 Table Join을 할 때는 PRIMARY KEY로만 Join을 해야한다는 사실을 잊으면 안된다.

     

    11.1.1.4 Type Casting

    Join을 할 때 ON 절에는 같은 데이터 타입을 가진 녀석만 와야한다고 했다. 다른 데이터 타입을 가진 녀석들끼리 Join을 해야한다면 이 때는 Type Casting으로 해결해 볼 수 있다. ksqlDB는 CAST(필드 AS 타입명)으로 Type Casting을 지원해준다. 아래에서 예를 확인할 수 있다.

    SELECT *
    FROM season_length_changes s
    INNER JOIN titles t
    ON CAST(s.title_id AS INT) = t.id
    EMIT CHANGES;

     

    11.1.1.5 Repartitioning

    Join을 해야한다면 Join을 할 대상들끼리 파티션 갯수가 동일해야한다. 만약 파티션 갯수가 동일하지 않다면 파티션 갯수를 맞춰줘야한다. 파티션 갯수를 맞추기 위해서 Repartition(리파티션)을 할 수 있다. 이것은 WITH 절의 PARTITIONS 속성을 설정하는 것으로 해볼 수 있다. 

    CREATE TABLE titles_repartition
    WITH (PARTITIONS = 4) AS
    SELECT * FROM titles
    EMIT CHANGES;

    WITH절에 PARTITIONS를 설정할 경우 CTAS, CSAS 쿼리가 카프카 토픽을 생성할 때 PARTITIONS에 적힌 숫자만큼 파티션을 생성해준다. 비교해서 알아둬야 할 것은 PARTITION BY는 특정 Key 파티셔닝을 하는 명령어로 파티션 갯수를 늘려주지는 않는다. 

     

    11.1.1.6 Rekey

    Join의 요구사항은 궁극적으로 Kafka Producer 특성 때문에 그렇다. Kafka Producer는 Key를 가지는 메세지를 Hash 알고리즘을 이용해서 파티셔닝한다. 따라서 같은 Key는 항상 같은 파티션으로 간다. Join을 할 때 요구하는 내용들은 이 사실로부터 기인한다. Join을 할 때 파티셔닝이 서로 다른 Key로 되어있다면 모든 Key가 Join 되었다고 볼 수 없다. 따라서 필요하다면 Re-Key로 다시 Repartitioning을 해줘야한다. Rekey는 PARTITION BY 명령어로 해줄 수 있다.

    CREATE STREAM title_rekey
    AS
    	SELECT * 
        FROM some_table
        PARTITION BY other_column

    Rekey는 반드시 필요할 수도 있고, 아닐 수도 있다. 왜냐하면 아래 그림을 보면 된다. 다른 Key로 Partition 한 경우 누락되는 데이터가 있을 수 밖에 없다. 0_0 이런 녀석들은 StreamTask의 이름을 명시한 것인데 서로 다른 Key로 파티셔닝 되었을 때, Join 하고자 하는 Column(Key)가 같은 파티션에 있다는 보장을 할 수 없다. 따라서 필요한 경우 ReKey를 해야한다. 

     

     

    11.1.7 Persistent Join

    Persistent Join은 영구적인 Join 쿼리를 생성하는 것이다. SELECT 절로 생성된 쿼리는 서버가 재기동하면 사라지기 때문에 일시적이다. SELECT 절로 Join을 실행하고, 실행된 Join절(SELECT 절)을 CSAS / CTAS 쿼리로 만들어주면 생성된 Join 쿼리의 결과가 영구적으로 변하게 된다. 바꿔 이야기하면 Join된 메세지가 카프카 토픽으로 공급될 수 있도록 만들어주는 것이다. 

    CREATE STREAM season_length_changes_enriched
    WITH(
    KAFKA_TOPIC = 'season_length_changes_enriched',
        VALUE_FORMAT = 'JSON',
        PARTITIONS = 4,
        TIMESTAMP = 'create_at',
        TIMESTAMP_FORMAT = 'yyyy-MM-dd HH:mm:ss'
    ) AS
    SELECT
        s.title_id,
        t.title,
        s.season_id,
        s.old_episode_count,
        s.new_episode_count,
        s.created_at
    FROM season_length_changes s
    INNER JOIN titles t
    ON s.title_id = t.tile_id
    EMIT CHANGES;

    위 쿼리는 Persistent Join 쿼리를 의미한다. SELECT로 Join을 하고, 그 결과를 STREAM으로 생성하면서 Persistent Query를 생성한다. 

     

    11.1.8 Join Timestamp

    STREAM - STREAM Join에서 Stream은 무한한 이벤트의 흐름이기 때문에 반드시 Window Join을 해야한다고 이야기했다. Window Join은 Timestamp를 기준으로 수행된다. 그렇다면 어떤 Timestamp를 참고하는 것일까? 10장에서 공부했을 떄 Timestamp를 지정하는 방법은 다음과 같았다.

    CREATE STREAM title(
    	id int,
        created_at VARCHAR)
    WITH(
    	KAFKA_TOPIC = 'titles',
      	VALUE_FORMAT = 'JSON',
        TIMESTAMP = 'created_at',
        TIMESTAMP_FORMAT = 'yyyy-MM-dd HH:mm:ss'
    );

    기본적으로 다음과 같이 동작한다. WINDOW Join 자체에는 다음 값을 참고한다. 

    1. 생성할 때 Timestamp로 지정된 Column이 있으면, 이 Column을 Window Join에 참고한다.
    2. 지정된 Column이 없으면 Pseudo Column인 ROWTIME을 Window Join에 참고한다. 

    생성할 때는 다음과 같이 참고하는데 Join은 어떻게 참고할까? Join도 마찬가지다. CREATE STREAM WITH(timestamp)에 설정된 값을 Join 할 때, Join Window 기준으로 참고한다. 예를 들면 다음 쿼리에서는 created_at 타임을 기준으로 Window Join을 수행할 수 있도록 Column을 알려준다. (아래 쿼리는 Window Join을 하지는 않는다)

    CREATE STREAM season_length_changes_enriched
    WITH(
    KAFKA_TOPIC = 'season_length_changes_enriched',
        VALUE_FORMAT = 'JSON',
        PARTITIONS = 4,
        TIMESTAMP = 'create_at',
        TIMESTAMP_FORMAT = 'yyyy-MM-dd HH:mm:ss'
    ) AS
    SELECT
        s.title_id,
        t.title,
        s.season_id,
        s.old_episode_count,
        s.new_episode_count,
        s.created_at
    FROM season_length_changes s
    INNER JOIN titles t
    ON s.title_id = t.tile_id
    EMIT CHANGES;

     

     

    11.1.1.9 Window Join

    STREAM - STREAM을 Join할 때는 Window Join이 필수다. 이곳에서는 Window Join을 하는 방법에 대해 알아보고자 한다.  Window Join의 기본 문법은 아래와 같다.

    // JOIN 절 바로 아래에 와야함.
    WITHIN <시간> <시간 단위>
    
    
    // 예시
    INNER JOIN ~~~~
    WITHIN 2 HOURS

    여기서 시간에는 1,60, 241 같은 숫자가 올 수 있다. 시간 단위는 ksqlDB가 지원하는 시간 단위만 올 수 있다. ksqlDB가 지원하는 시간 단위는 아래와 같다.

    • DAY, DAYS
    • HOUR, HOURS
    • MINUTE, MINUTES
    • SECOND, SECONDS
    • MILLISECOND, MILLISECONDS

    하나의 예시로 다음 시나리오에 대한 쿼리를 작성해볼 수 있다. 아래 시나리오를 구현하기 위해서는 Window Join을 이용해 볼 수 있다.

    1. 시청-시작 / 시청-중지 이벤트를 각각의 별도 토픽으로 보낸다.
    2. 시청 세션이 끝날 때, 총 시청 시간이 2분 이내인 모든 세션을 구해야한다. 

    예시 코드를 작성해보면 다음과 같다.

    --
    CREATE STREAM start_watching_events(
        sessionId INT,
        titleId VARCHAR,
        createdAt VARCHAR
    ) WITH ( 
        KAFKA_TOPIC = 'start-session',
        VALUE_FORMAT = 'JSON',
        PARTITIONS = 4,
        TIMESTAMP = 'createdAt',
        TIMESTAMP_FORMAT = 'MMM dd, yyyy, hh:mm:ss a'
    )
    --
    CREATE STREAM end_watching_events(
        sessionId INT,
        titleId VARCHAR,
        createdAt VARCHAR
    ) WITH ( 
        KAFKA_TOPIC = 'start-session',
        VALUE_FORMAT = 'JSON',
        PARTITIONS = 4,
        TIMESTAMP = 'createdAt',
        TIMESTAMP_FORMAT = 'MMM dd, yyyy, hh:mm:ss a'
    )
    -- 
    CREATE STREAM aggregate_window_join
    WITH(
        KAFKA_TOPIC = 'aggregate_window_join',
        VALUE_FORMAT = 'JSON'
    ) AS 
    SELECT 
        A.titleId as titleId,
        A.sessionId as sessionId
    FROM start_watching_events A
    INNER JOIN end_watching_events B
    WITHIN 2 MINUTES
    ON a.sessionId = b.sessionId
    EMIT CHANGES;

     


    11.1.2 Aggregate (집계) → 연산의 결과는 항상 Table

    ksqlDB에서는 이벤트 스트림, 혹은 업데이트 스트림에서 집계를 이용해서 새로운 의미를 창출할 수도 있다. 한 가지 알아둬야 할 점은 Stream / Table 집계 모두 Aggregate 연산의 결과는 Table로 생성된다는 점이다. 카프카 스트림즈에서 집계 연산을 할 때 StateStore를 이용해서 한다는 것을 감안한다면 이 부분은 자연스러운 결과다.

    • 윈도우 집계
    • 비윈도우 집계

    집계는 크게 윈도우 집계 / 비윈도우 집계로 나눌 수 있다. 

     

    11.1.2.1 집계 기초 (비집계 쿼리)

    선요약하면 다음과 같다.

    • GROUP BY는 특정 KEY를 기준으로 집계를 시작한다.
    • GROUP BY에 있는 컬럼들만 SELECT 절에 집계 함수를 사용하지 않고도 SELECT 절에 들어갈 수 있다.
    • GROUP BY에 여러 컬럼이 올 수 있고, 이것은 복합키를 생성한다. 복합키는 'key1|+|key2' 형식으로 구성된다. 
      • 최근에는 각 필드로 존재하고, 필요할 경우 위와 같은 복합키를 생성하는 방식으로 접근할 수 있다. Key Format이 Kafka인 경우에는 Single 필드만 키로 존재해야하기 때문이다.

    Aggregate 연산을 하기 위해서는 필수적으로 GROUP BY를 사용해야한다. GROUP BY의 의미는 특정 COLUMN을 기준으로 STREAM의 레코드를 집계하겠다는 의미다. 이것은 STREAM을 특정 KEY를 기준으로 집계한다는 의미로 이해할 수 있다. 한 가지 예로 아래 쿼리를 볼 수 있다.

    SELECT
    	title_id,
        count(*) AS change_count,
        LATEST_BY_OFFSET (new_episode_count) AS latest_episode_count
    FROM season_length_changes_enriched
    GROUP BY title_id
    EMIT CHANGES;

    GROUP BY에 'title_id' 컬럼이 들어가는 것을 볼 수 있다. 이 말은 'title_id'가 같은 레코드끼리 그룹핑해서 쿼리의 결과를 생성한다는 것이다. 

    비집계(집계 함수를 사용하지 않는) 컬럼이 SELECT 절로 들어가기 위해서는 GROUP BY 절에 포함되어야 한다. 생각해보면 당연한데, GROUP BY를 하게 되면 GROUP BY에 있는 값은 1개가 되고 나머지는 N개가 된다. 집계 함수를 사용하는 것은 N개를 1개로 변경해주는 작업인데, 이 때 GROUP BY에 없는 녀석이 SELECT 컬럼에 포함되게 되면 1:N으로 되어서 GROUP BY 1개의 ROW에 대해서 N개를 표현할 수가 없다.

    비집계 COLUMN을 SELECT 절에 포함하는 유일한 방법은 GROUP BY에 컬럼을 같이 선언해주면 된다. 그러면 복합 키로 GROUP BY 되기 때문에 SELECT 절에 함께 들어갈 수 있게 된다. GROUP BY에 여러 개 컬럼을 선언한다는 것은 복합 Key를 생성하는 것을 의미하는데 ksqlDB 0.16.0 버전을 기준으로 복합키는 'keyA|+|keyB' 형식으로 생성되게 된다. 이런 쿼리의 예시는 아래에서 볼 수 있다.

    SELECT
    	title_id, → 비집계 열
        season_id, → 비집계 열
        COUNT(*) as CHANGE_COUNT,
        LATEST_BY_OFFSET(new_episode_count) AS latest_episode_count
    FROM season_length_changes_enriched
    GROUP BY title_id, season_id → 복합 키 생성
    EMIT CHANGES;

     

     

    11.2.2 윈도우 집계 (Window Aggregate) 

    비윈도우 집계는 특정 Column을 기준으로 집계한 결과를 보는 것이다. 비윈도우 집계는 하나의 차원에 대해서만 집계를 한 셈인데, 윈도우 집계는 특정 Column + 특정 기간을 할 수 있기 때문에 두 개의 차원에 대해서 집계를 할 수 있게 된다. 따라서 윈도우 집계는 비윈도우 집계의 좀 더 구체적인 버전이라고 봐도 무방할 것 같다.

    윈도우 종류 예시
    Tumbling Window WINDOW TUMBLING( SIZE 30 SECONDS)
    Hopping Window WINDOW HOPPING ( SIZE 30 SECONDS, ADVANCE BY 10 SECONDS)
    Session Window WINDOW SESSION ( 60 SECONDS)

    윈도우 집계는 GROUP BY와 함께 WINDOW 절을 사용하면서 구현할 수 있다. 정확하게는 GROUP BY 절은 WINDOW 절 앞에 와야한다. ksqlDB가 지원하는 WINDOW 개념은 위에서 확인할 수 있다. 예시로 볼만한 쿼리는 아래에 있다.

    SELECT
    	title_id,
        season_id,
        COUNT(*) AS change_count,
        LATEST_BY_OFFSET(new_episode_count) AS latest_episode_count'
    FROM season_length_changes_

    윈도우 집계는 너무 쉽게 할 수 있다. 그렇지만 윈도우 집계를 사용하면서 고려해야할 부분이 몇 가지 존재한다.

    • 언제 생성된 데이터를 Downstream으로 보내야할지
    • 지연되거나 순서가 바뀐 데이터는 어떻게 처리해야할지
    • 얼마나 오랜 시간 윈도우를 보관해야할지 

    이 부분은 아래에서 알아볼 것이다.

     

    11.2.3 윈도우 집계에서의 지연 시간(Grace Period Time)

    선요약은 다음과 같다.

    1. 메세지의 지연으로 Window에 포함될지를 결정하는 것은 StreamTime - GracePeriod로 계산한다.
    2. Grace Period를 설정하지 않으면 Window가 보관 기간이 만료되어 삭제될 때까지 윈도우가 열려있게 된다.
      • 즉, 지연 시간 이내라면 Window StateStore에 계속 업데이트가 된다.

    5장에서 배운 내용이 있다. 간단히 복습해보면 다음과 같다.

    1. 카프카의 특정 토픽에는 서로 다른 프로듀서에서 메세지를 보낸다. 이 때문에 실제 생성된 시점과 브로커에 저장된 오프셋은 다를 수 있다. 
    2. 카프카 스트림즈는 브로커에서 동기화 되지 못한 시간을 동기화 하기 위해 내부적으로 Stream Time을 가진다. 이것은 WallClock(벽시계)와는 다른 개념이다. 
    3. Stream Time은 Stream을 처리해나가면서 현재까지 처리한 Stream 중에서 가장 최신의 시간을 가진 녀석으로 업데이트 한다. 
    4. 정리하면 카프카 스트림즈는 내부적으로 Stream Time이라는 Stream 동기화 시간을 가지고 있고, 이 녀석을 기준으로 특정 메세지가 뒤늦게 도착했는지를 판단할 수 있다.

    위 사실에서 살펴보면 현재 Stream 시간을 Stream Time으로 관리하는데 Stream Time보다 이전에 생성된 메세지가 도착하면 지연된 메세지로 볼 수 있다는 것이다. 지연된 메세지를 Window에 포함할지를 결정하는 것은 Grace Period Time으로 설정한다. 아래 명령어를 이용해서 어느 정도 지연된 메세지가 Window에 포함될 수 있도록 해준다. 

    WINDOW <WINDOW TYPE>(
    	<WINDOW TIME> <WINDOW TIME UNIT>
        GRACE PERIOD <WINDOW TIME> <WINDOW TIMEUNIT>
    );

    WINDOW 쿼리에 대한 예시를 아래에서 살펴볼 수 있다.

    SELECT
    	title_id,
        season_id,
        COUNT(*) AS change_count,
        LATEST_BY_OFFSET(new_episode_count) AS episode_count
    FROM season_length_changes_enriched
    WINDOW TUMBLING (SIZE 1 HOUR, GRACE PERIOD 10 MINUTES)
    GROUP BY title_id, season_id
    EMIT CHANGES;

     

    11.2.4 Window와 Stream Time, Grace Period 예시

    특정 메세지를 특정 시간에 발행한다고 가정했을 때, 특정 메세지가 Window에 포함되는지, 그리고 StreamTime은 어떻게 바뀌는지를 예시로 정리해보고자 한다. 

    // 동일한 KEY를 가진. 생성 시점이 다름. 
    1. 2021-02-24 10:00:00
    2. 2021-02-24 11:00:00
    3. 2021-02-24 10:59:00
    4. 2021-02-24 11:10:00
    5. 2021-02-24 10:59:00

    동일한 Key를 가지고 서로 다른 시점에 생성된 메세지가 서로 다른 시점에 카프카에 전달된다고 가정해보자. 이 때 각각의 동작 방식은 다음과 같다.

    No StreamTime WINDOW
    1 X → 10:00 10:00 ~ 11:00에 추가
    2 10:00 → 11:00 11:00 ~ 12:00에 추가
    3 11:00 10:00 ~ 11:00에 추가 (StreamTime - Grace Period = 10:50임)
    4 11:00 → 11:10 11:00 ~ 12:00에 추가
    5 11:10 X ( StreamTime - GracePeriod = 11:00 > 10:59)
    StreamTime에서 GracePeriod를 뺀 시간보다 더 이전에 발생한 메세지다.
    즉, Grace Period Time 내에 들어오지 못한 지연 메세지다. 

     

    11.2.5 Window 보관 기간(Retention Time)

    • WINDOW 절에 RETENTION 키워드로 WINDOW 보관 기간을 설정할 수 있다.
    • 윈도우 보관 기간 >= Window Size + Grace Period Time이어야만 한다.
    • Window 보관 기간을 설정하게 되면서 State를 작게 유지할 수 있다. 
    • Window Retention Time은 Stream Time을 따른다.

    Window StateStore를 계속 보관하면 ksqlDB는 많은 메모리, 로컬 스토리지를 사용하게 된다. StateStore가 커진다는 것은 장애가 발생했을 때, 복구 시간이 오래 걸리기 때문에 가급적이면 작은 State를 유지하는 것이 좋다. Window StateStore는 Retention Time을 설정하면 되고, 이 시간이 지나면 특정 Window를 나타내는 Window State는 삭제된다. 한 가지 주의해야 할 점은 한번 삭제된 Window는 더 이상 쿼리할 수 없다는 점이다. (삭제 되었으니 당연히 볼 수 없다!) 사용은 아래를 참고하면 된다. 

    WINDOW {HOPPING | TUMBLING | SESSION} (
    	<window_properties>,
        RETENTION <보관 기간> <TIME UNIT>
    );

    사용 관련된 예시 쿼리는 아래에서 확인할 수 있다.

    SELECT
    	title_id,
        season_id,
        LATEST_BY_OFFSET(new_episode_count) AS episode_count,
        COUNT(*) AS change_count
        FROM season_length_changes_enriched
        WINDOW TUMBLING (
        	SIZE 1 HOUR,
            RETENTION 2 DAYS,
            GRACE PERIOD 10 MINUTES
      	)
        GROUP BY title_id, season_id
        EMIT CHANGES;

     

     

    11.2.6 Materialized View → Table임.

    Materialized View는 앞서 살펴봤던 Materialized와는 다른 개념이다. Materialized 개념은 내부적으로만 사용하던 StateStore를 외부에 노출해서 외부에서 쿼리가 가능하게 한다는 개념이다. Materialized View는 논리적으로 생성하는 쿼리문을 실제로 실행하고 연산 결과를 바탕으로 테이블을 미리 만들어두어서 빠르게 쿼리해 나갈 수 있는 개념을 의미한다. ksqlDB에도 Materialized View 개념이 존재한다. 

    • Materialized View는 다른 Collection을 Query해서 파생된다.
    • Materialized View는 look-up 방식으로 쿼리한다 (ksqlDB의 Pull 쿼리를 이용)
    • ksqlDB(0.16.0 버전)의 Materialized View는 집계 쿼리로만 생성된다. 
    • 새로운 데이터가 들어오면 자동 갱신된다. 

    ksqlDB에서 Materialized View는 'Pull 쿼리를 실행할 수 있는 특정 종류의 테이블'을 의미한다. ksqlDB에서는 테이블을 다음 방식으로 생성할 수 있다. 이 때 Pull 쿼리를 지원하는 것은 집계 쿼리를 이용해서 생성된 테이블만이다. 따라서 ksqlDB에서는 집계 쿼리를 이용해서 생성된 테이블만 Materialized View를 지원하는 것으로 이해할 수 있다. 

    • Kafka Topic을 읽어서 바로 생성 → Pull 쿼리 지원 X
    • 다른 Stream, Table로부터 비집계 쿼리를 이용해서 생성 → Pull 쿼리 지원 X
    • 다른 Stream, Table로부터 집계 쿼리를 이용해서 생성 → Pull 쿼리 지원 O

    아래는 집계 쿼리를 이용해서 생성한 Materialized View 쿼리다. 다시 한번 이야기 하지만 ksqlDB에서의 Materialized View는 집계 쿼리를 이용해서 생성된 Table을 의미한다. 

    CREATE TABLE season_length_change_counts
    WITH (
    	KAFKA_TOPIC = 'season_length_change_counts',
        VALUE_FORMAT = 'JSON',
        PARTITIONS = 1
    ) AS
    SELECT
    	title_id,
        season_id,
        COUNT(*) AS change_count,
        LATEST_BY_OFFSET(new_episode_count) AS episode_count
    FROM season_length_changes_enriched
    WINDOW TUMBLING (
    	SIZE 1 HOUR,
        RETENTION 2 DAYS,
        GRACE PERIOD 10 MINUTES
    )
    GROUP BY title_id, season_id
    EMIT CHANGES;

    11.3 클라이언트

    이 장에서는 클라이언트를 구성하는 방법을 생각해볼 수 있다. ksqlDB는 이미 ksqlDB Client라는 훌륭한 클라이언트가 있기 때문에 직접 구성할 필요는 없다. 따라서 이런 기능이 있다는 것을 이해하면 된다. 

     

    11.3.1 가져오기 쿼리 (Pull 쿼리)

    Pull 쿼리를 이용해서 Table을 조회할 수 있는 것은 Materialized View가 유일하다. Materiazlied View에 대한 Pull 쿼리에는 Join, Aggregate를 지원하지 않는데 이것은 Materialized View를 생성할 때 사용할 수 있다. 따라서 가져오기 쿼리(Pull 쿼리)는 단순히 특정 Key 값에 대한 것을 찾아오는 Look up 쿼리로 이해를 하면 될 것 같다. 

    만약 Window View로 생성되었다면 자동으로 WINDOWSTART / WINDOWEND를 제공해준다. 이 WINDOW는 윈도우 집계를 했을 때 윈도우의 시작과 끝을 의미한다. 또한 WHERE 절에 필터링 조건으로도 이용할 수 있다. 

     

    11.3.2 CURL

    만약 쉘 스크립트로 쿼리를 실행하고 싶다면, CURL 명령어를 이용해서 POST 요청을 하면 된다. 아래 요청을 참고해서 작성하면 된다. Pull 쿼리는 요청 / 응답이 완료되면 연결이 끊어진다. 그렇지만 Push 쿼리는 새로운 데이터가 도착하면 결과를 계속 내보낼 수 있게 오랜 시간 유지되는 연결을 통해 쿼리 결과를 일련의 Chunk Stream으로 보내줄 것이다. 

    // Pull 쿼리 보내기
    curl -X POST "http://localhost:8088/query" \
    	-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
        --data $'{
        "ksql": "SELECT * FROM season_length_change_counts WHERE KEY=\'some\';",
        "streamsProperties": {}
        }'
        
    // Push 쿼리 보내기
    curl -X POST "http://localhost:8088/query" \
    	-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
        -d $'{
        "ksql": "SELECT * FROM season_length_changes_enriched EMIT CHANGES ;",
        "streamsProperties" : {}
         }'

    11.4 함수와 연산자

    ksqlDB는 여러 연산자를 제공하고 있으며 다양한 내장함수를 제공한다. 그리고 사용자가 직접 설정한 커스텀 함수(UDF) 기능까지 지원한다. 이 부분을 공부해보고자 한다.

     

    11.4.1 연산자

    ksqlDB는 여러 연산자를 포함하고 있다. 그리고 이 연산자들은 SQL문에서 사용할 수 있다.

    • 산술 연산자 : +, -, /, *, %
    • 문자열 결합 연산자 : +, ||
    • 배열의 색인 또는 맵의 키를 접근하는 첨자(subscript) 연산자 : [] → :[1], [2]
    • 구조체 참조 연산자 : ->

    이 연산자의 사용방법은 이곳(https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/operators/) 에서 확인할 수 있다. 

     

    11.4.2 함수 목록 + 설명 보기

    ksqlDB는 기본적으로 여러 형태의 내장함수를 지원한다. ksqlDB 서버에서 사용할 수 있는 내장함수 목록을 보는 방법과 내장함수의 자세한 설명을 보여주는 명령어를 이용해 볼 수 있다.

    // 사용 가능한 함수 리스트 보기
    SHOW FUNCTIONS;
    
    // 함수 설명 보기
    DESCRIBE FUNCTION <함수이름>

    ksqlDB에서는 함수의 타입이 Return Type을 기준으로 생성된다. ksqlDB에서 지원하는 함수의 타입은 총 세 가지가 있다.

    함수 종류 설명
    SCALAR 한 번에 한 ROW에 대한 연산을 수행하고, 하나의 결과를 반환하는 Stateless 함수
    AGGREGATE 데이터 집계에 사용하는 Stateful 함수. 하나의 결과만 반환
    TABLE 하나의 입력을 받아 0개 이상의 결과를 반환하는 Stateless 함수.
    이 함수는 카프카 스트림즈의 flatMap의 동작과 유사하다. 

    ksqlDB에서 지원하는 내장 함수는 ksql Docs(https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/functions/)에서 정확히 알아볼 수 있다.  

    함수의 사용 설명을 보면 다음과 같은 예시가 나온다. 여기서 살펴보면 좋은 것은 다음과 같다.

    • Jar : Internal은 내장함수를 의미한다. UDF를 구현해서 추가하게 되면 Jar는 internal이 아니라 다른 것으로 바뀌게 된다.
    • Variations : 사용 예시가 나온다. 하나 뿐만 아니라 여러 형태로 사용할 수 있는 것을 알 수 있다. 

    11.5 커스텀 함수 생성 (UDF, UDAF, UDTF)

    ksqlDB는 세 가지 타입의 함수를 제공한다. 따라서 ksqlDB에 User Define Function을 구현할 수 있는 타입도 세 가지만 존재한다. 사용자가 직접 구현해서 제공할 수 있는 함수는 아래 세 가지 형태로 정리할 수 있다.

    종류 설명
    User-Defined Function (UDF) 커스텀 Scala 값 반환 함수. Stateless 함수이며, 정확히 하나의 값만 반환함.
    User-Defined Aggregate Function (UDAF) 커스텀 Aggreagte 함수. Stateful 함수이며, 정확히 하나의 값만 반환함.
    User-Defined Table Function (UDTF) 커스텀 Table 함수. Stateless 함수이며, 0개 이상의 값을 반환한다.

    여기에서는 UDF를 구현하고 빌드해서 실제 ksqlDB에서 사용할 수 있도록 하는 과정을 다루고자 한다.

     

    11.5.1 UDF Dependency

    UDF, UDAF, UDTF를 만들기 위해서는 특정 어노테이션을 달아주어야 한다. 이 어노테이션은 UDF 관련 Dependency를 추가하면서 사용할 수 있게 된다. 아래 Dependency를 추가해주면 된다. 

        implementation 'io.confluent.ksql:ksqldb-udf:7.3.0'

     

    11.5.2 UDF 어노테이션

    자바 코드를 구현하는 과정에서 사용할 수 있는 UDF 어노테이션은 다음과 같다. @UdfParameter는 필수는 아닌 것 같다. @UdfParameter가 없어도 UDF는 잘 등록되고 잘 사용할 수 있다. 

    • @UDFDescription : 클래스에 단다. 이 UDF 전체에 대한 내용을 작성하는 곳이다.
    • @Udf : UDF에서 사용될 메서드에 단다. ksqlDB는 이 어노테이션이 달린 메서드를 호출한다.
    • @UdfParameter : @Udf 어노테이션이 있는 메서드의 파라메터에 단다. 이 어노테이션은 DESCRIBE Function을 했을 때, Variartion에 노출되는 부분이다. 

     

    11.5.3 구현

    주의사항은 다음과 같다.

    1. UDF에서 호출될 함수는 반드시 public 이어야한다.
    2. UDF에서 호출된 함수는 반드시 Non Static이어야 한다. 

    이 주의사항을 고려해서 인풋이 들어오면 1000을 더해서 반환해주는 간단한 함수를 구현했다. 

    @UdfDescription(
        name = "plus_1000",
        description = "TEST UDF : PLUS 1000",
        version = "0.0.0",
        author = "ME")
    public class PlusThouUdf {
    
        private Integer plusThousand(int input) {
            return input + 1000;
        }
    
        @Udf(description = "Plus 1000")
        public Integer apply(
                @UdfParameter(value = "source", description = "the raw source Integer")
                int source) {
            return plusThousand(source);
        }
    
        @Udf(description = "Plus 1000 with String. return type is Integer")
        public Integer apply(
                @UdfParameter(value = "source", description = "the raw source String")
                String source) {
            Integer integer = Integer.valueOf(source);
            return plusThousand(integer);
        }
    }

     

    11.5.4 Build + 사용

    요약하면 다음 단계를 수행해야한다.

    1. UDF를 Jar 파일로 생성한다.
    2. ksqlDB에서 Jar 파일을 읽을 폴더를 설정한다.
    3. 해당 폴더로 Jar 파일을 이동시킨다. 

    UDF를 ksqlDB에서 사용하기 위해서 UDF를 Jar 파일로 패키지해야한다. 아래 명령어를 이용하면 손쉽게 UDF 파일을 빌드할 수 있다. 

    $ gradle build

    ksqlDB의 설정을 수정해주면 외부에서 생성된 UDF 파일을 사용할 수 있게 된다. 두 가지 작업을 해야한다.

    1. ksqlDB를 컨테이너 환경에서 사용하고 있다면, UDF properties를 수정해준다.
    2. UDF properties에 설정한 폴더에 생성한 Jar 파일을 복사해준다. 
    ksqldb-server:
      image: confluentinc/ksqldb-server:0.28.2
      hostname: ksqldb-server
      container_name: ksqldb-server
      depends_on:
        - broker
      ports:
        - "8088:8088"
      environment:
        KSQL_LISTENERS: http://0.0.0.0:8088
        KSQL_BOOTSTRAP_SERVERS: broker:29092
        KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
        KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
        KSQL_KSQL_EXTENSION_DIR: "/home/appuser/udf"
      volumes:
        - "./udf/:/home/appuser/udf"

    나는 docker-compose를 이용해서 ksqlDB를 사용하고 있었기 때문에 docker-compose 파일에서 두 가지를 수정했다. 아래 두 작업을 마치고, ksqlDB를 재기동하면 UDF가 인식된다. 

    • KSQL_KSQL_EXTENSION_DIR에 UDF를 읽어올 디렉토리를 설정했다.
    • volumes를 이용해서 현재 UDF 파일이 저장되어있는 로컬 폴더와 ksqlDB 컨테이너의 폴더를 동기화했다.

    SHOW FUNCTION에서 UDF 리스트가 생성되는 것을 볼 수 있었고,  DESCRIBE FUNCTION PLUS_1000을 했을 때 상세 설명까지 잘 나오는 것을 볼 수 있었다.

     

    11.5.6 UDF 작성 관련 추가 자료

    • https://oreil.ly/HTb-F (미치 시모어, 카프카 서밋 2019)
    • https://oreil.ly/HMU9F (미치 시모어, 컨플루언트 블로그)
    • https://oreil.ly/qEafS (공식 ksqlDB 문서)

     

     

    복합 키 접근

    GROUP BY 절에 여러 Column이 포함되면 각 복합키가 생성된다. ksqlDB 0.16.0에서 복합키는 'keyA|+|keyB' 형식으로 생성된다. 아무튼 이렇게 생성된 레코드가 있을 때, 복합키에 쿼리를 해야한다면 당연하게도 위의 양식을 참고해서 쿼리해야한다. 아래 쿼리를 참고하면 된다. 

    SELECT *
    FROM T
    WHERE COL='keyA|+|keyB'

    0.15.0 이상 부터는 KEY_FORMAT이 KAFKA일 때 Single Key만 지원하도록 바뀌었다고 한다. 따라서 Group BY로 복합키를 사용한다면 KEY FORMAT을 JSON / PROTOBUF 등으로 해서 처리해야한다. 혹은 SELECT 절에 굉장히 복잡한 형태로 KEY를 만들어줘야한다. 아래에서 참고 가능하다.

    https://www.confluent.io/blog/ksqldb-0-15-reads-more-message-keys-supports-more-data-types/

     

    Stream Time을 따르는 것

    1. Window State의 Retention Time. 스트림 시간을 기준으로 삭제한다.
    2. Grace Period 관련. Stream Time - Grace Period를 했을 때 시간으로 특정 Window에 포함될 수 있는지 아닌지를 판단한다. 

    댓글

    Designed by JB FACTORY