Kafka Streams : StreamTask가 생성되는 과정

    들어가기 전

    Kafka Streams에서는 특정 TopicPartition은 하나의 StreamTask에게 할당된다. StreamTask는 본인에게 할당된 TopicPartition만 가지고 작업을 하기 때문에 데이터는 격리된다. 따라서 StreamTask의 데이터 격리성 때문에 동시성 문제를 걱정할 필요는 없다. 그렇다면 StreamTask는 어떻게 만들어질까?

     

     

    StreamTask 생성방식

    StreamTask가 생성되는 과정을 살펴보면 다음과 같다.

    1. Consumer가 현재 구독하고 있는 TopicPartition 정보를 Broker에게서 받아온다.
    2. 받아온 TopicPartition 정보를 ConsumerCoordinator에게 전달한다.
    3. ConsumerCoordinator는 전달받은 TopicPartition 정보를 바탕으로 Assign 되어야 할 assignment 객체를 생성하고 StreamPartitionAssignor에게 전달한다.
    4. StreamPartitionAssignor는 전달받은 assignment(TopicPartition 정보를 담은 객체)를 바탕으로 생성되어야 할 StreamTask의 ID를 생성한다.
    5. StreamPartitionAssignor는 StreamTask ID를 Key로 가지고 Value로는 구독해야 하는 TopicPartition 정보들을 넣어준 activeTasks 인스턴스를 생성한다.
    6. StreamPartitionAssignor는 생성한 activeTasks를 TaskManager 객체에게 전달한다.
    7. TaskManager 객체는 전달받은 activeTasks의 정보를 바탕으로 StreamTask를 생성한다. 

    StreamTask는 위와 같은 방식으로 생성된다. 이 때, 3~4번 과정에서 TopicPartition 정보는 Partition 번호순으로 정렬되는 과정을 거친다. 그리고 TaskID는 [Task 번호_파티션 번호]로 생성되고 정렬된다. 이 정보를 바탕으로 activeTasks(TaskID와 담당하는 TopicPartition 정보를 가짐) 생성하는데, 둘다 Partition 번호순으로 정렬되어있다. 이런 이유 때문에 어떤 토픽이라도 같은 번호를 가지는 TopicPartition은 동일한 TaskID를 가지는 StreamTask에게 배정된다. 

    4번 과정에서는 TaskID를 생성하는데, 이 때 Partition의 갯수가 가장 큰 Topic을 기준으로 StreamTaskID를 생성한다. 예를 들어서 토픽 A,B가 각각 파티션을 1,5개를 가지고 있는 경우에 TaskID는 5개가 생성된다. 그렇기 때문에 StreamTask는 5개가 생성된다. 

     

    코드 따라가보기

    @Override
    protected void onJoinComplete(int generation,
                                  String memberId,
                                  String assignmentStrategy,
                                  ByteBuffer assignmentBuffer) {
    
    	...
        ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy);
        ...
        Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
        ...
        firstException.compareAndSet(null, invokeOnAssignment(assignor, assignment));
        ...
        }
    }

    위는 ConsumerCoordinator의 onJoinComplete() 메서드 코드다. ConsumerCoordinator는 ConsumerGroup을 관리하는 클래스다. 이 클래스에서는 구독하고 있는 TopicPartition의 메타 정보를 Broker에게 받아와서 파티션을 분배하는 역할을 한다. deserializeAssignment() 메서드를 이용해서 Broker에게 받아온 Byte 형태의 Meta 정보를 Decoding 한 후 assignment 객체에 넣어둔다. 그리고 invokeOnAssignment()를 실행한다. 

    읽어온 정보는 assignment 객체에 다음과 같이 저장된다. assignment의 partitions 필드에 TopicPartition 형태로 저장된다. 

    private Exception invokeOnAssignment(final ConsumerPartitionAssignor assignor, final Assignment assignment) {
        log.info("Notifying assignor about the new {}", assignment);
    
        try {
            assignor.onAssignment(assignment, groupMetadata);
        } catch (Exception e) {
            return e;
        }
    
        return null;
    }

    위는 ConsumerCoordinator의 invokeOnAssignment() 메서드다. 여기서는 전달받은 assign 정보와 assginor를 이용해서 onAssignment() 메서드를 호출한다. 

    @Override
    public void onAssignment(final Assignment assignment, final ConsumerGroupMetadata metadata) {
        
        // partitions를 생성 + 정렬함.
        final List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
        partitions.sort(PARTITION_COMPARATOR);
    
        final AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
        
        ...
        
            case 11:
                validateActiveTaskEncoding(partitions, info, logPrefix);
    
                activeTasks = getActiveTasks(partitions, info);
                partitionsByHost = info.partitionsByHost();
                standbyPartitionsByHost = info.standbyPartitionByHost();
                topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
                encodedNextScheduledRebalanceMs = info.nextRebalanceMs();
                break;
            ...
        }
    
        maybeScheduleFollowupRebalance(
            encodedNextScheduledRebalanceMs,
            receivedAssignmentMetadataVersion,
            latestCommonlySupportedVersion,
            partitionsByHost.keySet()
        );
    
        final Cluster fakeCluster = Cluster.empty().withPartitions(topicToPartitionInfo);
        streamsMetadataState.onChange(partitionsByHost, standbyPartitionsByHost, fakeCluster);
    
        // we do not capture any exceptions but just let the exception thrown from consumer.poll directly
        // since when stream thread captures it, either we close all tasks as dirty or we close thread
        taskManager.handleAssignment(activeTasks, info.standbyTasks());
    }

    위의 코드는 StreamsPartitionAssignor.onAssignment() 메서드다. 처음에는 전달받은 assignment 정보를 이용해서 TopicPartition들을 저장하는 partitions 객체를 생성하고, 이름 순으로 정렬한다. 그리고 AssignmentInfo.decod(assignment.userData())를 호출해서 Assignment 정보를 가져온다. 

    public static AssignmentInfo decode(final ByteBuffer data) {
        
        ...
        
                    commonlySupportedVersion = in.readInt();
                    assignmentInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion);
                    
                    // ActiveTasks 생성
                    decodeActiveTasks(assignmentInfo, in);
                    decodeStandbyTasks(assignmentInfo, in);
                    decodeActiveAndStandbyHostPartitions(assignmentInfo, in);
                    assignmentInfo.errCode = in.readInt();
                    assignmentInfo.nextRebalanceMs = in.readLong();
                    break;
                
            return assignmentInfo;
        ...
    }

    위는 AssignmentInfo 클래스의 decode() 메서드의 코드다. 여기서 decodeActiveTasks()라는 메서드를 호출하면서 StreamTask()를 생성한다.

    private static void decodeActiveTasks(final AssignmentInfo assignmentInfo,
                                          final DataInputStream in) throws IOException {
        final int count = in.readInt();
        assignmentInfo.activeTasks = new ArrayList<>(count);
        for (int i = 0; i < count; i++) {
            assignmentInfo.activeTasks.add(readTaskIdFrom(in, assignmentInfo.usedVersion));
        }
    }
    

    위는 decodeActiveTasks() 메서드의 코드를 보여준다. 매개변수로 전달된 assigmentInfo에는 activeTasks라는 필드가 존재한다. 이 activeTasks는 StreamThread가 사용할 StreamTask의 TaskId를 관리하는 곳이다. 여기서는 readTaskIdFrom() 이라는 메서드를 호출해서 StreamTask ID를 생성해서 넣어준다. 

    activeTasks에는 다음과 같이 TaskId 객체만 저장되는 것을 확인할 수 있다. "0_0" 같은 형식으로 생성되어있는데 뒷쪽에 있는 숫자는 파티션 번호를 의미한다. 아무튼 여기서는 TaskID만 생성하기 때문에 StreamTask는 생성되지 않는다. 따라서 다시 StreamsPartitionAssignor 클래스로 돌아가야한다.

    @Override
    public void onAssignment(final Assignment assignment, final ConsumerGroupMetadata metadata) {
        ...
        final AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
        ...
                validateActiveTaskEncoding(partitions, info, logPrefix);
    
    			// 여기서 activeTasks를 생성함. 정확히는 TaskID
                activeTasks = getActiveTasks(partitions, info);
                partitionsByHost = info.partitionsByHost();
                standbyPartitionsByHost = info.standbyPartitionByHost();
                topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
                encodedNextScheduledRebalanceMs = info.nextRebalanceMs();
                break;
    	...
    }

    다시 StreamsPartitionAssignor 클래스로 돌아온다. decode()를 통해서 AssignmentInfo를 생성했고, AssignmentInfo의 activeTasks 필드에 필요한 TaskID를 생성했었다. 이제 getActiveTasks() 메서드를 호출해서 실제로 StreamTask()를 생성한다. 

    protected static Map<TaskId, Set<TopicPartition>> getActiveTasks(final List<TopicPartition> partitions, final AssignmentInfo info) {
        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
        for (int i = 0; i < partitions.size(); i++) {
            final TopicPartition partition = partitions.get(i);
            final TaskId id = info.activeTasks().get(i);
            activeTasks.computeIfAbsent(id, k1 -> new HashSet<>()).add(partition);
        }
        return activeTasks;
    }

    위 코드는 getActiveTask() 메서드다. 먼저 activeTasks를 HashMap으로 하나 생성해준다. 여기서 info는 앞서 만들었던 StreamTask ID가 존재하는 인스턴스다. 앞에서 파티션과 StreamTask ID는 각각 정렬되었기 때문에 같은 인덱스에서 정보를 가져오기만 하면 된다. 

    위의 이미지에서 볼 수 있듯이 같은 인덱스에서 TopicPartition의 파티션 번호와 TaskId의 파티션 번호는 동일한 것을 볼 수 있다. 따라서 동일한 인덱스에서 각각 값을 가져와서 StreamTask를 만들어주기만 하면 된다. ID와 TopicPartition 객체를 가져와서 HashMap형태의 activeTasks에 추가해주는 동작을 한다. 

    activaTasks는 이제 taskId를 Key로 가지고, Value에는 담당하는 TopicPartition을 가지게 된다. 여기서 주의해서 볼 점은 하나의 StreamTask는 Topic은 달라도 동일한 Partition을 관리한다는 것을 알 수 있다. 또한, StreamTask는 Partition 개수가 가장 큰 Topic을 기준으로 StreamTask가 생성되는 것을 볼 수 있다. 이렇게 activeTasks를 생성한 후 StreamsPartitionAssignor 클래스로 다시 돌아간다. 

    @Override
    public void onAssignment(final Assignment assignment, final ConsumerGroupMetadata metadata) {
        ...
                activeTasks = getActiveTasks(partitions, info);
    	...
        taskManager.handleAssignment(activeTasks, info.standbyTasks());
    }

    StreamsPartitionAssignor에서 getActiveTasks()를 이용해서 현재 activeTasks를 생성했다. 여기에는 각 토픽 파티션을 맡고 있는 StreamTask들이 존재한다. 이 StreamTask는 taskManager에 의해서 관리가 되어야 하기 때문에 taskManager에게 전달된다. handleAssignment()를 호출하면서 taskManager는 activeTasks를 추가한다. 

    public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                                 final Map<TaskId, Set<TopicPartition>> standbyTasks) {
        ...
        tasks.handleNewAssignmentAndCreateTasks(activeTasksToCreate, standbyTasksToCreate, activeTasks.keySet(), standbyTasks.keySet());
    }

    TaskManager.handleAssignment() 코드를 의미한다. 앞에서 여러가지를 체크한 후, 추가할 준비가 되었다면 새로운 Task를 여기서 생성하고 추가한다

    .TaskManager는 StreamTask를 관리하기 위해 다양한 내부 필드를 가지고 있다. 여기서 대표적으로 activeTaskPerId를 볼 수 있는데 이녀석은 TreeMap의 자료구조를 가진다. Key는 TaskId가 되고, Value로 그에 대응되는 StreamTask를 가진다. 실제로 생성되는 위치는 이곳이 되는 것이다. 

     

    결론

    • StreamTask는 토픽들 중 가장 많은 파티션의 갯수만큼 생성된다.
    • 각 StreamTask는 서로 다른 TopicPartition을 관리한다. 따라서 StreamTask 간의 데이터는 격리된다. 
    • 한 StreamTask는 여러개의 TopicPartition을 관리할 수도 있다. 

     

     

     

     

    댓글

    Designed by JB FACTORY