Kafka Streams : CheckPoint

    CheckPoint

    Kafka Streams가 시작되면, Kafka Streams는 GlobalStreamThread / StreamThread의 StateStore를 초기화한다. StateStore를 초기화 할 때 체크 포인트 파일이 존재하는지를 확인한다. 체크 포인트 파일은 StateStore에 플러쉬되고 change-log 토픽에 쓰여진 가장 최신의 레코드에 대한 Offset을 저장하고 있다. 

    따라서 Kafka Streams가 시작할 때 체크 포인트 파일이 존재하고, 특정 change log 토픽의 파티션에 대한 체크포인트가 존재한다면 StateStore는 체크포인트 파일에 존재하는 오프셋으로 설정되게 된다. 만약 체크포인트 파일이 존재하지 않으면 복구는 가장 처음 오프셋부터 시작된다.

     

    CheckPoint는 누가 가지고 있을까? 어떻게 동작할까?

    CheckPoint는 StateManager가 가지고 있다. StateManager는 각각의 StreamThread의 상태를 관리해주는 객체다. StateManager가 생성될 때 Check Point 파일이 생성되고, StateManager가 초기화 될 때 StateStore와 CheckPoint의 Offset이 동기화된다.

    private final Map<TopicPartition, Long> checkpointFileCache;
    
    public GlobalStateManagerImpl(...) {
        ...
        baseDir = stateDirectory.globalStateDir();
        storeToChangelogTopic = topology.storeToChangelogTopic();
        checkpointFile = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
        checkpointFileCache = new HashMap<>();
        ...
    }

    StateManager가 생성될 때 baseDir을 기준으로 CheckPointFile과 CheckPointFileCache가 생성된다. baseDir은 Kafka Streams가 저장되는 디렉토리 + Kafka Streams의 Application ID를 붙인 값으로 생성된다. CheckPointFileCache는 CheckPointFile을 읽었을 때 각 TopicPartion의 Offset이 어디인지를 알려주는 파일이다. 이 때 TopicPartition이기 때문에 내부 StateStore의 상태를 복구해주지는 않는다. 

    @Override
    public Set<String> initialize() {
        try {
        	// 현재 TopicPartition의 Offset 복원
            checkpointFileCache.putAll(checkpointFile.read());
        } catch (final IOException e) {
            throw new StreamsException("Failed to read checkpoints for global state globalStores", e);
        }
    
        final Set<String> changelogTopics = new HashSet<>();
        for (final StateStore stateStore : topology.globalStateStores()) {
            final String sourceTopic = storeToChangelogTopic.get(stateStore.name());
            changelogTopics.add(sourceTopic);
            stateStore.init((StateStoreContext) globalProcessorContext, stateStore);
        }
    	
        ...
        
    }

    StateManager.initialize()에서 checkpointFileCache.putAll() 메서드를 호출하면서 로컬 스토리지에 저장된 CheckPointFile을 읽어온다. 그리고 CheckPointFile에 저장된 TopicPartition의 Offset 값을 동기화시켜준다. 그리고 각 StateStore의 값도 CheckPointFile과 동기화하는 순간도 필요하다. 이것은 stateStore.init()를 호출하면서 실행된다.

    // RocksDBStore.java
    
    @Override
    public void init(final StateStoreContext context,
                     final StateStore root) {
        // open the DB dir
        metricsRecorder.init(getMetricsImpl(context), context.taskId());
        openDB(context.appConfigs(), context.stateDir());
    
        final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");
        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
        this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
    
    	...
    }

    stateStore.init()가 호출되면 RocksDBStore.init()가 호출된다. 이것이 호출되면 위에서 CheckpointFile을 만들었던 것처럼 각각의 StateStore의 Position을 기억한 positionCheckPoint 파일을 생성한다. 그리고 positionCheckPoint 파일을 읽어서 현재 StateStore의 position을 설정해준다. 이 때, 이미 positionCheckPoint 파일이 생성되어있다면 이 파일의 Offset으로 현재 StateStore가 설정되게 된다. 

     

    CheckPoint는 언제 업데이트 될까?

    @Override
    public void checkpoint() {
        
        final Map<TopicPartition, Long> checkpointingOffsets = new HashMap<>();
        for (final StateStoreMetadata storeMetadata : stores.values()) {
            if (storeMetadata.commitCallback != null && !storeMetadata.corrupted) {
                try {
                    storeMetadata.commitCallback.onCommit();
                } catch (final IOException e) {
                    throw new ProcessorStateException(
                            format("%sException caught while trying to checkpoint store, " +
                                    "changelog partition %s", logPrefix, storeMetadata.changelogPartition),
                            e
                    );
                }
            }
    
            // store is logged, persistent, not corrupted, and has a valid current offset
            if (storeMetadata.changelogPartition != null &&
                storeMetadata.stateStore.persistent() &&
                !storeMetadata.corrupted) {
    
                final long checkpointableOffset = checkpointableOffsetFromChangelogOffset(storeMetadata.offset);
                checkpointingOffsets.put(storeMetadata.changelogPartition, checkpointableOffset);
            }
        }
    
        log.debug("Writing checkpoint: {}", checkpointingOffsets);
        try {
        	
            // CheckpointFile에 작성하기
            checkpointFile.write(checkpointingOffsets);
            }
        ...
    }

    위는 StateManagerImpl.java의 checkpoint() 메서드다. stateManager의 checkpoint() 메서드가 호출되면 checkpointFile.write() 메서드를 이용해서 CheckPoint 파일에 변경점을 작성해준다. 그렇다면 이 checkpoint() 메서드는 언제 호출될까? StreamTask.maybeCheckpoint()가 호출될 때 호출된다. 

    @Override
    public void maybeCheckpoint(final boolean enforceCheckpoint) {
        final Map<TopicPartition, Long> offsetSnapshot = stateMgr.changelogOffsets();
        if (StateManagerUtil.checkpointNeeded(enforceCheckpoint, offsetSnapshotSinceLastFlush, offsetSnapshot)) {
            // the state's current offset would be used to checkpoint
            stateMgr.flush();
            stateMgr.checkpoint();
            offsetSnapshotSinceLastFlush = new HashMap<>(offsetSnapshot);
        }
    }

    StreamTask.maybeCheckpoint()가 호출되면, 이 블록에서 stateManager는 현재 state를 플러쉬하고 checkpoint를 업데이트 하는 작업을 한다. 

    • flush : 현재 StateStore가 가지고 있는 데이터를 DB에 보내서 업데이트 하는 작업을 한다.
    • checkpoint : 현재 StateStore가 읽고 있는 Offset을 로컬 스토리지의 Checkpoint 파일에 업데이트한다. 

    StreamTask.maybeCheckpoint()는 StreamTask의 postCommit()에서 호출된다. 그리고 StreamTask의 postCommit()은 TaskExecutor의 commitTaskAndMaybeUpdateCommitableOffsets()에서 호출된다.

    int commitTasksAndMaybeUpdateCommittableOffsets(final Collection<Task> tasksToCommit,
                                                    final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadata) {
        int committed = 0;
        for (final Task task : tasksToCommit) {
            // we need to call commitNeeded first since we need to update committable offsets
            if (task.commitNeeded()) {
                // Commit 준비
                final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit();
                if (!offsetAndMetadata.isEmpty()) {
                    consumedOffsetsAndMetadata.put(task, offsetAndMetadata);
                }
            }
        }
    
    	// Commit 진행
        commitOffsetsOrTransaction(consumedOffsetsAndMetadata);
    
        for (final Task task : tasksToCommit) {
            if (task.commitNeeded()) {
                task.clearTaskTimeout();
                ++committed;
                // PostCommit 호출
                task.postCommit(false);
            }
        }
        return committed;
    }

    TaskExecutor는 process()를 한 이후에 commitTaskAndMaybeUpdateCommitableOffsets()을 호출해서 작업 결과를 Commit 한다. Commit 전후로 PrepareCommit() / PostCommit()을 호출하는데 Commit 이후의 PostCommit() 메서드 호출에서 CheckPoint()의 업데이트가 진행된다. 

     

     

    참고

    https://gunju-ko.github.io/kafka/kafka-stream/2018/05/09/Check-Point.html

    댓글

    Designed by JB FACTORY