카프카 스트림즈와 ksqlDB 정복 : ksqlDB 스트림 처리 기초 (10장)

    들어가기 전

    이 글은 카프카 스트림즈와 ksqlDB 정복 10장을 공부하며 작성한 글입니다. 

     

     

    10.1 데이터 타입

    ksqlDB는 여러 형태의 데이터 타입을 지원해준다. 

    타입 설명
    ARRAY <element-type> 동일 데이터 타입 요소들의 집합. → ARRAY<STRING>
    BOOLEAN True / False
    INT 32비트 부호가 있는 정수
    BIGINT 64비트 부호가 있는 정수
    DOUBLE 754 부동 소수점 수
    DECIMAL(precision, scale) 전체 자릿수(precisio와 소수점 자릿수
    MAP(key-type, element-type) 각 데이터 타입과 일치하는 키와 값을 포함하는 객체
    <<STRING, INT>>
    STRUCT<field-name field-type [, ...]> 구조체
    STRUCT<foo INT, bar BOOLEAN>
    VARCHAR / STRING 유니코드 문자열 (UTF8)

    기본적으로는 쿼리를 생성할 때 각각의 컬럼명과 어떤 필드 타입을 사용하는지를 작성해줘야한다. 그렇지만 Schema Registry에 의해서 직렬화 / 역직렬화 되고 있는 레코드는 ksqlDB에서 Stream, Table을 생성할 때 자동으로 어떤 필드가 있는지, 필드의 타입이 무엇인지를 완성시켜준다.  그렇지만 레코드에서 어떤 값이 Key, Primary Key로 사용될지는 개발자가 직접 알려줘야한다. 아래 코드를 보면 더 명확히 알 수 있다. 

    // 사용자가 명시적으로 선언해도 됨. 
    CREATE TABLE titles (
    	id INT PRIMARY KEY,
        title VARCHAR
    ) WITH (
    	KAFKA_TOPIC = 'titles',
        VALUE_FORMAT = 'AVRO',
        PARTITIONS=4
    );
    
    // 사용자가 명시적으로 선언하지 않아도 스키마 레지스트리 + KSQLDB가 처리해 줌.
    CREATE TABLE titles (
    	id INT PRIMARY KEY,
        // title VARCHAR
    ) WITH (
    	KAFKA_TOPIC = 'titles',
        VALUE_FORMAT = 'AVRO',
        PARTITIONS=4
    );

     

    10.2 커스텀 타입

    커스텀 타입을 생성하는 것은 Java에서 새로운 클래스를 정의하는 것과 같은 행위다. 반복해서 사용될만한 데이터 타입이  있다면 커스텀 타입으로 생성하고 계속 사용하면 된다. 아래에서 커스텀 타입의 생성, 삭제, 조회 명령어를 살펴볼 수 있다.

    연산 문법
    타입 생성 CREATE TYPE <타입 이름> AS <타입>
    → CREATE TYPE season_length AS STRUCT<season_id INT, episode_count INT>;
    타입 조회 SHOW TYPES;
    타입 삭제 DROP TYPE <타입 이름>

    타입을 생성하면 11.1에서 본 데이터 타입처럼 Stream, Table을 생성할 때 사용할 수 있다. 예를 들어 아래와 같이 사용할 수 있다. 커스텀 타입을 생성하면 반복적으로 사용되는 녀석들에게 사용해서 재사용성을 올릴 수 있다. 

    CREATE STREAM test_stream(
    	my_field season_length,
        ...
    )...

     

     

    10.3 Collection (Stream, Table)

    카프카의 토픽을 카프카 스트림즈는 KStream, KTable로 추상화한다. 마찬가지로 ksqlDB는 카프카의 토픽을 Stream, Table로 추상화한다. 그리고 ksqlDB는 이것을 Collection이라고 부른다. ksqlDB는 Stream, Table을 크게 두 가지 방법으로 생성할 수 있다.

    1. 카프카 토픽에서 데이터를 읽어와서 Collection을 생성한다. → Source Collection
      • 생성된 Collection은 카프카로 전달되지 않는다.
    2. ksqlDB에서 생성한 스트림, 테이블에서 파생된 Collection을 생성한다. → Derived Collection
      • 생성된 Collection은 카프카로 전달된다.

     

     

    10.3.1 Source Collection 생성

    ksqlDB에서 Stream / Table을 생성하는 것은 SQL에서 생성하는 것과 매우 비슷하다. 아래 문법을 이용해서 생성하면 된다. 

    CREATE {STREAM | TABLE} [IF NOT EXSITS] <생성할 이름> (
    	column_name datatype [KEY / PRIMARY KEY],
        column_name datatype, ...
    ) WITH (
    	KAFKA_TOPIC = '...',
        VALUE_FORMAT = '...',
        PARTITIONS = 4
    );

    전체적인 생성틀은 위에서 살펴볼 수 있다. 잘 안 와닿을 수 있으니 예시로 하나 생성해보면 다음 쿼리를 참고하면 된다.

    10.3.2 Table 생성

    CREATE TABLE titles (
    	id INT PRIMARY KEY,
        title VARCHAR
    ) WITH (
    	KAFKA_TOPIC = 'titles',
        VALUE_FORMAT = 'AVRO',
        PARTITIONS=4
    );
    1. PRIMARY KEY 키워드는 테이블의 KEY 컬럼이 무엇인지를 지정하고, 카프카 레코드에서 이 값을 추출한다. 
    2. Table에서는 두 가지 레코드를 무시한다.
      1. Key 존재, Value X → Tombstone으로 처리
      2. Key 존재X → 무시함.
    3. Stream에서는 한 가지 레코드를 무시한다.
      1. Key 존재, Value X → Tombstone으로 처리
    4. WITH 절에서 PARTITIONS 속성을 지정하면, ksqlDB는 이 토픽이 존재하지 않을 때 토픽을 생성해준다. 

     

    10.3.3 Stream 생성

    아래 쿼리를 이용해서 스트림을 생성할 수 있다. 

    CREATE STREAM production_changes (
    rowkey VARCHAR KEY,
        uuid INT,
        title_id INT,
        change_type VARCHAR,
        before season_length,
        after season_length,
        created_at VARCHAR
    ) WITH (
    	KAFKA_TOPIC = 'production_changes',
        PARTITIONS = '4',
        VALUE_FORMAT = 'JSON',
        TIMESTAMP = 'created_at',
        TIMESTAMP_FORMAT = 'yyyy-MM-dd HH:mm:ss'
    );

    스트림은 PRIMARY KEY 컬럼이 없다. 왜냐하면 스트림은 변경 불가능한 INSERT 시멘틱을 가지고 있기 때문에 고유한 레코드를 식별하는 것은 불가능하다. 따라서 KEY 키워드만 지원한다. 이 KEY 키워드를 이용하면 Record Key Column에 대한 별칭으로 사용할 수 있다. Record Key Column은 카프카 레코드의 Key, Value의 Key를 의미한다. 

     

    10.3.4 WITH 절

    카프카 토픽에서 ksqlDB Collection을 생성할 때 WITH 절도 작성해야한다. WITH절에 들어갈 수 있는 Property는 다음과 같다. 

    속성 이름 설명 필수 여부
    KAFKA_TOPIC 읽어올 카프카 토픽 이름 O
    VALUE_FORMAT 메세지의 직렬화 포멧
    (AVRO, PROTOBUF, JSON, JSON_SR, KAFKA)
    O
    PARTITIONS ksqlDB가 파티션의 갯수가 이 갯수를 가진 토픽을 생성하고자 할 때 지정 X
    REPLICAS ksqlDB가 Replicas가 이 개수를 가진 토픽을 생성하고자 할 때 지정 X
    TIMESTAMP 타임스탬프를 포함하는 컬럼 이름을 알려줌. 
    윈도우 연산에서 TIMESTAMP에 지정된 Column으로 함.
    만약 지정되어 있지 않으면 Kafka Record의 Timestamp 기준으로 함.
    컬럼 데이터 타입이 BIGINT면 ksqlDB는 자동으로 파싱함.
    컬럼 타입이 VARCHAR라면 Java의 TIME_STAMP FORMAT 속성을 지정해야함.
    X
    TIMESTAMP_FORMAT java.time.format.DateTimeFormmater가 지원하는 포맷
    yyyy-MM-dd HH:mm:ss
    X
    VALUE_DELIMITER VALUE_FORMAT = 'delimit' 일 때, 필드 구분자로 사용하는 문자를 의미함. 
    default 값은 ','임
    X

     

    10.3.5 WITH 절의 Timestamp

    WITH 절에서 'TIMESTAMP'로 레코드의 컬럼을 설정하면 ksqlDB가 윈도우 연산을 할 때 사용하는 Time으로 이 컬럼이 사용된다. 이것은 카프카 메세지의 Payload 중의 특정 Column을 윈도우 연산에 사용하겠다는 의미다. 만약 TIMESTAMP가 지정되지 않는다면 Kafka Record의 내장 Timestamp으로 읽는다. 내장 Timestamp은 Event Time, Ingestion Time이 될 것이다. 

    TIMESTAMP으로 지정한 컬럼이 어떤 타입이냐에 따라서 TIMESTAMP_FORMAT 값도 지정해줘야한다. 예를 들어 TIMESTAMP에서 지정한 컬럼이 BIGINT 타입이라면 ksqlDB에서 자동으로 파싱한다. 그렇지 않은 경우라면 java.time.format.DateTimeFormatter가 지원하는 포멧으로 TIMESTAMP_FORMAT으로 지정해줘야한다. 

     

    10.4 스트림과 테이블로 작업하기

    Stream / Table에 어떤 명령어를 실행할 수 있는지를 살펴보고자 한다. 먼저 사용가능한 명령어를 정리하면 아래와 같다.

    // 전체 Stream, Table 살펴보기
    SHOW {STREAMS | TABLES} [EXTENDED];
    
    // 특정 Stream, Table 살펴보기
    DESCRIBE <이름> [EXTENDED];
    
    // 특정 Stream, Table 바꾸기
    ALTER {STREAMS | TABLES } <이름> alterOption [,...]
    
    // 특정 Stream, Table 제거하기
    DROP {STREAMS | TABLES} [IF EXISTS] <이름> [DELETE TOPIC]

     

    10.4.1 전체 Stream, Table 살펴보기

    // 전체 Stream, Table 살펴보기
    SHOW {STREAMS | TABLES} [EXTENDED];

    위 명령어를 사용하면 현재 ksqlDB에 등록된 모든 Stream / Table 정보를 살펴볼 수 있다.  기본 명령어에 EXTENED를 함께 넣어주면 추가적인 정보를 살펴볼 수 있다.

    • 기본 : 이름 / 카프카 토픽 / 포멧 / 윈도우 연산인지
    • EXTENDED : 실행 시간 통계, 완전한 DDL 문

     

    10.4.2 특정 Stream, Table 살펴보기

    DESCRIBE [EXTENDED] <이름>

    이 명령어를 이용해서 특정 Stream, Table 정보를 살펴볼 수 있다. Stream / Table을 지정하지 않는 이유는 ksqlDB는 컬렉션을 생성할 때 유니크한 이름으로만 생성할 수 있기 때문이다.

    • 기본 : 필드 + 타입 / Key가 무엇인지
    • EXTENDED : 통계 데이터

     

    10.4.3 스트림과 테이블 변경

    스트림과 테이블을 변경하고 싶을 때가 있다. 이럴 때는 ALTER 문을 이용해서 변경할 수 있다.

    ALTER {STREAM | TABLE} <이름> alterOption [, ...]

    ksqlDB 0.14.0 버전(현재는 0.27.0이다)에서는 ALTER 문으로 수행할 수 있는 유일한 연산은 컬럼의 추가다. 아래 명령어를 이용해서 컬럼을 추가할 수 있다.

    ALTER TABLE titles ADD COLUMN gener VARCHAR;

     

    10.4.4 스트림과 테이블 삭제

    생성한 스트림과 테이블은 삭제할 수 있다. 아래 명령어를 이용해서 삭제할 수 있다. 

    DROP {STREAM | TABLE} [ IF EXISTS ] <이름> [DELETE TOPIC]
    • DELETE TOPIC을 함께 사용되면 이 Collection과 관련된 토픽이 브로커에서 삭제된다. 따라서 조심해야한다.
    • 만약 이 STREAM을 삭제하려고 했을 때, 이 STREAM에 의존하는 STREAM이 있다면 삭제할 수 없다. 이 경우는 의존 STREAM을 먼저 삭제하고 다음을 삭제해야한다. 

     

    10.5 ksqlDB의 기본 쿼리 

    이 절에서는 ksqlDB에서 사용하는 기본 쿼리를 살펴보고자 한다. 

     

    10.5.1 INSERT 쿼리

    INSERT 쿼리는 Table과 Stream에서 서로 다르게 동작한다. 새로운 레코드가 발생하면 Stream은 Append 방식으로 동작하고 Table은 Update 방식으로 동작하기 때문이다. 

    • Stream : Insert 쿼리로 메세지를 추가하면 새로운 메세지가 Append 된다.
    • Table : Insert 쿼리로 메세지를 추가하면 같은 Key를 가진 메세지의 Value가 Update된다. 

    INSERT 쿼리에는 이것 외에 두 가지 알아두면 좋은 기능이 있다.

    • CREATE 문을 이용해서 생성된 스키마 순서대로 데이터를 넣는 경우, 어떤 필드인지를 생략해도 된다.
    • ROWKEY, ROWTIME 컬럼을 생성할 수 있다. 

     

    INSERT : 정해진 스키마 쿼리 효율

    INSERT 쿼리에서 정해진 스키마의 필드 순서대로 데이터를 넣는다면 이 경우에는 어떤 필드에 값을 넣어야하는지 생략할 수 있다. 정해진 순서대로 넣지 않을 경우에는 데이터를 넣은 Stream / Table에 어떤 필드에 넣을지를 직접 명시해줘야한다. 

    // 정해진 순서와 다르게 넣을 때
    INSERT INTO production_changes (
    	uuid,
        title_id
    ) VALUES (
    1,
        1,
    )
    
    // 정해진 순서대로 넣을 때
    INSERT INTO production_changes 
    VALUES (1, 1)

     

    ksqlDB가 자동으로 생성해주는 Column ROWKEY, ROWTIME

    ksqlDB는 ROWKEY, ROWTIME이라는 컬럼을 자동으로 생성해준다. INSERT 쿼리에서 이 컬럼을 사용하고 싶다면, INSERT의 Stream / Table의 필드를 정의하는 부분에 ROWKEY, ROWTIME을 넣어주기만 하면 된다. 값은 실제로 개발자가 고려해서 넣어줘야한다.

    INSERT INTO production_changes(
    	ROWKEY, ROWTIME, uuid, title_id
    ) VALUES (
    	'2', 1632981230, 2,2
    )

     

     

    10.5.2 간단한 SELECT 쿼리 : Transient push Query

    ksqlDB에서는 SELECT 쿼리를 이용해서 일시적인 내보내기 쿼리 (Transient push Query)를 제공해준다. Transient Push Query는 다음 특징을 가진다.

    • EMIT CHANGES로 끝난다.
    • ksqlDB 서버를 재시작하면 모두 중지된다.
    • 쿼리 결과가 카프카 브로커에 전달되지는 않는다. 

    코드를 예시로 들면 아래와 같이 생성할 수 있다. 기본적인 틀과 참고할만한 코드는 다음과 같다. 

    // 기본 쿼리
    SELECT select_expr [, ...] <-- 가져올 녀석들
    FROM from_item
    [LEFT JOIN join_collection ON join_criteria]
    [WINDOW window_expression]
    [WHERE condition]
    [GROUP BY grouping_expression ]
    [PARTITION BY partitioning_expression ]
    [HAVING having_expression ]
    EMIT CHANGES
    [LIMIT count];
    
    
    // 예시 코드
    SELECT * FROM production_changes EMIT CHANGES;

     

    10.5.3 프로젝션

    ksqlDB의 Collection들 중에서 필요한 Column만 가져와서 표현해주는 것을 프로젝션이라고 한다. 프로젝션은 SELECT 문에서 *를 사용하는대신 필요한 Column을 명시해주면 된다.

    SELECT title_id, before, after, created_at
    FROM production_changes
    EMIT CHANGES;

     

    10.5.4 필터링

    ksqlDB에서는 WHERE절을 이용해서 필요한 데이터만 필터링 할 수 있다.  아래 쿼리를 참고해서 작성해볼 수 있다.

    SELECT title_id, before, after, created_at
    FROM production_changes
    WHERE change_type = 'season_length'
    EMIT CHANGES;

     

    10.5.5 와일드카드

    ksqlDB는 와일드카드 필터링도 지원한다. 컬럼 값의 일부만 일치했을 때 필터링 하는 기능을 구현하고 싶다면 이 기능을 사용하면 된다. 예를 들면 LIKE + '%' 조합을 함께 사용해 볼 수 있다.

    SELECT title_id, before, after, created_at
    FROM production_changes
    WHERE cahnge_type LIKE 'season%' /// 와일드카드를 사용함. 
    EMIT CHANGES;

     

    10.5.6 논리 연산자 (AND / OR)

    WHERE 절에서 필터링을 할 때 논리 연산자를 사용해서 여러 필터링 조건들을 결합할 수 있다. 필터링 조건들을 결합할 때, 괄호를 사용해서 처리할 수 있다. 

    SELECT title_id, before, after, created_at
    FROM production_changes
    WHERE NOT change_type = 'release_date'
    AND ( after->episode_count >= 8 OR after->episode_count <= 20 )
    EMIT CHANGES;

     

    10.5.7 BETWEEN (범위 필터)

    BETWEEN 필터는 특정 숫자 범위 / 특정 알파벳 범위에 있는 레코드를 필터링 해야할 때 사용할 수 있다. 사용 가능한 영역은 다음과 같다.

    • 특정 숫자 범위
    • 특정 알파벳 범위
    SELECT title_id, before, after, created_at
    FROM production_changes
    WHERE change_type = 'season_length'
    AND after->episode_count BETWEEN 8 and 20
    EMIT CHANGES;

     

    10.5.8 구조체의 flatten

    ksqlDB에서는 구조체 타입을 지원한다. 구조체의 필드로 직접 접근하려면 → 연산자를 이용해서 접근해야하는데 이 부분을 flatten 개념을 이용해서 평탄화 해줄 수 있다. 앞서 이야기한 프로젝션과 크게 다르지 않은 내용같다. 

    SELECT
    title_id,
        after->session_id, // flatten
        after->episod_count, // flatten
        created_at
    FRO
    M production_changes
    WHERE change_type = 'season_length'
    EMIT CHANGES;

     

    10.5.9 ksqlDB의 조건식

    ksqlDB는 여러 조건식도 지원한다. 여러 상황에서 이 조건식을 사용할 수 있지만 일반적인 사례들 중 하나는 Stream / Table에서 NULL 컬럼에 대한 대체값을 제공해주는 것이다. 아래 내용을 참고해서 다른 조건식에도 사용해보자.

     

    COALESCE

    COALESCE 함수는 NULL 값을 대체해주는 함수다. 가장 앞부분부터 순서대로 NULL이 아닌 값을 넣어준다. 예를 들어서 first T의 값이 존재한다면 first T 값이 들어간다. 만약 first T 값이 NULL이라면 seconds T 값이 들어간다. 이렇게 매칭하다가 없는 경우에는 last T 값이 들어가게 된다. 따라서 last T 값은 특정한 상수값을 설정해줘야한다. 

    COALESCE(first T, seconds T, third T, ..., last T)

    위 함수의 사용 예시는 아래와 같이 될 수 있다. 

    SELECT COALESCE(after->season_id, before->season_id, 0) AS season_id
    FROM production_changes
    WHERE change_type = 'season_length'
    EMIT CHANGES;

     

    IFNULL

    IFNULL은 첫번째 값이 NULL이면 두번째 값을 넣어준다는 의미다. 두번째 값도 NULL이면 NULL이 들어가게 된다. 사용 예씨는 다음과 같다.

    SELECT IFNULL(after->season_id, before->season_id) AS season_id
    FROM production_changes
    WHERE change_type = 'season_length'
    EMIT CHANGES;

    위 쿼리문에서 after->season_id가 NULL인 경우 before->season_id를 season_id 컬럼에 넣어준다. 만약 둘다 NULL인 경우라면 NULL 값이 들어가게 된다.

     

    CASE 문

    CASE 문은 다음과 같이 동작한다. 

    • WHEN 절에 boolean 타입을 반환하는 평가식을 여러 개 작성한다.
    • WHEN 절들을 위에서부터 순서대로 실행하고, 이 중에서 첫번째로 true가 되는 조건의 값을 반환해준다. 

    CASE 문은 위처럼 동작하기 때문에 단순한 NULL 검사 뿐만 아니라 복잡한 검사들도 할 수 있다. 

    CASE expression
    	WHEN condition THEN result [, ...]
        WHEN condition THEN result [, ...]
        ...
        [ELSE result]
    END
    
    // 
    
    SELECT
      CASE
        WHEN after->season_id IS NOT NULL THEN after->season_id
        WHEN before->season_id IS NOT NULL THEN before->season_id
        ELSE 0
      END AS season_id // 결과값은 season_id column으로 표현됨
    FROM production_changes
    WHERE change_type = 'season_length'
    EMIT CHANGES;

     

     

    10.6 Persistent Query

    Transient Query는 EMIT CHANGES 키워드로 생성되는 쿼리다. 이 쿼리는 ksqlDB 클라이언트에 지속적으로 데이터를 보여주지만 카프카 브로커에는 데이터를 주지 않는다. 그리고 ksqlDB 서버가 종료되면 사라지게 된다. 일시적인 쿼리이기 때문에 이런 특성을 가진다.

    ksqlDB는 Persisten Query를 생성할 수 있다. persistent Query는 쿼리 결과를 카프카에게 전달한다. 그리고 서버가 재시작되어도 쿼리가 유지된다.

    쿼리 종류 ksqlDB Cli Kafka Broker ksqlDB 재시작 생성 방법
    Transient Query 쿼리 결과 확인 O 쿼리 결과 전달 X 사라짐 SELECT로 생성됨.
    Persistent Query 쿼리 결과 확인 X 쿼리 결과 전달 O 유지됨 CREATE로 생성됨.
    CTAS / CSAS

     

    10.6.1 파생 컬렉션 생성 (derived collections)

    파생 컬렉션은 이미 존재하고 있는 Stream / Table을 이용해서 다시 Stream / Table을 생성하는 것이다. 파생 컬렉션을 생성하면 Persistent 쿼리가 생성된다. 다시 말해 파생 컬렉션을 생성한다는 것은 카프카 브로커에 메세지를 공급하는 프로듀스를 만드는 것으로 이해할 수 있다.

    CREATE {STREAM | TABLE} [ IF NOT EXISTS] <이름>
    WITH(
    	property=value [, ...] // 생성된 스트림이 메세지를 보낼 카프카 관련 설정
    )
    AS SELECT select_expr [, ...]
    FROM from_item
    [LEFT JOIN join_collection ON join_criteria] 
    [WINDOW window_expression]
    [WHERE condition]
    [GROUP BY grouping_expression]
    [PARTITION BY partitioning_expression]
    [HAVING having_expression]
    EMIT CHANGES
    [LIMIT count];

    파생 컬렉션은 위의 쿼리로 생성할 수 있다. 이런 쿼리들을 종종 줄여서 부르기도 한다.

    • CSAS (CREATE STREAM AS) : 파생 스트림을 생성함.
    • CTAS (CREATE TABLE AS) : 파생 테이블을 생성함. 

    파생 컬렉션을 생성해서 카프카 토픽으로 메세지를 내보내는 Persistent Query를 생성하는 예제 쿼리는 아래에서 볼 수 있다.

    CREATE STREAM season_length_changes
    WITH (
    	KAFKA_TOPIC = 'season_length_changes',
        VALUE_FORMAT = 'AVRO',
        PARTITIONS = 4,
        REPLICAS = 1
    ) AS SELECT (
    ROWKEY, // 키 컬럼 포함
        title_id,
        IFNULL(after->season_id, before->season_id) AS season_id,
        before-> episode_count AS old_episode_count,
        after->episod_count AS new_episode_count,
        created_at
    FROM production_changes
    WHERE change_type = 'season_length'
    EMIT CHANGES
    )
    • WITH 절의 KAFKA TOPIC은 특정 토픽에서 데이터를 불러오는 것이 아니라, 특정 토픽으로 메세지를 공급해준다는 의미다.
    • ROWKEY : 프로젝션에 키 컬럼을 포함해야한다. 이렇게 하면 ksqlDB가 카프카 레코드에서 어떤 값을 키로 사용할지 알 수 있다. 

    이렇게 파생 컬렉션을 생성하면 ksqlDB cli에서 쿼리 생성을 확인하는 모습을 볼 수 있다.  CSAS문은 Persistent Query를 생성한다. Persistent Query는 사실은 ksqlDB 엔진이 SQL문을 파싱하고 AST를 만들어서 동적으로 Kafka Streams Topology를 생성하고, 카프카 스트림즈 인스턴스를 실행시켜준다. 쉽게 말해 Persistent Query가 생성된다는 것은 Kafka Streams 인스턴스가 하나 생성되서 동작한다는 것이다. 

     Message
    ----------------------------------------------------
     Created query with ID CSAS_SEASON_LENGTH_CHANGES_9
    ----------------------------------------------------

     

    10.7 쿼리 보기

    현재 ksqlDB에 등록된 쿼리가 무엇이 있는지를 살펴보려면 아래 명령어를 이용하면 된다. 이 명령어를 사용하면 현재 등록된 쿼리들에 대한 정보를 알려준다.

    SHOW QUERIES;

    위 명령어를 이용해서 쿼리를 조회하면 아래 양식으로 답변이 돌아온다.

    • Query ID : 쿼리 식별자 컬럼. 쿼리 중단 및 쿼리 설명을 보기 위해서 필요하다. 
    • Status : 현재 쿼리 상태를 보여준다. 동작중이면 Running이 되고, 아닌 경우에는 ERROR, UNRESPONSIVE가 될 수 있음.
    • Sink : Sink Node의 이름을 의미한다. 카프카 스트림즈의 Sink Node 개념
    • Sink Kafka Topic : 데이터가 기록될 카프카 토픽을 의미한다. 
    • Query String : 쿼리가 생성된 DDL을 알려준다. 
     Query ID                     | Query Type | Status    | Sink Name             | Sink Kafka Topic      | Query String
    ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
     CSAS_SEASON_LENGTH_CHANGES_9 | PERSISTENT | RUNNING:1 | SEASON_LENGTH_CHANGES | season_length_changes | CREATE STREAM SEASON_LENGTH_CHANGES WITH (KAFKA_TOPIC='season_length_changes', PARTITIONS=4, REPLICAS=1, VALUE_FORMAT='json') A
    S SELECT   PRODUCTION_CHANGES.ROWKEY ROWKEY,   PRODUCTION_CHANGES.TITLE_ID TITLE_ID,   IFNULL(PRODUCTION_CHANGES.AFTER->SEASON_ID, PRODUCTION_CHANGES.BEFORE->SEASON_ID) SEASON_ID,   PRODUCTION_CHANGES.BEFORE->EPISODE_COUNT OLD_EPISO
    DE_COUNT,   PRODUCTION_CHANGES.AFTER->EPISODE_COUNT NEW_EPISODE_COUNT,   PRODUCTION_CHANGES.CREATED_AT CREATED_AT FROM PRODUCTION_CHANGES PRODUCTION_CHANGES WHERE (PRODUCTION_CHANGES.CHANGE_TYPE = 'season_length') EMIT CHANGES;     
    ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    For detailed information on a Query run: EXPLAIN <Query ID>;

     

     

    10.8 쿼리 설명 보기

    위에서는 현재 실행되고 있는 전체 쿼리들에 대한 정보를 확인했다. 그렇지만 특정한 쿼리에 대해 더 자세히 알고 싶은 경우가 있을 수 있는데, 이 때는 EXPLAIN 명령어를 사용하면 된다. EXPLAIN 명령어의 실행 결과로 다음 내용을 알 수 있다.

    • Query ID
    • Query Type
    • DDL
    • Host Query Status
    • Field Name + Field Type
    • 어떤 토픽에서 데이터 읽는지
    • 어떤 토픽으로 데이터 보내는지
    • 쿼리 실행 계획
    • Topology

    정말 막강한 기능인 것 같다. 사용해보려면 아래 코드를 이용하면 된다. 

    EXPLAIN {query id | query_statement}

    각 사용 예시는 아래 코드를 참고하면 된다.

    // 이미 실행되고 있는 쿼리
    EXPLAIN CSAS_SEASON_LENGTH_CHANGES_0;
    
    // 실행되지는 않았지만 만들어 볼 쿼리
    EXPLAIN SELECT ID, TITLE FROM TITLES;

    ksqlDB는 이미 실행되고 있는 쿼리와 실행되지 않은 쿼리들에 대해서도 자세한 실행 계획들을 살펴볼 수 있는 기능을 제공한다. 

     

    10.9 전체 쿼리

    이 장에서 필요한 전체 쿼리는 아래와 같이 작성했다.

    • 나는 스키마 레지스트리를 사용하지 않았기 때문에 VALUE FORMAT을 JSON으로 함. 
    • 나는 Producer에서 Key를 공급하지 않았기 때문에 ReKey 스트림을 생성해서 테이블을 생성함.
    CREATE TYPE session_length AS STRUCT<seasonId INT, episodeCount INT>;
    
    --
    CREATE STREAM titles_stream(
        id BIGINT,
        title VARCHAR,
        onSchedule Boolean
    )WITH(
        KAFKA_TOPIC = 'titles',
        VALUE_FORMAT = 'JSON'
    )
    --
    CREATE STREAM titles_temp_rekey_stream
    WITH (KAFKA_TOPIC = 'titles_rekey', PARTITIONS = 1) AS
    SELECT id, title, onSchedule
    FROM titles_stream
    PARTITION BY id
    -- 
    CREATE TABLE titles (
        id BIGINT PRIMARY KEY,
        title VARCHAR,
        onSchedule Boolean
    )WITH(
        KAFKA_TOPIC = 'titles_rekey',
        VALUE_FORMAT = 'JSON'
    )
    --
    CREATE STREAM production_change(
        uuid VARCHAR,
        titleId BIGINT,
        changeType VARCHAR,
        before session_length,
        after session_length,
        createdAt VARCHAR
    )WITH(
        KAFKA_TOPIC = 'production_changes',
        VALUE_FORMAT = 'JSON',
        TIMESTAMP = 'createdAt',
        TIMESTAMP_FORMAT = 'MMM dd, yyyy, hh:mm:ss a'
    );
    
    --
    CREATE STREAM season_length_changes 
    WITH (
        KAFKA_TOPIC = 'season_length_changes',
        VALUE_FORMAT = 'JSON',
        PARTITIONS = '4',
        REPLICAS = '1'
    ) AS 
        SELECT
            titleId, 
            CASE
                when after->seasonId IS NOT NULL then after->seasonId
                when before->seasonId IS NOT NULL then before->seasonId
            END as seasonId,
            before->episodeCount as oldEpisodeCount,
            after->episodeCount as newEpisodeCount
        FROM production_change
        WHERE changeType = 'season_length';
        EMIT CHANGES
    --
    
    CREATE TABLE titles_repartition
    WITH (
        PARTITIONS = 4, 
        REPLICAS = 1
    ) AS 
        SELECT
            id BIGINT,
            title VARCHAR, 
            onSchedule Boolean
        FROM titles
        EMIT CHANGES;

     

     

     

     

     

    여러 가지 

    show types;
    
    show queries [EXTENDED];
    explain queries
    
    show / list streams [EXTENDED]
    
    show / list tables [EXTENDED]
    
    show topics;
    
    describe [EXTENDED] <stream/table 이름>
    
    DROP {stream/table} [IF EXISTS] <이름> [DELETE TOPIC] 
    
    ALTER {STREAM/TABLE} <이름> alterOption [...]

     

     

    INSERT INTO <스트림/테이블 이름> [ (Column name... ) ]
    VALUES (
    
    	value [,...]
    
    );
    
    
    ---
    INSERT INTO production_changes(
    uuid,
        title_id,
        change_type,
        before,
        after,
        created_at
    ) VALUES(
    1,
        1,
        'season_length',
        STRUCT(season_id := 1, episode_count := 12),
        STRUCT(season_id := 1, episode_count := 8),
        '2021-02-08 10:00:00'
     );
     
    ---
    INSERT INTO production_changes(
    ROWKEY, //ksqlDB 자동으로 생성 생성 컬럼 
    	ROWTIME, // ksqlDB 자동으로 생성 컬럼
    	uuid,
        title_id,
        change_type,
        before,
        after,
        created_at
    ) VALUES(
    '2',
    158116140000,
    	1,
        1,
        'season_length',
        STRUCT(season_id := 1, episode_count := 12),
        STRUCT(season_id := 1, episode_count := 8),
        '2021-02-08 10:00:00'
     );
     
     
     
     ---
     생략 가능. 대신 스키마에 지정된 순서대로 입력해야함. 
     INSERT INTO titles VALUES (1, 'STranger Things');

     

    설정값

    SET 'auto.offset.reset' = 'earliest';

     

    추가로 알아봐야하는 것

    •  CSAS / CTAS에서 생성하는 ROWKEY는 무엇이지? 
      • 실제로 데이터를 읽어보면 어떤값이 오지? 
    • CSAS를 생성하면 영구적인 쿼리가 생성된다. CTAS도 생성되는걸까? 

     

    알아낸 것

     Persistent Query를 생성하면..

    1. 생성한 Persistent Query는 해당 Kafka Topic을 Sink로 해서 Sink Processor를 생성한다.
    2. 그것과 별개로 Stream이 생성되는데, 이 녀석은 1번에서 설정된 Sink 토픽을 Source 토픽으로 설정해서 데이터를 불러오는 녀석이다. 

    댓글

    Designed by JB FACTORY