Kafka Streams : Process 과정에서의 레코드

    Kafka Streams의 레코드는 각 노드에서 바뀐다. 

    Kafka Streams는 Kafka Consumer를 이용해서 Kafka Broker에서 메세지를 받아온다. 그리고 받아온 메세지 원본을 각 StreamTask의 Partition Group에다가 각각 저장해둔다. 이 때 메세지는 각 노드의 프로세스를 지날 때 마다 변경되어서 전달된다. 

    final Record<Object, Object> toProcess = new Record<>(
        record.key(),
        record.value(),
        processorContext.timestamp(),
        processorContext.headers()
    );
    maybeMeasureLatency(() -> currNode.process(toProcess), time, processLatencySensor);

    StreamTask의 Process() 메서드에는 다음과 같은 흐름이 있다.

    1. 만들어진 record를 다시 한번 record를 만드는데, 이 때 toProcess 객체로 생성한다.
    2. 생성된 toProcess 객체를 currNode.process()를 호출할 때 넘겨준다.

    즉, 다음 노드에서 처리해야 할 record의 값이 현재 노드에서 새로 생성되서(toProcess) 전달되는 것을 볼 수 있다.

    예를 들어 이 때는, Record의 value 타입이 ShareVolume인 것을 알 수 있다. 

        store.put(record.key(), ValueAndTimestamp.make(newAgg, newTimestamp));
        tupleForwarder.maybeForward(
            record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : null))
                .withTimestamp(newTimestamp));
    }

    이것은 KStreamReduce 객체의 process() 메서드다. 여기서는 새로운 형태의 레코드를 다시 한번 생성해서 전달하는 것을 볼 수 있다.

    1. tupleForwarder.maybeForward()를 이용해서 다음 노드로 넘어간다.
    2. 이 때 record.withValue().withTimeStamp() 등을 이용해서 새로운 레코드를 생성해서 전달한다. 

    여기서 record의 value는 Change 타입인 것을 확인할 수 있다.

     

    결론

    원본 레코드는 StreamTask에 PartitionGroup에 저장되어있지만, 재귀적으로 다음 노드에서 Process를 할 때에는 일반적으로 현재 노드에서 처리된 내용을 바탕으로 새로운 메세지 객체(예를 들면 toProcess)를 새로 생성해서 다음 Processor에게 넘기는 형태가 된다.

     

     

    댓글

    Designed by JB FACTORY