Kafka Streams : Punctuator
- Kafka eco-system/KafkaStreams
- 2022. 11. 13.
Punctuator
Kafka Streams에서는 Punctuator를 이용해서 레코드의 downstream 전달 시점을 제어할 수 있다. punctuator는 Commit() / Flush()와는 별개로 동작하고, 이 녀석들을 사용하지 않고도 레코드의 downstream 전달 시점을 제어할 수 있다. punctuator는 각 StreamTask가 가지고 있는 ProcessorContext의 schedule() 메서드를 호출해서 예약할 수 있다. 일단 예약되면, Stream Thread의 process() 처리 이후에 punctuator를 호출해서 레코드에 대한 일정 부분의 작업을 할 수 있다.
Punctuator의 설정
- PunctuationType.WALL_CLOCK_TIME : WALL_CLOCK_TIME을 기준으로 punctuator 실행 시점이 설정됨.
- PunctuationType.STREAM_TIME : 레코드가 전달되는 시간을 기준으로 punctuator 실행 시점이 설정됨.
Punctuator가 실행되는 시점을 전달하는 시점을 선택할 수 있다. 위의 두 가지 설정값이 지원된다. 그렇다면 각 모드는 정확히 어떻게 다른 것일까?
PunctuationType.STREAM_TIME
먼저 Stream Thread 아래에 여러 StreamTask가 존재하고, 각 StreamTask는 각각의 PartitionGroup을 가지고 있다. 그리고 이 PartitionGroup은 각각의 데이터를 TopicPartition 단위로 가지고 있다. 이 데이터는 Consumer가 넣어주는 것이다. 이것을 먼저 알아두고 흐름을 살펴보자. 먼저 정리하면 레코드가 Kafka Streams에 도착할 때 마다 PartitionGroup이 가지고 있는 Stream Time이 업데이트 되고, 이 시간을 기준으로 Puncator를 하겠다는 것이다. 바꿔 이야기하면 레코드가 도착하지 않으면 펑츄에이터 간격이 지났더라도 펑츄에이터가 실행되지 않는다. 또한 이것은 레코드에 기록된 시간을 바탕으로 펑츄에이터를 실행하겠다는 것을 의미한다.
- partitionGroup이 가지고 있는 streamTime()을 읽어온다. streamTime은 해당 partitionGroup이 가진 모든 topicPartition에 포함된 레코드들 중 가장 빠른 timestamp 값이다.
- PunctuationQue에서 pq에 들어있는 PunctuationSchedule 객체를 확인한다. 이 객체는 punctuator 실행 시간 + punctuator 간격을 더한 값을 timestamp로 가진다. 이 값은 다음 punctuator가 실행되어야 하는 시간을 의미한다. 만약 partitionGroup의 streamTime이 실행되어야 하는 시간보다 크다면 punctuator를 실행한다.
- punctuator를 실행한 후, PunctuationSchedule 객체의 timestamp(실행 되어야 할 시간)을 업데이트한다. 현재 시간 + punctuator 간격을 더한 값이 업데이트 된다. 업데이트 된 PunctuationSchedule 객체는 다시 PunctuationQue의 우선순위 큐(pq)에 들어간다.
위의 흐름을 반복하면서 Punctuator가 반복된다. 아래 상황을 예제로 이해해볼 수 있다.
- 실행 간격을 5초로 펑츄에이터 실행 예약함.
- 1초에 메세지 도착 → 펑츄에이터 실행함. 다음 펑츄에이터 예약은 6초
- 4초에 메세지 도착 → 6초보다 큰 값이 아니기 때문에 펑츄에이터 실행되지 않음.
- 8초에 메세지 도착 → 6초보다 큰 값이기 때문에 펑츄에이터 실행됨. 다음 펑츄에이터 시간은 6 + 5로 11초가 됨.
- 10초에 메세지 도착 → 11초보다 작은 값이기 때문에 펑츄에이터 실행되지 않음.
PunctuationType.WALL_CLOCK_TIME
WALL_CLOCK_TIME으로 설정하면, 펑츄에이터가 시스템 시간을 기준으로 실행된다는 것을 의미한다. Stream TIME은 레코드가 도착하지 않으면 펑츄에이터가 실행되지 않았지만, WALL_CLOCK TIME은 도착하지 않아도 실행된다. 실행은 다음 순서를 반복한다.
- 현재 systemTime을 punctuationQue에 넘겨준다.
- PunctuationQue에서 pq에 들어있는 PunctuationSchedule 객체를 확인한다. 이 객체는 punctuator 실행 시간 + punctuator 간격을 더한 값을 timestamp로 가진다. 이 값은 다음 punctuator가 실행되어야 하는 시간을 의미한다. 전달받은 시스템 시간이 punctuation timestamp보다 클 경우 펑츄에이터를 실행한다.
- punctuator를 실행한 후, PunctuationSchedule 객체의 timestamp(실행 되어야 할 시간)을 업데이트한다. 현재 시간 + punctuator 간격을 더한 값이 업데이트 된다. 업데이트 된 PunctuationSchedule 객체는 다시 PunctuationQue의 우선순위 큐(pq)에 들어간다.
Punctuator NextTime이 현재 시간보다도 작은 경우는?
펑츄에이터는 실행하고 나면 펑츄에이터 예약 시간에 인터벌을 더해서 다음 펑츄에이어 예약 시간을 구한다. 그런데 너무 펑츄에이터가 늦게 실행되는 경우, 다음에 실행될 펑츄에이터의 예약 시간을 구했는데 현재 시간보다도 작은 경우가 있다. 이런 경우에는 어떻게 처리될까?
public PunctuationSchedule next(final long currTimestamp) {
long nextPunctuationTime = timestamp + interval;
if (currTimestamp >= nextPunctuationTime) {
// we missed one ore more punctuations
// avoid scheduling a new punctuations immediately, this can happen:
// - when using STREAM_TIME punctuation and there was a gap i.e., no data was
// received for at least 2*interval
// - when using WALL_CLOCK_TIME and there was a gap i.e., punctuation was delayed for at least 2*interval (GC pause, overload, ...)
final long intervalsMissed = (currTimestamp - timestamp) / interval;
nextPunctuationTime = timestamp + (intervalsMissed + 1) * interval;
}
이 경우, 펑츄에이터가 몇번의 interval 간격을 놓친 것으로 처리를 한다.
예를 들어 Stream Time인 경우에는 여러번의 interval 동안 데이터가 도착 하지 않은 것으로 이해할 수 있고, wall ClockTime의 경우 GC, 혹은 데이터 처리에 오랜 시간이 걸려서 데이터가 늦게 왔다고 이해할 수 있다. 이런 경우에 펑츄에이터는 몇번의 Interval 간격을 놓쳤는지 계산하고, Interval 횟수 + 1을 더한 값으로 새로운 다음 펑츄에이션 타임을 구한다.
예를 들어 첫번째 펑츄에이션 시간이 5초고, 현재 시간이 21초다. 그리고 다음 펑츄에이션 시간을 계산해보니 10초다. 이 경우에 21-5 = 16초가 되고, 인터벌 5초로 나누면 3이 된다. 즉 3번의 인터벌을 놓쳤고, 여기에 1을 더한 인터벌을 계산해보면 4개다. 결론적으로 다음 펑츄에이션 시간은 5 + 5*4 = 25초가 되게 된다는 의미다.
Punctuator 실행 흐름 살펴보기
Kafka Streams의 Punctuator 실행 흐름은 다음과 같다.
- Processor는 Stream Thread에 의해서 초기화 되는데, 이 때 Puncuator는 processorContext.schedule()에 의해서 예약된다. 예약되면 Puncutate Schedule 객체가 생성되어 Punctuator Schedule Que에 저장된다.
- Kafka Streams는 실행되고 난 다음 Process() 메서드를 호출한 직후에 punctuate() 메서드를 호출해서 펑츄에이터를 실행하려고 한다. 가장 처음에는 초기화 과정에서 예약했던 펑츄에이터 스케쥴 객체를 찾아와서 이 녀석을 실행하게 되고, 이후에는 스케쥴 객체의 실행 시점에 펑츄에이션 interval을 더한 값을 다음 펑츄에이션 실행 시간으로 설정한다. 그리고 이 시간으로 스케쥴 객체를 새로 생성하고 다시 펑츄에이터 스케쥴 큐에 넣어준다.
Punctuator 실행 코드 따라가보기
펑츄에이터는 ProcessorContext.schedule() 메서드를 이용해서 실행을 예약한다. 이 메서드는 주로 Processor를 초기화 할 때 실행된다. 따라서 Processor가 초기화 될 때 Processor.init() 메서드가 호출되고, processor.init() 메서드가 호출될 때 ProcessorContext.schedule()이 호출되면서 예약된다.
// ProcessorContextImpl.java
@Override
public Cancellable schedule(final Duration interval,
final PunctuationType type,
final Punctuator callback) throws IllegalArgumentException {
throwUnsupportedOperationExceptionIfStandby("schedule");
final String msgPrefix = prepareMillisCheckFailMsgPrefix(interval, "interval");
final long intervalMs = validateMillisecondDuration(interval, msgPrefix);
if (intervalMs < 1) {
throw new IllegalArgumentException("The minimum supported scheduling interval is 1 millisecond.");
}
return streamTask.schedule(intervalMs, type, callback);
}
몇번의 메서드를 더 호출하면 StreamTask.schedule() 메서드로 넘어온다.
- 먼저 펑츄에이션 스케쥴 객체를 생성한다.
- STREAM_TIME인 경우 streamTimePunctuationQue에 넣어준다. WALL_CLOCK_TIME인 경우 systemTimePunctuationQue에 넣어준다.
여기서 볼 수 있는 점은 초기화 하는 과정에서 이미 펑츄에이션 스케쥴 하나가 예약되어 펑츄에이션 스케쥴링 큐에 들어가있다는 것이다. 즉, 설정된 Interval이 지나가게 되면 펑츄에이션이 계속 실행되는 것을 의미한다.
private Cancellable schedule(final long startTime, final long interval, final PunctuationType type, final Punctuator punctuator) {
if (processorContext.currentNode() == null) {
throw new IllegalStateException(String.format("%sCurrent node is null", logPrefix));
}
final PunctuationSchedule schedule = new PunctuationSchedule(processorContext.currentNode(), startTime, interval, punctuator);
switch (type) {
case STREAM_TIME:
// STREAM_TIME punctuation is data driven, will first punctuate as soon as stream-time is known and >= time,
// stream-time is known when we have received at least one record from each input topic
return streamTimePunctuationQueue.schedule(schedule);
case WALL_CLOCK_TIME:
// WALL_CLOCK_TIME is driven by the wall clock time, will first punctuate when now >= time
return systemTimePunctuationQueue.schedule(schedule);
default:
throw new IllegalArgumentException("Unrecognized PunctuationType: " + type);
}
}
펑츄에이터는 topology를 생성하는 과정에서 펑츄에이터 스케쥴링 큐에 예약된다. 실제 Kafka Streams는 Stream Thread에서 동작하게 되는데, 이 때 taskManager.process()가 실행된 이후 taskManager.punctuate() 메서드를 호출하면서 실행된다.
// StreamThread
void runOnce() {
...
final long pollLatency = pollPhase();
...
initializeAndRestorePhase();
...
final int processed = taskManager.process(numIterations, time);
...
// 펑츄에이터 실행
final int punctuated = taskManager.punctuate();
...
final int committed = maybeCommit();
...
}
taskManager.punctuate() 메서드를 호출하면 TaskExecutor.punctuate() 메서드를 호출한다. 이 메서드에서는 다음과 같은 작업을 한다.
- notPaused 상태인 StreamTask들을 대상으로 펑츄에이터를 실행해본다.
- 이 때 STREAM_TIME을 설정한 경우 maybePunctuateStreamTime() 메서드가 호출된다. 이 때, WALL_CLOCK_TIME을 설정한 경우 maybePunctuateSystemTime() 메서드가 호출된다.
// TaskExecutor.java
int punctuate() {
int punctuated = 0;
for (final Task task : tasks.notPausedActiveTasks()) {
try {
if (task.maybePunctuateStreamTime()) {
punctuated++;
}
if (task.maybePunctuateSystemTime()) {
punctuated++;
}
} ...
}
puncuateStreamTime() 메서드가 호출된 경우, 각 StreamTask의 파티션 그룹의 레코드들 중에서 가장 작은 timeStamp을 가져와서 streamTime에 저장한다. 그리고 이 시간을 streamTimePuncutationQue.mayPunctuate() 메서드를 호출해서 펑츄에이팅을 실행해본다.
// Streamtask.java
public boolean maybePunctuateStreamTime() {
final long streamTime = partitionGroup.streamTime();
// if the timestamp is not known yet, meaning there is not enough data accumulated
// to reason stream partition time, then skip.
if (streamTime == RecordQueue.UNKNOWN) {
return false;
} else {
final boolean punctuated = streamTimePunctuationQueue.mayPunctuate(streamTime, PunctuationType.STREAM_TIME, this);
if (punctuated) {
commitNeeded = true;
}
return punctuated;
}
}
반대로 STREAM_TASK인 경우 maybePunctutateSystemTime()이 호출된다. 이 경우에는 현재 시간을 systemTime에 저장하고 이 값을 systemTimePunctutationQue에 전달해서 펑츄에이션을 실행해본다.
// Streamtask.java
public boolean maybePunctuateSystemTime() {
final long systemTime = time.milliseconds();
final boolean punctuated = systemTimePunctuationQueue.mayPunctuate(systemTime, PunctuationType.WALL_CLOCK_TIME, this);
if (punctuated) {
commitNeeded = true;
}
return punctuated;
}
PunctuationQue.mayPunctutate()가 호출된다. 이 녀석은 내부적으로 우선순위 큐를 가지고 있고, 이 우선순위 큐에는 PunctuationSchedule 객체가 저장되어있다.
- PunctutationSchedule 객체는 어떤 펑츄에이션을 실행해야하고, 해당 펑츄에이션 스케쥴 객체가 실행되어야 할 시간을 기억하고 있다.
- 우선순위 큐에서 펑츄에이션 실행 시간이 가장 빠른 녀석을 peek()으로 뽑아온다. 이 때, 전달된 timestamp가 펑츄에이션 시간보다 큰 경우, 즉 펑츄에이션이 실행되어야 할 시점이 지난 경우에는 우선순위 큐에서 poll()을 이용해서 제거하고, 펑츄에이션을 실행해준다.
- 펑츄에이션이 실행된 다음 sched.next() 메서드를 이용해서, 현재 펑츄에이션 스케쥴 객체를 기준으로 다음에 실행되어야 할 시간(기본적으로는 실행 시간 + 인터벌 시간 = 다음 실행해야 할 시간)을 구한 후 우선순위 큐에 다시 넣어준다.
// Punctuation Que.java
boolean mayPunctuate(final long timestamp, final PunctuationType type, final ProcessorNodePunctuator processorNodePunctuator) {
synchronized (pq) {
boolean punctuated = false;
PunctuationSchedule top = pq.peek();
while (top != null && top.timestamp <= timestamp) {
final PunctuationSchedule sched = top;
pq.poll();
if (!sched.isCancelled()) {
processorNodePunctuator.punctuate(sched.node(), timestamp, type, sched.punctuator());
// sched can be cancelled from within the punctuator
if (!sched.isCancelled()) {
pq.add(sched.next(timestamp));
}
punctuated = true;
}
top = pq.peek();
}
return punctuated;
}
}
정리하면 다음과 같다.
- 펑츄에이션은 처음에 Processor가 초기화 되는 과정에서 펑츄에이션 스케쥴링 큐에 실행이 예약된다.
- 각각 실행되고 난 다음에는 그 펑츄에이션 스케쥴링 객체를 바탕으로 새로운 펑츄에이션 스케쥴링 객체를 생성해서 다시 펑츄에이션 스케쥴링 큐에 넣어준다.
위와 같은 형태로 펑츄에이션 된다.
'Kafka eco-system > KafkaStreams' 카테고리의 다른 글
Kafka Streams : 모니터링과 성능 (0) | 2022.11.15 |
---|---|
Kafka Streams : 프로세서 API (0) | 2022.11.14 |
Kafka Streams : StandBy Task (0) | 2022.11.13 |
Kafka Streams : State의 복구 (0) | 2022.11.13 |
Kafka Streams : CheckPoint (0) | 2022.11.13 |