Kafka Streams : Kafka Streams의 리밸런싱 관련 내용 정리

    들어가기 전

    이 글은 Kafka Streams의 코드를 따라가며 작성한 글입니다. 틀린 내용을 발견하시면 댓글로 알려주시면 감사하겠습니다. 

     

    간단 정리

    카프카 스트림즈 클러스터에 새로운 카프카 스트림즈 인스턴스가 들어오는 경우, 카프카 스트림즈 클러스터는 전체적으로 리밸런싱을 한다. 그런데 카프카 스트림즈는 일반적인 Kafka Consumer의 리밸런싱과는 조금은 다른 방식으로 동작한다. 왜냐하면 카프카 스트림즈에는 StreamTask라는 개념이 존재하기 때문이다. StreamTask는 각 토픽의 파티션 별로 나누어진 녀석들이고, 각 카프카 스트림즈가 어떤 StreamTask를 가지고 있느냐에 의존적으로 카프카 Consumer가 결정되어야 한다. 따라서 일반적인 카프카 Consumer와는 다른 방식으로 리밸런싱 된다.  

    코드를 따라가며 도식화 한 카프카 스트림즈의 리밸런싱이다.

    1. ConsumerCoordinator는 리밸런싱 요청이 왔을 때, 본인이 Leader Consumer, 즉 Consumer Coordinator인 경우에 onLeaderElected() 메서드를 이용해서 StreamPartitionAssignor 클래스를 호출한다.
    2. StreamPartitionAssignor 클래스는 4가지 단계로 리밸런싱 작업을 진행한다.
      1. 먼저 브로커로부터 현재 컨슈머 그룹에 대한 메타 데이터를 받아온다. 이 메타 데이터에는 리밸런스 이전의 StreamTask의 배정 정보가 들어있다.
      2. Kafka Streams의 Topology에서 Parsing 한다. 주로 Internal Topic등을 Parsing 한다.
      3. assignTasksToClient 메서드를 이용해서 각 StreamTask를 각각의 KafkaStreams에 배정한다.
      4. computeNewAssignment 메서드를 이용해서 Kafka Streams에 배정된 StreamTask대로 Kafka Consumer를 배정하는 작업을 한다. 
    3. assignTasksToClients 메서드를 호출한다. 이 메서드를 호출하면 TaskAssignor 클래스가 호출되는데, 나의 경우 HighAvailabilityTaskAssignor가 호출되었다.
    4. HighavailiabilityTaskAssignor는 4가지 작업을 순서대로 진행한다.
      1. StatefulTask, StandbyTask를 RR 방식으로 각각의 Kafka Streams Client에 배정한다. (StreamThread 단위도 고려한다)
      2. Client들중에서 Stadnby, Stateful Task를 봤을 때 Lag이 있는 녀석이 있는지를 살펴보며 Catch up client를 구한다.
      3. AssignMovement 메서드를 호출해서, Standby, Stateful Task 중에 Lag이 너무 많이 발생한 Client에게 배정된 경우에는 Movement 객체를 생성해서 좀 더 잘 따라잡은 녀석쪽으로 재배정 Signal을 보내고, Probing Rebalancing을 하게 해준다.
      4. assignStatelessActiveTask() 메서드를 호출해서 상태가 없는 StreamTask를 RR 방식으로 각각의 카프카 스트림즈 인스턴스에 배정한다. (StreamThread 단위도 고려한다)
    5. 작업이 완료되었으면, StreamPartitionAssignor로 돌아온 다음에 computeNewAssignment를 호출한다. 앞선 작업에서는 각 카프카 스트림즈 인스턴스의 StreamThread까지 고려해서 StreamTask가 배정되었지만, 아직까지 Consumer는 배정되지 않았다. StreamTask에 맞는 Consumer는 이곳에서 배정된다.
    6. addClientAssignment 메서드를 호출해서 현재 카프카 스트림즈가 가지고 있는 StreamTask와 Consumer를 알맞게 매칭시킨다. 그렇게 되면 Assignment 정보가 완성되는데, 이것을 직렬화해서 Broker에게 보낼 준비를 한다.
    7. Listener.onAssignment()를 호출해서 리밸런싱이 추가로 필요한지 업데이트 한다. 4-3번에 Movement가 필요할 경우게 Probing Rebalancing이 False가 되는데, 이 경우에 Listener.onAssignment()를 이용해서 follow-up Rebalance를 예약해준다. 

    요약해보면 현재 토폴로지에 대한 Kafka Stremams Task를 모두 계산하고, 그 Task에 맞게 Kafka Streams - Stream Thread별로 Task를 잘 배정해준다. 배정한 결과를 바탕으로 Kafka Streams가 사용할 Consumer를 결정하게 되고, 이 Consumer 정보를 직렬화시켜서 브로커에게 보내줘서 리밸런싱이 일어날 수 있게 된다.

     

     

    코드 따라가보기

    카프카 스트림즈 클러스터에 새로운 인스턴스가 들어오면 callback 함수에 의해서 AbstractCoordinator의 handle 메서드가 호출된다. handle 메서드는 JoinGroupResponse라는 인자를 전달받는데 이름에서 알 수 있듯이 새로운 카프카 스트림즈가 클러스터에 Join 요청을 한 것을 의미한다. 이 때 자신이 리더인 경우라면, onLeaderElected()라는 메서드를 호출한다.

    // AbstractCoordinator.java
    public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
    	...
                        if (joinResponse.isLeader()) {
                            onLeaderElected(joinResponse).chain(future);
                        } else {
                            onJoinFollower().chain(future);
                        }
                    }
                }
            }

    onLeaderElected 메서드에서는 다시 onLeaderElected() 메서드를 호출하고 groupAssignment를 받는다. onLeaderElected() 메서드를 호출해서 카프카 스트림즈 인스턴스가 새로 들어왔을 때의 리밸런싱을 수행한다는 것을 의미한다. 

    // AbstractCoordinator.java
    private RequestFuture<ByteBuffer> onLeaderElected(JoinGroupResponse joinResponse) {
        try {
            // 만약 본인이 ConsumerLeader 일 때
            Map<String, ByteBuffer> groupAssignment = onLeaderElected(
                joinResponse.data().leader(),
                joinResponse.data().protocolName(),
                joinResponse.data().members(),
                joinResponse.data().skipAssignment()
            );
            ...
    }

    ConsumerCoordinator의 onLeaderElected() 메서드가 호출되면 다음 작업을 수행한다.

    1. 리밸런싱 Strategy에 따라 적절한 Assignor를 받아온다. Kafka Streams는 기본이 StreamPartitionAssignor다.
    2. 브로커에서 읽어온 메타정보를 assignor에게 전달해서 리밸런싱한다.
    3. 리밸런싱 결과를 직렬화한 후 반환한다. 

    이곳에서 볼 수 있듯이 ConsumerCoordinator가 직접 리밸런싱을 하는 것은 아니고 Assignor에게 메타 정보를 넘겨줘서 리밸런싱된 정보를 가져다 쓰는 형태가 된다. 그런데 한 가지 중요하게 봐야할 점은 assignmentStragety 매개변수다. 이 녀석은 이전 메서드의 joinResponse.data().protocolName()이다. 그런데 이 녀석은 "stream"으로 전달된다. 따라서 "stream"에 해당되는 녀석은 StreamsPartitionAssignor 밖에 존재하지 않는다. 따라서 ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG에 Cooperative Stickcy를 넣어도 정상적으로 동작하지 않는 것 같다. 

    // ConsumerCoordinator.java
    @Override
    protected Map<String, ByteBuffer> onLeaderElected(String leaderId,
                                                      String assignmentStrategy,
                                                      List<JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions,
                                                      boolean skipAssignment) {
        ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy);
        
        ...
        // 파티션 리밸런싱 완료
        Map<String, Assignment> assignments = assignor.assign(metadata.fetch(), new GroupSubscription(subscriptions)).groupAssignment();
    
    	// Cooperative Sticky인 경우, 문제없이 배정되었는지 한번 더 확인
        if (protocol == RebalanceProtocol.COOPERATIVE && !assignorName.equals(COOPERATIVE_STICKY_ASSIGNOR_NAME)) {
            validateCooperativeAssignment(ownedPartitions, assignments);
        }
    
    
    	...
    
    	// 리밸런싱 정보를 직렬화.
        for (Map.Entry<String, Assignment> assignmentEntry : assignments.entrySet()) {
            ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
            groupAssignment.put(assignmentEntry.getKey(), buffer);
        }
    
    	// 리턴
        return groupAssignment;
    }

    assignor.assign() 메서드를 호출하면 StreamPartitionAssignor 클래스로 넘어오게 된다. 여기는 정말 긴 코드가 있어서 대부분은 아직 이해할 수 없지만, 주석에 달린 것을 바탕으로 이해해보면 크게 네 단계로 진행된다.

    1. 브로커에서 받아온 바이트 데이터를 역직렬화해서 각 카프카 스트림즈에 대한 메타 정보를 획득한다.
    2. 현재 카프카 스트림즈의 토폴로지를 파싱해서 리밸런싱 할 토픽들을 찾아보고, 서브 토폴로지의 소스 토픽의 파티션 수가 최대로 생성되었는지 확인한다.
    3. 1+2 정보를 바탕으로 각 StreamTask, Stateful Task, StandbyTask를 카프카 스트림즈 인스턴스에 리밸런싱하는 작업을 진행한다.
      1. 이 때 만약 Task가 불균형하게 설정된 경우 probinRebalanceNeeded가 False로 반환된다. 이 값이 False면, 4번 과정 - computeNewAssignment - 메서드를 호출할 때, Broker에 보내는 Info 메시지에 다시 한번 리밸런싱이 필요하다고 알려준다.
    4. 3에서 각 카프카 스트림즈 인스턴스가 가져야 할 StreamTask 분배가 Stream Thread별로 완료되었으나, 카프카 스트림즈가 가지고 있는 StreamThread의 Consumer들에 대해서는 분배가 되지 않았다. 4번에서는 카프카 스트림즈에 분배된 StreamTask에 알맞게 Consumer를 설정한다.

    3,4 번이 중요한 작업이 될 것인데 3번은 assignTaskToClients() 메서드를 호출하면서 처리된다. 4번 작업은 computeNewAssignment 메서드를 이용해서 처리된다.

    // StreamPartitionAssignor.java
    public GroupAssignment assign(final Cluster metadata, final GroupSubscription groupSubscription) {
        final Map<String, Subscription> subscriptions = groupSubscription.groupSubscription();
    
        // ---------------- Step Zero ---------------- //
        // construct the client metadata from the decoded subscription info
    	...
        
        try {
            ...
    
            // ---------------- Step One ---------------- //
    
            // parse the topology to determine the repartition source topics,
            // making sure they are created with the number of partitions as
            // the maximum of the depending sub-topologies source topics' number of partitions
            ...
    
            // ---------------- Step Two ---------------- //
            // construct the assignment of tasks to clients
    
            ...
    		final boolean probingRebalanceNeeded = assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, statefulTasks);
    
            // ---------------- Step Three ---------------- //
            // construct the global partition assignment per host map
    
            ...
    
            // ---------------- Step Four ---------------- //
            // compute the assignment of tasks to threads within each client and build the final group assignment
    
            final Map<String, Assignment> assignment = computeNewAssignment(
                statefulTasks,
                clientMetadataMap,
                partitionsForTask,
                partitionsByHost,
                standbyPartitionsByHost,
                allOwnedPartitions,
                minReceivedMetadataVersion,
                minSupportedMetadataVersion,
                versionProbing,
                probingRebalanceNeeded
            );
    
            return new GroupAssignment(assignment);
        ...
    }

    StreamPartitionAssignor의 assignTasksToClients() 메서드가 호출된다. 이 메서드에서는 카프카 스트림즈가 가지고 있는 내부 토픽, 현재 각 클라이언트의 상태, 그리고 전체 Task를 가져와서 이것을 taskAssignor.assign() 메서드에 넘겨주고 여기서 각 카프카 스트림즈에 대한 Task 리밸런싱이 진행된다. 

    1. createTaskAssignor()를 이용해서 적절한 TaskAssignor를 생성한다.
    2. taskAssignor.assign()을 이용해서 Task를 리밸런싱한다. 
    // StreamPartitionAssignor.java
    private boolean assignTasksToClients(final Cluster fullMetadata,
                                         final Set<String> allSourceTopics,
                                         final Map<Subtopology, TopicsInfo> topicGroups,
                                         final Map<UUID, ClientMetadata> clientMetadataMap,
                                         final Map<TaskId, Set<TopicPartition>> partitionsForTask,
                                         final Set<TaskId> statefulTasks) {
    
        final Map<TopicPartition, TaskId> taskForPartition = new HashMap<>();
        final Map<Subtopology, Set<TaskId>> tasksForTopicGroup = new HashMap<>();
        populateTasksForMaps(taskForPartition, tasksForTopicGroup, allSourceTopics, partitionsForTask, fullMetadata);
    
        final ChangelogTopics changelogTopics = new ChangelogTopics(...);
    
    	...
        
        final Set<TaskId> allTasks = partitionsForTask.keySet();
        statefulTasks.addAll(changelogTopics.statefulTaskIds());
    
        ...
        // TaskAssignor 생성
        final TaskAssignor taskAssignor = createTaskAssignor(lagComputationSuccessful);
    	// 리밸런싱 처리
        final boolean probingRebalanceNeeded = taskAssignor.assign(clientStates,
                                                                   allTasks,
                                                                   statefulTasks,
                                                                   assignmentConfigs);
    	...
    }

    createTaskAssignor() 메서드를 이용해서 Task를 리밸런싱할 TaskAssignor를 생성하고 반환한다. 아래 코드에서 살펴볼 수 있듯이 카프카 스트림즈는 현재 5개의 TaskAssignor를 지원한다. 전체적으로 어떻게 구현되어있는지 살펴보기는 어려워서, 우선은 HighAvailiabilityTaskAssignor를 기준으로 코드를 살펴보려고 한다.

    1. StickyTaskAssignor
    2. ClientTagAwareStandbyTaskAssignor
    3. DefaultStandbyTaskAssignor
    4. FallbackPriorTaskAssignor
    5. HighAvailabilityTaskAssignor
    // StreamPartitionAssignor.java
    private TaskAssignor createTaskAssignor(final boolean lagComputationSuccessful) {
        final TaskAssignor taskAssignor = taskAssignorSupplier.get();
        if (taskAssignor instanceof StickyTaskAssignor) {
            // special case: to preserve pre-existing behavior, we invoke the StickyTaskAssignor
            // whether or not lag computation failed.
            return taskAssignor;
        } else if (lagComputationSuccessful) {
            return taskAssignor;
        } else {
            log.info("Failed to fetch end offsets for changelogs, will return previous assignment to clients and "
                         + "trigger another rebalance to retry.");
            return new FallbackPriorTaskAssignor();
        }
    }

     

    HighAvailabilityTaskAssigonor.assign()

    • assignActiveStatefulTasks() : 각 클라이언트마다 RR 방식으로 Stateful Task를 배정한다. 
    • assignStandbyReplicaTasks () : 각 클라이언트마다 Standby Replicas를 배정한다. 
    • taskToCaughtUpClients() : 각 카프카 스트림즈 인스턴스에 배정된 Stateful Task의 Lag이 수용가능한 수준인지를 확인한다. Catchup Client라는 개념은 Stateful Task의 이동이 필요한지 확인할 때 한번 더 사용된다. 
    • assignActivetaskMovements() : 배정된 Stateful Task 중에서 Lag이 너무 많이 발생해서 교체가 필요한 Stateful Task가 있는 경우 변경해준다. 
    • assignStandbyTaskMovements() :  배정된 Standby Task 중에서 Lag이 너무 많이 발생해서 교체가 필요한 Standby Task 가 있는 경우 변경해준다.
    • assignStatelessActiveTasks : Stateless한 StreamTask를 배정한다. 

    assignStatelessAcitvetasks를 이용해서 상태가 없는 StreamTask들을 각각의 컨슈머에게 배정하는 작업을 한다. Stateful, StandbyTask는 이전의 메서드에서 모두 배정되었다. 그리고 neededActiveTaskMovements + neededStandbyTaskMovements의 합이 0보다 크면 리밸런싱이 필요한 것으로 값을 반환하게 된다.

    // HighAvailabilityTaskAssignor.java
    public boolean assign(...) {
        final SortedSet<TaskId> statefulTasks = new TreeSet<>(statefulTaskIds);
        final TreeMap<UUID, ClientState> clientStates = new TreeMap<>(clients);
    
        assignActiveStatefulTasks(...);
        assignStandbyReplicaTasks(...);
    
        final AtomicInteger remainingWarmupReplicas = new AtomicInteger(configs.maxWarmupReplicas);
    
        final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients = tasksToCaughtUpClients(
            statefulTasks,
            clientStates,
            configs.acceptableRecoveryLag
        );
    
        final Map<TaskId, SortedSet<UUID>> tasksToClientByLag = tasksToClientByLag(statefulTasks, clientStates);
    
    	
        final int neededActiveTaskMovements = assignActiveTaskMovements(...);
        final int neededStandbyTaskMovements = assignStandbyTaskMovements(...);
        assignStatelessActiveTasks(clientStates, diff(TreeSet::new, allTaskIds, statefulTasks));
    
    	// 리밸런싱 필요한지 확인 
        final boolean probingRebalanceNeeded = neededActiveTaskMovements + neededStandbyTaskMovements > 0;
        return probingRebalanceNeeded;
    }

     

    assign → assignActiveStatefulTasks()

    이 메서드에서는 ClientState가 이전에 가지고 있었던 TaskId가 어떤 것이었는지 상관하지 않고, 새로운 StreamTask를 우선 배정하고 리밸런싱 사전 작업을 진행한다.

    1. For문을 돌면서 assignActive() 메서드를 수행한다. 이 때, ClientState는 번갈아가면서 StreamTask를 하나씩 배정받는다.
    2. balanceTasksOverThreads() 메서드를 호출해서 각 Kafka Streams 인스턴스의 StreamThread를 고려한 밸런싱을 한번 더 진행한다. 

    아래에서는 balanceTasksOverThreads() 호출 내용을 함께 살펴본다.

    // HighAvailiabilityTaskAssignor.java
    private static void assignActiveStatefulTasks(final SortedMap<UUID, ClientState> clientStates,
                                                  final SortedSet<TaskId> statefulTasks) {
        Iterator<ClientState> clientStateIterator = null;
        for (final TaskId task : statefulTasks) {
            if (clientStateIterator == null || !clientStateIterator.hasNext()) {
                clientStateIterator = clientStates.values().iterator();
            }
            // 번갈아가면서 ClientState에 StreamTask 배정
            clientStateIterator.next().assignActive(task);
        }
    
    	// StreamThread 갯수를 고려한 리밸런싱
        balanceTasksOverThreads(...);
    }

    assign → assignActiveStatefulTasks() → balanceTasksOverThreads()

    balanceTasksOverThreads() 메서드가 호출된다. 위의 메서드에서는 각 카프카 스트림즈 인스턴스가 가지고 있는 StreamThread의 개수가 고려가 되지 않았으나, 이 메서드에서는 배정된 상태에서 StreamThread 개수를 고려해서 한번 더 밸런싱 하는 작업을 처리한다. 

    1. Source / Destination Client에 대해서 이중 for문을 돌면서 Source → Destination으로 StreamTask 1개를 옮겨도 될지 확인한다.
    2. 옮겼을 때 더 좋을 경우, accept() 메서드를 이용해서 Source Client에 있는 StreamTask를 삭제처리한다. 그리고 Destination Client에 있는 StreamTask를 하나 추가한다.
    // HighAvailiabilityTaskAssignor.java
    private static void balanceTasksOverThreads(...) {
        boolean keepBalancing = true;
        while (keepBalancing) {
            keepBalancing = false;
            
            /*
            	이중 For문을 진행함
                1. 각각의 Source Client마다 Destination Client를 살펴본다.
                2. 살펴볼 때, Source → Destination으로 StreamTask 이동이 의미 있을지를 살핀다.
            */
            for (final Map.Entry<UUID, ClientState> sourceEntry : clientStates.entrySet()) {
                final UUID sourceClient = sourceEntry.getKey();
                final ClientState sourceClientState = sourceEntry.getValue();
    
                for (final Map.Entry<UUID, ClientState> destinationEntry : clientStates.entrySet()) {
                    final UUID destinationClient = destinationEntry.getKey();
                    final ClientState destinationClientState = destinationEntry.getValue();
                    if (sourceClient.equals(destinationClient)) {
                        continue;
                    }
    
    				// Source Client !〓 Destination Client인 경우
                    final Set<TaskId> sourceTasks = new TreeSet<>(currentAssignmentAccessor.apply(sourceClientState));
                    final Iterator<TaskId> sourceIterator = sourceTasks.iterator();
                    
                    // Source → Destination 이동의미 있는지 확인
                    while (shouldMoveATask(sourceClientState, destinationClientState) && sourceIterator.hasNext()) {
                        final TaskId taskToMove = sourceIterator.next();
                        final boolean canMove = !destinationClientState.hasAssignedTask(taskToMove)
                                                // When ClientTagAwareStandbyTaskAssignor is used, we need to make sure that
                                                // sourceClient tags matches destinationClient tags.
                                                && taskMovementAttemptPredicate.test(sourceClientState, destinationClientState);
                        
                        // 의미 있는 경우,
                        if (canMove) {
                            
                            /*
                            	Source에서 StreamTask 삭제
                                Destination에 StreamTask 추가
                            */
                            taskUnassignor.accept(sourceClientState, taskToMove);
                            taskAssignor.accept(destinationClientState, taskToMove);
                            keepBalancing = true;
                        }
                    }
                }
            }
        }
    }

    assign → assignActiveStatefulTasks → balanceTasksOverThreads() → shouldMoveATask()

    이 메서드에서는 Task를 이동시키는게 괜찮을지 확인하는 메서드다. SourceClientState는 StreamTask를 공급해줄 녀석의 상태, DestinationClientState는 공급받아서 이동된 후의 상태다. 이 값들을 비교해서 이동이 괜찮을지를 살펴본다. 이 때, capacity는 각 Client마다 존재하는 StreamThread의 값이다.

    1. 현재 sourceClient, destinationClient에 Assign된 Task의 수의 차이를 확인한다. sourceClient가 가진 StreamTask가 destination이 가진 녀석보다 같거나 많은 경우는 Move 할 필요가 없다. 
      • 같은 경우 이동하면 더 많은 비용이 발생한다. (StateStore 복원)
      • Source보다 Destination의 값이 더 크면, Source → Destination으로 옮겼을 때 오히려 불균형이 가속화 되기 때문이다. 
    2. 하나의 StreamTask를 옮겼다고 가정하고, Destination의 값을 1개 올리고 Source의 값을 1개 내린다. 그리고 Source가 Destination보다 큰지 확인한다. 그 값은 propsedSkew다. 옮겼을 때, Destination의 값이 훨씬 큰 경우에는 옮길 필요가 없다. 왜냐하면 기존에도 작던 Source의 크기는 더욱 더 작아지고, Destination의 크기만 더욱 커졌기 때문이다.
    3. 이 과정을 통과하면 처음에 계산했던 Skew, 변경한다고 가정했을 때의 ProposedSkew를 비교한다. Skew보다 ProposedSkew가 작아졌다면 리밸런싱의 의미가 있기 때문에 리밸런싱을 할 수 있게 된다. 
    private static boolean shouldMoveATask(final ClientState sourceClientState,
                                           final ClientState destinationClientState) {
        final double skew = sourceClientState.assignedTaskLoad() - destinationClientState.assignedTaskLoad();
    
        if (skew <= 0) {
            return false;
        }
    
        final double proposedAssignedTasksPerStreamThreadAtDestination =
            (destinationClientState.assignedTaskCount() + 1.0) / destinationClientState.capacity();
        final double proposedAssignedTasksPerStreamThreadAtSource =
            (sourceClientState.assignedTaskCount() - 1.0) / sourceClientState.capacity();
        final double proposedSkew = proposedAssignedTasksPerStreamThreadAtSource - proposedAssignedTasksPerStreamThreadAtDestination;
    
        if (proposedSkew < 0) {
            // then the move would only create an imbalance in the other direction.
            return false;
        }
        // we should only move a task if doing so would actually improve the skew.
        return proposedSkew < skew;
    }

    assign → assignActivetaskMovements 

    assignActiveTaskMovements를 호출하면 다음 작업을 진행한다.

    1. taskIsNotCaughtUpOnClientAndOtherMoreCaughtUpClientsExist() 메서드를 호출해서, 현재 client보다 Stateful Task를 더욱 잘 catch up한 Kafka Streams 인스턴스가 있을 경우 taskMovements에 넣어준다.
    2. taskMovements의 size가 0보다 크고, 이동이 가능한 상태라면 이동해야 할 갯수를 반환한다.
    // HighAvailiabilityTaskAssignor.java
    static int assignActiveTaskMovements(...) {
        ...
    
        for (final Map.Entry<UUID, ClientState> clientStateEntry : clientStates.entrySet()) {
            ...
            for (final TaskId task : state.activeTasks()) {
                // 더 잘 따라잡은 Task가 있는 경우
                if (taskIsNotCaughtUpOnClientAndOtherMoreCaughtUpClientsExist(task, client, clientStates, tasksToCaughtUpClients, tasksToClientByLag)) {
                    taskMovements.add(new TaskMovement(task, client, tasksToCaughtUpClients.get(task)));
                }
            }
            // 우선순위 큐에 집어넣어준다.
            caughtUpClientsByTaskLoad.offer(client);
        }
    
    	// Move가 필요한 녀석 갯수
        final int movementsNeeded = taskMovements.size();
    
        while (!taskMovements.isEmpty()) {
            final TaskMovement movement = taskMovements.poll();
            final boolean moved = tryToSwapStandbyAndActiveOnCaughtUpClient(clientStates, caughtUpClientsByTaskLoad, movement) ||
                    tryToMoveActiveToCaughtUpClientAndTryToWarmUp(clientStates, warmups, remainingWarmupReplicas, caughtUpClientsByTaskLoad, movement) ||
                    tryToMoveActiveToMostCaughtUpClient(tasksToClientByLag, clientStates, warmups, remainingWarmupReplicas, caughtUpClientsByTaskLoad, movement);
    
            if (!moved) {
                throw new IllegalStateException("Tried to move task to more caught-up client as scheduled before but none exist");
            }
        }
    	// Move가 필요하면 결국 갯수를 돌려준다.
        return movementsNeeded;
    }

    assign → assignStatelessActiveTasks

    이 메서드는 Stateless ActiveTask를 각각의 Consumer들에게 배정하는 작업을 한다. 

    1. ConstrainedPrioritySet을 생성한다. 이 객체는 내부적으로 ClientState를 가지고 있는 우선순위 큐, Unique 클라이언트 값들을 보관하고 있다. 
    2. 배정은 for문을 돌면서 처리된다. For문에서는 stateless task를 하나씩 가져오고, 우선순위 큐에서 현재 배정받은 stateless Task가 가장 적은 clientState를 가져와서 stateless StreamTask를 하나씩 배정하는 작업을 한다. 배정은 state.assignActive() 메서드를 이용해서 진행된다. 

    이 때 ClientState 객체의 상태는 아래에서 살펴볼 수 있다. assigned / previous Prefix가 각각 붙어 있는 것을 볼 수 있다. ClientState는 Previous를 통해서 리밸런싱이 일어나기 전의 상태를 알고 있다. 또한 assigend를 통해서 리밸런싱이 일어나고 있는 현재의 배정 상태를 기록하고 있다. 

    코드는 아래에서 살펴볼 수 있다. 

    // HighAvailabilityTaskAssignor
    private static void assignStatelessActiveTasks(final TreeMap<UUID, ClientState> clientStates,
                                                   final Iterable<TaskId> statelessTasks) {
        final ConstrainedPrioritySet statelessActiveTaskClientsByTaskLoad = new ConstrainedPrioritySet(
            (client, task) -> true,
            client -> clientStates.get(client).activeTaskLoad()
        );
        statelessActiveTaskClientsByTaskLoad.offerAll(clientStates.keySet());
    
        for (final TaskId task : statelessTasks) {
            final UUID client = statelessActiveTaskClientsByTaskLoad.poll(task);
            final ClientState state = clientStates.get(client);
            state.assignActive(task);
            statelessActiveTaskClientsByTaskLoad.offer(client);
        }
    }

     

     

    StreamPartitionAssignor.assign() → computeNewAssignment()

    앞서서 각 StreamThread 별로 StreamTask, Stateful Task, Standby Task가 배정되었다. 그런데 아직 한 가지 배정되지 않은 것이 있는데, 바로 각 카프카 스트림즈 인스턴스의 어떤 컨슈머가 어떤 파티션을 Consume 해올지다. compueNewAssignment()는 각 StreamThread에 배정된 StreamTask의 파티션에 알맞게 Consumer를 설정하는 작업을 진행한다. 즉, 실제로 어떤 파티션을 카프카 스트림즈가 불러올지는 이미 앞에서 이야기가 끝났다. 

    이곳에서는 실제로 배정이 완료된 녀석들을 Broker에게 전달하기 위해서 리밸런싱 정보를 직렬화한다. 그리고 크게 두 가지를 지켜봐야한다. 

    1. addClientAssignment() : 이곳에서는 각 카프카 스트림즈 인스턴스가 가진 Consumer에게 파티션을 배정해준다. 각 카프카 스트림즈는 여러 개의 StreamThread를 가질 수 있고, 여러 개의 StreamThread는 여러 개의 Consumer를 가진다. 각 StreamThread가 가진 TaskID를 적당한 Consumer에 배정해주는 작업을 한다.
    2. rebalnce : 앞서 처리한 리밸런싱이 Stateful Task, StandBy Task에서 Lag이 발생하면서 Probing Rebalance가 False로 될 경우, rebalance가 True 된다. 즉, 이 작업이 종료되었을 때 보내는 메세지에 '리밸런싱이 더 필요해요'라는 메세지를 넣어서 보낸다. 그리고 이후 다시 한번 Follow-up Rebalance가 발생한다. 
    // StreamsPartitionAssignor
    private Map<String, Assignment> computeNewAssignment(final Set<TaskId> statefulTasks,
                                                         final Map<UUID, ClientMetadata> clientsMetadata,
                                                         final Map<TaskId, Set<TopicPartition>> partitionsForTask,
                                                         final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
                                                         final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost,
                                                         final Set<TopicPartition> allOwnedPartitions,
                                                         final int minUserMetadataVersion,
                                                         final int minSupportedMetadataVersion,
                                                         final boolean versionProbing,
                                                         final boolean shouldTriggerProbingRebalance) {
        boolean rebalanceRequired = shouldTriggerProbingRebalance || versionProbing;
        final Map<String, Assignment> assignment = new HashMap<>();
    
        // within the client, distribute tasks to its owned consumers
        for (final Map.Entry<UUID, ClientMetadata> clientEntry : clientsMetadata.entrySet()) {
            final UUID clientId = clientEntry.getKey();
            final ClientMetadata clientMetadata = clientEntry.getValue();
            final ClientState state = clientMetadata.state;
            final SortedSet<String> consumers = clientMetadata.consumers;
            final Map<String, Integer> threadTaskCounts = new HashMap<>();
    
            final Map<String, List<TaskId>> activeTaskStatefulAssignment = assignTasksToThreads(...);
            final Map<String, List<TaskId>> standbyTaskAssignment = assignTasksToThreads(...);
            final Map<String, List<TaskId>> activeTaskStatelessAssignment = assignTasksToThreads(...);
    
            final Map<String, List<TaskId>> activeTaskAssignment = activeTaskStatefulAssignment;
            for (final Map.Entry<String, List<TaskId>> threadEntry : activeTaskStatelessAssignment.entrySet()) {
                activeTaskAssignment.get(threadEntry.getKey()).addAll(threadEntry.getValue());
            }
            
            final boolean encodeNextProbingRebalanceTime = shouldTriggerProbingRebalance && clientId.equals(taskManager.processId());
    
    		// StreamThread에 각 Consumer에 배정함. 
            final boolean tasksRevoked = addClientAssignments(...);
    
            if (tasksRevoked || encodeNextProbingRebalanceTime) {
                rebalanceRequired = true;
            }
    
    	...
    
        if (rebalanceRequired) {
            // 추가 리밸런스가 필요한 경우, followup 리밸런스를 발행함. (probingRebalance)
            assignmentListener.onAssignmentComplete(false);
            log.info("Finished unstable assignment of tasks, a followup rebalance will be scheduled.");
        } else {
        	// 추가 리밸런스가 필요없는 경우, 여기서 끝남.
            assignmentListener.onAssignmentComplete(true);
            log.info("Finished stable assignment of tasks, no followup rebalances required.");
        }
    
        return assignment;
    }

     

     

     

    멀티 스트림 쓰레드

    • 멀티 스트림 쓰레드는 서로 다른 컨슈머, 프로듀서를 사용한다.
    • 하나의 카프카 스트림즈 인스턴스에 Stream Thread가 10개인 경우, 컨슈머, 프로듀서는 각각 10개가 생성된다. 

     

    정보

    • StandbyTaskAssignor는 따로 존재한다.
    • https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP441:SmoothScalingOutforKafkaStreams-ProbingRebalances

    댓글

    Designed by JB FACTORY