Kafka Streams와 ksqlDB 정복 : Processor API (7장)

    들어가기 전

    이 글은 카프카 스트림즈와 ksqlDB 정복 7장을 공부하며 작성한 글입니다. 

     


    7.0 Processor API

    카프카 스트림즈는 DSL / Processor API로 구성되어있다. DSL은 상위 수준의 추상화를 가져와서 개발 생산성을 확보해주지만 상대적으로 자유도는 떨어진다. Processor API는 자바 코드를 직접 작성해야하기 때문에 하위 수준의 추상화를 보여주지만 DSL에 비해서 넓은 자유도를 보장한다. 카프카 스트림즈에서는 DSL / Processor API를 각각 쓰거나 섞어서 사용하면서 어플리케이션을 개발한다. 이 곳에서는 Processor API를 언제 사용하는지에 대해 살펴보고자 한다.

     


    7.1 Processor API는 언제 사용해야 할까?

    스트림 처리 어플리케이션을 구축할 때 사용할 추상화 수준을 결정하는 것은 매우 중요하다. 보통 프로젝트에 복잡성이 추가될 때는 합리적인 이유가 있어야한다. 즉, Processor API를 사용해야 할 근거를 알아야 한다는 것이다. 일반적으로 다음 특징 때문에 Processor API를 사용한다.

    • 레코드의 Metadata(Topic, Partition, Offset, Header) 정보가 필요할 때
    • 주기적인 함수를 스케쥴링 할 때 (Punctuator)
    • 레코드를 하위 스트림 프로세서로 넘길 때 사용 가능한 세세한 제어 
    • 상태 저장소에 대한 좀 더 세분화 된 접근
    • DSL을 사용할 때 마주치는 제약을 뛰어넘을 수 있는 기능

    반면 Processor API를 사용하면서 단점 또한 발생할 수 있다. 어떤 단점이 발생할까?

    • 코드가 길어진다. 따라서 유지 보수 비용이 증가한다.
    • 다른 프로젝트 관리자가 진입하기 어려운 높은 장벽이 생길 수 있음. 
    • DSL의 추상화 수준을 재창조하거나, 성능 함정 같은 문제를 일으킬 가능성이 높다.
      • 예를 들어 지나치게 많은 커밋, 지나치게 많은 상태 저장소 접근은 성능 문제를 일으킨다.
         

    카프카 스트림즈는 DSL / Processor API만 사용하거나 둘다 혼합해서 사용하는 기능을 제공한다. 따라서 어떤 것이 언제 필요한지 잘 알고, 적절한 추상화 수준을 유지할 수 있도록 하는 것이 좋다. 

     


    7.2 Processor 구현하기

    Processor API를 이용하기 위해서는 Processor 인터페이스를 구현해서 사용해야한다. Processor API는 다양한 형태로 사용할 수 있다. 

    org.apache.kafka.streams.processor.api.Processor
    • StateStore를 불러와서 상태가 있는 처리하기.
    • Punctuator 설정해서 스케쥴링하기
    • 1:N의 레코드를 생성해서 Downstreaming 하기
    • 1개의 레코드를 특정 하위 Node로만 보내기
    • 1개의 레코드를 모든 하위 Node에게 보내기

    이 곳에서는 1:N의 레코드를 생성해서 DownStream으로 보내는 것을 예시로 들려고 한다.

    public class DigitalTwinProcessor implements Processor<String, TurbineState, String, DigitalTwin> {
    
        private ProcessorContext<String, DigitalTwin> context;
        }
    
        @Override
        public void init(ProcessorContext<String, DigitalTwin> context) {
            Processor.super.init(context);
            this.context = context;
        }
    
        @Override
        public void process(Record<String, TurbineState> record) {
            this.store.put(key, digitalTwin);
            Record<String, DigitalTwin> newRecord = new Record<>(key, digitalTwin, record.timestamp());
            // child1에게 보내기
            this.context.forward(newRecord, "child1");
            // child2에게 보내기
            this.context.forward(newRecord, "child2");
        }
    
        @Override
        public void close() {
        }
    }

    위는 Processor를 구현한 녀석이다. 1개의 레코드를 생성해서 각각 child1, child2 노드로 downstream 하는 것을 볼 수 있다. 기본적으로 Processor API의 메세지 DownStream 전달은 context.forward()로 한다. context.forward()를 할 때, 자식 노드의 이름을 명시하면 특정 노드로만 전달이 되고 명시하지 않을 경우 전체 자식 노드로 브로드캐스팅 된다. 

     


    7.3 상태가 있는 프로세서 생성하기

    DSL에서는 StateStore를 추가하지 않아도 자동적으로 StateStore가 생성된다. 그렇지만 Processor API에서는 반드시 StateStore를 추가해야한다. 따라서 Processor API를 생성할 때는 StoreBuilder를 전달해주는 작업도 필요하다. Processor에는 ProcessorContext가 전달되는데 이곳에서 StateStore를 찾아와서 필요한 상태 처리를 해줄 수 있다. 정리하면 다음과 같다.

    • ProcessorContext에서 StateStore를 찾아와서 상태 처리하기
    • Topology에 StateStoreBuilder 추가하기 

    위에 대해서 각각을 살펴보고자 한다.

     

    Topology에 StateStoreBuilder 추가하기 

    // StateStore 생성 및 Topology 추가하기
    StoreBuilder<KeyValueStore<String, DigitalTwin>> digitalTwinStoreBuilder = Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("digital-twin-store"),
            Serdes.String(), digitalTwinSerde);
    topology.addStateStore(digitalTwinStoreBuilder, "digital-twin-store-node");
    
    topology.addProcessor("digital-twin-store-node", () -> new DigitalTwinProcessor("digital-twin-store"),
            "high-winds-node", "desired-state-event-node");

    위 코드에서 StateStore Builder를 Topology에 추가하는 방법을 알 수 있다.

    1. StateStoreSupplier를 생성하고 이것을 StateStoreBuilder에게 전달해준다.
    2. addStateStore() 메서드를 이용해서 StateStoreBuilder를 전달할 때, 이 녀석이 어떤 노드에서 사용될지 명시한다.
    3. Processor에서는 StateStore의 이름을 전달받아서 이 녀석을 찾아쓸 수 있도록 한다. 

     

    ProcessorContext에서 StateStore를 찾아와서 상태 처리하기

    public class DigitalTwinProcessor implements Processor<String, TurbineState, String, DigitalTwin> {
    
        private ProcessorContext<String, DigitalTwin> context;
        private KeyValueStore<String, DigitalTwin> store;
        private String storeName;
    
        public DigitalTwinProcessor(String storeName) {
            this.storeName = storeName;
        }
    
        @Override
        public void init(ProcessorContext<String, DigitalTwin> context) {
            this.context = context;
            this.store = context.getStateStore(this.storeName);
        }
    
        @Override
        public void process(Record<String, TurbineState> record) {
    
            String key = record.key();
            TurbineState value = record.value();
            DigitalTwin digitalTwin = this.store.get(key);
            if (digitalTwin == null) {
                digitalTwin = new DigitalTwin();
            }
    
            if (value.getType() == Type.DESIRED) {
                digitalTwin.setDesired(value);
            } else {
                digitalTwin.setReported(value);
            }
    
            this.store.put(key, digitalTwin);
            Record<String, DigitalTwin> newRecord = new Record<>(key, digitalTwin, record.timestamp());
            this.context.forward(newRecord, "child1");
        }
    }

    위 코드에서 Processor가 StateStore를 이용해서 어떻게 상태 처리를 하는지 살펴볼 수 있다. 

    1. Processor가 초기화 될 때, ProcessorContext가 전달된다. ProcessorContext는 StateStore를 가지고 있으므로, 이름으로 이녀석을 찾아와서 필드에 저장한다. 
    2. 필드 변수로 저장된 StateStore를 가져와서 필요한 상태 처리를 할 수 있다.
    3. 상태 처리 후 필요한 레코드를 Downstream 하는 것은 processorContext.forward()로 처리한다.

     


    7.4 Punctuator로 주기적인 함수 호출하기

    카프카 스트림즈에서 주기적으로 특정 함수 호출이 필요하다면 Punctuator 기능을 이용하면 된다. 이것은 ProcessorContext.schedule() 메서드를 이용해서 손쉽게 예약할 수 있다. 그런데 한 가지 주의해야 할 점이 있다. 카프카 스트림즈에서는 항상 시간을 고려해야하는데 Punctuator도 시간을 고려해야한다. Punctuator가 어떤 시간 타입을 사용할지를 결정해야한다.

    Type 설명
    StreamTime 스트림 시간은 특정 토픽 파티션에서 관찰한 타임스탬프 중 가장 최신 시간이다.
    초기에는 알 수 없고 증가하거나 현재 시간에 머물러 있기만 한다.
    스트림 시간은 메세지가 들어와야 증가하므로, 이 녀석을 사용할 때는 데이터가 지속적으로 들어와야한다. 그렇지 않으면 함수 실행이 안된다.
    WallClockTime 로컬 시스템 시간으로 컨슈머의 poll()을 호출할 때 마다 증가한다.
    시간을 얼마나 자주 업데이트 할지는 POLL_MS_CONFIG로 결정한다. 
    이 녀석은 새 메세지들이 도착하는 지의 여부와 상관없이 주기적으로 함수를 실행할 수 있다.

    코드는 아래에서 살펴볼 수 있다.

    • ProcessorContext.schedule() 메서드를 이용해서 어떤 Time으로 Punctuator를 설정할 수 있을지를 결정한다.
    • 실행할 Punctuator 함수를 하나 생성해서 shedule() 메서드에 넘겨준다. 
    public class DigitalTwinProcessor implements Processor<String, TurbineState, String, DigitalTwin> {
    
        private ProcessorContext<String, DigitalTwin> context;
    
        @Override
        public void init(ProcessorContext<String, DigitalTwin> context) {
            this.context = context;
            // Punctuator 설정
            context.schedule(Duration.ofMinutes(5), PunctuationType.WALL_CLOCK_TIME, this::enforceTtl);
        }
    
        ...
    
    	// Puncuator 함수
        private void enforceTtl(long timestamp) {
            try (KeyValueIterator<String, DigitalTwin> keyValueiterator = this.store.all()) {
    
                while (keyValueiterator.hasNext()) {
                    KeyValue<String, DigitalTwin> entry = keyValueiterator.next();
                    TurbineState lastReportedTurbineState = entry.value.getReported();
                    if (lastReportedTurbineState == null) {
                        continue;
                    }
                    Instant lastUpdate = Instant.parse(lastReportedTurbineState.getTimestamp());
                    long daysSinceLastUpdate = Duration.between(lastUpdate, Instant.now()).toDays();
                    if (daysSinceLastUpdate >= 7) {
                        this.store.delete(entry.key);
                    }
                }
            }
        }
    }

     


    7.5 Record 메타데이터 접근하기

    Processor API는 Key / Value가 아닌 Record를 전달해준다. 따라서 Record가 가지고 있는 메타 데이터에 직접 접근할 수 있다. Record의 메타 데이터가 필요한 경우에는 Processor API를 사용해볼 법 하다.

    메타 데이터 예제
    레코드 헤더 record.headers()
    레코드 오프셋 record.offset()
    레코드 파티션 record.partition()
    레코드 타임스탬프 record.timstamp()
    레코드 토픽 record.topic() // Source 토픽을 알려줌.

     


    7.6 Merge 기능 구현

    Processor API는 DSL의 merge 기능을 아주 손쉽게 지원해준다. addProcessor()에 부모 노드가 무엇인지를 알려줘야하는데, 이 때 원하는 부모 노드를 모두 적어주면 된다. 그러면 이 노드에는 그 부모 노드로부터 모든 메세지가 들어오면서 그 데이터를 Processing하게 된다. 즉 Merge + 특정한 Processing이 함께 진행된다.

    topology.addProcessor("digital-twin-store-node", () -> new DigitalTwinProcessor("digital-twin-store"),
            // 부모 노드 2개 설정
            "high-winds-node", "desired-state-event-node");

    예를 들어서 위 코드를 보면 high-winds-node / desired-state-event-node라는 두 개의 부모 노드를 설정했다. 이렇게 작성해두면 두 개의 부모 노드에서 레코드가 'digital-twin-store-node'라는 현재 노드로 전달되게 된다. 그리고 이 노드는 레코드를 전달받으면 DigitalTwinProcessor()를 이용해서 프로세싱을 하는 형태로 구현되어있다. 

     


    7.7 Processor API와 DSL 결합

    Processor API만 사용하다보면 필요 이상으로 저수준 추상화를 사용했는지 점검을 해봐야한다. DSL을 사용하면 코드 유지 및 로직을 이해하기도 편리하기 때문에 Processor API를 사용하고 있는 것을 DSL로 바꿀 수 있다면 DSL로 바꾸는 것(리팩토링)이 좋다. 바꿔 이야기하면 꼭 Processor API를 사용해야만 할 때 사용하도록 하는 것이 좋다. 기본적인 전략은 DSL로 구현을 하되, 필요한 경우 Processor API를 구현해서 DSL과 Processor API를 함께 혼합해서 사용하는 방법이다. 

     

    7.7.1 DSL에서 Processor API 호출하기

    이 책에는 process() 연산자, transform() 연산자를 사용할 것을 권고한다. 하지만 아래에서 볼 수 있듯이 transform() 메서드는 카프카 스트림즈 3.3 이후에는 deprecated 되었다. 대신에 process() 연산자를 사용하라고 한다. 책에는 transform() 메서드에서 사용할 수 있는 많은 인터페이스가 소개되어있다. 대표적으로 Transformer, ValueTransformer, ValueTransformerWithKey 등이 제공되는데 이 녀석들은 transform() 메서드에서만 사용할 수 있다. 

    /**
     * @deprecated Since 3.3. Use {@link KStream#process(ProcessorSupplier, String...)} instead.
     */
    @Deprecated
    <K1, V1> KStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier,
                                       final String... stateStoreNames);

    따라서 책에 기록된 내용들은 무시하고 process() 메서드를 어떻게 사용해서 Processor API와 DSL을 조합하는지를 살펴보면 될 것 같다. 

     

    7.7.2 Process() 메서드

    <KOut, VOut> KStream<KOut, VOut> process(
        final ProcessorSupplier<? super K, ? super V, KOut, VOut> processorSupplier,
        final String... stateStoreNames
    );

    process() 메서드는 반드시 ProcessorSupplier를 제공해야한다. 

    public interface ProcessorSupplier<KIn, VIn, KOut, VOut> extends ConnectedStoreProvider, Supplier<Processor<KIn, VIn, KOut, VOut>> {
        Processor<KIn, VIn, KOut, VOut> get();
    }

    ProcessorSupplier는 get() 메서드를 가지고 있으므로 람다 형태로 호출될 수 있다. 람다 형태로 호출된 결과로 Processor<KIn, VIn, KOut, VOut>을 반환해야한다. org.apache.kafka.streams.processor.api.Processor 인터페이스를 구현해야한다. 앞서 말했던 것처럼 Transformer 객체를 생성하는 것은 아무 의미가 없다. 아래에 Processor를 구현한 예시를 작성했다. 앞서서 Processor API를 개발했던 것과 동일하다.

    1. ProcessorContext.getStateStore()를 이용해서 Stateful한 연산을 할 수 있다.
    2. Downstream으로 메세지를 보내고 싶을 때는 ProcessorContext.forward()를 이용하면 된다.
    3. ProcessorContext.forward()를 여러 번 사용하면 FlatMap() 같은 함수를 구현하는 것과 동일하다.
    4. ProcessorContext.schedule()을 이용해서 Punctuator를 사용할 수 있다.
    5. ProcessorContext.commit()을 이용해서 Cache Flush 등을 할 수 있다.
    6. Processor API가 Stateful하다면, Processor API를 Topology에 추가하기 전에 반드시 StateStore를 Topology에 추가해야한다.
    public class DigitalTwinProcessor implements Processor<String, TurbineState, String, DigitalTwin> {
    
        private ProcessorContext<String, DigitalTwin> context;
        private KeyValueStore<String, DigitalTwin> store;
        private String storeName;
    
        public DigitalTwinProcessor(String storeName) {
            this.storeName = storeName;
        }
    
        @Override
        public void init(ProcessorContext<String, DigitalTwin> context) {
            Processor.super.init(context);
            this.context = context;
            this.store = context.getStateStore(this.storeName);
            context.schedule(Duration.ofMinutes(5), PunctuationType.WALL_CLOCK_TIME, this::enforceTtl);
        }
    
        @Override
        public void process(Record<String, TurbineState> record) {
    
            String key = record.key();
            TurbineState value = record.value();
            DigitalTwin digitalTwin = this.store.get(key);
            if (digitalTwin == null) {
                digitalTwin = new DigitalTwin();
            }
    
            if (value.getType() == Type.DESIRED) {
                digitalTwin.setDesired(value);
            } else {
                digitalTwin.setReported(value);
            }
    
            this.store.put(key, digitalTwin);
            Record<String, DigitalTwin> newRecord = new Record<>(key, digitalTwin, record.timestamp());
            this.context.forward(newRecord, "child1");
        }
    
        @Override
        public void close() {
        }
    
        private void enforceTtl(long timestamp) {
            try (KeyValueIterator<String, DigitalTwin> keyValueiterator = this.store.all()) {
    
                while (keyValueiterator.hasNext()) {
                    KeyValue<String, DigitalTwin> entry = keyValueiterator.next();
                    TurbineState lastReportedTurbineState = entry.value.getReported();
                    if (lastReportedTurbineState == null) {
                        continue;
                    }
                    Instant lastUpdate = Instant.parse(lastReportedTurbineState.getTimestamp());
                    long daysSinceLastUpdate = Duration.between(lastUpdate, Instant.now()).toDays();
                    if (daysSinceLastUpdate >= 7) {
                        this.store.delete(entry.key);
                    }
                }
            }
        }
    }

     

     

    7.7.3 하이브리드 DSL + Processor API의 이점

    하이브리드 DSL + Processor API를 사용할 때는 어떤 이점이 있을까?

    • 노드 이름과 부모 이름으로 관계를 정의하는 대신 연산자들을 연결하는 것이 데이터 플로우를 구현할 때 더 쉽다.
    • DSL에서는 대부분의 연산자에서 람다를 지원한다. 람다는 간결한 표현으로 변환할 때 유리하다.
    • Processor API에서는 키 재생성 구현에 많은 코드가 필요하다. 이 경우 repartition() DSL을 이용해서 손쉽게 할 수 있다.
    • DSL 연산자들은 어떤 스트림 처리 단계에서 벌어지는 일을 정의하는 표준 용어들을 제공한다. 예를 들어 FlatMap 연산자는 계산 로직을 모르더라도 입력보다 많은 수의 레코드를 생상한다는 것을 알 수 있따. 

    이런 이점들이 존재하기 때문에 하위 수준의 접근이 필요할 때 마다 Processor API로 순수하게 구현하는 것보다는 PRocessor API 사용을 위한 DSL의 특별한 연산자들을 사용하자. 

     

     

    알게된 점

    FlatMapValues

    FlatMap은 1개의 레코드에서 N개의 레코드를 생성하는 것과 동일하다. 뭔가 하나의 객체에 엮여있는 수많은 필드들을 하나씩 떼내서 레코드로 생성하는 느낌. Processor API에서 1개의 레코드를 받아서 여러 레코드를 생성하는 것은 FlatMap 형태와 유사하다고 볼 수 있겠다. 

     

     

    관련 코드

    https://github.com/chickenchickenlove/kafkastudy/tree/master/src/main/java/ksqldbstudy/chapter7

     

     

    댓글

    Designed by JB FACTORY