Kafka Streams : Source 토픽에서 메세지 가져오기

    Kafka Stream의 전체 흐름

    Kafka Streams는 전체적으로 다음 순서대로 동작한다. 

    • poll : KafkaConsumer를 이용해 Broker에서 메세지를 가져옴.
    • process : 가져온 메세지를 처리함. 
    • commit : 처리 완료한 후, Broker에게 처리 완료 Commit함. 

    이번 포스팅에서는 poll을 했을 때 어떻게 데이터를 가져오고, 이 데이터는 어디에 저장되어 Process를 할 수 있게 되는지를 살펴보고자 한다. 

     

    Kafka Streams가 토픽에서 메세지를 가져오는 방법

    위의 과정이 Kafka Streams가 토픽에서 메세지를 가져오는 방법이다. 간략히 정리하면 다음과 같다.

    1. KafkaStreams Server는 메세지를 가져올 것을 Stream Thread에게 명령한다.
    2. StreamThread는 KafkaConsumer를 이용해 Broker로부터 메세지를 가져온다. Consumer가 가져온 메세지는 Stream 쓰레드에게 전달된다. 
    3. Stream 쓰레드는 메세지를 TaskManager에게 전달한다.
    4. TaskManager는 메세지의 TopicPartition을 확인하고 적당한 StreamTask에게 전달한다.

    이 때, 메세지가 저장되는 종착지가 Stream Task라는 것을 확인했다. Stream Task는 서로 다른 TopicPartition을 담당하고 있으며, TopicPartition 단위로 StreamTask가 처리한다는 것을 의미한다. TopicPartition 단위로 처리하기 때문에 서로 같은 데이터를 건드리는 동시성 이슈가 발생하지 않고, 따라서 Kafka Streams의 Application은 확장하기 쉽다는 것을 의미한다. 

     

    코드 따라가보기

    void runOnce() {
        
        ...
        final long pollLatency = pollPhase();
        ...
        
        if (state == State.RUNNING) {
            do {
                ...
                final int processed = taskManager.process(numIterations, time);
                ...
                final int committed = maybeCommit();
    			...            
        }
    }

    위는 StreamThread의 runOnce()라는 메서드다. 이 메서드에서 Kafka의 전체 흐름이 동작하고 있다. pollPhase(), process(), maybeCommit() 메서드에서 각각 동작하게 된다.

    private long pollPhase() {
    
    	...
        
        } else if (state == State.RUNNING || state == State.STARTING) {
            // try to fetch some records with normal poll time
            // in order to get long polling
            records = pollRequests(pollTime);
        } 
        
        ...
        
        if (!records.isEmpty()) {
            taskManager.addRecordsToTasks(records);
        }
    
    	...
        
        return pollLatency;
    }

    위는 StreamThread의 pollPhase() 메서드다. pollRequest()를 호출해서 KafkaConsumer를 이용해 Broker에서 구독하고 있는 토픽의 메세지를 읽어온다. 읽어온 메세지들은 records에 저장되게 된다. records를 taskManager에게 넘겨주면서 addRecordsToTasks()를 호출한다.

    void addRecordsToTasks(final ConsumerRecords<byte[], byte[]> records) {
        for (final TopicPartition partition : records.partitions()) {
            final Task activeTask = tasks.activeTasksForInputPartition(partition);
    
            if (activeTask == null) {
                log.error("Unable to locate active task for received-record partition {}. Current tasks: {}",
                    partition, toString(">"));
                throw new NullPointerException("Task was unexpectedly missing for partition " + partition);
            }
    
            activeTask.addRecords(partition, records.records(partition));
        }
    }

    위는 TaskManager의 addRecordsToTask() 메서드다. 여기서는 TaskManager에게서 현재 레코드의 TopicPartition을 처리하는 StreamTask를 찾아온다. 그리고 StreamTask에 현재 TopicPartition에 해당되는 레코드만 가져와서 activetask.addRecords()를 통해서 Record를 추가한다. 

    이렇게 동작할 수 있는 이유는 TaskManager가 현재 활성화 된 StreamTask를 TopicPartition을 Key로 관리하고 있기 때문이다. TaskManager에게는 activeTasksPerPartition이라는 HashMap이 있고, 이 HashMap은 Key가 TopicPartition이고 Value가 StreamTask다. 따라서 특정 TopicPartition을 처리해주고 있는 StreamTask를 가져올 수 있는 것이다. 

    @Override
    public void addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
        final int newQueueSize = partitionGroup.addRawRecords(partition, records);
    	...
    }

    위 코드는 StreamTask의 addRecords() 메서드다. 여기서는 streamTask의 partitionGroup에 addRawRecords()를 호출해서 파티션을 추가하는 동작을 한다. addRawRecords() 메서드는 partitionGroup의 recordQue라는 곳에 불러온 메세지를 추가하는 동작을 한다.

    recordQue는 내부적으로 여러 필드를 가지고 있다. recordQue의 fifoQue라는 필드에 불러온 레코드를 하나씩 추가하는 동작을 한다. 

     

    정리

    Kafka Streams는 KafkaConsumer를 통해서 메세지를 가져온다. 가져온 메세지는 taskManager를 통해서 각 StreamTask에게 분배해준다. 이 때, 각 StreamTask는 자신이 담당하는 TopicPartition이 있는데, taskManager는 이것을 확인하고 해당 TopicPartition을 담당하는 StreamTask에게 불러온 메세지를 전달해준다. 실제로 레코드가 저장되는 위치는StreamTask.partitionGroup.recordQueue.fifoQueue 이다.

    댓글

    Designed by JB FACTORY