Kafka Streams : Join 스트림의 내부동작

    들어가기 전

    Kafka Streams에서 스트림의 레코드 처리는 하나의 레코드씩 깊이 우선으로 처리를 하고 있다. 생성된 Topology가 다양한 형태로 되어있다고 하더라도 하나의 레코드를 기준으로 깊이 우선으로 처리를 한 후에 다음 레코드를 처리하는 형태다. 이번 포스팅에서는 간단한 Join 메서드를 이용해서 Topology를 구현하고, 해당 Topology에서 실제 구현은 어떻게 되는지를 살펴보고자 한다. 

     

    각 스트림의 Join 실행 방식 

    왼쪽 그림처럼 Topology를 생성하고 스트림 프로세스를 시작한다고 가정해보자. 오른쪽 그림은 Topology를 만족하기 위해서 실제로 생성되는 Stream들을 의미한다. 실제로는 각각의 Stream으로 나누어져서 동작하는 구조를 가진다. 그리고 서두에서 이야기한 것처럼 Depth First로 동작을 한다. 따라서 하나의 레코드에 대해서 Map Stream부터 시작해서 가장 끝에 있는 Print Stream까지 끝을 낸 다음에 다시 우측의 Map Stream으로 돌아와서 우측의 Print Stream까지 처리를 한다. 

    이 때 WindowStream은 각각 자신의 상태를 보관하는 StateStore를 가지고, 이 StateStore는 rocksDB로 동작한다. Window Stream은 먼저 Join Stream을 호출하고, Join Stream을 처리한 다음 자신이 지금 가지고 있는 Record를 자신의 WindowStateStore에 저장한다. 

    한편 Join Stream은 먼저 현재 레코드의 Timestamp에서 Before / After를 계산한 값을 각각 From / To로 계산을 한다. 그리고 반대편에 있는(Join과 관련된) Window Stream의 StateStore를 불러온다. StateStore에서 앞서 계산한 From / To 인자를 넘겨줘서 시간에 맞는 녀석들을 가지고 있는 Iterator 객체를 생성한다. Iterator 객체에서 값을 하나씩 찾아와서 Join을 한다. 

    >a:b
    >a:c
    >a:d

    예를 들어 Kafka Console Producer를 이용해서 다음과 같은 메세지를 전송했다고 가정해보자. 그러면 실제 Print는 어떻게 찍힐까? 

    [topology-1]: a, b +++ topology-1
    [kafka-producer-network-thread | bb6ae556-d6b1-48b8-8187-d267d74fc0f6-765b7cfb-db76-45f0-be1d-7e6e606a815f-StreamThread-1-producer] INFO org.apache.kafka.clients.Metadata - [Producer clientId=bb6ae556-d6b1-48b8-8187-d267d74fc0f6-765b7cfb-db76-45f0-be1d-7e6e606a815f-StreamThread-1-producer] Resetting the last seen epoch of partition bb6ae556-d6b1-48b8-8187-d267d74fc0f6-KSTREAM-JOINOTHER-0000000008-store-changelog-0 to 0 since the associated topicId changed from null to D0jSppDwRs2TzhfGemITJA
    [topology-2]: a, b ---  topology-2
    [JOINED]: a, joined b ---  topology-2 ||||||||||||||| b +++ topology-1
    [kafka-producer-network-thread | bb6ae556-d6b1-48b8-8187-d267d74fc0f6-765b7cfb-db76-45f0-be1d-7e6e606a815f-StreamThread-1-producer] INFO org.apache.kafka.clients.Metadata - [Producer clientId=bb6ae556-d6b1-48b8-8187-d267d74fc0f6-765b7cfb-db76-45f0-be1d-7e6e606a815f-StreamThread-1-producer] Resetting the last seen epoch of partition bb6ae556-d6b1-48b8-8187-d267d74fc0f6-KSTREAM-JOINTHIS-0000000007-store-changelog-0 to 0 since the associated topicId changed from null to W1e-FHKlRFCogdKKGDmmlA
    [topology-1]: a, c +++ topology-1
    [JOINED]: a, joined b ---  topology-2 ||||||||||||||| c +++ topology-1
    [topology-2]: a, c ---  topology-2
    [JOINED]: a, joined c ---  topology-2 ||||||||||||||| b +++ topology-1
    [JOINED]: a, joined c ---  topology-2 ||||||||||||||| c +++ topology-1
    [topology-1]: a, d +++ topology-1
    [JOINED]: a, joined b ---  topology-2 ||||||||||||||| d +++ topology-1
    [JOINED]: a, joined c ---  topology-2 ||||||||||||||| d +++ topology-1
    [topology-2]: a, d ---  topology-2
    [JOINED]: a, joined d ---  topology-2 ||||||||||||||| b +++ topology-1
    [JOINED]: a, joined d ---  topology-2 ||||||||||||||| c +++ topology-1
    [JOINED]: a, joined d ---  topology-2 ||||||||||||||| d +++ topology-1
    1. 메세지 {a,b}가 들어왔다. 이 메세지는 TaskManager에게 전달된다. 
    2. TaskManager는 전달받은 메세지를 좌측의 Map Stream에 전달된다. 이 때 b는 b +++ topology-1이 된다. (파이썬의 딕셔너리 방식으로 Key / Value를 표현)
    3. 2번에서 생성된 레코드는 좌측 PrintStream으로 전달된다. 그리고 좌측 Print Stream은 a, b +++ topology-1을 출력한다.
    4. 2번에서 생성된 레코드가 좌측 WindowSream에 전달된다. 그리고 바로 좌측 JoinStream을 호출한다.
    5. 좌측 JoinStream에서 우측 StateStore를 조회했지만 우측 StateStore는 비어있다. 따라서 좌측 Join Stream에서는 Join된 레코드는 없다.
    6. 좌측은 Join된 레코드 없이 Merge Stream, Print Stream까지 넘어간다. 그렇지만 Join된 레코드가 없기 때문에 출력될 것은 없다. 
    7. Print Stream에서 재귀 방식으로 좌측 Join Stream으로 돌아온다. 이 때, 현재 레코드를 좌측 Window의 StateStore에 보관한다. 이 때, 좌측 StateStore에는 {a : b +++ topology-1}가 보관된다.  이 레코드는 앞서 사용한 Map Stream에 의해서 변형된 레코드다.
    8. TaskManager는 전달받은 메시지를 우측 Map Stream에 전달한다. 이 때 {a,b}는 {a,b --- topology-2}로 변환되고 우측 WindowStream에 전달된다.
    9. 우측 PrintStream은 호출된다. 따라서 이 때, {a : b --- topology-2}가 출력된다.
    10. 우측 WindowStream은 우측 JoinStream을 호출한다.
    11. 우측 JoinStream은 좌측 WindowStateStore를 조회한다. 이 때, {a, b +++ topology-1}이 저장되어있어서 이것을 꺼내온다.
      1. 현재 메세지인 {a, b --- topology-2}와 {a, b +++ topology-1}은 동일한 Key인 A를 가진다. 따라서 Join된 메세지가 생성된다.
    12. 우측 JoinStream은 생성한 메세지 {a, joined b ---  topology-2 ||||||||||||||| b +++ topology-1}를 우측 Merge Stream에 전달한다.
    13. Merge Stream은 전달받은 메세지를 PrintStream으로 전달한다.
    14. PrintStream은 메세지 {a, joined b ---  topology-2 ||||||||||||||| b +++ topology-1}를 출력한다. 
    15. 호출은 재귀방식으로 우측 WindowStream으로 돌아오며, 우측 WindowStream의 StateStroe에는 {a: b --- topology-2}가 저장된다. 

    Join Stream은 위와 같은 방식으로 처리된다. 마치 병렬적으로 물이 양쪽으로 쫙 퍼져서 한번에 Join이 될 것으로 생각이 되었지만, 실제로는 전달받은 메세지 한 건을 처리할 때마다 그 값을 WindowStream의 StateStore에 저장하고 반대편에서 이 StateStore를 확인해서 처리하는 방식이다. 

     

     

     

    코드 따라가보기

    void runOnce() {
        
        ...
        
            do {
                final int processed = taskManager.process(numIterations, time);
                }
                
    	...
    }

    StreamThread의 runOnce() 메서드다. runOnce()에서 taskManager.process()를 이용해서 Stream Process를 처리하기 시작한다. 

    int process(final int maxNumRecords, final Time time) {
        
        ...
        
        for (final Task task : tasks.activeTasks()) {
            ...
            try {
                if (taskExecutionMetadata.canProcessTask(task, now)) {
                    ...
                    totalProcessed += processTask(task, maxNumRecords, now, time);
                }
            ...
    }

    TaskExecutor() 클래스에서 tasks.activeTasks()에서 현재 Stream 쓰레드가 가지고 있는 모든 StreamTask를 하나씩 실행시켜준다. processtask()를 통해서 StreamTask가 실행된다. 

    public boolean process(final long wallClockTime) {
        ...
            // get the next record to process
            record = partitionGroup.nextRecord(recordInfo, wallClockTime);
    	...
                doProcess(wallClockTime);
        
        ...
    }

    StreamTask 클래스의  process()가 호출된다. 먼저 여기서는 두 가지 주요 동작이 있다. 

    1. partitionGroup에 recordInfo, wallClockTime을 전달해서 현재 처리해야 할 record를 읽어와서 StreamTask에 저장한다.
    2. doProcess() 메서드를 호출해서 레코드를 처리한다. 

    한 StreamTask는 하나의 쓰레드에 의해서만 동작하기 때문에 동시성 이슈는 없고, 멤버 변수로서 처리해야 할 레코드를 관리한다. 

    private void doProcess(final long wallClockTime) {
        ...
        final ProcessorNode<Object, Object, Object, Object> currNode = (ProcessorNode<Object, Object, Object, Object>) recordInfo.node();
        ...
    
        final ProcessorRecordContext recordContext = new ProcessorRecordContext(
            ...
        );
        
        ...
        final Record<Object, Object> toProcess = new Record<>(
            record.key(),
            record.value(),
            processorContext.timestamp(),
            processorContext.headers()
        );
        
        maybeMeasureLatency(() -> currNode.process(toProcess), time, processLatencySensor);
    	...    
    }

    StreamTask의 doProcess() 메서드로 넘어온다. 여기서는 4가지 주요한 동작을 한다.

    1. 현재 Topology에서 실행해야 할 Node를 가져온다. 이 Node는 기본적으로는 시작부분이니 Source Node가 된다.
    2. 해당 Topology에서 전체적으로 사용할 ProcessorRecordContext()를 생성한다. 이 녀석은 재귀적으로 DownStream을 실행해가면서 현재 Node의 문맥을 알려주는 역할을 한다. 주로 ProcessContext에서 재귀적으로 동작하는데, 이 때 이전 Context를 기억하면서 처리된다. 
    3. 처리해야할 Record인 toProcess 레코드를 생성한다.
    4. 생성한 레코드를 현재 Node에 전달해서 작업을 진행한다. 
    @Override
    public void process(final Record<KIn, VIn> record) {
        context.forward(record);
        processAtSourceSensor.record(1.0d, context.currentSystemTimeMs());
    }

    SourceNode 클래스의 process() 메서드다. 이 메서드에서는 ProcessorContext에 record를 전달하면서 forward() 메서드를 호출한다. 다른 말로 이야기 하면 DownStream Node로 레코드를 전달하면서 작업을 처리하라는 이야기다. 

    @SuppressWarnings("unchecked")
    @Override
    public <K, V> void forward(final Record<K, V> record, final String childName) {
    
        final ProcessorNode<?, ?, ?, ?> previousNode = currentNode();
        final ProcessorRecordContext previousContext = recordContext;
    
        try {
            if (recordContext != null && (record.timestamp() != timestamp() || record.headers() != headers())) {
                recordContext = new ProcessorRecordContext(
                    record.timestamp(),
                    recordContext.offset(),
                    recordContext.partition(),
                    recordContext.topic(),
                    record.headers());
            }
    
            if (childName == null) {
                final List<? extends ProcessorNode<?, ?, ?, ?>> children = currentNode().children();
                for (final ProcessorNode<?, ?, ?, ?> child : children) {
                    forwardInternal((ProcessorNode<K, V, ?, ?>) child, record);
                }
            } else{
            	...
            }
            ...
        } finally {
            recordContext = previousContext;
            setCurrentNode(previousNode);
        }
    }

    ProcessorContextImpl의 forward() 메서드가 호출된다.

    1. 이전 Node와 Context 정보를 Previous에 저장해둔다.
    2. 현재 Node의 Processor Context를 생성한다.
    3. for문을 이용해서 현재 노드의 자식 노드들을 forwardInternal()로 호출한다. 이 때, 매개변수로 전달받은 record를 전달하면서 처리가 된다. 
      • 이전 노드에서 전달받은 record를 전달받은 이유는 Process 된 상태를 계속 유지하기 위함이다.
    4. forwardInternal()로 처리가 완료되면 Previous에 저장해두었던 이전 Node와 이전 ProcessorRecordContext를 현재 상태로 되돌리고 리턴한다. 
    private <K, V> void forwardInternal(final ProcessorNode<K, V, ?, ?> child,
                                        final Record<K, V> record) {
        setCurrentNode(child);
    
        child.process(record);
    
        if (child.isTerminalNode()) {
            streamTask.maybeRecordE2ELatency(record.timestamp(), currentSystemTimeMs(), child.name());
        }
    }

    forwardIntenal()을 호출하면 ProcessorContextImpl 클래스의 forwardInternal()로 넘어오게 된다. 여기서는 주로 두 가지 동작을 한다.

    1. 현재 Node를 child 노드로 등록한다.
      • 위의 예시에서는 현재 노드가 Source, 자식 노드가 MapStream 노드였다. 이 메서드를 통해서 현재 노드는 MapStream이 된다.
    2. child.process()를 통해서 자식 노드에 의해서 메세지가 처리된다. 
    public void process(final Record<KIn, VIn> record) {
    
        try {
            if (processor != null) {
                processor.process(record);
            } else if (fixedKeyProcessor != null) {
                fixedKeyProcessor.process(
                    InternalFixedKeyRecordFactory.create(record)
                );
            } else {
    		...
        }
    }

    ProcessNode 클래스의 process() 메서드로 넘어오게 된다. 이 때 Key가 있는 Processor인 경우 fixedKeyProcessor가 되어서 fixedKeyProcessor.process()로 다음 스트림이 처리가 된다. 

    이 때 각 Node는 다음과 같은 필드를 가지고 있는 것을 볼 수 있다.

    1. 자신의 자식 Node들. Topology상 Processor가 된다.
    2. 현재 노드의 Processor. 현재 노드에서 처리해야 할 작업들이다.
    3. StateStores는 현재 노드의 상태 값을 보관하는 곳이다. 
    private class KStreamMapProcessor extends ContextualFixedKeyProcessor<KIn, VIn, VOut> {
        @Override
        public void process(final FixedKeyRecord<KIn, VIn> record) {
            final VOut newValue = mapper.apply(record.key(), record.value());
            context().forward(record.withValue(newValue));
        }
    }

    MapValues() 스트림을 처리하는 것이기 때문에 KStreamMapValues의 Process()로 넘어왓다. 여기서는 값을 처리한 후, 현재 노드의 ProcessorContext를 호출해서 다음 프로세스 작업을 진행한다. 

    public <K, V> void forward(final Record<K, V> record, final String childName) {
        final ProcessorNode<?, ?, ?, ?> previousNode = currentNode();
        final ProcessorRecordContext previousContext = recordContext;
            if (recordContext != null && (record.timestamp() != timestamp() || record.headers() != headers())) {
                recordContext = new ProcessorRecordContext(...}
    
            if (childName == null) {
                final List<? extends ProcessorNode<?, ?, ?, ?>> children = currentNode().children();
                for (final ProcessorNode<?, ?, ?, ?> child : children) {
                    forwardInternal((ProcessorNode<K, V, ?, ?>) child, record);
                }
            } 
            ...
            
        } finally {
            recordContext = previousContext;
            setCurrentNode(previousNode);
        }
    }

    다시 한번 ProcessContext()로 넘어오게 된다. ProcessContext()는 재귀적으로 DownStream을 호출해나가는 방식이 된다. 이 때도 동일하게 현재 Node의 자식 Node들을 불러와서 forwardInternal()을 한다. 

    private <K, V> void forwardInternal(final ProcessorNode<K, V, ?, ?> child,
                                        final Record<K, V> record) {
        setCurrentNode(child);
    
        child.process(record);
    
        if (child.isTerminalNode()) {
            streamTask.maybeRecordE2ELatency(record.timestamp(), currentSystemTimeMs(), child.name());
        }
    }

    ProcessorContextImpl로 넘어와서 child.process()를 호출한다.Print Node가 먼저 호출되지만, 유사하기 때문에 다음 노드는 WindowStreamNode라고 가정한다.

    ProcessorNode로 넘어와서 현재 ProcessorNode 인스턴스의 상태값을 확인해보면 다음과 같다. 이 때 한가지 주의해서 볼 점은 다음과 같다. ProcessNode가 가지고 있는 StateStores에 "KSTREAM-JOINOTHER-00000000008-store"라는 stateStore가 생긴 것을 볼 수 있다. 이 StateStore는 애초에 코드를 작성할 때 추가해준 적이 없다. Join을 하게 되면 자동으로 생기는 Join을 위한 stateStore다. 

    그리고 이 stateStore는 RocksDB를 사용하기 때문에 로컬 디렉토리에 StateStore까지 생성되는 것을 볼 수 있다.

    @Override
    public void process(final Record<K, V> record) {
        // if the key is null, we do not need to put the record into window store
        // since it will never be considered for join operations
        if (record.key() != null) {
            context().forward(record);
            // Every record basically starts a new window. We're using a window store mostly for the retention.
            window.put(record.key(), record.value(), record.timestamp());
        }
    }

    KStreamJoinWindow 프로세서의 process() 메서드로 넘어온다. 여기서 봐야할 점은 세 가지다.

    1. 메세지의 Key가 null이면 다음으로 넘어가지 않는다. 즉, Join으로 넘어가지 않는다.
      • Join은 Key를 바탕으로 하기 때문에 Key가 없으면 DownStream 동작은 하지 않는다는 것이다.
    2. ProcessorContext()를 이용해서 다음 노드로 넘어간다.
    3. 다음 노드의 작업이 끝나고 다시 돌아왔을 때, window.put() 메서드 호출한다.
      • 이것은 현재 WindowStream 노드가 가지고 있는 StateStore에 현재 레코드를 넣는다는 것을 의미한다. 

    아무튼 context().forward()를 이용해서 Join 노드를 불러와서 다음 작업을 진행해준다. 

    현재 노드는 JoinOther이라는 스트림인데, JoinThis쪽 StateStore를 가지고 있는 것을 볼 수 있다. 현재 Join Stream 노드에서 Processor을 불러와서 Join을 실행한다. 

    @Override
    public void process(final Record<K, V1> record) {
        
        ...
    
        boolean needOuterJoin = outer;
    
        final long inputRecordTimestamp = record.timestamp();
        final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
        final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
    
        ...
    
        try (final WindowStoreIterator<V2> iter = otherWindowStore.fetch(record.key(), timeFrom, timeTo)) {
            while (iter.hasNext()) {
                needOuterJoin = false;
                final KeyValue<Long, V2> otherRecord = iter.next();
                final long otherRecordTimestamp = otherRecord.key;
    
                ...
                
                context().forward(
                    record.withValue(joiner.apply(record.key(), record.value(), otherRecord.value))
                           .withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
            }
    
            ...
            }
        }
    }

    스트림과 스트림을 조인하기 때문에 KStreamKStreamJoin 클래스의 process()로 넘어온다. 이 메서드에서는 다음과 같은 일을 한다.

    1. Join Window를 결정한다.
      • 레코드의 timestamp를 기준으로 BeforeMs / AfterMS를 이용해서 JoinWindow를 결정한다.
    2. 반대편 WindowStream의 WindowStore에서 Join 대상을 찾는다.
      • Join 대상을 찾을 때, Key와 Window의 시작 / 종료 시간을 전달해서 Iterator 객체를 찾아온다.
    3. 찾아온 조인 대상 Iterator 객체를 이용해서 조인한 레코드를 생성하고 다음 노드로 진행한다. 
    KeyValueIterator<Bytes, byte[]> fetch(final Bytes key,
                                          final long from,
                                          final long to,
                                          final boolean forward) {
        final List<S> searchSpace = keySchema.segmentsToSearch(segments, from, to, forward);
    
        final Bytes binaryFrom = keySchema.lowerRangeFixedSize(key, from);
        final Bytes binaryTo = keySchema.upperRangeFixedSize(key, to);
    
        return new SegmentIterator<>(
                searchSpace.iterator(),
                keySchema.hasNextCondition(key, key, from, to, forward),
                binaryFrom,
                binaryTo,
                forward);
    }

    Join 대상을 WindowStore에서 찾아올 때는 위와 같은 함수가 호출된다. 유닉스 타임으로 시간이 변경되어서 그 기준에 맞는 녀석들의 바이너리상의 위치를 가져온다. 그리고 그것을 바탕으로 RocksDB에서 값을 불러와서 Iterator를 만들어주는 형태가 되는 것 같다. 

     

     

    각 Stream Node의 구성

    • StreamNode는 DownStream을 Children 변수에 가지고 있다. 메세지를 처리할 때, Depth-First로 Recursive하게 처리한다.
    • 하나의 Join Processor를 생성하면 그에 대한 결과로 WindowStream / JoinStream / MergeStream이 생기게 된다.
    • WindowStream은 내부적으로 StateStore가 생긴다. 이 StateStore는 RocksDB를 이용한다. 
    • 각 JoinStream은 반대편에 있는 JoinStream의 StateStore를 가진다. 그리고 이 StateStore에서 Key에 대한 값을 찾아와서 Join을 수행한다.

     

    한 Node의 매개변수

    • Topology의 Node는 다음과 같이 표현된다.
      • 내부적으로 Processor / ProcessorContext / StateStore를 가진다. .

     

    JoinWindow의 설정

    JoinWindows joinWindows = JoinWindows.
    	ofTimeDifferenceWithNoGrace(Duration.ofMinutes(30)).
        after(Duration.ofMinutes(1));
    • JoinWindow는 다음과 같이 설정할 수 있다. 이 JoinWindow는 Record의 Timestamp를 기준으로 정해진다. 
    • 기본적으로 TimeDiffence만 설정하게 되면 Before / After Time은 TimeDiffernce로 설정된다.
    • 만약에 Window를 조절하고 싶다면 after / before 메서드를 이용해서 조절할 수 있다. 
    final long inputRecordTimestamp = record.timestamp();
    final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
    final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
    
    final WindowStoreIterator<V2> iter = otherWindowStore.fetch(record.key(), timeFrom, timeTo
    • 해당 설정값은 Join 클래스(KStreamKStreamJoin.java)에서 설정값이 적용된다. 여기서 joinBeforeMs / joinAfterMS에 설정값이 저장된다.
    • TimeDiffernce만 설정하면 JoinBeforeMs / JoinAfterMs는 모두 TimeDifference와 동일하다. 
    • after / before를 설정하게 되면 joinBeforeMs / joinAfterMs의 값이 변경된다. 
    • 여기서 구한 timeFrom / timeTo 값을 이용해서 상대방의 WindowStateStore에서 값을 불러와서 Windowing을 구현한다.

    • 위의 코드에서 after를 1분으로 설정했는데, joinBeforeMS에 60초의 값이 설정된 것을 볼 수 있다.

     

     

    댓글

    Designed by JB FACTORY