Kafka Streams의 레코드는 각 노드에서 바뀐다.
Kafka Streams는 Kafka Consumer를 이용해서 Kafka Broker에서 메세지를 받아온다. 그리고 받아온 메세지 원본을 각 StreamTask의 Partition Group에다가 각각 저장해둔다. 이 때 메세지는 각 노드의 프로세스를 지날 때 마다 변경되어서 전달된다.
final Record<Object, Object> toProcess = new Record<>(
record.key(),
record.value(),
processorContext.timestamp(),
processorContext.headers()
);
maybeMeasureLatency(() -> currNode.process(toProcess), time, processLatencySensor);
StreamTask의 Process() 메서드에는 다음과 같은 흐름이 있다.
- 만들어진 record를 다시 한번 record를 만드는데, 이 때 toProcess 객체로 생성한다.
- 생성된 toProcess 객체를 currNode.process()를 호출할 때 넘겨준다.
즉, 다음 노드에서 처리해야 할 record의 값이 현재 노드에서 새로 생성되서(toProcess) 전달되는 것을 볼 수 있다.
예를 들어 이 때는, Record의 value 타입이 ShareVolume인 것을 알 수 있다.
store.put(record.key(), ValueAndTimestamp.make(newAgg, newTimestamp));
tupleForwarder.maybeForward(
record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : null))
.withTimestamp(newTimestamp));
}
이것은 KStreamReduce 객체의 process() 메서드다. 여기서는 새로운 형태의 레코드를 다시 한번 생성해서 전달하는 것을 볼 수 있다.
- tupleForwarder.maybeForward()를 이용해서 다음 노드로 넘어간다.
- 이 때 record.withValue().withTimeStamp() 등을 이용해서 새로운 레코드를 생성해서 전달한다.
여기서 record의 value는 Change 타입인 것을 확인할 수 있다.
결론
원본 레코드는 StreamTask에 PartitionGroup에 저장되어있지만, 재귀적으로 다음 노드에서 Process를 할 때에는 일반적으로 현재 노드에서 처리된 내용을 바탕으로 새로운 메세지 객체(예를 들면 toProcess)를 새로 생성해서 다음 Processor에게 넘기는 형태가 된다.
'Kafka eco-system > KafkaStreams' 카테고리의 다른 글
Kafka Streams : Tumbling Window / Hopping Window 동작 (0) | 2022.10.31 |
---|---|
Kafka Streams : 리파티셔닝의 노드 구성 (0) | 2022.10.30 |
Kafka Streams : KTable과 Cache 내부동작 (0) | 2022.10.25 |
KafkaStreams : StateStore (0) | 2022.10.24 |
Kafka Streams : StreamTask가 생성되는 과정 (0) | 2022.10.22 |