ksqlDB : Repartition
- Kafka eco-system/ksqlDB
- 2022. 10. 10.
들어가기 전
Stream / Table은 Key에 따라서 파티셔닝된다. 그런데 기존에 생성된 스트림을 그냥 읽기만 하는 경우에는 큰 문제가 없다. 왜냐하면 ksqlDB 내부에서 데이터를 처리할 필요가 없기 때문이다. 그렇지만 Join 같은 작업들을 하게 되면 파티션의 갯수는 문제가 된다. 왜냐하면 Join의 제약 조건 중 하나는 동일한 파티션 갯수를 가지는 것이기 때문이다.
동일한 파티션 갯수를 가지지 않는 스트림/테이블을 Join 하기 위해서는 파티션의 갯수를 맞춰줘야한다. 이렇게 파티션의 갯수를 맞춰주는 방법으로 Repartition이 존재한다. 여기서는 Repartition에 대해서 간략히 정리하고자 한다.
리파티션 하기
- 파티션 1개만 가지는 토픽을 만들고, 스트림을 생성한다.
- 생성한 스트림에서 파티션을 6개 가지는 스트림을 생성한다.
파티션 1개만 가지는 토픽을 만들고, 스트림을 생성
여기서는 위의 두 가지를 실습하고자 한다.
# 파티션 1개인 토픽 생성
$ docker exec -it broker kafka-topics --bootstrap-server localhost:9092 --create --topic USERPROFILE --partitions 1
위 명령어를 이용해서 토픽을 생성할 수 있다.
CREATE STREAM USERPROFILE1(
>userid VARCHAR, firstname VARCHAR, lastname VARCHAR, rating DOUBLE, ts BIGINT)
>WITH(
>kafka_topic = 'USERPROFILE',
>value_format = 'json');
위 명령어를 이용해서 파티션이 1개인 토픽에서 스트림을 하나 생성할 수 있다.
생성된 스트림을 확인해보면 파티션이 1개인 것을 확인할 수 있다.
또한 Key가 생성되어 있지 않은 것을 볼 수 있다. 이것은 스트림을 생성할 때 Key를 지정해주지 않았기 때문이다.
- print 'USERPROFILE' 명령어를 이용하면 현재 들어오고 있는 변화를 확인할 수 있다.
- 이 명령어를 이용해서 현재 파티션 상태를 확인해보니 모든 값이 파티션 0으로 들어오는 것을 확인할 수 있다.
파티션 6개로 리파티션
CREATE STREAM userprofile_rekeyed10
>WITH (partitions = 6) as
>SELECT * FROM USERPROFILE
>PARTITION BY firstname;
- 위 명령어를 이용해서 파티셔닝 된 새로운 파티션을 생성한다.
- 이 명령어는 6개의 파티션을 새로 생성하고, 파티셔닝 할 때 사용할 Key로는 'firstname'을 사용하겠다는 것을 의미한다.
- 생성된 스트림을 확인해보면 FIRSTNAME이 Key가 된 것을 확인할 수 있다.
- 또한 파티션이 6개로 생성된 것을 확인할 수 있다.
- 이 때 KAFKA_TOPIC이 USERPROFILE_REKEYED10으로 되어있는 것을 볼 수 있다. 이것은 이 스트림에서 사용할 데이터를 저장하기 위해 USERPROFILE_REKEYD10이라는 토픽을 카프카 브로커에 생성했다는 것을 의미한다.
Docker Desktop에서 Broker의 Volume을 확인해보면 Topic이 생성되었고 파티셔닝이 0~5번까지 되어있는 것을 확인할 수 있다.
파티션이 정상적으로 나누어져서 들어가는지 확인하기 위해서 PRINT 'USERPROFILE_REKEYED10'을 사용했다. 파티션이 3 → 3 → 5 등으로 정상적으로 나누어져서 들어가는 것을 확인할 수 있었다.
스트림 Join 시, 리파티셔닝 관련
- Key가 없는 스트림과 Key가 있는 스트림을 Join 해본다.
- 서로 다른 Key로 파티셔닝 된 스트림을 Join 해본다.
파티셔닝한 스트림 생성
---- Key 없는 스트림 생성
CREATE STREAM USERPROFILE_RE1
WITH (PARTITIONS = 6)
SELECT * FROM USERPROFILE
--- Key = firstname인 스트림 생성
CREATE STREAM USERPROFILE_RE2
WITH (PARTITIONS = 6)
SELECT * FROM USERPROFILE
PARTITION BY firstname
--- Key = lastname인 스트림 생성
CREATE STREAM USERPROFILE_RE3
WITH (PARTITIONS = 6)
SELECT * FROM USERPROFILE
PARTITION BY lastname
--- Key = firstname인 스트림 생성
CREATE STREAM USERPROFILE_RE4
WITH (PARTITIONS = 6)
SELECT * FROM USERPROFILE
PARTITION BY firstname
위 명령어를 이용해서 파티셔닝한 스트림을 생성한다. 이렇게 생성한 스트림으로 새로운 스트림을 Join 해본다.
키 없는 스트림 / 키 있는 스트림의 Join 실행
CREATE STREAM USERPROFILE_JOIN as
SELECT re1.firstname, re2.lastname, re2.userid
FROM USERPROFILE_RE1 re1
LEFT JOIN USERPROFILE_RE2 WITHIN 10 minutes ON re2.firstname = re1.firstname;
위 명령을 이용해서 키 없는 스트림과 키 있는 스트림의 Join을 실행할 수 있다.
Name : USERPROFILE_RE1
Field | Type
-------------------------------
USERID | VARCHAR(STRING)
FIRSTNAME | VARCHAR(STRING)
LASTNAME | VARCHAR(STRING)
COUNTRYCODE | VARCHAR(STRING)
RATING | DOUBLE
TS | BIGINT
--------------------------------------
Name : USERPROFILE_RE2
Field | Type
--------------------------------------
FIRSTNAME | VARCHAR(STRING) (key)
USERID | VARCHAR(STRING)
LASTNAME | VARCHAR(STRING)
COUNTRYCODE | VARCHAR(STRING)
RATING | DOUBLE
TS | BIGINT
--------------------------------------
먼저 Join을 하게 되면서 USERPROFILE_RE1에 대해서 새롭게 Key가 설정되었다거나 하지 않는 것을 볼 수 있다.
Kafka topic : USERPROFILE_RE1
Max lag : 0
Partition | Start Offset | End Offset | Offset | Lag
------------------------------------------------------
0 | 0 | 0 | 0 | 0
1 | 0 | 0 | 0 | 0
2 | 0 | 0 | 0 | 0
3 | 0 | 0 | 0 | 0
4 | 0 | 0 | 0 | 0
5 | 0 | 19 | 19 | 0
------------------------------------------------------
Kafka topic : USERPROFILE_RE2
Max lag : 0
Partition | Start Offset | End Offset | Offset | Lag
------------------------------------------------------
0 | 0 | 1 | 1 | 0
1 | 0 | 0 | 0 | 0
2 | 0 | 1 | 1 | 0
3 | 0 | 6 | 6 | 0
4 | 0 | 5 | 5 | 0
5 | 0 | 6 | 6 | 0
------------------------------------------------------
Kafka topic : _confluent-ksql-default_query_CSAS_USERPROFILE_JOIN_29-Join-left-repartition
Max lag : 0
Partition | Start Offset | End Offset | Offset | Lag
------------------------------------------------------
0 | 0 | 1 | 1 | 0
1 | 0 | 0 | 0 | 0
2 | 1 | 1 | 1 | 0
3 | 0 | 6 | 6 | 0
4 | 1 | 5 | 5 | 0
5 | 1 | 6 | 6 | 0
------------------------------------------------------
실행 결과를 살펴보면 다음과 같다.
- USERPROFILE_RE1은 5번 파티션에만 메세지가 있었다.
- USERPROFILE_RE2는 여러 파티션에 메세지가 있었다.
- JOIN 스트림을 만들 때, Kafka Topic명을 보면 '****-Join-left-repartition'이 있는 것을 볼 수 있다. 즉, USERPROFILE이 리파티셔닝 되서 새로운 쿼리가 만들어졌다는 것을 의미한다.
rowtime: 2022/10/10 12:14:37.250 Z, key: Fawcett, value: {"RE1_FIRSTNAME":"Carol"}, partition: 1
rowtime: 2022/10/10 12:14:37.250 Z, key: Fawcett, value: {"RE1_FIRSTNAME":"Alice"}, partition: 1
rowtime: 2022/10/10 12:14:37.250 Z, key: Fawcett, value: {"RE1_FIRSTNAME":"Grace"}, partition: 1
rowtime: 2022/10/10 12:14:37.250 Z, key: Fawcett, value: {"RE1_FIRSTNAME":"Bob"}, partition: 1
rowtime: 2022/10/10 12:14:37.250 Z, key: Fawcett, value: {"RE1_FIRSTNAME":"Heidi"}, partition: 1
rowtime: 2022/10/10 12:14:37.250 Z, key: Fawcett, value: {"RE1_FIRSTNAME":"Heidi"}, partition: 1
rowtime: 2022/10/10 12:14:37.250 Z, key: Fawcett, value: {"RE1_FIRSTNAME":"Heidi"}, partition: 1
rowtime: 2022/10/10 12:14:37.250 Z, key: Fawcett, value: {"RE1_FIRSTNAME":"Heidi"}, partition: 1
rowtime: 2022/10/10 12:14:37.250 Z, key: Fawcett, value: {"RE1_FIRSTNAME":"Heidi"}, partition: 1
rowtime: 2022/10/10 12:14:38.264 Z, key: Smith, value: {"RE1_FIRSTNAME":"Frank"}, partition: 1
rowtime: 2022/10/10 12:14:38.264 Z, key: Smith, value: {"RE1_FIRSTNAME":"Heidi"}, partition: 1
rowtime: 2022/10/10 12:14:38.264 Z, key: Smith, value: {"RE1_FIRSTNAME":"Heidi"}, partition: 1
rowtime: 2022/10/10 12:14:40.285 Z, key: Coen, value: {"RE1_FIRSTNAME":"Frank"}, partition: 1
rowtime: 2022/10/10 12:14:40.285 Z, key: Coen, value: {"RE1_FIRSTNAME":"Alice"}, partition: 1
rowtime: 2022/10/10 12:14:40.285 Z, key: Coen, value: {"RE1_FIRSTNAME":"Dan"}, partition: 1
rowtime: 2022/10/10 12:14:40.285 Z, key: Coen, value: {"RE1_FIRSTNAME":"Dan"}, partition: 1
rowtime: 2022/10/10 12:14:40.285 Z, key: Coen, value: {"RE1_FIRSTNAME":"Dan"}, partition: 1
rowtime: 2022/10/10 12:14:42.306 Z, key: Coen, value: {"RE1_FIRSTNAME":"Frank"}, partition: 1
rowtime: 2022/10/10 12:14:42.306 Z, key: Coen, value: {"RE1_FIRSTNAME":"Alice"}, partition: 1
rowtime: 2022/10/10 12:14:42.306 Z, key: Coen, value: {"RE1_FIRSTNAME":"Dan"}, partition: 1
rowtime: 2022/10/10 12:14:42.306 Z, key: Coen, value: {"RE1_FIRSTNAME":"Bob"}, partition: 1
rowtime: 2022/10/10 12:14:42.306 Z, key: Coen, value: {"RE1_FIRSTNAME":"Bob"}, partition: 1
rowtime: 2022/10/10 12:14:42.306 Z, key: Coen, value: {"RE1_FIRSTNAME":"Bob"}, partition: 1
rowtime: 2022/10/10 12:14:42.306 Z, key: Coen, value: {"RE1_FIRSTNAME":"Bob"}, partition: 1
rowtime: 2022/10/10 12:14:44.328 Z, key: Smith, value: {"RE1_FIRSTNAME":"Frank"}, partition: 1
rowtime: 2022/10/10 12:14:44.328 Z, key: Smith, value: {"RE1_FIRSTNAME":"Heidi"}, partition: 1
rowtime: 2022/10/10 12:14:44.328 Z, key: Smith, value: {"RE1_FIRSTNAME":"Bob"}, partition: 1
rowtime: 2022/10/10 12:14:44.328 Z, key: Smith, value: {"RE1_FIRSTNAME":"Bob"}, partition: 1
rowtime: 2022/10/10 12:14:44.328 Z, key: Smith, value: {"RE1_FIRSTNAME":"Bob"}, partition: 1
rowtime: 2022/10/10 12:14:34.216 Z, key: Edison, value: {"RE1_FIRSTNAME":"Grace"}, partition: 0
rowtime: 2022/10/10 12:14:39.274 Z, key: Edison, value: {"RE1_FIRSTNAME":"Grace"}, partition: 0
rowtime: 2022/10/10 12:14:39.274 Z, key: Edison, value: {"RE1_FIRSTNAME":"Grace"}, partition: 0
rowtime: 2022/10/10 12:14:39.274 Z, key: Edison, value: {"RE1_FIRSTNAME":"Grace"}, partition: 0
rowtime: 2022/10/10 12:14:45.338 Z, key: Edison, value: {"RE1_FIRSTNAME":"Grace"}, partition: 0
rowtime: 2022/10/10 12:14:45.338 Z, key: Edison, value: {"RE1_FIRSTNAME":"Grace"}, partition: 0
rowtime: 2022/10/10 12:14:45.338 Z, key: Edison, value: {"RE1_FIRSTNAME":"Heidi"}, partition: 0
rowtime: 2022/10/10 12:14:45.338 Z, key: Edison, value: {"RE1_FIRSTNAME":"Heidi"}, partition: 0
rowtime: 2022/10/10 12:14:45.338 Z, key: Edison, value: {"RE1_FIRSTNAME":"Heidi"}, partition: 0
아직까지는 정확히 이해할 수는 없지만, USERPROFILE_JOIN의 결과에서 파티션이 5번이 아니라는 것을 볼 수 있다.
서로 다른 키의 스트림의 Join 실행
CREATE STREAM USERPROFILE_JOIN1 as
SELECT re2.firstname, re2.lastname, re3.userid
FROM USERPROFILE_RE2 re2
LEFT JOIN USERPROFILE_RE3 WITHIN 10 minutes ON re3.firstname = re2.firstname;
RE2는 firstName이 Key다. 그리고 RE3는 LastName이 Key다. 따라서 서로 다른 Key를 가진 스트림을 Join 할 때 어떻게 동작하는지를 확인한다.
Kafka topic : _confluent-ksql-default_query_CSAS_USERPROFILE_JOIN2_49-Join-right-repartition
Max lag : 0
Partition | Start Offset | End Offset | Offset | Lag
------------------------------------------------------
0 | 0 | 3 | 3 | 0
1 | 0 | 0 | 0 | 0
2 | 1 | 1 | 1 | 0
3 | 0 | 0 | 0 | 0
4 | 1 | 2 | 2 | 0
5 | 1 | 3 | 3 | 0
------------------------------------------------------
Kafka topic : USERPROFILE_RE2
Max lag : 6
Partition | Start Offset | End Offset | Offset | Lag
------------------------------------------------------
0 | 0 | 4 | 4 | 0
1 | 0 | 0 | 0 | 0
2 | 0 | 2 | 2 | 0
3 | 0 | 6 | 0 | 6
4 | 0 | 7 | 7 | 0
5 | 0 | 9 | 9 | 0
------------------------------------------------------
Kafka topic : USERPROFILE_RE3
Max lag : 0
Partition | Start Offset | End Offset | Offset | Lag
------------------------------------------------------
0 | 0 | 6 | 6 | 0
1 | 0 | 15 | 15 | 0
2 | 0 | 0 | 0 | 0
3 | 0 | 7 | 7 | 0
4 | 0 | 0 | 0 | 0
5 | 0 | 0 | 0 | 0
------------------------------------------------------
실행 결과를 살펴보면 다음과 같다.
- RE2 / RE3가 만들어 진 후에 나중에 JOIN 스트림이 만들어져서 숫자 자체는 맞지 않다.
- 새롭게 생성된 Join 스트림의 이름에 'Join-right-repartition' 라는 것이 있다. 즉 Join의 오른쪽에 있는 스트림이 Repartition 되었다는 것을 의미한다.
- 왼쪽은 FirstName이 Key 였고, 오른쪽은 LastName이 Key였다. 그리고 Join은 FirstName으로 했다. 따라서 오른쪽을 FirstName을 Repartition 하고 그 결과로 Join 해서 스트림을 생성했다.
같은 키로 파티셔닝 된 스트림의 Join
CREATE STREAM USERPROFILE_JOIN4 as
SELECT re2.firstname, re4.lastname
FROM USERPROFILE_RE2 re2
LEFT JOIN USERPROFILE_RE4 re4 WITHIN 10 minutes ON re2.firstname = re4.firstname;
위 명령어를 이용해서 같은 키로 파티셔닝 된 스트림의 조인을 해볼 수 있다.
Consumer Group : _confluent-ksql-default_query_CSAS_USERPROFILE_JOIN4_53
Kafka topic : USERPROFILE_RE2
Max lag : 9
Partition | Start Offset | End Offset | Offset | Lag
------------------------------------------------------
0 | 0 | 4 | 0 | 4
1 | 0 | 0 | 0 | 0
2 | 0 | 3 | 3 | 0
3 | 0 | 7 | 7 | 0
4 | 0 | 8 | 8 | 0
5 | 0 | 9 | 0 | 9
------------------------------------------------------
Kafka topic : USERPROFILE_RE4
Max lag : 0
Partition | Start Offset | End Offset | Offset | Lag
------------------------------------------------------
0 | 0 | 0 | 0 | 0
1 | 0 | 0 | 0 | 0
2 | 0 | 1 | 1 | 0
3 | 0 | 1 | 1 | 0
4 | 0 | 1 | 1 | 0
5 | 0 | 0 | 0 | 0
------------------------------------------------------
생성된 스트림을 살펴보면 앞의 스트림과는 다르게 이름에 'Repartition'이 표기된 스트림이 없는 것을 볼 수 있다. 즉, 같은 키로 파티셔닝 된 스트림을 조인할 때는 새롭게 리파티셔닝을 하지 않는 것을 확인할 수 있다.
리파티션 결론
- PARTITION BY를 이용해서 리파티션을 할 수 있다.
- 새로운 스트림을 생성하게 되면 브로커에 토픽이 생성된다. 이 때, 파티션 갯수에 따라 파티션 된 것도 브로커 토픽에 반영되어있다.
- Join을 할 때 파티셔닝 된 키가 다르다면, 내부적으로 해당 Key를 기준으로 파티셔닝을 한 토픽을 새로 생성한 후에 Join을 수행한다
- Join을 할 때 사용하는 Key와 파티셔닝에 사용된 Key가 다른 경우, ksqlDB는 내부적으로 Join Key를 기준으로 파티셔닝 된 스트림을 토픽 형식으로 생성하고, 그 토픽으로 Join을 한다.
'Kafka eco-system > ksqlDB' 카테고리의 다른 글
ksqlDB : Materialized View (0) | 2022.10.14 |
---|---|
ksqlDB : 실시간 스트림 처리의 동작 방식 (0) | 2022.10.14 |
ksqlDB : Window 정리 (0) | 2022.10.12 |
ksqlDB : ksqlDB의 Join (1) | 2022.10.11 |
ksql DB : 기본 개념 (0) | 2022.10.10 |