Kafka Streams : StandBy Task

    StandBy Task

    Kafka Streams의 Task들은 토폴로지를 시작하기 전에 StateStore를 복구한다. Stream Thread의 모든 Task들의 StateStore가 복구된 이후 Topology가 실행된다. 이런 이유 때문에 StateStore의 복구 시간이 중요할 수 있다. StateStore의 복구 시간을 단축하는데 도움이 되는 것이 Standby Task다. 

    StandbyTask는 Stream Task가 죽었을 때 빠르게 복구할 수 있도록 항상 스탠바이 하도록 노력해주는 Task다. 예를 들어 Task1에 대해서 StandbyTask가 존재한다면, Task1이 죽었을 때 StandbyTask가 어느정도 생성해 둔 StateStore를 사용해서 복구 작업을 보다 빠르게 할 수 있다. StandByTask는 Task의 초기화 작업(StateStore의 복구 작업)을 단축시킨다. 

     

    특징

    StandByTask 실행 시점

    StandByTask가 실행되는 시점은 Active Task의 StateStore가 모두 복구된 이후다. Active Task(Stream Task)의 StateStore가 복구 되기 전까지 RestoreConsumer는 Active Task의 StateStore를 복구하는데만 이용한다. Active Task의 StateStore의 복구가 끝났으면, RestoreConsumer는 StandByTask의 StateStore를 복구하는데 사용된다. 

    StandByTask를 이용하면 Topology 동작 시간을 단축 시킬 수 있을까?

    단축은 된다. 한 가지 알아야 할 부분은 Stream Thread에 할당된 Task들의 Topology가 실행되기 위해서는 Stream Thread에 할당된 모든 Task들의 StateStore가 복구되어야 한다. 하나라도 StateStore가 복구되지 않았다면, Stream Thread의 Topology는 실행되지 않는다. 

     

    Standby 관련 속성

    // num.standby.replicas
    StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG

    num.standby.replicas는 Standby Replicas의 수다. Standby Replicas는 StateStore의 복사본이다. Kafka Streams는 nu.standby.replicas에 명시된 수의 복사본을 생성하려고 시도하고, 최신 상태를 유지하려고 한다. Standby Replicas는 Kafka Streams의 장애 조치의 지연을 최소화하는데 사용된다. 

    만약 특정 Kafka Streams 인스턴스가 다운되면, 각각의 Task들은 이 Task의 Standby를 가지고 있는 Kafka Streams 인스턴스에서 재실행된다. 따라서 StateStore를 복원하는데 걸리는 시간을 최소화 할 수 있다. 그렇지만 k8s의 statefulset 인스턴스 등을 이용하는 경우에는 문제가 될 수 있을 것 같다. 

     

    StandBy Thread Restore 복원 흐름

    StandBy Thread의 Restore 복원 흐름은 동일하다.

    1. StreamThread의 Restore Phase에서 Stream / Standby Thread의 StateStore는 복원된다.
    2. Restore Phase에서 Stream Thread가 Running이면 StandBy Thread만 계속 복구작업이 진행되고 아닌 경우 Stream Thread → StandBy Thread 순으로 복구 작업이 진행된다. 

     

     

     

     

    StandBy Thread의 StateStore 복원 관련 코드 따라가보기

    StreadmThread에서는 pollPhase() 메서드 호출 이후에 initializedAndRestorePhase() 메서드를 호출해서 StreamTask / StandbyTask의 StateStore를 복원 작업을 진행한다. 

    // StreamThread.java
    void runOnce() {
    	...
        final long pollLatency = pollPhase();
    	...
        // StreamTask의 StateStore, StandbyTask의 StateStore 복원 시도
        initializeAndRestorePhase();
    
        ...
                final int processed = taskManager.process(numIterations, time);
    	...
                final int punctuated = taskManager.punctuate();
    	...
                final int committed = maybeCommit();
    
    }

    StreamThread의 initializedAndRestorePhase() 메서드로 넘어온다. 이 메서드에서는 두 가지 작업을 한다.

    1. 이미 StreamThread가 Run인 상태, 즉 StreamThread의 모든 Task의 StateStore가 복원된 상태인 경우에는 StandBy StateStore를 복원한다. Stream Thread의 StateStore는 Process() 메서드에서 이미 업데이트 되고, 그 변경점이 ChangeLog에 반영되지만 StandBy Thread의 StateStore는 그렇지 못하기 때문이다.
    2. StreamThread가 Run 상태가 아니라면 먼저 changelogReader.restore()를 호출해서 StreamThread의 StateStore를 먼저 복원한 후, Standby Thread의 StateStore를 복원한다. 
    // StreamThread.java
    private void initializeAndRestorePhase() {
    
    	// 파티션이 할당되어있고, StreamThread가 Running 일 때
        if (stateSnapshot == State.PARTITIONS_ASSIGNED
            || stateSnapshot == State.RUNNING && taskManager.needsInitializationOrRestoration()) {
            ...
            if (taskManager.tryToCompleteRestoration(now, partitions -> resetOffsets(partitions, null))) {
                // StandBy StateStore 업데이트
                changelogReader.transitToUpdateStandby();
                ...
            }
    
            ...
        }
    
        ...
        // StreamThread StateStore 업데이트 → StandbyThread StateStore 업데이트
        changelogReader.restore(taskManager.notPausedTasks());
        ...
    }

    먼저 transitToUpdateStandby() 메서드가 호출된 경우를 고려해보자. 이 때, standByRestoringChangeLogs() 메서드를 이용해서 이 Kafka Streams 인스턴스에서 가지고 있는 StandBy Thread의 ChangeLog 토픽들을 가져올 수 있다. 그리고 이것을 resumeChangelogsFromRestoreConsumer() 메서드에게 넘겨준다. 이 메서드를 통해서 StandBy Thread의 StateStore에게 필요한 레코드들을 RestoreConsumer를 이용해서 불러와서 StandBy Thread의 StateStore를 복원한다. 

    // StoreChangeLogReader.java
    @Override
    public void transitToUpdateStandby() {
        if (state != ChangelogReaderState.ACTIVE_RESTORING) {
            throw new IllegalStateException(
                "The changelog reader is not restoring active tasks (is " + state + ") while trying to " +
                    "transit to update standby tasks: " + changelogs
            );
        }
    
        log.debug("Transiting to update standby tasks: {}", changelogs);
    
        // resume all standby restoring changelogs from the restore consumer
        resumeChangelogsFromRestoreConsumer(standbyRestoringChangelogs());
    
        state = ChangelogReaderState.STANDBY_UPDATING;
    }

    changelogReader.restore()를 이용해서 Stream Thread + StandBy Thread의 StateStore를 모두 복원하기도 한다. Stream Thread의 StateStore가 모두 복원되어야 Standby Thread의 StateStore가 복원될 수 있기 때문에 RestoreConsumer는 먼저 StreamThread의 StateStore를 restoreChangeLog() 메서드를 이용해서 복원한다. 이 메서드가 실행이 완료되면 StreamThread의 StateStore는 복원이 완료되었기 때문에 Standby Thread의 StateStore를 maybeUpdatelimitOffsetsForStandbyChangelogs() 메서드를 호출해서 처리할 수 있다. 

    //StoreChangelogReader.java
    
    @Override
    public void restore(final Map<TaskId, Task> tasks) {
        ...
                polledRecords = restoreConsumer.poll(state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);
    	...
            for (final TopicPartition partition : polledRecords.partitions()) {
                bufferChangelogRecords(restoringChangelogByPartition(partition), polledRecords.records(partition));
            }
    
            for (final TopicPartition partition : restoringChangelogs) {
                ...
                try {
                    // StreamThread의 StateStore 복원
                    if (restoreChangelog(changelogs.get(partition))) {
                   ...
                    }
                } ...
    
    		// StandbyThread의 StateStore 복원 
            maybeUpdateLimitOffsetsForStandbyChangelogs(tasks);
    
            maybeLogRestorationProgress();
        }
    }

    StoreChangeLogReader의 maybeUpdateLimitOffsetsForStandbyChangelogs() 메서드를 호출한다. 이 메서드에서 현재 StoreChangeLogReader가 가지고 있는 ChangeLog 토픽들 중 Task 타입이 "StandBy"인 녀석들만 필터링 한다. 그리고 이 필터링 결과를 바탕으로 updateLimitOffsetsForStandbyChangelogs() 메서드를 호출해서 Standby Thread의 ChangeLog 토픽을 업데이트 한다. 

    // StoreChangelogReader.java
    
    private void maybeUpdateLimitOffsetsForStandbyChangelogs(final Map<TaskId, Task> tasks) {
        // we only consider updating the limit offset for standbys if we are not restoring active tasks
        if (state == ChangelogReaderState.STANDBY_UPDATING &&
            updateOffsetIntervalMs < time.milliseconds() - lastUpdateOffsetTime) {
    
            // Task가 STANDBY 타입인 changeLogOffset을 찾아와서
            final Set<TopicPartition> changelogsWithLimitOffsets = changelogs.entrySet().stream()
                .filter(entry -> entry.getValue().stateManager.taskType() == Task.TaskType.STANDBY &&
                    entry.getValue().stateManager.changelogAsSource(entry.getKey()))
                .map(Map.Entry::getKey).collect(Collectors.toSet());
    
    		// StateStore를 복구한다.
            for (final TopicPartition partition : changelogsWithLimitOffsets) {
                if (!changelogs.get(partition).bufferedRecords().isEmpty()) {
                    updateLimitOffsetsForStandbyChangelogs(committedOffsetForChangelogs(tasks, changelogsWithLimitOffsets));
                    lastUpdateOffsetTime = time.milliseconds();
                    break;
                }
            }
        }
    }

     

     

     

    참고

    https://gunju-ko.github.io/kafka/2018/06/27/StandbyTask.html

    'Kafka eco-system > KafkaStreams' 카테고리의 다른 글

    Kafka Streams : 프로세서 API  (0) 2022.11.14
    Kafka Streams : Punctuator  (0) 2022.11.13
    Kafka Streams : State의 복구  (0) 2022.11.13
    Kafka Streams : CheckPoint  (0) 2022.11.13
    KafkaStreams : GlobalKTable  (0) 2022.11.12

    댓글

    Designed by JB FACTORY