ksqlDB : Repartition

    들어가기 전

    Stream / Table은 Key에 따라서 파티셔닝된다. 그런데 기존에 생성된 스트림을 그냥 읽기만 하는 경우에는 큰 문제가 없다. 왜냐하면 ksqlDB 내부에서 데이터를 처리할 필요가 없기 때문이다. 그렇지만 Join 같은 작업들을 하게 되면 파티션의 갯수는 문제가 된다. 왜냐하면 Join의 제약 조건 중 하나는 동일한 파티션 갯수를 가지는 것이기 때문이다. 

    동일한 파티션 갯수를 가지지 않는 스트림/테이블을 Join 하기 위해서는 파티션의 갯수를 맞춰줘야한다. 이렇게 파티션의 갯수를 맞춰주는 방법으로 Repartition이 존재한다. 여기서는 Repartition에 대해서 간략히 정리하고자 한다. 

     

    리파티션 하기 

    • 파티션 1개만 가지는 토픽을 만들고, 스트림을 생성한다.
    • 생성한 스트림에서 파티션을 6개 가지는 스트림을 생성한다. 

     

    파티션 1개만 가지는 토픽을 만들고, 스트림을 생성

    여기서는 위의 두 가지를 실습하고자 한다. 

    # 파티션 1개인 토픽 생성 
    $ docker exec -it broker kafka-topics --bootstrap-server localhost:9092 --create --topic USERPROFILE --partitions 1

    위 명령어를 이용해서 토픽을 생성할 수 있다. 

    CREATE STREAM USERPROFILE1(
    >userid VARCHAR, firstname VARCHAR, lastname VARCHAR, rating DOUBLE, ts BIGINT)
    >WITH(
    >kafka_topic = 'USERPROFILE',
    >value_format = 'json');

    위 명령어를 이용해서 파티션이 1개인 토픽에서 스트림을 하나 생성할 수 있다. 

    생성된 스트림을 확인해보면 파티션이 1개인 것을 확인할 수 있다. 

    또한 Key가 생성되어 있지 않은 것을 볼 수 있다. 이것은 스트림을 생성할 때 Key를 지정해주지 않았기 때문이다.

    • print 'USERPROFILE' 명령어를 이용하면 현재 들어오고 있는 변화를 확인할 수 있다. 
    • 이 명령어를 이용해서 현재 파티션 상태를 확인해보니 모든 값이 파티션 0으로 들어오는 것을 확인할 수 있다. 

     

    파티션 6개로 리파티션

    CREATE STREAM userprofile_rekeyed10
    >WITH (partitions = 6) as
    >SELECT * FROM USERPROFILE 
    >PARTITION BY firstname;
    • 위 명령어를 이용해서 파티셔닝 된 새로운 파티션을 생성한다. 
    • 이 명령어는 6개의 파티션을 새로 생성하고, 파티셔닝 할 때 사용할 Key로는 'firstname'을 사용하겠다는 것을 의미한다. 

    • 생성된 스트림을 확인해보면 FIRSTNAME이 Key가 된 것을 확인할 수 있다. 

    • 또한 파티션이 6개로 생성된 것을 확인할 수 있다. 
    • 이 때 KAFKA_TOPIC이 USERPROFILE_REKEYED10으로 되어있는 것을 볼 수 있다. 이것은 이 스트림에서 사용할 데이터를 저장하기 위해 USERPROFILE_REKEYD10이라는 토픽을 카프카 브로커에 생성했다는 것을 의미한다. 

    Docker Desktop에서 Broker의 Volume을 확인해보면 Topic이 생성되었고 파티셔닝이 0~5번까지 되어있는 것을 확인할 수 있다. 

    파티션이 정상적으로 나누어져서 들어가는지 확인하기 위해서 PRINT 'USERPROFILE_REKEYED10'을 사용했다. 파티션이 3 → 3 → 5 등으로 정상적으로 나누어져서 들어가는 것을 확인할 수 있었다. 

     

     

    스트림 Join 시, 리파티셔닝 관련

    1. Key가 없는 스트림과 Key가 있는 스트림을 Join 해본다. 
    2. 서로 다른 Key로 파티셔닝 된 스트림을 Join 해본다. 

    파티셔닝한 스트림 생성

    ---- Key 없는 스트림 생성
    CREATE STREAM USERPROFILE_RE1 
    WITH (PARTITIONS = 6)
    SELECT * FROM USERPROFILE
    
    --- Key = firstname인 스트림 생성
    CREATE STREAM USERPROFILE_RE2
    WITH (PARTITIONS = 6)
    SELECT * FROM USERPROFILE 
    PARTITION BY firstname
    
    --- Key = lastname인 스트림 생성
    CREATE STREAM USERPROFILE_RE3
    WITH (PARTITIONS = 6)
    SELECT * FROM USERPROFILE 
    PARTITION BY lastname
    
    --- Key = firstname인 스트림 생성
    CREATE STREAM USERPROFILE_RE4
    WITH (PARTITIONS = 6)
    SELECT * FROM USERPROFILE 
    PARTITION BY firstname

    위 명령어를 이용해서 파티셔닝한 스트림을 생성한다. 이렇게 생성한 스트림으로 새로운 스트림을 Join 해본다.

     

    키 없는 스트림 / 키 있는 스트림의 Join 실행

    CREATE STREAM USERPROFILE_JOIN as
    SELECT re1.firstname, re2.lastname, re2.userid
    FROM USERPROFILE_RE1 re1
    LEFT JOIN USERPROFILE_RE2 WITHIN 10 minutes ON re2.firstname = re1.firstname;

    위 명령을 이용해서 키 없는 스트림과 키 있는 스트림의 Join을 실행할 수 있다. 

    Name                 : USERPROFILE_RE1                                         
     Field       | Type                                                            
    -------------------------------                                                
     USERID      | VARCHAR(STRING)                                                 
     FIRSTNAME   | VARCHAR(STRING)                                                 
     LASTNAME    | VARCHAR(STRING)                                                 
     COUNTRYCODE | VARCHAR(STRING)                                                 
     RATING      | DOUBLE                                                          
     TS          | BIGINT                                                          
    --------------------------------------
    Name                 : USERPROFILE_RE2
     Field       | Type                   
    --------------------------------------
     FIRSTNAME   | VARCHAR(STRING)  (key)
     USERID      | VARCHAR(STRING)
     LASTNAME    | VARCHAR(STRING)
     COUNTRYCODE | VARCHAR(STRING)
     RATING      | DOUBLE
     TS          | BIGINT
    --------------------------------------

    먼저 Join을 하게 되면서 USERPROFILE_RE1에 대해서 새롭게 Key가 설정되었다거나 하지 않는 것을 볼 수 있다. 

    Kafka topic          : USERPROFILE_RE1
    Max lag              : 0
    
     Partition | Start Offset | End Offset | Offset | Lag
    ------------------------------------------------------
     0         | 0            | 0          | 0      | 0
     1         | 0            | 0          | 0      | 0
     2         | 0            | 0          | 0      | 0
     3         | 0            | 0          | 0      | 0
     4         | 0            | 0          | 0      | 0
     5         | 0            | 19         | 19     | 0
    ------------------------------------------------------
    
    Kafka topic          : USERPROFILE_RE2
    Max lag              : 0
    
     Partition | Start Offset | End Offset | Offset | Lag
    ------------------------------------------------------
     0         | 0            | 1          | 1      | 0
     1         | 0            | 0          | 0      | 0
     2         | 0            | 1          | 1      | 0
     3         | 0            | 6          | 6      | 0
     4         | 0            | 5          | 5      | 0
     5         | 0            | 6          | 6      | 0
    ------------------------------------------------------
    
    Kafka topic          : _confluent-ksql-default_query_CSAS_USERPROFILE_JOIN_29-Join-left-repartition
    Max lag              : 0
    
     Partition | Start Offset | End Offset | Offset | Lag
    ------------------------------------------------------
     0         | 0            | 1          | 1      | 0
     1         | 0            | 0          | 0      | 0
     2         | 1            | 1          | 1      | 0
     3         | 0            | 6          | 6      | 0
     4         | 1            | 5          | 5      | 0
     5         | 1            | 6          | 6      | 0
    ------------------------------------------------------

    실행 결과를 살펴보면 다음과 같다. 

    • USERPROFILE_RE1은 5번 파티션에만 메세지가 있었다.
    • USERPROFILE_RE2는 여러 파티션에 메세지가 있었다.
    • JOIN 스트림을 만들 때, Kafka Topic명을 보면 '****-Join-left-repartition'이 있는 것을 볼 수 있다. 즉, USERPROFILE이 리파티셔닝 되서 새로운 쿼리가 만들어졌다는 것을 의미한다. 
    rowtime: 2022/10/10 12:14:37.250 Z, key: Fawcett, value: {"RE1_FIRSTNAME":"Carol"}, partition: 1
    rowtime: 2022/10/10 12:14:37.250 Z, key: Fawcett, value: {"RE1_FIRSTNAME":"Alice"}, partition: 1
    rowtime: 2022/10/10 12:14:37.250 Z, key: Fawcett, value: {"RE1_FIRSTNAME":"Grace"}, partition: 1
    rowtime: 2022/10/10 12:14:37.250 Z, key: Fawcett, value: {"RE1_FIRSTNAME":"Bob"}, partition: 1
    rowtime: 2022/10/10 12:14:37.250 Z, key: Fawcett, value: {"RE1_FIRSTNAME":"Heidi"}, partition: 1
    rowtime: 2022/10/10 12:14:37.250 Z, key: Fawcett, value: {"RE1_FIRSTNAME":"Heidi"}, partition: 1
    rowtime: 2022/10/10 12:14:37.250 Z, key: Fawcett, value: {"RE1_FIRSTNAME":"Heidi"}, partition: 1
    rowtime: 2022/10/10 12:14:37.250 Z, key: Fawcett, value: {"RE1_FIRSTNAME":"Heidi"}, partition: 1
    rowtime: 2022/10/10 12:14:37.250 Z, key: Fawcett, value: {"RE1_FIRSTNAME":"Heidi"}, partition: 1
    rowtime: 2022/10/10 12:14:38.264 Z, key: Smith, value: {"RE1_FIRSTNAME":"Frank"}, partition: 1
    rowtime: 2022/10/10 12:14:38.264 Z, key: Smith, value: {"RE1_FIRSTNAME":"Heidi"}, partition: 1
    rowtime: 2022/10/10 12:14:38.264 Z, key: Smith, value: {"RE1_FIRSTNAME":"Heidi"}, partition: 1
    rowtime: 2022/10/10 12:14:40.285 Z, key: Coen, value: {"RE1_FIRSTNAME":"Frank"}, partition: 1
    rowtime: 2022/10/10 12:14:40.285 Z, key: Coen, value: {"RE1_FIRSTNAME":"Alice"}, partition: 1
    rowtime: 2022/10/10 12:14:40.285 Z, key: Coen, value: {"RE1_FIRSTNAME":"Dan"}, partition: 1
    rowtime: 2022/10/10 12:14:40.285 Z, key: Coen, value: {"RE1_FIRSTNAME":"Dan"}, partition: 1
    rowtime: 2022/10/10 12:14:40.285 Z, key: Coen, value: {"RE1_FIRSTNAME":"Dan"}, partition: 1
    rowtime: 2022/10/10 12:14:42.306 Z, key: Coen, value: {"RE1_FIRSTNAME":"Frank"}, partition: 1
    rowtime: 2022/10/10 12:14:42.306 Z, key: Coen, value: {"RE1_FIRSTNAME":"Alice"}, partition: 1
    rowtime: 2022/10/10 12:14:42.306 Z, key: Coen, value: {"RE1_FIRSTNAME":"Dan"}, partition: 1
    rowtime: 2022/10/10 12:14:42.306 Z, key: Coen, value: {"RE1_FIRSTNAME":"Bob"}, partition: 1
    rowtime: 2022/10/10 12:14:42.306 Z, key: Coen, value: {"RE1_FIRSTNAME":"Bob"}, partition: 1
    rowtime: 2022/10/10 12:14:42.306 Z, key: Coen, value: {"RE1_FIRSTNAME":"Bob"}, partition: 1
    rowtime: 2022/10/10 12:14:42.306 Z, key: Coen, value: {"RE1_FIRSTNAME":"Bob"}, partition: 1
    rowtime: 2022/10/10 12:14:44.328 Z, key: Smith, value: {"RE1_FIRSTNAME":"Frank"}, partition: 1
    rowtime: 2022/10/10 12:14:44.328 Z, key: Smith, value: {"RE1_FIRSTNAME":"Heidi"}, partition: 1
    rowtime: 2022/10/10 12:14:44.328 Z, key: Smith, value: {"RE1_FIRSTNAME":"Bob"}, partition: 1
    rowtime: 2022/10/10 12:14:44.328 Z, key: Smith, value: {"RE1_FIRSTNAME":"Bob"}, partition: 1
    rowtime: 2022/10/10 12:14:44.328 Z, key: Smith, value: {"RE1_FIRSTNAME":"Bob"}, partition: 1
    rowtime: 2022/10/10 12:14:34.216 Z, key: Edison, value: {"RE1_FIRSTNAME":"Grace"}, partition: 0
    rowtime: 2022/10/10 12:14:39.274 Z, key: Edison, value: {"RE1_FIRSTNAME":"Grace"}, partition: 0
    rowtime: 2022/10/10 12:14:39.274 Z, key: Edison, value: {"RE1_FIRSTNAME":"Grace"}, partition: 0
    rowtime: 2022/10/10 12:14:39.274 Z, key: Edison, value: {"RE1_FIRSTNAME":"Grace"}, partition: 0
    rowtime: 2022/10/10 12:14:45.338 Z, key: Edison, value: {"RE1_FIRSTNAME":"Grace"}, partition: 0
    rowtime: 2022/10/10 12:14:45.338 Z, key: Edison, value: {"RE1_FIRSTNAME":"Grace"}, partition: 0
    rowtime: 2022/10/10 12:14:45.338 Z, key: Edison, value: {"RE1_FIRSTNAME":"Heidi"}, partition: 0
    rowtime: 2022/10/10 12:14:45.338 Z, key: Edison, value: {"RE1_FIRSTNAME":"Heidi"}, partition: 0
    rowtime: 2022/10/10 12:14:45.338 Z, key: Edison, value: {"RE1_FIRSTNAME":"Heidi"}, partition: 0

    아직까지는 정확히 이해할 수는 없지만, USERPROFILE_JOIN의 결과에서 파티션이 5번이 아니라는 것을 볼 수 있다. 

     

    서로 다른 키의 스트림의 Join 실행

    CREATE STREAM USERPROFILE_JOIN1 as
    SELECT re2.firstname, re2.lastname, re3.userid
    FROM USERPROFILE_RE2 re2
    LEFT JOIN USERPROFILE_RE3 WITHIN 10 minutes ON re3.firstname = re2.firstname;

    RE2는 firstName이 Key다. 그리고 RE3는 LastName이 Key다. 따라서 서로 다른 Key를 가진 스트림을 Join 할 때 어떻게 동작하는지를 확인한다. 

    Kafka topic          : _confluent-ksql-default_query_CSAS_USERPROFILE_JOIN2_49-Join-right-repartition
    Max lag              : 0
    
     Partition | Start Offset | End Offset | Offset | Lag
    ------------------------------------------------------
     0         | 0            | 3          | 3      | 0
     1         | 0            | 0          | 0      | 0
     2         | 1            | 1          | 1      | 0
     3         | 0            | 0          | 0      | 0
     4         | 1            | 2          | 2      | 0
     5         | 1            | 3          | 3      | 0
    ------------------------------------------------------
    
    Kafka topic          : USERPROFILE_RE2
    Max lag              : 6
    
     Partition | Start Offset | End Offset | Offset | Lag
    ------------------------------------------------------
     0         | 0            | 4          | 4      | 0
     1         | 0            | 0          | 0      | 0
     2         | 0            | 2          | 2      | 0
     3         | 0            | 6          | 0      | 6
     4         | 0            | 7          | 7      | 0
     5         | 0            | 9          | 9      | 0
    ------------------------------------------------------
    
    Kafka topic          : USERPROFILE_RE3
    Max lag              : 0
    
     Partition | Start Offset | End Offset | Offset | Lag
    ------------------------------------------------------
     0         | 0            | 6          | 6      | 0
     1         | 0            | 15         | 15     | 0
     2         | 0            | 0          | 0      | 0
     3         | 0            | 7          | 7      | 0
     4         | 0            | 0          | 0      | 0
     5         | 0            | 0          | 0      | 0
    ------------------------------------------------------

    실행 결과를 살펴보면 다음과 같다.

    • RE2 / RE3가 만들어 진 후에 나중에 JOIN 스트림이 만들어져서 숫자 자체는 맞지 않다.
    • 새롭게 생성된 Join 스트림의 이름에 'Join-right-repartition' 라는 것이 있다. 즉 Join의 오른쪽에 있는 스트림이 Repartition 되었다는 것을 의미한다. 
    • 왼쪽은 FirstName이 Key 였고, 오른쪽은 LastName이 Key였다. 그리고 Join은 FirstName으로 했다. 따라서 오른쪽을 FirstName을 Repartition 하고 그 결과로 Join 해서 스트림을 생성했다. 

     

    같은 키로 파티셔닝 된 스트림의 Join

    CREATE STREAM USERPROFILE_JOIN4 as
    SELECT re2.firstname, re4.lastname
    FROM USERPROFILE_RE2 re2
    LEFT JOIN USERPROFILE_RE4 re4 WITHIN 10 minutes ON re2.firstname = re4.firstname;

    위 명령어를 이용해서 같은 키로 파티셔닝 된 스트림의 조인을 해볼 수 있다. 

    Consumer Group       : _confluent-ksql-default_query_CSAS_USERPROFILE_JOIN4_53
    
    Kafka topic          : USERPROFILE_RE2
    Max lag              : 9
    
     Partition | Start Offset | End Offset | Offset | Lag
    ------------------------------------------------------
     0         | 0            | 4          | 0      | 4
     1         | 0            | 0          | 0      | 0
     2         | 0            | 3          | 3      | 0
     3         | 0            | 7          | 7      | 0
     4         | 0            | 8          | 8      | 0
     5         | 0            | 9          | 0      | 9
    ------------------------------------------------------
    
    Kafka topic          : USERPROFILE_RE4
    Max lag              : 0
    
     Partition | Start Offset | End Offset | Offset | Lag
    ------------------------------------------------------
     0         | 0            | 0          | 0      | 0
     1         | 0            | 0          | 0      | 0
     2         | 0            | 1          | 1      | 0
     3         | 0            | 1          | 1      | 0
     4         | 0            | 1          | 1      | 0
     5         | 0            | 0          | 0      | 0
    ------------------------------------------------------

    생성된 스트림을 살펴보면 앞의 스트림과는 다르게 이름에 'Repartition'이 표기된 스트림이 없는 것을 볼 수 있다. 즉, 같은 키로 파티셔닝 된 스트림을 조인할 때는 새롭게 리파티셔닝을 하지 않는 것을 확인할 수 있다. 

     

     

    리파티션 결론

    • PARTITION BY를 이용해서 리파티션을 할 수 있다. 
    • 새로운 스트림을 생성하게 되면 브로커에 토픽이 생성된다. 이 때, 파티션 갯수에 따라 파티션 된 것도 브로커 토픽에 반영되어있다. 
    • Join을 할 때 파티셔닝 된 키가 다르다면, 내부적으로 해당 Key를 기준으로 파티셔닝을 한 토픽을 새로 생성한 후에 Join을 수행한다
    • Join을 할 때 사용하는 Key와 파티셔닝에 사용된 Key가 다른 경우, ksqlDB는 내부적으로 Join Key를 기준으로 파티셔닝 된 스트림을 토픽 형식으로 생성하고, 그 토픽으로 Join을 한다. 

     

     

     

    'Kafka eco-system > ksqlDB' 카테고리의 다른 글

    ksqlDB : Materialized View  (0) 2022.10.14
    ksqlDB : 실시간 스트림 처리의 동작 방식  (0) 2022.10.14
    ksqlDB : Window 정리  (0) 2022.10.12
    ksqlDB : ksqlDB의 Join  (1) 2022.10.11
    ksql DB : 기본 개념  (0) 2022.10.10

    댓글

    Designed by JB FACTORY