ksqlDB : Materialized View
- Kafka eco-system/ksqlDB
- 2022. 10. 14.
들어가기 전
Materialized View는 현재 시점을 기준으로 가장 최신화 된 데이터를 의미한다. 그런데 RDBMS에서는 최신화 된 데이터를 가져오기 위해서 모든 행에 쿼리를 날려서 필요한 통계치를 내서 작성을 했었다. 이런 경우 데이터가 많아지면 속도에 문제가 있다. 그렇지만 ksqlDB는 이런 부분을 RocksDB를 이용해 최적화해서 실시간으로 집계된 데이터(Materialized View)를 바로바로 보여줄 수 있다.
가장 쉬운 예로는 톨게이트 직원이 징수한 요금 합계를 보여주는 것이다. 차가 지나갈 때 마다(메세지) 전체를 모두 다시 계산하는 것은 느릴 수 있다. ksqlDB에서는 징수한 요금 합계를 보여주기 위해 '누적합'과 같은 비슷한 형태로 업데이트를 해서 빠르게 Materialized View를 제공한다. 즉, ksqlDB는 메세지가 도착하면 그 메세지에 대해서 최소한의 변경점만으로 Materialized View를 업데이트 한다.
// 스트림 생성
CREATE STREAM readings (
sensor VARCHAR KEY,
area VARCHAR,
reading INT
) WITH (
kafka_topic = 'readings',
partitions = 2,
value_format = 'json'
);
// 데이터 넣기
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-1', 'wheel', 45);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-2', 'motor', 41);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-1', 'wheel', 92);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-2', 'engine', 13);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-2', 'engine', 90);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-4', 'motor', 95);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-3', 'engine', 67);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-3', 'wheel', 52);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-4', 'engine', 55);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-3', 'engine', 37);
예를 들어 다음과 같이 스트림을 생성하고 데이터를 넣는다고 가정해보자.
SET 'auto.offset.reset' = 'earliest';
CREATE TABLE avg_readings AS
SELECT sensor,
AVG(reading) AS avg
FROM readings
GROUP BY sensor
EMIT CHANGES;
그리고 스트림을 통해서 평균적으로 읽은 값이 얼마인지를 확인하고 싶을 때가 있다. 이 때는 Push 쿼리를 이용했다. Materialized View는 Pull 쿼리로도 가능하다. Push 쿼리는 실시간으로 값이 변할 때 마다 그 값을 보여주고, Pull 쿼리는 쿼리 시점의 실시간 데이터만 빠르게 보여준다.
이 때 아래의 표에 표기되는 것이 Materialized View다. Persistence Query에 메세지가 도착해서 통과할 때 마다 집계되고, 이 값은 실시간 상황을 반영해서 바로 보여준다. 그렇다면 어떻게 이렇게 고속으로 동작할 수 있을까?
- ksqlDB는 ksqlDB 서버의 로컬 디스크에 RocksDB를 실행시킨다. RocksDB는 ksqlDB 서버에서 프로세스에서 실행되는 Key / Value값을 관리한다. (RocksDB는 고성능으로 동작한다.)
- ksqlDB는 입력 스트림의 파티션 당 하나의 RocksDB 인스턴스를 생성하고, 파티션 별로 격리해준다. 이것의 의미는 수신되는 메세지는 모두 동일한 Key 값을 가져야 한다는 것이다.
- RocksDB에서 처리된 메세지는 Broker의 Change Log에 파티션별로 저장된다. Change Log에는 Log Compaction 형태로 저장된다.
ksqlDB는 RocksDB를 이용해서 Materialized View에 대한 값을 빠르게 불러와서 이벤트 수신에 대한 Delta만큼의 연산하고 RocksDB에 저장한다. 그리고 그 변경 로그를 Kafka Broker의 Change Log에 저장하면서 빠르게 동작한다.
리파티셔닝
ksqlDB에서는 GROUP BY, PARTITION BY등을 이용해서 다시 한번 파티셔닝을 하면서 Materalized View를 보여줄 수 있다. 이 때 리파티션닝을 하게 되면 내부적으로 리파티셔닝을 위한 Persistence Query가 생성되고, 그 Persistence Query의 결과가 집계하는 쿼리로 전달되어서 최종적으로 원하는 쿼리가 실행된다.
Repartition Query -> Materialized Query
즉, 위와 같은 순서대로 실행된다는 것을 의미한다. 그런데 한 가지 주의해야할 점은 리파티셔닝을 할 경우, 리파티셔닝 된 스트림의 파티션별 순서는 보장되지 않는다는 점이다. 예를 들어 아래와 같은 경우가 있을 수 있다.
예를 들어 다음과 같은 경우가 생길 수 있다. 따라서 가급적이면 데이터를 리파티셔닝 하는 경우를 줄이는 것이 좋다. 혹은 데이터를 제공할 때 처음부터 리파티셔닝을 고려한 녀석을 제공하는 것이 좋다.
Materialized View의 복구
ksqlDB는 서버의 Local 디스크에 있는 RocksDB를 이용해서 Materialized View를 보여준다. 따라서 ksqlDB가 죽을 경우 RocksDB는 어디 저장되지 않으므로 저장된 Materialized View 데이터는 모두 날아간다. ksqlDB Server가 다시 복구되면 ksqlDB Server는 kafka Broker의 Change Log를 읽어와서 Materialized View를 복구한다.
새로운 ksqlDB 서버가 온라인 상태가 되고 sum() 함수 같은 것들로 집계를 시작하면 ksqlDB는 다음과 같이 동작한다.
- RocksDB에 sum() 관련 Stream에 대한 Materalized View가 있는지 확인한다.
- Stream에 대한 Change Log를 Persistence Query에 제공해서 Materalized View에 업데이트 한다.
이런 동작은 RocksDB에 값이 있건 없건 간에 동일하다. ksqlDB가 클러스터 환경에서 동작하고 있을 때, 또 다른 서버가 그 자리를 인계 받았을 수도 있기 때문이다. 따라서 Change Log의 값을 불러와서 RocksDB를 최신 상태로 복구한 후 Persistence Query는 값을 제공한다.
ksqlDB는 Materalized View를 ksqlDB Server의 로컬 디스크의 RocksDB와 Broker에 Change Log에 각각 저장한다. 이 때 Change Log는 Log Compaction 형태로 저장되기 때문에 RocksDB를 복구하는 과정에서 필요한 업데이트 숫자는 매우 적다. Log Compaction의 동작을 감안한다면 Key당 몇 회 수준의 업데이트만 발생할 것이기 때문이다.
'Kafka eco-system > ksqlDB' 카테고리의 다른 글
ksqlDB : Pseudo Column (자동 생성 컬럼) (0) | 2022.12.17 |
---|---|
ksqlDB : ksqlDB의 간단한 구조 및 배포 모드 (0) | 2022.10.15 |
ksqlDB : 실시간 스트림 처리의 동작 방식 (0) | 2022.10.14 |
ksqlDB : Window 정리 (0) | 2022.10.12 |
ksqlDB : ksqlDB의 Join (1) | 2022.10.11 |