Kafka Streams와 ksqlDB 정복 : 상태가 있는 처리 (4장)

    들어가기 전

    이 글은 KafkaStreams와 KsqlDB 완전 정복의 4장을 공부하며 작성한 글입니다. 

     


    4.0 상태가 있는 처리

    3장에서는 Stateless한 스트림을 처리하는 방법에 대해서 공부를 했다. 이 때는 이벤트가 발생했다는 '사실'만이 흘러가는 것을 단편적으로 처리하는 형태였다. 하지만 Kafka Streams는 소비되는 이벤트 정보를 기억해서 새로운 문맥을 파악하는 기능을 제공한다. 이벤트 정보를 기억하기 때문에 이런 종류의 Kafka Streams를 'Stateful'하다고 한다.


    4.1 상태가 있는 처리의 이점

    이벤트가 무한히 흐르는 Stateless Stream에서 특정 시점의 정보를 기억해서 특정 시점(point-in-time)의 정보를 표현할 수 있다. 특정 시점의 정보를 기억하기 위해서 StateStore(상태 저장소)를 이용하는데 기본적으로는 내부적으로만 사용된다. 만약 StateStore가 물리화(Materialized)되면 외부에서 쿼리도 가능해진다. 

    Fact vs Behavior

    Fact와 Behavior는 무슨 의미일까? 이벤트는 끊임없이 스트림을 타고 흐른다. 이렇게 흐르는 이벤트는 그 자체로 독립적이다. 즉, 그런 이벤트가 발생했다는 사실(fact)만 흐르는 상태다. 이것은 불변의 의미로 인식되고 처리된 후 삭제된다. 그렇지만 현실에서의 모든 이벤트는 연결되어있다. 사실들을 수집하고 기억하며 그 의미를 좀 더 이해할 수 있게 되어 행동(Behavior)를 감지할 수 있게 된다. 따라서 상태를 기억하는 것도 충분히 의미가 있다. 

    상태가 있는 연산자

    기본적으로 데이터 조인, 데이터 집계, 데이터 윈도윙, 윈도윙 집계 등을 이용해서 상태를 기억할 수 있다. 상태가 있는 연산자들은 상태를 기억하기 위해 여러 형태의 StateStore를 가지고 있게 된다. 

    사례 목적 연산자
    데이터 조인 서로 다른 스트림 / 테이블을 조인해서 새로운 문맥을 만든다. join
    leftJoin
    outerJoin
    데이터 집계 지속적으로 업데이트 되는 이벤트의 수학적인 계산 aggregate
    count
    reduce
    데이터 윈도윙 시간상 근접성을 갖는 이벤트의 그룹화 windowedBy

     


    4.2 StateStore

    Stateful Processor들은 State를 저장할 수 있는 장소로 StateStore를 사용한다. StateStore는 기본적으로는 내부적으로(ProcessorTopology) 사용되고 Materialized(물리화)되면 외부에서 쿼리를 해서 사용할 수 있게 된다. StateStore는 여러 형태로 구현될 수 있고 그에 맞게 서로 다른 특성을 가진다. 따라서 이런 특성을 잘 이해해서 필요한 형태의 StateStore를 사용하는 것이 요구된다. 

     

    4.2.1 공통 특성 : 임베딩

    StateStore는 카프카 스트림즈 어플리케이션 내의 StreamTask에 각각 임베딩 되어있다. 이 말은 Kafka Streams 어플리케이션이 실행되고 있는 노드의 Local Storage 영역에 StateStore가 저장되어있다는 것이다.

    • StateStore는 각 StreamTask에게 각각 분배된다. 따라서 동시성 문제가 차단된다.
    • StateStore는 각 Kafka Streams에 분배되어있다. 한 곳에 모여있을 경우, 실패지점이 하나가 되어 가용성이 떨어지지만 분산 처리되어 가용성이 증가한다.
    • StateStore는 내부적으로 RocksDB를 Local Storage에서 사용한다. RocksDB는 데이터를 저장할 때 직렬화 / 역직렬화로 저장하는 기능까지 제공한다. 

     

    4.2.2 공통 특성 : FaultTolerance

    StateStore의 내결함성을 위해 Broker에는 ChangeLog(변경 로그) 토픽이 내부적으로 생성된다. StateStore에 저장된 값들은 ChangeLog 토픽에 Append 할 수 있다. 이처럼 ChangeLog 토픽에 StateStore의 상태를 Append 하는 것을 Logging이라고 하는데, Logging을 할 경우 FaultTolearnce를 확보할 수 있다. 

    • StateStore의 장애가 발생했을 경우, ChangeLog를 Replay해서 StateStore를 복구할 수 있다.
    • StateStore의 복사본으로 Standby Replicas를 생성할 수 있다. StateStore가 죽었을 경우, Stadnby Replicas를 가진 StreamTask로 리밸런싱해서 빠르게 복구한 뒤 작업을 할 수 있다. 

     

    4.2.3 공통 특성 : Key-Value 기반

    StateStore는 Key - Value 형태의 저장소로 구현되어있다. 키는 단순하거나 때에 따라 복합적이거나 다차원일 수 있다. 

     

    4.2.4 Persistent vs In-Memory

    StateStore는 In-Memory와 Persist 형태를 제공한다. 두 형태의 StateStore의 장단점을 명확하게 이해해야한다.

    • PersistentStateStore
      • State를 RocksDB를 이용해서 Local Storage에 저장한다.
      • 카프카 스트림즈가 장애에서 복구될 경우, State가 Local Storage에 저장되어있기 때문에 장애기간동안 생성된 로그만 ChangeLog에서 Replay하면 된다. 즉, State Restore가 빠르다.
      • In-Memory에 비해 더 많은 양의 State를 저장할 수 있다. (메모리 문제)
      • Local Disk에서 메세지를 읽어오기 때문에 In-memory에 비해 느릴 수 있다.
    • In-Memory StateStore
      • 메모리에 데이터를 저장하기 때문에 빨리 가져올 수 있다.
      • 휘발성 데이터이기 때문에 장애 복구 시, ChangeLog 전체를 Replay해야한다. 복구 시간이 오래 걸릴 수 있다. 이 부분은 Standby Replicas로 해결할 수 있다.
      • 메모리의 크기에 따라 저장할 수 있는 최대 크기가 존재한다.

    기본적으로 Standby Replicas를 설정하는 경우에만 In-Memory StateStore를 사용해볼법하다. 그 외의 상황이라면 기본적으로는 Persistent StateStore를 사용하는 것이 권장된다. 

     


    4.3 KStream / KTable / GlobalKTable

    Source Processor에서 데이터를 불러올 때 카프카 토픽을 어떤 녀석으로 추상화 할지를 결정해야한다. 각 데이터의 특성을 고려해서 KStream / KTable / GlobalKTable 중 어떤 것으로 추상화 할지를 결정하면 된다.

    KStream

    • 키가 없는 경우
    • 이벤트 발생 사실(Fact) 자체만 중요한 경우

    KTable은 Key를 기반으로 동작한다. 따라서 Key가 없는 경우라면 처음에는 KStream을 써야만 한다. 또한 최신 레코드에만 관심을 가지는 것이 아니라 전반적인 메세지의 흐름이 필요할 때 KStream을 이용하면 된다. 

    KTable

    • 키가 있는 경우
    • Stateful한 데이터가 필요할 때
    • 키의 사이즈가 클 경우
    • 시간 동기화 처리가 필요할 경우

    KTable은 키가 있는 경우에만 사용할 수 있다. 따라서 키가 있는 경우에는 KTable 사용을 고려해볼 수 있다. GlobalKTable과 함께 고려해야하는 부분은 Key가 차지하는 공간이 아주 큰 경우에는 KTable을 사용해야한다는 것이다. Key가 차지하는 공간이 크다는 이야기는 유니크한 Key가 너무 많아서 너무 많은 메모리 공간을 차지할 것이 기대될 때다. 

    시간 동기화 처리의 의미는 현재 데이터를 받았을 때를 의미한다. KTable은 StreamThread Consumer를 이용해서 메세지를 받아온 시점에 데이터를 In-Depth를 기반으로 처리한다. 데이터를 받아온 시점에 StateStore에 넣으면서 데이터를 처리하기 때문에 현재 데이터는 '시간 동기화'가 정확하게 되어있다. 이런 종류의 처리가 필요할 때는 KTable을 사용한다. 

     

    GlobalKTable

    • 키 공간이 작은 경우
    • 시간 동기화 처리가 필요하지 않을 경우

    GlobalKTable은 GlobalStreamThread가 일반 StreamThread와는 별개로 동작한다. GlobalStreamThread는 GlobalConsumer를 가지고 있고, 이 녀석을 통해 매순간 새로운 데이터를 가져온다. 따라서 ProcessorTopology에서 처리되고 있는 데이터와 GlobalKTable은 시간 동기화가 되어있지 않다. GlobalKTable은 항상 최신의 데이터와 Join이 필요한 경우에 선택하면 된다. 

     

    KTable vs GlobalKTable

    두 테이블의 차이는 다음과 같다. 토픽은 파티셔닝 되어서 보관되고 있는데, KTable은 추상화 결과로 동일한 파티션을 담당하는 StreamTask의 StateStore에 저장된다. 반면 GlobalKTable은 각 StreamTask가 파티션 상관 없이 모두 동일한 데이터를 가지고 있게 된다. 

     


    4.4 Join

    카프카 스트림즈에서 Join은 Stream - Stream / Stream - Table / Table - Table / Stream - GlobalKTable이 가능하다. Join은 같은 Key를 가지는 메세지들끼리 새로운 레코드를 생성하는 것으로 이해할 수 있다. Merge는 같은 유형의 데이터가 흐르는 스트림끼리 합치는 것이기 때문에 State가 필요없다. 

    카프카 스트림즈는 Depth-First로 처리가 된다. 그렇기 때문에 현재 흐르는 메세지를 Join하기 위해서는 이전의 상태를 기억하고 그 상태와 Join을 해야한다. 즉, Join은 Stateful한 연산이다. Join 연산을 어떻게 구현하는지, 동작은 어떻게 하는지를 아래에서 살펴보고자 한다. 

     

    4.4.1 조인 연산자들

    카프카 스트림즈는 다음 조인 연산자를 지원한다. 각 조인은 서로 다른 방식으로 동작하기 때문에 아래 내용을 숙지해야한다.

    연산자 설명
    join Inner Join을 의미한다.
    양쪽 입력의 레코드가 동일 키를 공유할 때 Join 트리거.
    leftJoin Stream-Table : 왼쪽에 레코드가 들어왔을 때 Join 트리거. 오른쪽에 Value가 없다면 null로 설정됨.
    Stream-Stream : 왼쪽에 레코드가 들어왔을 때 Join 트리거.  오른쪽에 Value가 없다면 null로 설정됨.
    Table - Table : 왼쪽, 오른쪽에 레코드가 들어왔을 때 Join 트리거. 오른쪽이 Join 발현시켰는데, 왼쪽에 데이터가 없다면 전달되지 않음. 왼쪽이 Join 발현시켰는데 오른쪽에 데이터가 없다면 null로 설정됨. 
    outerJoin 양쪽에서 레코드를 받으면 Join 트리거

     

    4.4.2 조인 종류

    카프카 스트림즈는 다양한 Join을 제공한다. 각 Join의 특성을 아래에 정리했다. Stream - Table Join은 OuterJoin은 제공하지 않는다. 뇌피셜로는 Stream은 StateStore에 보관되지 않기 때문으로 생각된다. Table에 데이터가 도착해서 Join을 하려고 하면 이전의 Stream 데이터들이 남아야하는데 Stream 데이터들이 남아있지 않기 때문에 Stream - Table의 outerJoin은 지원되지 않는 것 같다. 

    종류 윈도우 여부 연산자들 CoPartitioning 필요
    Stream - Stream O join / leftJoin / outerJoin O
    Table - Table X join / leftJoin / outerJoin O
    Stream - Table X join / leftJoin O
    Stream - GlobalKTable X join / leftJoin X

     

     

    4.4.3 Co-Partitioning

    카프카에서 메세지는 프로듀서에 의해서 파티셔닝되서 제공된다. Key가 존재한다면 기본적인 파티셔닝 전략은 Key 값에 따른 Hashing이다. 따라서 같은 Key를 가지면 같은 파티션에 데이터가 저장된다. 카프카 스트림즈는 메세지와 파티션의 이런 관계를 이용해서 처리한다. "같은 파티션에 같은 데이터가 존재해야 같이 처리할 수 있다"를 보장해야한다. 이를 위해서 Co-Partitioning이 필요하다. Co-Partitioning은 다음과 같다.

    1. Join 해야할 토픽들은 동일한 갯수의 파티션을 가져야 한다.
    2. Join 해야할 토픽들은 동일한 필드를 Key로 사용하고, 동일한 파티셔닝 전략으로 파티셔닝 되어야 한다. 

    위 두 가지 조건을 만족할 때 Co-Partitiong 되었다고 한다. Co-Partitioning 되지 못한 녀석들이라면 데이터 정합성은 보장할 수 없다.

    위의 그림이 코파티셔닝의 의미를 표현한다. Key가 없는 메세지들은 Round Robin 방식으로 배정되기 때문에 같은 Key지만 서로 다른 파티션에 존재한다. 이런 녀석들을 Join해서 처리하게 된다면 잘못 처리될 가능성이 높다. 따라서 selectKey() 메세드 등을 이용해서 Co-Partitioning을 해서 데이터를 처리할 준비를 한다.

     

    4.4.4 ValueJoiner

    ValueJoiner는 카프카 스트림즈에서 두 개의 레코드가 Join할 때 어떻게 결합되어야 하는지를 지정하는 클래스다. ValueJoiner에게는 현재의 메세지와 그에 대응되는 상대방 메세지를 StateStore에서 찾아와서 서로 결합하는 형태가 된다.  ValueJoiner는 크게 두 가지 방법으로 구현한다고 한다.

    • 두 개의 레코드 객체를 필드로 가지는 Wrapper 클래스 생성
    • 두 개의 레코드를 직접 조인한 객체를 생성

    위의 형태로 주로 사용한다고 한다. Wrapper 클래스는 아래와 같은 형태를 가지는 녀석이다. ValueJoiner에게 SecoreEvent / Player 객체가 메세지로 전달되면 ValueJoiner는 ScoreWithPlayer 객체를 생성해서 반환하는 형태로 구현할 수 있다.

     

    @Data
    @AllArgsConstructor
    public class ScoreWithPlayer {
    
        private ScoreEvent scoreEvent;
        private Player player;
    
    }

    두 개의 레코드를 직접 Join한 객체는 아래와 같은 형태로 만들 수 있다. ScoreWithPlayer, Producer 메세지를 각각 받아서 이 메세지의 필요한 값들만 새롭게 생성한 객체의 필드에 저장하고 반환하는 형태다.

    @Data
    public class Enriched implements Comparable<Enriched>{
    
        private Long playerId;
        private Long productId;
        private String playerName;
        private String productName;
        private Double score;
    
        public Enriched(ScoreWithPlayer scoreWithPlayer, Product product) {
            this.playerId = scoreWithPlayer.getPlayer().getId();
            this.productId = product.getId();
            this.playerName = scoreWithPlayer.getPlayer().getName();
            this.productName = product.getName();
            this.score = scoreWithPlayer.getScoreEvent().getScore();
        }
    
        @Override
        public int compareTo(Enriched o) {
            return Double.compare(o.getScore(), score);
        }
    }

    다시 정리하면 ValueJoiner를 이용해서 Join한 클래스를 생성할 수 있는 기능을 제공해주기만 하면 된다. Join한 결과는 각 메세지를 가지고 있는 Wrapper 클래스 일 수도 있고, 필요한 필드만 취사 선택해서 저장하고 있는 Enrich된 클래스 일 수도 있다. 


    4.5 KeyValueMapper

    Stream - GlobalKTable을 Join할 때 사용되는 클래스다. Stream에 어떤 메세지가 들어왔을 때 GlobalKTable과 Join 할 Key를 선택하는 기능을 제공해준다. KeyValueMapper에서 반환된 Key는 GlobalKTable에서 해당 Key를 가지는 메세지를 불러와서 Join을 시도한다. 

    KeyValueMapper<String, ScoreWithPlayer, String> keyValueMapper = new KeyValueMapper<>() {
        @Override
        public String apply(String key, ScoreWithPlayer value) {
            return String.valueOf(value.getScoreEvent().getProductId())
        }
    };

     

     


    4.6. Record Groupping : 집계 전단계

    카프카 스트림에서 집계 연산을 수행하기 전에는 반드시 그룹핑을 해야한다. 그룹핑을 수행하면 중간에서 잠깐 사용되는 GroupedKStream, GroupedKTable이 생성되는데 이 종류의 객체에게서만 집계 연산을 수행할 수 있다. 따라서 집계 연산이 필요한 경우 반드시 KStream, KTable을 각각 그룹핑 해두어야 한다. 

    KStream의 그룹핑 

    KStream은 groupBy(), groupByKey()를 이용해서 그룹핑가능하다. 각각의 차이는 아래와 같다.

    • groupBy() : 리파티셔닝 토픽 생성. 추가 네트워크 비용 필요해짐.
    • groupByKey() : 리파티셔닝 토픽 생성되지 않음. 따라서 추가 네트워크 비용 없음. 

    KTable의 그룹핑

    KTable은 groupBy()로만 그룹핑 가능하다.

    그룹핑 예시 코드

    그룹핑 예시 코드는 아래에서 참고할 수 있다. Grouping에서 Repartition을 하게 되면 카프카 브로커와 네트워크를 다시 한번 타야하기 때문에 Groupd.with()를 이용해서 Serde 객체를 필요로 하는 것으로 생각된다.

    KGroupedStream<String, Enriched> enrichedKGroupedStream = enrichedStream.groupByKey(
            Grouped.with(Serdes.String(), MyCustomSerdes.EnrichedSerde()));

    4.7 집계

    그룹핑 된 KStream, KTable은 GroupedKStream, GroupedKTable을 반환한다. 이 객체들에게서는 집계 연산을 할 수 있다. 집계 연산은 단순히 카운트를 세는 것도 될 수 있고, 복잡한 통계 계산이 될 수도 있다. 쉽게 생각해서 여러 값들을 결합해서 하나의 메세지를 생성하는 것이다. 카프카 스트림즈는 집계 연산을 위해 아래 메서드를 지원한다.

    • aggregate : 출력 클래스가 다른 클래스가 될 수 있음.
    • reduce : 입력 클래스 = 출력 클래스
    • count

    aggregate와 reduce는 동일한 기능을 제공하고 차이점은 처리 결과를 다른 클래스로 바꿀 수 있는지의 유무다. 또한, 집계연산은 StateStore 생성하기 때문에 Grouped 파라메터에서 반드시 Key/Value Serdes를 가지고 있어야 한다.

     

    4.7.1 aggregate, reduce의 파라메터

    aggregate(), reduce() 메서드는 각각 Initiailizer, Aggregator, Subtractor 클래스를 파라메터로 전달받는다. 각각은 집계 연산에서 초기화, 메세지를 받았을 때 집계 결합, 어떤 키가 삭제되었을 때의 행위를 각각 정의한다.

     

    4.7.2. Initiailizer 구현 : Stream / Table 집계 사용

    카프카 스트림즈 Topology에 새로운 Key가 들어왔을 때 StateStore에는 어떠한 값도 들어가있지 않다. 이 때, StateStore에 새로운 Key를 초기화 해 줄 녀석이 필요한데 Initializer가 그 역할을 맡는다. 다음은 특정 Key가 몇번 들어왔는지 Count 연산을 할 때 사용할 수 있는 Initializer 구현 코드다.

    Initializer<Long> initializer = () -> 0L;

    Key가 처음으로 들어왔기 때문에 이 Key에 대한 초기값은 0이 된다. Initializer는 이런 의미를 가지고 생성해주면 된다. 

     

    4.7.3 Aggregator구현 : Stream / Table 집계 사용

    카프카 스트림즈에 메세지가 도착했을 때, 도착한 메세지와 기존의 집계된 State를 합쳐주는 연산이 필요하다. 이 연산을 맡아줄 클래스가 Adder 클래스다. 예를 들어서 특정 Key가 몇번 들어왔는지 Count 연산을 할 때 사용되는 Adder는 아래와 같이 구현할 수 있다. 

    Aggregator<String, Enriched, Long> stringEnrichedLongAggregator = (key, value, aggregate) -> aggregate++;

    새로운 메세지가 도착하면 메세지의 Key 값으로 StateStore에서 Aggregate(집계값을 저장하는 녀석)을 불러온다. 그리고 이 녀석들을 Aggregator에게 넘겨주면서 기존 집계에 현재 메세지가 가지는 집계를 더하는 연산을 한다. 위의 코드에서는 다음과 같은 의미가 될 수 있다.

    1. StateStore에서 Key로 aggregate를 검색한다. 이 때, aggregate는 각 Key 값이 몇번 들어왔는지를 기록한 count다.
    2. Aggregator가 호출되면, value와 aggregate를 다시 한번 집계하는 연산을 하고 집계 결과를 반환한다. 

     

    4.7.4 Subtractor : Table 집계에서만 사용

    Subtractor는 Table 집계에서만 사용한다. Subtractor는 어떤 키가 삭제되면 호출된다. Subtractor는 호출될 때 Key, Value, 해당 Key에 대한 Aggregate(상태 저장)을 받는다. 이 때 Key가 삭제 되었을 때 Aggregate에 어떤 작업을 해주지를 작성하면 된다. Table을 이용해서 Count 집계를 구현했다면 아래와 같은 형태로 구성할 수 있다. 

    (key, value, aggregate) -> aggregate - 1L

    4.8 Stateful Kafka Streams의 튜토리얼

    이번 포스팅에서 튜토리얼로 실행했던 카프카 스트림즈의 ProcessorTopology다.

    1. score-events 토픽은 Kstream으로 추상화한다. Key가 없기 때문에 SelectKey로 리파티셔닝한다.
    2. Players는 최신 업데이트를 봐야하기 때문에 KTable, Products는 Key Arear가 크지 않기 때문에 GlobalKTable로 추상화한다.
    3. Score-events / Players를 Join하고, Join한 결과를 Products와 다시 한번 Join해 Enriched 객체를 생성한다.
    4. Enriched Stream을 Key로 Grouping 한 다음 Aggregate 연산을 한다. 이 때, 집계 연산은 상위 3개의 점수를 집계하는 연산이다.

    이 때, 집계된 값은 StateStore에 저장된다. 만약 StateStore를 Materialized를 이용해서 물리화 할 경우, 외부에서 API를 이용해서 집계 결과를 확인할 수 있게 된다. 

    코드는 아래와 같이 작성했다.  Materialized StateStore를 추가하지 않아서 아직은 Queryable 하지는 않다. 

    public class Chapter4Study {
    
        public static void main(String[] args) {
    
            StreamsBuilder builder = new StreamsBuilder();
    
            // Source
            KStream<String, ScoreEvent> scoreEventKStream = builder.stream("score-events",
                    Consumed.with(Serdes.String(), MyCustomSerdes.scoreEventSerde()));
            
            // Source
            KTable<String, Player> playerKTable = builder.table("players",
                    Consumed.with(Serdes.String(), MyCustomSerdes.playerSerde()));
    
            // Source
            GlobalKTable<String, Product> productGlobalKTable = builder.globalTable("products",
                    Consumed.with(Serdes.String(), MyCustomSerdes.productSerde()));
            
            // Key가 없음. 리파티셔닝 필요함.
            KStream<String, ScoreEvent> repartitionScoreEventStream = scoreEventKStream.selectKey((key, value) -> String.valueOf(value.getPlayerId()));
    
            // Joiner는 Join한 StateStore를 저장할 형태를 의미한다.
            KStream<String, ScoreWithPlayer> scoreWithPlayerKStream = repartitionScoreEventStream.join(playerKTable,
                    (readOnlyKey, scoreEvent, player) -> new ScoreWithPlayer(scoreEvent, player),
                    Joined.with(Serdes.String(), MyCustomSerdes.scoreEventSerde(), MyCustomSerdes.playerSerde()));
    
            // Joiner는 Join한 StateStore를 저장할 형태를 의미한다.
            KStream<String, Enriched> enrichedStream = scoreWithPlayerKStream.join(productGlobalKTable,
                    (key, value) -> String.valueOf(value.getScoreEvent().getProductId()),
                    (readOnlyKey, scoreWithPlayer, product) -> new Enriched(scoreWithPlayer, product)
            );
    
            // 상위 3개를 뽑을꺼임. 그러니까 빈 Treeset을 주자.
            KGroupedStream<String, Enriched> groupedStream = enrichedStream.groupByKey(
                    Grouped.with(Serdes.String(), MyCustomSerdes.EnrichedSerde()));
    
            // 집계 함수 생성
            Initializer<HighScores> highScoresInitializer = HighScores::new;
            Aggregator<String, Enriched, HighScores> aggregator = (key, value, aggregate) -> aggregate.add(value);
    
            // 집계 시작
            KTable<String, HighScores> highScoreKStream =
                    groupedStream.aggregate(highScoresInitializer, aggregator,
                            Materialized.with(Serdes.String(), MyCustomSerdes.highScoresSerde()));
            
            // 결과 출력
            highScoreKStream.toStream().print(Printed.toSysOut());
    
    
            Properties customKafkaStreamsProperty = CustomKafkaStreamsProperty.createCustomKafkaStreamsProperty();
            KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), customKafkaStreamsProperty);
            kafkaStreams.start();
            Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
        }
    }
    

     


    4.9 대화형 쿼리

    카프카 스트림즈는 StateStore의 값을 외부에서도 손쉽게 접근해서 확인할 수 있다는 점이다. 이런 값들은 실시간으로 대시보드를 만든다거나 할 때 유용하게 사용될 수 있다. StateStore의 대화형 쿼리를 위해서는 몇 가지 알아둬야할 것이 있다. 

    4.9.1 저장소 물리화 (Materialized)

    StateStore를 물리화(Materialized)하지 않는 경우 Processor Topology가 내부적으로만 참조해서 사용한다. 단순히 StateStore에 이름을 붙이는 것만으로는 Processor Topology에서만 사용하고, 반드시 Materialized 클래스를 이용해야 외부에서 쿼리가 가능하다.  

    Materialized<String, HighScores, KeyValueStore<Bytes, byte[]>> stateStore = 
            Materialized
            .as(keyValueBytesStoreSupplier)
            .with(Serdes.String(), MyCustomSerdes.highScoresSerde());

     

    4.9.2 Materialized StateStore 접근

    Materialized StateStore는 외부에서 읽기 전용으로 접근할 수 있다. 기본적으로 StateStore에 접근하기 위해서는 두 가지 값을 알아야 한다.

    • StateStore의 이름
    • StateStore의 타입 
      • QueryableStoreTypes.keyValueStore()
      • QueryableStoreTypes.timestampedKeyValueStore()
      • QueryableStoreTypes.windowStore()
      • QueryableStoreTypes.timestampedWindowStore()
      • QueryableStoreTypes.sessionStore()

    다음 두 값을 알게 되면 카프카 스트림즈의 store() 메서드를 이용해서 StateStore 객체를 가져올 수 있게 된다. 예를 들어 아래와 같은 코드를 작성해주면 된다. 

    ReadOnlyKeyValueStore<String, HighScores> store = kafkaStreams.store(
            StoreQueryParameters.fromNameAndType(
                    "my-store-name",
                    QueryableStoreTypes.keyValueStore()));

     

    4.9.3 비윈도우 KeyValue Store 쿼리하기

    각 StateStore는 여러 종류의 쿼리를 지원한다. 따라서 필요한 쿼리를 잘 사용해야한다. 

    • KeyValueStore : get(), range(), entry(), all()
    • WindowStore : 시간 범위 룩업

     

    get : 포인트 룩업 

    특정 Key를 StateStore에서 검색한다. Key가 존재하지 않으면 Null값을 반환한다.

    range : 범위 스캔

    범위 스캔은 범위 내의 모든 키를 포함하는 Iterator를 반환한다. 사용 완료한 Iterator는 꼭 close() 해줘야한다. 그렇지 않으면 메모리 누스가 발생한다. 이 부분을 잘 해결할 수 있는 것은 try-with-resources를 이용하는 것이다. 

    all : 모든 엔트리

    StateStore에 있는 모든 Key에 대한 Value를 가져온다. Iterator를 반환하고, 마찬가지로 사용이 끝난 Iterator는 닫아줘야한다. 

    approximateNumEntries() : 엔트리 개수

    RocksDB에서 정확한 엔트리 갯수를 세는 연산은 매우 비싸다. 따라서 여기서 반환된 갯수는 대략적인 값이다. 

     

    4.9.4 Local Query vs Remote Query

    카프카 스트림즈 인스턴스는 로컬 쿼리와 원격 쿼리를 제공한다. Queryable StateStore에서 데이터를 잘 보여주기 위해서는 로컬 쿼리와 원격 쿼리를 모두 사용할 수 있어야 한다. 각각의 Query가 어떻게 동작하는지를 살펴본다. 

    Config 설정하기

    가장 먼저 원격 쿼리를 사용할 수 있도록 다음 설정값을 설정해야한다. 여기서 설정한 Host / Port 정보는 API로 접근할 수 있게 도와준다. 만약 이 값을 설정하지 않았다면 각 카프카 스트림즈는 로컬 쿼리만 가능하다. 원격 쿼리를 하려면 반드시 이 설정값을 설정해야한다. 

    그렇지만 이 Host + Port 정보가 값을 요청했을 때 주는 것은 아니다. 단지 Kafka Stremas 클러스터끼리 Metadata를 주고 받는 용도로 사용된다. 

    props.setProperty(StreamsConfig.APPLICATION_SERVER_CONFIG, "myapp:8080);
    props.setProperty(StreamsConfig.APPLICATION_SERVER_CONFIG, "myapp:9200);

    로컬 쿼리

    카프카 스트림즈는 기본적으로 자신의 Materialized StateStore에 로컬 쿼리를 할 수 있다. 로컬 상태는 전체 카프카 스트림즈 클러스터의 일부분이기 때문에 전체를 표현하지는 못한다. 따라서 카프카 스트림즈 클러스터 전체를 정확하게 표현하기 위해서는 원격 쿼리를 이용해야한다. 

    원격 쿼리

    카프카 스트림즈는 로컬에 모든 데이터가 존재하는 것은 아니기 때문에 원격 쿼리를 해야 클러스터의 전체 상태를 알려줄 수 있다. 따라서 원격 쿼리를 해야한다. 원격 쿼리는 다음 단계로 진행 되어야 한다.

    1. kafkaStreams 인스턴스를 이용해 하고자 하는 것에 대한 메타 정보를 받아온다.
    2. 받아온 메타 정보의 Host / Port 정보로 API 요청을 보내서 데이터를 받아온다. 이 때 API 요청은 스프링 등으로 직접 구축한다. 

    카프카 스트림즈는 Meta정보 쿼리를 위해서 다음 메서드를 제공한다. 이 때 queryMetadatForKey 메서드는 실제로 데이터가 존재하는 것을 확인하는게 아니라 어디에 Key가 존재하는지 결정하기 위해 Stream Partitioner를 이용한다고 한다.

    // 모든 Kafka Stream 클라이언트 조회
    kafkaStreams.metadataForAllStreamsClients();
    
    // 현재 Kafka Streams의 StreamThread 조회
    kafkaStreams.metadataForLocalThreads();
    
    // 모든 KafkaStream 클라이언트에서 StateStore 이름을 가지는 메타 정보 불러오기
    kafkaStreams.streamsMetadataForStore();
    
    // 모든 KafkaStream 클라이언트에서 특정 Key를 가진 Meta 정보 불러오기 
    kafkaStreams.queryMetadataForKey()

    카프카 스트림즈의 로컬 쿼리와 원격 쿼리의 차이점은 아래 그림에서 확인할 수 있다.

    1. 클라이언트의 요청을 받은 카프카 스트림즈가 store.get() 메서드를 하면 자신의 StateStore에서만 값을 찾아온다.
    2. 다른 카프카 스트림즈 클라이언트의 StateStore 조회를 위해서 API 요청을 보내야한다. Local Query로 해결할 수 있으면 본인에게서 종료한다. Local Query로 해결할 수 없으면, API 요청을 보내고 데이터를 받는다. 그리고 받은 데이터를 취합해서 돌려준다. 

     

     

    알아두면 좋은 것들

    selectKey

    selectKey() 메서드를 사용하면 이 연산자를 통과하는 데이터가 리파티셔닝 될 것을 표시한다. 리파티셔닝 표시가 되었을 때, 리파티셔닝 된 레코드를 있는 하위 Processor가 추가되면 카프카 스트림즈는 내부적으로 다음 작업을 한다.

    1. 새로 생성한 데이터를 내부적으로 생성된 Repartition Topic에 전송한다.
    2. Repartition Topic에서 데이터를 카프카로 다시 Consume 한다. 

    이 과정은 새로운 SubTopology가 생기는 것을 의미한다. 

     

    groupBy, groupByKey

    두 메서드는 모두 그룹핑 할 때 사용된다. 그렇지만 두 메서드에는 차이가 존재한다.

    • groupBy() : 새로운 Key로 그룹핑한다. 따라서 카프카 스트림즈는 리파티셩 플래그를 설정한다. groupBy()에 하위 스트림을 추가하면 자동으로 리파티션 토픽을 추가하고 다시 데이터를 읽어온다.
    • groupByKey() : 기존에 존재하는 Key로 그룹핑한다. 따라서 리파티셔닝이 요구되지 않고 내부 토픽이 생성되지 않는다. 

     

     

     

     

     

    댓글

    Designed by JB FACTORY