Kafka Streams : State의 복구

    State의 복구와 ChangeLog

    Kafka Streams는 집계 등이 필요한 경우에는 StateStore를 이용해서 상태를 기억하는 작업을 한다. StateStore는 In-Memory 혹은 Local 스토리지에 기록되는데 공통적인 부분은 서버가 죽었을 때, 모두 휘발된다는 점이다. Kafka Streams는 이 부분을 해결하기 위해 StateStore를 기록하는 ChangeLog 토픽을 Kafka Broker에 생성하는 작업을 한다. 그리고 StateStore의 변경점을 ChangeLog에 저장하고, 서버가 불시에 죽고 재기동될 때 ChangeLog 토픽에서 값을 읽어와서 StateStore를 복구하는 작업을 한다. 

     

    ChangeLog에서 State는 어떤 경로로 불러와지는가?

    각 노드의 StateStore의 값은 이에 대응되는 Change Log 토픽이 브로커에 생성되어서 상태가 기록된다. 재기동 될 때는 ChangeLog 토픽이 존재하는 경우에 이 값을 읽어와서 StateStore가 죽기 전의 상태로 복구하는 작업을 한다. StreamThread는 initializedAndResotrePhase() 메서드를 호출한다. 이후 StoreChangeLogReader.restore()가 호출된다. restore()가 호출되면 Kafka Streams가 가지고 있는 ChangeLog Topic들에 대해서 consumer를 이용해서 레코드를 불러온다. StoreChangeLogReader는 불러온 record를 StateManager에게 전달해주고, StateManager는 전달받은 레코드를 바탕으로 StateStore를 재기동한다. 

    정리하면 Kafka Streams가 시작하면 Stream Thread가 초기화하는데, 이 때 ChangeLogReader를 통해서 각 Changelog 토픽에 대한 레코드를 읽어오고, StateManager는 이것을 이용해서 StateStore를 모두 복구하는 작업을 한다. 

     

    코드 따라가보기

    StreamThread는 runOnce() 메서드를 이용해서 Topology 명세대로 작업을 수행한다. 이 때 initiliazedAndRestorePhase() 메서드를 호출해서 각 노드의 StateStore를 복구하는 작업을 한다. 

    // StreamThread.java
    void runOnce() {
    
    	...
        final long pollLatency = pollPhase();
        ...
        // 복구 시작
        initializeAndRestorePhase();
        ...
                final int processed = taskManager.process(numIterations, time);
    	...
        }

    StreamThread의 initializedAndRestorePhase() 메서드가 호출된다. 여기에서 이런저런 작업을 한 다음에 task 정보를 넘겨주면서 changelogReader.restore() 메서드를 호출한다. 

    // StreamThread.java
    private void initializeAndRestorePhase() {
        ..
        // restore 시작
        changelogReader.restore(taskManager.notPausedTasks());
        log.debug("Idempotent restore call done. Thread state has not changed.");
    }

    StoreChangelogReader.restore()가 호출된다. 다음 작업을 순서대로 진행한다.

    1. restoreConsumer 객체를 통해서 changeLog 토픽에 대한 레코드만 poll 한다.
    2. bufferChangelogRecords() 메서드를 이용해서 읽어온 레코드를 changelog 데이터에 잘 저장해둔다.
    3. restoreChangelog() 메서드를 이용해서 State를 복구하기 시작한다. 

    restoreConsumer 객체는 따로 생성되는 consumer 객체기 때문에 changeLog 토픽만 구독한다. 그리고 이 consumer가 가져온 changeLog 메세지를 bufferChangeLogRecords()를 이용해서 changelog 필드에 잘 저장해둔다. restoreChangelog()는 이 changelog 필드에서 복원이 필요한 레코드를 읽어와 stateManager에게 전달해주는 역할을 한다. 

    // StoreChangelogReader.java
    @Override
    public void restore(final Map<TaskId, Task> tasks) {
        ...
                polledRecords = restoreConsumer.poll(state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);
    	...
            for (final TopicPartition partition : polledRecords.partitions()) {
                bufferChangelogRecords(restoringChangelogByPartition(partition), polledRecords.records(partition));
            }
    
            for (final TopicPartition partition : restoringChangelogs) {
                final TaskId taskId = changelogs.get(partition).stateManager.taskId();
                try {
                // 여기서 복원 시작
                    if (restoreChangelog(changelogs.get(partition))) {
                        final Task task = tasks.get(taskId);
                        if (task != null) {
                            task.clearTaskTimeout();
                        }
                    }
                } 
        ...
    }

    StoreChangelogReader.restoreChangeLog() 메서드가 호출되면 이제 changeLogReader는 현재의 상태를 파악한 후 StateStore를 복구하기 시작한다. 

    1. buffer된 record가 존재한다면, stateManager에게 버퍼된 레코드를 전달해서 StateStore를 복원하기 시작한다.
    2. 이후에 복원이 끝났거나, 현재 동작 중인 Task라면 changeLog 상태를 Completed로 전환하고, RestoreConsumer를 통해서 데이터를 읽어오는 것을 멈춘다. 

    여기서 알 수 있는 점은 restoreConsumer는 초기에 한번 동작하고 복구가 완료되면 다시 동작하지는 않는다는 점이다. 

    // StoreChangelogReader.java
    private boolean restoreChangelog(final ChangelogMetadata changelogMetadata) {
        
        ...
        
        if (numRecords != 0) {
            ...
    
            final List<ConsumerRecord<byte[], byte[]>> records = changelogMetadata.bufferedRecords.subList(0, numRecords);
            // StateStore 복원
            stateManager.restore(storeMetadata, records);
    		...
        }
    
    	// 이미 동작하고 있는 Task이거나, 복원이 끝난 Task라면
        if (changelogMetadata.stateManager.taskType() == Task.TaskType.ACTIVE && hasRestoredToEnd(changelogMetadata)) {
            ...
            changelogMetadata.transitTo(ChangelogState.COMPLETED);
            pauseChangelogsFromRestoreConsumer(Collections.singleton(partition));
    
            try {
                stateRestoreListener.onRestoreEnd(partition, storeName, changelogMetadata.totalRestored);
            } 
            ...
    }

    'Kafka eco-system > KafkaStreams' 카테고리의 다른 글

    Kafka Streams : Punctuator  (0) 2022.11.13
    Kafka Streams : StandBy Task  (0) 2022.11.13
    Kafka Streams : CheckPoint  (0) 2022.11.13
    KafkaStreams : GlobalKTable  (0) 2022.11.12
    Kafka Streams : Session Window 동작  (0) 2022.11.11

    댓글

    Designed by JB FACTORY