KafkaStreams : GlobalKTable

     

    GlobalKTable vs KTable

    GlobalKTable은 Kafka Streams DSL에서 지원하는 KTable이다. 일반적으로 제공되는 KTable과는 다음 차이가 존재한다. 

    • KTable은 해당 노드에 할당된 파티션의 데이터만 읽어와서 StateStore에 보관한다.
    • GlobalKTable은 모든 파티션의 데이터를 읽어와서 StateStore에 보관한다. 

    이런 특징 때문에 GlobalKTable은 파티셔닝에 대한 신경을 쓰지 않아도 된다는 장점이 있다. 반면, 카프카 스트림즈의 모든 클러스터는 각각 토픽의 전체 파티션을 모두 읽어와서 각 노드에 각각의 StateStore를 가지고 있게 된다. 

    GlobalKTable은 읽어온 Table을 Local 스토리지를 이용해서 StateStore에 보관하게 된다. 따라서 모든 노드가 GlobalKTable을 이용해서 같은 데이터를 모두 보관하고 있다는 단점이 있고, 모든 파티셔닝을 읽기 때문에 각 노드가 보관하고 있는 데이터의 용량이 증가한다. 따라서 GlobalKTable을 사용해야 할 때는 한 노드가 가져도 문제 없을 정도로 가벼운 데이터일 경우에만 사용한다. 

     

    GlobalKTable은 누가 만들어줄까?

    Kafka Streams에는 Stream 쓰레드와 GlobalStream 쓰레드가 존재한다. GlobalKTable은 GlobalStream 쓰레드가 생성해준다. GlobalKTable은 GlobalStream 쓰레드에 의해서 생성되고 KTable이라고는 하지만 실제로는 StateStore의 형태로 존재하고 가져다 쓴다. 

     

    GlobalStreamThread의 GlobalKTable 업데이트 코드 따라가기

    // KafkaStreams.java
    public synchronized void start() throws IllegalStateException, StreamsException {
        if (setState(State.REBALANCING)) {
            log.debug("Starting Streams client");
    
            if (globalStreamThread != null) {
                // GlobalStreamThread 시작
                globalStreamThread.start();
            }
    
    		// StreamThread 시작
            final int numThreads = processStreamThread(StreamThread::start);
    	
        ...
        
    }

    위의 코드는 KafkaStreams.start() 메서드이다. KafkaStreams 객체를 시작하게 되면 GlobalStream 쓰레드와 StreamThread가 시작하게 된다는 것을 알 수 있다. 여기서 GlobalStream 쓰레드가 시작되면서 GlobalKTable에 대한 값을 처리해준다. 

    @Override
    public void run() {
        
        final StateConsumer stateConsumer = initialize();
    
    	...
    
        boolean wipeStateStore = false;
        try {
            while (stillRunning()) {
                final long size = cacheSize.getAndSet(-1L);
                if (size != -1L) {
                    cache.resize(size);
                }
                // stateConsumer를 이용해서 poll()
                stateConsumer.pollAndUpdate();
            }
        } 
        
        ...
        
    }

    GlobalStreamThread의 Run() 메서드로 넘어온다. 이곳에서는 GlobalStreamThread가 Running 상태를 유지하는 동안 stateConsumer.pollAndUpdate() 메서드를 이용해서 데이터를 가져오고 업데이트 하는 것을 볼 수 있다.

    void pollAndUpdate() {
        final ConsumerRecords<byte[], byte[]> received = globalConsumer.poll(pollTime);
        for (final ConsumerRecord<byte[], byte[]> record : received) {
            stateMaintainer.update(record);
        }
        final long now = time.milliseconds();
        if (now - flushInterval >= lastFlush) {
            stateMaintainer.flushState();
            lastFlush = now;
        }
    }

    stateConsumer의 pollAndUpdate() 메서드로 넘어오게 된다. 이곳에서는 다음 작업을 한다.

    1. globalConsumer를 이용해서 레코드를 가져온다.
    2. 가져온 레코드를 stateMaintainer 객체에게 넘겨서 업데이트를 한다.
    3. FlushInterval 시간이 흘렀으면 State를 Flush()한다.

    여기서 알아야 할 점은 Global Stream Thread는 Stream Thread와 별개로 globalConsumer를 가지고 있고, 이 Consumer를 이용해서 데이터를 읽어온 후 stateMaintainer에게 넘겨서 GlobalStateStore를 업데이트한다는 점이다. 그리고 이 GlobalStateStore 자체가 GlobalKTable이 된다.

     

    파티션이 10개일 때의 읽어오는 데이터

    GlobalStream Thread는 모든 파티션의 데이터를 읽어온다고 했다. 위의 이미지는 stateConsumer의 GlobalConsumer가 poll() 해온 메세지다. 이 레코드를 확인해보면 동일한 토픽에서 모든 파티션을 읽어오는 것을 알 수 있다. 예를 들어 companies-0, companies-1, companies-2, ... 등등에서 데이터를 읽어오는 것을 볼 수 있다.

     

    카프카 스트림즈를 여러개 켰을 때

    아래 / 위는 서로 다른 카프카 스트림즈에서 읽어온 메세지를 보여준다. 각각의 ArrayList가 서로 다른 객체인 것은 서로 다른 카프카 스트림즈에서 돌아가고 있는 녀석들이기 때문이다. 현재 GlobalConsumer가 읽어오는 속도가 달라서 offset의 차이 때문에 서로 다른 데이터를 읽은 것처럼 보인다. 그렇지만 첫번째는 현재 3150 ~ 3153을 읽고 있는데, 조금 있으면 3400 ~ 3403을 읽을 것이다. 결국은 첫번째 / 두번째 카프카 스트림즈의 GlobalConsumer가 읽어오는 데이터는 모두 똑같다. 

    따라서 카프카 스트림즈에서 GlobalKTable을 생성하면 모든 노드는 동일한 데이터를 자신의 Local StateStore에 보관하게 된다. 

    댓글

    Designed by JB FACTORY