들어가기 전
이번 포스팅에서는 Kafka Streams에서 Session Window를 이용해서 집계를 했을 때, 실제 집계 동작이 어떻게 이루어지는지를 오픈소스 코드를 뜯어보며 이해하고자 한다.
알아두면 좋은 것
Session Window는 새로 들어온 메세지들에 대해서 비활성 시간을 바탕으로 같은 세션 윈도우에 묶일 수 있는지를 확인한다. 이 때 사용되는 개념은 In-activate Time과 Grace Period Time이다. 아래 그림에서는 실제 코드를 두 가지 경우로 나눠서 확인해보고자 한다.
Record가 도착하면 Window StateStore에서 현재 Record Timestamp를 기준으로 In-activate Time 간격 내에서 조회되는 모든 Session Window를 가져온다. 그리고 그렇게 가져온 Session Window들과 현재의 Record Timestamp를 Merge하는 작업을 한다.
예를 들어 현재 Record Timestamp ± In-activate Time 내에 존재하는 Other Session Window가 검색될 수 있다. 이 녀석은 현재 메세지를 포함할 수 있는 시간 범위이기 때문에 하나의 Session Window로 합쳐질 수 있다. Window Statestore에서 Session Window를 가져와서 Merge 한다는 것은 위의 작업을 해당 조건을 만족하는 것으로 검색되는 모든 Session Window에 대해서 위 작업을 반복한다는 것을 의미한다. 여기까지가 In-acitvate Time의 의미다. 정리하면 현재 레코드의 Timestamp를 기준으로 In-Activate Time 범위에 있는 Session들을 모두 검색하는데 사용된다. 즉. 세션이 합쳐질 수 있는 범위로 볼 수 있다. 그렇다면 Grace Periode Time은 무엇을 의미할까?
먼저 Record TimeStamp와 Observed TimeStamp가 다를 수 있다는 것을 인지하자. 그리고 이것이 다를 수 있다는 것은 처음 레코드가 생성된 후 프로세서에 전달되기까지 시간적인 딜레이가 있을 수 있다는 것을 의미한다. 이것은 Network Delay가 될 수도 있고 Process의 Delay가 될 수 있다. 아무튼 레코드가 생성된 Record Timestamp가 존재하는 것, 그리고 실제로 프로세서에 도착한 시간인 Observed Timestamp로 나눠서 볼 수 있다는 점을 이해하자.
Window Close Time은 Observed TimeStamp에서 (Grace Period Time + In-acctivate Gap)을 뺀 시간으로 정의된다. 위 그림에서 Window CloseTime은 빨간색 화살표로 표시된다. 이 때, 동일한 Key를 가진 또 다른 Session Window가 있다고 가정해보자. 다른 Session Window의 끝부분과 Window CloseTime을 비교해서 포함되는지를 확인한다. 이 경우에는 포함되지 않기 때문에 현재 레코드와 Session Window가 합쳐지지 않는다.
반대로 위와 같이 Observed Timestamp를 바탕으로 계산한 Window Close Time은 Session Window에 포함된다. 따라서 이 세션은 합쳐질 수 있다. 따라서 이 레코드는 해당 WindowSession과 합쳐진다. 정리하면, Grace Period Time은 메세지는 네트워크 / 프로세싱 딜레이로 인해서 생성된 것보다 무조건 늦게 전달될텐데 어떤 세션까지 합쳐질 수 있을지를 결정해준다.
정리하면 기본적으로는 다음과 같은 그림이 된다. 그러면 Grace Period Time이 실제로는 어떻게 해야 의미가 있어질까?
예를 들어 이런 형태가 되면 의미가 없어진다. 네트워크 딜레이로 인해 Observed Timestamp가 저 뒤에 찍히게 되면, 비록 Record Timestamp의 In-activate Time Range 내에 Other Session Window는 존재하지만, 이녀석은 Window Closed Time을 만족하지 못하기 때문에 결국 하나의 세션이 되지 못한다.
코드로 따라가보기
@Override
public void process(final Record<KIn, VIn> record) {
// if the key is null, we do not need proceed aggregating
// the record with the table
if (record.key() == null) {
logSkippedRecordForNullKey();
return;
}
final long timestamp = record.timestamp();
observedStreamTime = Math.max(observedStreamTime, timestamp);
final long windowCloseTime = observedStreamTime - windows.gracePeriodMs() - windows.inactivityGap();
final List<KeyValue<Windowed<KIn>, VAgg>> merged = new ArrayList<>();
final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp);
SessionWindow mergedWindow = newSessionWindow;
VAgg agg = initializer.apply();
try (
final KeyValueIterator<Windowed<KIn>, VAgg> iterator = store.findSessions(
record.key(),
timestamp - windows.inactivityGap(),
timestamp + windows.inactivityGap()
)
) {
while (iterator.hasNext()) {
final KeyValue<Windowed<KIn>, VAgg> next = iterator.next();
merged.add(next);
agg = sessionMerger.apply(record.key(), agg, next.value);
mergedWindow = mergeSessionWindow(mergedWindow, (SessionWindow) next.key.window());
}
}
if (mergedWindow.end() < windowCloseTime) {
logSkippedRecordForExpiredWindow(timestamp, windowCloseTime, mergedWindow);
} else {
if (!mergedWindow.equals(newSessionWindow)) {
for (final KeyValue<Windowed<KIn>, VAgg> session : merged) {
store.remove(session.key);
maybeForwardUpdate(session.key, session.value, null);
}
}
agg = aggregator.apply(record.key(), record.value(), agg);
final Windowed<KIn> sessionKey = new Windowed<>(record.key(), mergedWindow);
store.put(sessionKey, agg);
maybeForwardUpdate(sessionKey, null, agg);
}
maybeForwardFinalResult(record, windowCloseTime);
}
Session Window는 KStreamSessionWindowAggregate 클래스에 의해서 구현된다.
- Key가 없는 메세지라면 Skipping 레코드가 된다. (StateStore에 Key - Value 형식으로 보관하기 때문임)
- Observed Timestamp를 구한다. Observed Timestamp는 레코드의 Timestamp, 그리고 현재 Processor에 도착한 값 중에 큰 값으로 결정된다. 대부분은 현재 Processor에 도착한 값이 될 것이다.
- WindowsCloseTime을 구한다. 이 시간은 Observed Timestamp - (비활성화 시간 + Grace Period)로 구해진다.
- 현재 레코드의 Timestamp를 기준으로 newSessionWindow를 생성한다. 그리고 이 값을 MergeWindow가 참조한다.
- 현재 레코드의 Key와 Timestamp의 ± 비활성화 시간을 입력으로 StateStore에서 해당되는 모든 SessionWindow를 찾아온다.
- 찾아온 SessionWindow를 대상으로 다음 연산을 반복한다.
- merged 리스트에 찾아온 SessionWindow 하나를 추가한다.
- 현재 가지고 있는 aggregator 값(현재 레코드 값은 포함 되지 않음)에 SessionWindow에서 찾아온 Aggregator 값을 함께 집계해준다.
- 현재 MergedWindow와 찾아온 SessionWindow의 값을 합쳐준다.
- 수정된 MergedWindow의 종료 시간이 WindowCloseTime보다 크거나 같을 경우(즉, Merged Window가 실제로 현재 레코드와 Merge 될 수 있는 경우)에는 아래 작업을 수행한다.
- mergedWindow와 초기에 생성한 newSessionWindow가 다르다면, merged 리스트에서 WindowSession을 하나씩 가져와서 store에서 이 WindowSession을 삭제한다. 왜냐하면 하나의 Merged Window로 Merge 되었기 때문에 이전에 StateStore에 있던 값을 필요 없어지기 때문이다.
- 최종적으로 집계값에 현재 메세지의 값을 추가해서 최종 집계값을 생성하고 그것을 agg에 저장한다.
- MergedWindow를 이용해서 새로운 WindowSession을 생성한다.
- WindowSession을 Key, Aggregator를 Value로 해서 Merge된 WindowSession을 stateStore에 저장한다.
'Kafka eco-system > KafkaStreams' 카테고리의 다른 글
Kafka Streams : CheckPoint (0) | 2022.11.13 |
---|---|
KafkaStreams : GlobalKTable (0) | 2022.11.12 |
Kafka Streams : Join 스트림의 내부동작 (2) | 2022.11.11 |
Kafka Streams In Action 5장 : KTable API (0) | 2022.11.01 |
Kafka Streams : Aggregate 동작 방식 (0) | 2022.11.01 |