KStream sourceStream = builder.stream(STOCK_TRANSACTIONS_TOPIC, Consumed.with(stringSerde, stockTransactionSerde).withOffsetResetPolicy(EARLIEST)).peek((key, value) -> System.out.println("value = " + value)); // 리파티셔닝을 할 때는 내부적으로 Internal Topic을 하나 더 만든다. // 그것은 Broker에게 Log를 전송한다는 것이다. 따라서 Key / Value Serde가 필요하다. KGroupedStream stringShareVolumeKGroupedStream = sourceStream .mapValues(value ->..
Kafka Streams의 레코드는 각 노드에서 바뀐다. Kafka Streams는 Kafka Consumer를 이용해서 Kafka Broker에서 메세지를 받아온다. 그리고 받아온 메세지 원본을 각 StreamTask의 Partition Group에다가 각각 저장해둔다. 이 때 메세지는 각 노드의 프로세스를 지날 때 마다 변경되어서 전달된다. final Record toProcess = new Record( record.key(), record.value(), processorContext.timestamp(), processorContext.headers() ); maybeMeasureLatency(() -> currNode.process(toProcess), time, processLatencySe..
이 글은 코드를 따라보면서 작성한 글입니다. 따라서 실제와는 다를 수 있으니 참고만 해주세요. 들어가기 전 Ktable은 기본적으로 KStream과 다르게 UpdateStream을 할 수 있다. 무슨 말이냐면 모든 이벤트를 downstream으로 보내는 것이 아니라 Cache Buffer를 이용해서 특정 조건을 만족하는 경우에만 downStream으로 보내서 Stream Process의 양을 줄이는 역할을 할 수 있다. KTable Cache cache.max.bytes.buffering commit.interval.ms 개발자 입장에서는 KTable의 Cache는 크게 두 가지 파라메터를 이용해서 통제할 수 있다. 여기서 cache.max.bytes.buffering은 StreamTask가 작업을 하는..
이 글은 코드를 따라가보면서 뇌피셜로 작성한 글이기 때문에 틀릴 수도 있습니다. 틀릴 경우 댓글을 남겨주시면 수정하겠습니다. StateStore StateStore는 StreamTask별로 고유하게 주어진다. 그리고 StreamTask는 고유한 파티션을 가진다. 따라서 StateStore는 고유한 파티션에 대응되는 상태 저장소라고 볼 수 있다. 또한 StateStore는 이름으로 관리되는데, 동일한 이름을 가진 StateStore를 사용할 수 없기 때문에 각 StreamTask가 가지고 있는 노드끼리는 StateStore를 공유할 수 없는 것으로 보인다. StreamTask의 StateStore 관리 StreamTask는 내부적으로 stateDirectory라는 필드를 가지고 있다. 이 stateDire..
들어가기 전 Kafka Streams에서는 특정 TopicPartition은 하나의 StreamTask에게 할당된다. StreamTask는 본인에게 할당된 TopicPartition만 가지고 작업을 하기 때문에 데이터는 격리된다. 따라서 StreamTask의 데이터 격리성 때문에 동시성 문제를 걱정할 필요는 없다. 그렇다면 StreamTask는 어떻게 만들어질까? StreamTask 생성방식 StreamTask가 생성되는 과정을 살펴보면 다음과 같다. Consumer가 현재 구독하고 있는 TopicPartition 정보를 Broker에게서 받아온다. 받아온 TopicPartition 정보를 ConsumerCoordinator에게 전달한다. ConsumerCoordinator는 전달받은 TopicParti..