KafkaStreams : StateStore

    이 글은 코드를 따라가보면서 뇌피셜로 작성한 글이기 때문에 틀릴 수도 있습니다. 틀릴 경우 댓글을 남겨주시면 수정하겠습니다. 

    StateStore

    StateStore는 StreamTask별로 고유하게 주어진다. 그리고 StreamTask는 고유한 파티션을 가진다. 따라서 StateStore는 고유한 파티션에 대응되는 상태 저장소라고 볼 수 있다. 또한 StateStore는 이름으로 관리되는데, 동일한 이름을 가진 StateStore를 사용할 수 없기 때문에 각 StreamTask가 가지고 있는 노드끼리는 StateStore를 공유할 수 없는 것으로 보인다. 

     

    StreamTask의 StateStore 관리

    StreamTask는 내부적으로 stateDirectory라는 필드를 가지고 있다. 이 stateDirectory는 StateStore가 저장되는 stateDir를 가지고 있다. stateDir에는  lock / stream-process-metadata 뿐만 아니라 StreamTask에 대한 stateDir 정보가 있다. 

    이 때 StreamTask의 stateDir 정보는 0_0 / 0_1 / 0_2 ... 형태로 표현이 된다. 그리고 StreamTask에는 각각 stateStore가 존재하고 그 이름을 확인할 수 있다. 여기서 KSTREAM-JOINOTHER-000008-store는 실제로 생성된 stateStore의 이름이다. 

    정리해보면 StreamTask는 각 StreamTask가 가지고 있는 StateStore의 이름이 있다. 그리고 그 stateStore의 이름을 바탕으로 런타임에서 stateStore를 찾아와서 동작하는 형태다.

     

    addStateStore()로 동일한 stateStore를 여러 노드에서 쓸 수 있을까?

    결론부터 이야기하면 동작하지 않는 것 같다. 

    public final void addStateStore(final StoreBuilder<?> storeBuilder,
                                    final boolean allowOverride,
                                    final String... processorNames) {
        Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
        final StateStoreFactory<?> stateFactory = stateFactories.get(storeBuilder.name());
        if (!allowOverride && stateFactory != null && !stateFactory.builder.equals(storeBuilder)) {
            throw new TopologyException("A different StateStore has already been added with the name " + storeBuilder.name());
        }
    
    	stateFactories.put(storeBuilder.name(), new StateStoreFactory<>(storeBuilder));
    
        if (processorNames != null) {
            for (final String processorName : processorNames) {
                Objects.requireNonNull(processorName, "processor name must not be null");
                connectProcessorAndStateStore(processorName, storeBuilder.name());
            }
        }
        nodeGroups = null;
    }

    Topology를 build할 때 addStateStore()를 이용해서 topology에 stateStore를 추가한다. 이 때, 이미 해당 StateStore 이름으로 만들어진 것이 있는 경우에는 "A different StateStore has already been added with the name"이라는 Exception이 발생한다. 

     

     

    댓글

    Designed by JB FACTORY