들어가기 전 이 글은 Kafka Streams에서 Aggregate의 동작 방식을 코드를 보고 정리한 글입니다. 정리 Aggregate() 메서드를 이용하게 되면 Init, Add, Remove 함수를 각각 선언해줘야한다. 여기서 궁금한 점은 Init, Add, Remove 함수는 각각 언제 호출되느냐이다. 정리하면 다음과 같다. 레코드의 Key로 StateStore에서 값을 찾아왔을 때 없다면 Init Value가 호출된다. 레코드의 Key로 StateStore에서 값을 찾아왔을 때 존재한다면, Aggreagator의 Remove 메서드를 호출한다. 도착한 레코드에 새로운 값이 존재한다면, Aggregator의 Add 메서드를 호출한다. Aggregator 클래스에서 전체적으로 메서드가 호출되는 순서는..
들어가기 전 이번 포스팅에서는 Kafka Streams에서 제공하는 Tumbling Window / Hopping Window가 실제로 어떻게 동작하는지 코드 단위로 확인해보고자 한다. 알아둬야 할 부분 Tumbling Window와 Hopping Window는 TimeWindow를 바탕으로 동작한다. TimeWindow는 고정된 기간이 있는 것을 의미한다. Tumbling Window는 고정된 기간이 끝나면 새로운 Window를 생성해서 집계하는 동작을 하고, Hopping Window는 TimeWindow에 설정된 기간동안 집계는 하되 advanceMs에 설정된 기간동안 새로운 TimeWindow를 만들어서 좀 더 자주 집계하는 동작을 한다. TimeWindow를 구할 때 가장 먼저 Window Cl..
KStream sourceStream = builder.stream(STOCK_TRANSACTIONS_TOPIC, Consumed.with(stringSerde, stockTransactionSerde).withOffsetResetPolicy(EARLIEST)).peek((key, value) -> System.out.println("value = " + value)); // 리파티셔닝을 할 때는 내부적으로 Internal Topic을 하나 더 만든다. // 그것은 Broker에게 Log를 전송한다는 것이다. 따라서 Key / Value Serde가 필요하다. KGroupedStream stringShareVolumeKGroupedStream = sourceStream .mapValues(value ->..
Kafka Streams의 레코드는 각 노드에서 바뀐다. Kafka Streams는 Kafka Consumer를 이용해서 Kafka Broker에서 메세지를 받아온다. 그리고 받아온 메세지 원본을 각 StreamTask의 Partition Group에다가 각각 저장해둔다. 이 때 메세지는 각 노드의 프로세스를 지날 때 마다 변경되어서 전달된다. final Record toProcess = new Record( record.key(), record.value(), processorContext.timestamp(), processorContext.headers() ); maybeMeasureLatency(() -> currNode.process(toProcess), time, processLatencySe..
이 글은 코드를 따라보면서 작성한 글입니다. 따라서 실제와는 다를 수 있으니 참고만 해주세요. 들어가기 전 Ktable은 기본적으로 KStream과 다르게 UpdateStream을 할 수 있다. 무슨 말이냐면 모든 이벤트를 downstream으로 보내는 것이 아니라 Cache Buffer를 이용해서 특정 조건을 만족하는 경우에만 downStream으로 보내서 Stream Process의 양을 줄이는 역할을 할 수 있다. KTable Cache cache.max.bytes.buffering commit.interval.ms 개발자 입장에서는 KTable의 Cache는 크게 두 가지 파라메터를 이용해서 통제할 수 있다. 여기서 cache.max.bytes.buffering은 StreamTask가 작업을 하는..