Kafka Streams : Aggregate 동작 방식

    들어가기 전

    이 글은 Kafka Streams에서 Aggregate의 동작 방식을 코드를 보고 정리한 글입니다.

     

    정리

    Aggregate() 메서드를 이용하게 되면 Init, Add, Remove 함수를 각각 선언해줘야한다. 여기서 궁금한 점은 Init, Add, Remove 함수는 각각 언제 호출되느냐이다. 정리하면 다음과 같다.

    1. 레코드의 Key로 StateStore에서 값을 찾아왔을 때 없다면 Init Value가 호출된다.
    2. 레코드의 Key로 StateStore에서 값을 찾아왔을 때 존재한다면, Aggreagator의 Remove 메서드를 호출한다.
    3. 도착한 레코드에 새로운 값이 존재한다면, Aggregator의 Add 메서드를 호출한다. 

    Aggregator 클래스에서 전체적으로 메서드가 호출되는 순서는 아래와 같다.

    Remove → Init → Add 

    이처럼 호출되는 이유는 도착한 메세지의 Key값을 기준으로 StateStore부터 검사하기 때문이다.

    Init, Add, Remove 메서드는 어떤 값을 집계하는지에 따라 다르게 구현되어야 한다. 예를 들어 누적합을 구하는 경우 Init의 결과는 return 0의 형태가 될 것이다. 또한 Add() 메서드는 return aggreagtor + value 형태가 될 것이다. 그렇지만 예를 들어 상위 5개의 값을 구하는 형태라면 Init의 리턴값은 Priority Que가 될 것이고, Add() 메서드는 Priority Que.add() 메서드가 될 것이다.

     

    코드 따라가보기

        @Override
        public void process(final Record<KIn, Change<VIn>> record) {
            
            ...
    
            final ValueAndTimestamp<VAgg> oldAggAndTimestamp = store.get(record.key());
            final VAgg oldAgg = getValueOrNull(oldAggAndTimestamp);
            final VAgg intermediateAgg;
            long newTimestamp = record.timestamp();
    
            // first try to remove the old value
            // 메세지에 값이 있고, StateStore에 저장된 값이 있을 때
            if (record.value().oldValue != null && oldAgg != null) {
                // remove를 호출한다.
                intermediateAgg = remove.apply(record.key(), record.value().oldValue, oldAgg);
                newTimestamp = Math.max(record.timestamp(), oldAggAndTimestamp.timestamp());
            } else {
                intermediateAgg = oldAgg;
            }
    
            // then try to add the new value
            final VAgg newAgg;
            if (record.value().newValue != null) {
                final VAgg initializedAgg;
                if (intermediateAgg == null) {
                    initializedAgg = initializer.apply();
                } else {
                    initializedAgg = intermediateAgg;
                }
    
                newAgg = add.apply(record.key(), record.value().newValue, initializedAgg);
                if (oldAggAndTimestamp != null) {
                    newTimestamp = Math.max(record.timestamp(), oldAggAndTimestamp.timestamp());
                }
            } else {
                newAgg = intermediateAgg;
            }
    
            // update the store with the new value
            store.put(record.key(), ValueAndTimestamp.make(newAgg, newTimestamp));
            tupleForwarder.maybeForward(
                record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : null))
                    .withTimestamp(newTimestamp));
        }
    
    }

    Aggreator 클래스다.  위 클래스는 다음과 같이 동작한다. 

    1. 현재 메세지의 Key로 StateStore에서 저장되어있는 Aggregated Value를 가져와 OldAggVal에 저장한다.
    2. StateStore에 메세지의 Key로 저장된 값이 있는 경우, Aggregator의 Remove 메서드를 호출하고, 결과값을 intermidateAgg에 저장한다.
    3. 메세지에 새로운 값이 존재하는 경우 아래 작업을 진행한다
      • intermidateAgg 값이 없는 경우 Aggregator의 Initializer를 호출하고, 그 값을 initAgg에 저장한다.
      • IntermidateAgg 값이 있는 경우에는 이미 초기화 되었기 때문에 initAgg에 intermidateAgg값을 저장한다.
      • Aggregator의 add 메서드에 intermidateAgg와 레코드의 Value를 넘겨주어서 연산한 후 newAgg에 저장한다.
    4. 새롭게 집계된 newAgg 값을 stateStore에 저장하고, 다음 프로세서를 호출한다. 

    댓글

    Designed by JB FACTORY