ksqlDB : ksqlDB의 Join

    들어가기 전

    ksqlDB는 RDBMS에서 사용하는 Join과 마찬가지로 Join 기능을 제공한다. 스트림 - 스트림 / 스트림 - 테이블 / 테이블 - 테이블을 Join해서 새로운 스트림과 테이블을 만들어 낼 수 있다. 기본적으로 Join을 할 때는 파티션을 기준으로 한다. 파티션을 기준으로 한다는 말은 Key를 기준으로 한다는 의미가 될 수 있다. 이것을 미루어 보건데 Join의 대상이 되는 녀석들의 파티션이 중요하다는 것을 암시한다. 

     

    Join과 Key

    Table은 항상 PK를 가지고, PK를 기준으로 파티셔닝 된다. 그리고 ksqlDB에서 Table은 리파티셔닝이 되지 않는다. 따라서 Table을 Join 하는 경우 반드시 PK값을 사용해야만 한다. 

    Stream은 Key를 가지는 경우도 있고, Key를 가지지 않는 경우도 있다. Key를 가지는 경우 Join을 할 때 해당 Key를 이용해서 한다. 반면 Key를 가지지 않는 Stream이라면 Join 할 때 Join Key를 바탕으로 내부적으로 스트림을 다시 Repartition을 한 후에 Join 한다.

    Repartition을 했을 때, 각 메세지는 동일한 파티션에 있을 때만 순서가 보장된다. 
    https://ojt90902.tistory.com/1113

     

    Join의 제약 조건

    • Join을 위한 Key는 동일한 스키마를 가져야 함. 
    • Join을 할 대상은 동일한 수의 파티션을 가져야 함. 
    • Join을 할 파티션들은 동일한 파티션 전략으로 분배되어야 함.

    위의 세 가지 조건을 만족할 때만 Join이 가능하다. 아래에서 좀 더 자세히 해당 내용을 살펴보겠다. 

     

    Join을 위한 Key는 동일한 스키마를 가져야 함. 

    -- stream with INT userId
    CREATE STREAM clicks (
        userId INT KEY, 
        url STRING
      ) WITH (
        kafka_topic='clickstream', 
        value_format='json'
      );
    
    -- table with BIGINT id stored in the key:
    CREATE TABLE users (
        id BIGINT PRIMARY KEY, 
        fullName STRING
      ) WITH (
        kafka_topic='users', 
        value_format='json'
      );
    
    -- Join utilising a CAST to convert the left sides join column to match the rights type.
    SELECT 
      clicks.url, 
      users.fullName 
    FROM clicks 
      JOIN users ON CAST(clicks.userId AS BIGINT) = users.id
    EMIT CHANGES;

    clicks 스트림과 users 스트림을 Join을 하려고 한다. 이 때, userId를 기준으로 Join을 한다. 중요하게 봐야할 것은 clicks 스트림과 users 스트림의 userID의 타입이 다르다는 점이다. clicks에는 Int 타입이고 users에는 Big int 타입이다. 따라서 Join 하려고 하는 Key의 스키마가 맞지 않다. 

    Join Key의 스키마가 맞지 않는 경우는 Cast 함수를 이용해서 타입을 바꾸어서 해결해 줄 수 있다.

     

    Join을 할 대상은 동일한 파티션 수를 가져야 함. 

    Kafka Producer는 기본적으로 Key 값을 가진 메세지들을 Hash 알고리즘을 파티셔닝해서 메세지를 보낸다. 이 말을 바꿔서 이야기하면 파티션의 갯수가 다르면 서로 다른 키를 가진다는 것이다. 서로 다른 키를 가지는 경우는 Join 할 수 없다. 따라서 ksqlDB에서는 Join을 하기 위해서 Join 대상은 모두 동일한 파티션 갯수를 가져야 한다. 

    CREATE STREAM products_rekeyed 
      WITH (PARTITIONS=6) AS 
      SELECT * 
       FROM products
       PARTITION BY product_id;

    만약 Join 할 대상이 동일한 파티션 갯수를 가지지 않는다면 같은 파티션 갯수를 가지도록 ksqlDB에서 Repartition을 해줄 수 있다. Repartition은 위의 명령어를 이용해서 할 수 있다. 위 명령은 파티션을 6개로 만들고 이 때 Key로는 product_id라는 녀석을 사용하겠다는 것을 의미한다. 

    - 만약 PARTITION BY에 Null이 들어가는 경우 임의로 생성된다. 따라서 COALESCE 라는 함수를 이용할 것이 권장된다.
    - 만약 PARTITION BY를 사용하지 않을 경우 기존 Stream / Table에서 사용하던 Key를 그대로 사용해서 Repartition 한다.

     

    Join 할 파티션들은 동일한 파티션 분배 전략이 적용되어야 함.

    Producer는 파티션 분배 전략을 통해서 각 파티션에 메세지를 분배한다. Producer는 기본적으로 Hash 알고리즘을 이용해서 Key를 파티셔닝 하는데 이 때 서로 다른 파티션 분배 전략을 사용했다면 같은 파티션 번호에서 서로 다른 키가 존재할 수 밖에 없는 상황이 나타난다. 따라서 Join 할 파티션들은 동일한 파티션 분배 전략을 사용 해야한다. 사실 이 부분은 메세지를 제공해주는 Kafka Producer의 구현에 좌우된다.

     

    ksqlDB Join의 종류

    위의 제약 조건만 만족한다면 ksqlDB를 이용해서 Join을 실행할 수 있다. Join은 다음과 같은 경우에만 가능하다.

    • Stream / Stream Join 후 새로운 Stream을 생성함. 
    • Table / Table을 Join 후 새로운 Table을 생성함
    • Stream / Table을 Join 후 새로운 Stream을 생성함. 

    이 중에서 눈여겨 봐야할 부분은 Stream / Stream의 Join이다. Stream은 Window 개념을 이용해서 Join을 하는데 이 때, 실시간으로 동일한 Key가 여러 개 들어올 수 있기 때문에 전통적인 RDBMS에서 생각보다 많은 데이터가 발생할 수 있다는 것이다. 

    CREATE STREAM shipped_orders AS
    SELECT
    	o.id as orderId,
        o.itemid as itemId,
        s.id as shipmentId,
        p.id as paymentId
    FROM orders o
    INNER JOIN payments p WITHIN 1 HOURS ON p.id = o.id
    INNER JOIN shipments s WITHIN 2 HOURS ON s.id = o.id;

    기본적인 Join 문은 다음과 같이 생성할 수 있다.  이 때 WITHIN은 Stream끼리 Join할 때만 사용된다. Stream에는 동일한 Key가 들어온다. Join은 Key를 기준으로 Join 하기 때문에 기본적으로 실시간으로 동일한 Key가 계속 들어오면 해당 Key를 가진 모든 메세지에 대해서 Join을 해서 결과를 표현해준다. 따라서 이게 무한히 반복되는 경우에는 Join으로 발생하는 데이터가 기하급수적으로 증가하면서 문제가 될 것이다. 

     


    Stream - Stream Join의 Window

    앞서 이야기 했던 것처럼 Stream - Stream Join을 할 때 Join의 범위를 무한한 시간에 대해서 할 수 없다. 왜냐하면 너무나 많은 데이터가 생기기 때문이다. 따라서 이런 문제를 해결하기 위해서 Stream끼리 Join을 할 때는 특정한 기간 내에 들어온 이벤트들만 Join을 한다라고 설정할 수 있다. 이 때 사용하는 것이 'WITHIN'이라는 절이다. 

    CREATE STREAM shipped_orders AS
    SELECT
    	o.id as orderId,
    FROM orders o
    INNER JOIN payments p WITHIN 1 HOURS ON p.id = o.id

    위의 코드를 살펴보면 'WITHIN 1 HOURS' 라는 문구가 있다. 이 말은 현재 시점을 기점으로 1시간 이내에 들어온 메세지들만 Join의 대상이 된다는 것을 의미한다. 예를 들어 아래 상황을 가정해보자.

    • 데이터는 오후 5시부터 계속 들어오고 있음. 
    • 현재 시간은 7시 30분임. 

    위와 같은 경우 STREAM Join의 대상이 되는 이벤트는 오후 6시 30분 ~ 7시 30분 사이의 데이터들이다. 

     

    Stream - Stream Join의 동작 방식

    Stream - Stream Join은 그렇다면 실제로 어떻게 동작하길래 데이터가 저렇게 크게 생긴다는 것일까? 이것을 확인해보기 위해서 아래의 일을 진행했다.

    # 스트림1 생성
    CREATE STREAM Stream1(
    Key varchar key, value varchar) WITH (kafka_topics = 'topic1', value_format = 'delimited');
    
    # 스트림2 생성
    CREATE STREAM Stream2(
    Key varchar key, value varchar) WITH (kafka_topics = 'topic2', value_format = 'delimited');
    
    # Join 스트림 생성
    CREATE STREAM JOIN_STREAM AS
    SELECT s1.key, s1.value, s2.value
    FROM Stream1 s1
    LEFT JOIN Stream2 s2 WITHIN 10 minutes 
    ON s1.key=s2.key

    위 스트림을 각각 생성하고, Console Producer를 이용해서 메세지를 하나씩 발송해봤다.

    시간 KEY Stream1 Stream2 Join Stream
    0 A 1   1 | null
    1 A   a 1 | a
    2 A 2   2 | a
    3 A   b 1 | b
    2 | b
    4 A 3   3 | a
    3 | b
    5 A   c 1 | c
    2 | c
    3 | c
    6 A 4   4 | a
    4 | b
    4 | c
    7 A 5   5 | a
    5 | b
    5 | c
    8 A 6   6 | a
    6 | b
    6 | c

    시간별로 메세지를 발송하면 다음과 같은 결과가 발생한다.  5분을 기준으로 자세한 상황을 한번 살펴보자.

    • 5분 기준으로 Stream1의 Key 'A'에는 1,2,3 Value를 가진 메세지가 각각 존재한다. 
    • 5분 기준으로 Stream2의 Key 'A'에는 'c' Value를 가진 메세지가 도착했다. 따라서 b의 Join 대상은 1,2,3이 되는 것이다.

    이런 이유 때문에 c Value가 도착한 후 Join Stream에는 1|c, 2|c, 3|c라는 3개의 레코드가 생성되는 것이다.

     

    각 Join의 지원 범위

      Window inner left outer right outer full outer
    Stream - Stream O O O O O
    Table - Table X O O O O
    Stream - Table X O O X X

    ksqlDB에서 각 Join의 지원범위는 다음과 같다.  Stream - Table의 right Outer Join이 지원되지 않는 것은 Table의 Key를 기준으로 Join을 할 때, 스트림에는 해당 Key를 가진 이벤트가 아주 많을 것이다. 이 중에 어떤 메세지가 Table의 Key에 맵핑 되어야 할지 알 수 없기 때문에 지원하지 않는다. 

     

     

    각 Join의 동작 방식

    Stream - Stream Join

    Strema과 Stream이 Join할 경우, 각 스트림에 새로운 메세지가 들어올 때 마다 Join Stream에 새로운 쿼리 결과가 업데이트 된다. 

     

    Stream - Table Join

    Stream - Table Join은 Stream을 기준으로 만들어진다. 이 때 주의해야 할 부분은 다음과 같다.

    • Stream에 새로운 입력이 들어오면, 그 입력은 Join Stream의 쿼리가 업데이트 되도록 한다. 
    • Table에 새로운 입력이 들어와서 업데이트 된다하더라도 그 입력은 Join Stream에 트리거를 주지 않는다.

    예를 들어 StreamA 스트림에 들어온 메세지의 갯수를 집계하는 것이 TableA라고 해보자. 그리고 TableA는 StreamB와 조인을 했다고 가정해보자. 이 때 중요한 것은 StreamA에 메세지가 들어와서 TableA의 집계 값이 업데이트 된다고 하더라도, 그것이 Join Query 결과를 업데이트 하지 않는다는 것이다. 

    ksql> SELECT * from join_stream10 emit changes;
    
    +------------+-----------+-------+--------------+
    |SS1_KEY     |SS1_VALUE  |M1_KEY |M1_KSQL_COL_0 |
    +------------+-----------+-------+--------------+
    |a           |101        |a      |3             |
    |a           |101        |a      |3             |
    |a           |102        |a      |10            |

    위에서 볼 수 있는 것이 하나의 예시다. StreamA가 처음 3개의 메세지만 있다가, 10개의 메세지가 생겼다. 그래서 TableA는 10개의 메세지를 가지고 있다고 업데이트가 되었다. 그런데 실제로 EMIT CHANGES로 JOIN QUERY 결과를 살펴보면 나오는 그것에 대한 변화값은 없었다. 만약 이것에 대한 변화가 트리거를 했다면 3,4,5,6,7,8,9,10 순으로 갔어야 했다. 그렇지만 스트림 - 테이블 Join에서는 스트림의 변화만 쿼리를 촉발하기 때문에 다음과 같은 결과가 나온다. 

     

    Table - Table Join

    • 테이블끼리의 Join은 1:1(PK) / 1:N(FK) Join을 지원한다. (다대다 Join은 지원하지 않음)
      • PK Join은 Inner / Left Outer / Right Outer /Full Outer Join이 지원된다.
      • FK Join은 Inner / Left Outer JOin이 지원된다.

    테이블끼리의 Join은 항상 Window를 사용하지 않는 Join이다. 아래에서 tombstone이라고 하는 것은 나중에 더 찾아봐야겠지만, 실제로 tombstone이 아닐 수도 있다. 예를 들어 Key = 'ABC', value = null이라는 메세지를 보내면 ksqlDB의 Stream / Table 레벨에서 Parsing을 하지 않는다. Parsing을 하지 않는다는 말은, Topic에는 위 메세지가 Tombstone 형태로 들어가있지만 Stream / Table 레벨에서는 Parsing을 하지 않기 때문에 결과적으로 Stream / Table의 Topic에는 반영되지 않는다는 것을 의미한다. 

    또한 INSERT INTO 명령어로 NULL 값을 직접 넣어도, ksqlDB는 이 값을 자신의 Stream / Table에 넣지 않는다. 따라서 INSERT INTO 명령어의 결과도 실제로는 반영되지 않는다.

    쉽게 이야기 하면 카프카 토픽 입장에서는 변경점이 있지만, 스트림 + 테이블 입장에서는 읽을 수 없는 값이기 때문에 Parsing 하지 않아서 스트림 + 테이블과 관련된 토픽에는 해당 메세지가 기록되지 않는다. 거기에 Join 된 테이블에는 당연히 영향을 미치지 않게 된다.

     

    Table - Table PK Join

    Table - Table 외래키 Join을 할 경우 양쪽 테이블이 업데이트 될 때 마다 그 값은 바로 EMIT CHANGES에 반영된다.  

    • 모든 메세지는 동일한 Key인 'ABC'를 가진다.
    • 모든 메세지는 시간 순서대로 들어온다.

    이 때 상황을 더 쉽게 이해하기 위해서 다음과 같은 가정을 하고 시계열로 확인해보자. 아래 표를 살펴보자.

    시간 Table A Table B INNER LEFT RIGHT FULL
    1 null
    (tombstone)
             
    2   null
    (tombstone)
           
    3 A     [A, null]   [A,null]
    4   a [A,a] [A,a] [A,a] [A,a]
    5 B   [B,a] [B,a] [B,a] [B,a]
    6   b [B,b] [B,b] [B,b] [B,b]
    7 null
    (tombstone)
      null
    (tombstone)
    null
    (tombstone)
    [null, b] [null, b]
    8   null
    (tombstone)
        null
    (tombstone)
    null
    (tombstone)
    9 C     [C, null]   [C,null]
    • 1~2 번 : Value에 어떤 값도 들어오지 않으면 Tombstone이 되어서 무시된다.
    • 7번 : TableA의 'ABC'의 값이 null이 된다. 즉 없는 값이 되어 이 값은 무시된다. 따라서 TableA를 주로 바라 보는 INNER / LEFT JOIN은 JOIN 할 것이 없어지기 때문에 Tombstone이 된다. 반면 TableB의 값은 남아있기 때문에 Right, Full Outer Join은 그대로 존재한다.

     

    Table - Table FK Join 생각해보기 

    Table - Table 외래키 Join을 할 경우 양쪽 테이블이 업데이트 될 때 마다 그 값은 바로 EMIT CHANGES에 반영된다.  ㄷ

    1. Tombstone 입력을 넣으면 Join한 테이블에도 Tombstone이 발생할 수 있다.
    2. Inner Join / Left Join의 Tombstone이 발생 기전이 다르다.

    여기서 눈여겨 봐야할 점은 위의 두 가지다. 위의 가정을 유의해서 아래를 살펴보자. 

    CREATE TABLE FAMILY(
    family_name varchar primary key, 
    parent_name varchar) 
    WITH 
    (kafka_topic='family',
    value_format='delimited');
    
    CREATE TABLE CHILD(
    child_name varchar primary key,
    family_name varchar) 
    WITH 
    (kafka_topic='child', 
    value_format='delimited');
    
    
    ----- JOIN 쿼리 생성
    CREATE TABLE JOIN_TABLE as
    SELECT c.child_name, c.family_name, f.family_name, f.parent_name
    FROM CHILD c
    LEFT JOIN family f ON c.family_name=f.family_name;

    이 명령을 이용하면 JOIN 쿼리를 생성할 수 있다. 비록 토픽에서 바로 테이블을 생성해서 정적이지만, Join 쿼리를 생성하면 Join한 테이블은 쿼리로 생성할 수 있다. 이 때 시간별로 동작하는 것을 아래에서 살펴보면 다음과 같다. 아래 표에서는 []안에 있는 값들이 각 테이블의 필드를 나타낸다. 

    시간 CHILD Family INNER JOIN LEFT JOIN
    1 [C1,null] -> tombstone      
    2   [F1, null] -> tombstone    
    3 [C1, F1]     [C1,F1,null,null]
    4   [F1,P1] [C1,F1,F1,P1] [C1,F1,F1,P1]
    5   [F1,P2] [C1,F1,F1,P2] [C1,F1,F1,P2]
    6 [C2,F1]   [C2,F1,F1,P2] [C2,F1,F1,P2]
    7 [C2,null] -> tombstone   [C2] tombstone [C2] tombstone
    8   [F1, null] [C1] tombstone [C1,F1, null, null]
    • 4 ~ 5번 시간 : F1의 Value 값이 업데이트 되면, 이 값은 즉시 EMIT CHANGES로 변경되는 것이 확인된다.  EMIT CHANGES 말고 전체 쿼리로 읽어오면 RDBMS에서 만들어진것처럼 마지막 업데이트 값인 [C1,F1,F1,P2]만 보인다. 
    • 7번 시간 : Child에 [C2, null]을 보내면 C2를 PK로 가지는 INNER JOIN / LEFT JOIN Table에서 FK 값이 null이 된다. 따라서 Join된 값이 모두 사라진다. 이런 이유 때문에 각 테이블에서 [C2, null, null, null]이 되며 Tombstone 상태가 된다.
    • 8번 시간 : Family에 [F1, null]을 보내면 FK F1과 Join 하는 녀석들에게 영향을 준다. INNER JOIN의 경우, FK가 없어지기 떄문에 PK C1에 대한 값은 아예 없어진다. 따라서 tombstone이 된다. 반면 LEFT JOIN은 FK가 없어진다고 하더라도 c.family_name이 존재하기 때문에 Tombstone은 되지 않는다.

     

     

     

    N-Way Joins

    N-Way Joins는 ksqlDB에서 지원하는 Join 종류라면 여러 번 가능하게 만들어주는 것을 의미한다. 예를 들어 스트림 x 스트림 x 스트림 Join도 가능하다. 이렇게 여러 Join을 할 때는, 각 Join마다 중간 스트림을 만들어주고 그 스트림에 다시 한번 Join 하는 형식이 된다. 이렇게 여러번 Join하는 것의 단점은 중간 조인 결과 중 하나라도 실패한다면 전체 쿼리가 실패한다.

    여기서 하나 유의할 점은 중간 스트림을 만들어 준다는 것은 실제로 ksqlDB 내부에 어떠한 리소스가 새로 생기는 것을 의미하지 않는다. 중간 스트림에 대해서 Kafka Topic / Stream / Query / Table 등이 실제로 생기는지 확인해봤을 때 추가적으로 만들어지는 것은 없었다. 아래 예시의 쿼리를 실행하면 딱 joined라는 Stream과 Query가 생성된다. 

    그 외에도 ChangeLog / repartition 된 것 역시 생성된다. 아직까지 이 부분의 동작은 명확히 이해할 수 없다. 

    CREATE STREAM joined AS 
      SELECT * 
      FROM A
        JOIN B ON A.id = B.product_id
        JOIN C ON A.id = C.purchased_id;

    예를 들어 위와 같은 형식으로 Stream Join을 두 번 작성할 수 있다. 이 때, A는 Stream이고 B,C는 각각 Table로 볼 수 있다. 이유는 다음과 같다.

    • B가 스트림이었으면 A와 Stream - Stream Join이기 때문에 WITHIN절이 필요하다.
    • C가 스트림이었으면 A - B의 중간 결과물은 Stream이기 때문에 Stream - Stream Join이 된다. 따라서 WITHIN 절이 필요하다.

    이런 이유 때문에 위의 예시에서 이야기 한 것은 Stream - Table Join의 결과가 된다. 

    CREATE STREAM A(
    >id varchar, value varchar) WITH (kafka_topic = 'A', value_format='delimited');
    
    CREATE TABLE B(
    >product_id varchar primary key, value varchar) WITH (kafka_topic = 'B', value_format='delimited');
    
    CREATE TABLE C(
    >purchase_id varchar primary key, value varchar) WITH (kafka_topic = 'C', value_format='delimited');

    위 명령을 이용해서 쿼리를 생성하고 실제로 Join을 해볼 수 있다.

     

    'Kafka eco-system > ksqlDB' 카테고리의 다른 글

    ksqlDB : Materialized View  (0) 2022.10.14
    ksqlDB : 실시간 스트림 처리의 동작 방식  (0) 2022.10.14
    ksqlDB : Window 정리  (0) 2022.10.12
    ksqlDB : Repartition  (0) 2022.10.10
    ksql DB : 기본 개념  (0) 2022.10.10

    댓글

    Designed by JB FACTORY