Kafka Streams와 ksqlDb 정복 : 상태가 없는 처리 (3장)

    들어가기 전

    이 글은 카프카 스트림즈와 ksqlDB 정복의 3장을 공부하며 작성한 글입니다.


    3.0 상태가 없는 처리(Stateless Process)

    스트림 처리의 가장 단순한 형태는 이전에 처리했던 메세지가 어떤 녀석이었는지를 기억도 하지 않는 것이다. 이 형태의 스트림 처리를 Stateless Process라고 한다. 이번 포스팅에서는 Stateless Process들을 이용해서 Stateless한 Kafka Streams Instance를 생성해보려고 한다. Kafka Streams는 아래 Stateless Processor를 제공한다.

    • 레코드 필터링(filter)
    • 필드 추가 및 삭제
    • 레코드의 키 변경(rekeying)
    • 스트림 가지치기(branching)
    • 스트림 병합(merging)
    • 레코드의 보강(enriching)

     


    3.1 Stateless Process vs Stateful Process

    카프카 스트림즈를 개발할 때 중요하게 봐야하는 것은 Stateless인지 Stateful한지를 살펴봐야한다. Stateful 할 경우에는 고려해야 할 부분이 더욱 많아진다. 

    • Stateless : Stateless한 경우 Kafka Streams는 KStream으로만 작업해도 충분하다. 각 어플리케이션은 메세지를 독립적으로 Append하고, 이전에 처리한 이벤트에 대해서는 어떠한 기억도 하지 않는다.
    • Stateful : Windowing, Aggreagting 같은 녀석들은 상태를 가진다. 이 어플리케이션은 추가적인 데이터 또는 상태를 추적해야하기 때문에 좀 더 복잡하다. 

    일반적으로 카프카 스트림즈에서 상태가 있는 Processor가 하나라도 사용된다면 Stateful Kafka Streams가 된다. 상태가 있는 Processor가 하나도 사용되지 않으면 Stateless Kafka Streams가 된다. Stateful한 Kafka Streams는 Scale out, High availiability를 고려하기 위해서는 더욱 더 복잡해진다. 따라서 이 부분은 좀 더 고려해야한다. 

     

    3.2 Serde

    카프카 스트림즈는 컨슈머를 이용해서 데이터를 읽어오고 프로듀서를 이용해서 브로커에 데이터를 공급한다. 따라서 카프카 스트림즈는 데이터를 읽어올 때는 데이터의 역직렬화가 필요하고, 데이터를 공급할 때는 데이터의 직렬화가 필요하다. 카프카 스트림즈에서 이 역할을 하는 클래스는 Serde이고, 이 클래스는 Serializer / Deserializer의 Wrapper 클래스로 동작한다. Broker와 통신을 하는 녀석들(Source, Sink, ChangeLog)은 반드시 이 Serde 클래스를 가져야한다. 

     

    기본적으로 Serde 클래스는 여러 곳에서 사용될 수 있기 때문에 Generic을 이용해서 구현하는 것이 좋다. 보통 클래스를 JSON으로 변경해서 보내거나 받기 때문에 각 클래스에 대한 Serde를 구현하면 된다. JSON을 Parsing 해주는 클래스는 JacksonBind, Gson 등이 있으며 Gson을 이용해서 직렬화 / 역직렬화를 구현한다. 

     

    만약 들어온 JSON 객체 중에서 맵핑하고 싶지 않은 필드가 존재한다면 데이터 클래스에서 해당 필드를 생략하면 된다. 그렇게 생성하면 Gson이 자동으로 해당 필드를 제외한다. 접근할 필드의 수를 줄이는 것은 Projection이라고 하고, SQL에서 관심 있는 컬럼들만 선택하는 SELECT 문 사용과 유사하다.

     

    Json 맵핑 클래스 구현

    @Data
    public class Tweet {
    
        private Long createdAt;
        private Long id;
        private String lang;
        private Boolean retweet;
        private String text;
    
    }

     

    Deserializer 구현

    public class CustomGsonDeserializer<T> implements Deserializer<T> {
    
        private Gson gson;
        private Class<T> type;
    
        public CustomGsonDeserializer(Class<T> type) {
            this.gson = new GsonBuilder()
                    .setFieldNamingPolicy(FieldNamingPolicy.UPPER_CAMEL_CASE)
                    .create();
            this.type = type;
        }
    
        @Override
        public T deserialize(String topic, byte[] data) {
            if (data == null) {
                return null;
            }
            return this.gson.fromJson(new String(data, StandardCharsets.UTF_8), type);
        }
    }

    Serializer 구현

    public class CustomGsonSerializer<T> implements Serializer<T> {
    
        private Gson gson;
    
        public CustomGsonSerializer() {
            this.gson = new GsonBuilder()
                    .setFieldNamingPolicy(FieldNamingPolicy.UPPER_CAMEL_CASE)
                    .create();
        }
    
        @Override
        public byte[] serialize(String topic, T data) {
    
            if (data == null) {
                return null;
            }
    
            return this.gson.toJson(data).getBytes(StandardCharsets.UTF_8);
        }
    }

    Serdes 구현

    public class MyCustomSerdes {
    
        public static Serde<Tweet> tweetSerdes() {
            CustomGsonSerializer<Tweet> tweetCustomGsonSerializer = new CustomGsonSerializer<>();
            CustomGsonDeserializer<Tweet> tweetCustomGsonDesrializer = new CustomGsonDeserializer<>(Tweet.class);
            Serde<Tweet> tweetSerde = Serdes.serdeFrom(tweetCustomGsonSerializer, tweetCustomGsonDesrializer);
            return tweetSerde;
        }
    }

     

     

     

    3.3 전체적인 Topology

    구현할 ProcessorTopology는 아래 그림에서 정리할 수 있다. 박스는 어떤 형태로 진행되는지를 의미하고 왼쪽에 있는 것은 각각 어떤 Processor를 이용해서 작업하는지를 보여주는 것이다. 여기서는 상태를 가지고 있는 Processor가 아무것도 없기 때문에 Stateless인 카프카 스트림즈라고 볼 수 있다. 

     

     

     

    3.4 각각의 Processor

    Stateless한 Processor가 어떤 역할을 하는지, 어떻게 사용되는지를 코드로 작성했다.

     

    3.4 : filter, filternot Processor

    • 평가식을 만족하면 해당 메세지는 필터링 된다. 필터링 된 메세지는 DownStream 되지 않는다.
    KStream<String, Tweet> filteredStream = tweetStream.filter((key, value) -> value.getRetweet());

     

    3.4 : filternot Processor

    • 평가식을 만족하지 않으면 필터한다. 필터링 된 메세지는 DownStream 되지 않는다. 
    KStream<String, Tweet> filteredStream = tweetStream.filterNot((key, value) -> value.getRetweet());

     

    3.4 : map

    • 전달된 메세지를 다른 형태로 변경한다. Key, Value를 모두 바꿀 수 있다. Key가 바뀌면 Repartition이 일어날 수 있다.
    nonEnglishStream.map((key, value) -> 
            KeyValue.pair(value.getLang(), value.getText()))

     

    3.4 : mapValues

    • 전달된 메세지를 다른 형태로 변경한다. 이 때, Key는 바꿀 수 없고 Value만 다른 형태로 바꿀 수 있다.  Key가 바뀌지 않기 때문에 Repartition은 일어나지 않는다.
    KStream<String, Tweet> translatedEnglishStream = nonEnglishStream.mapValues((readOnlyKey, value) ->
            {
                value.setText("[TRANSLATED]" + value.getText());
                return value;
            }
    );

     

    3.4 : split

    • 하나의 스트림을 조건에 맞는 여러 스트림으로 분리해서 사용할 수 있다.
    Map<String, KStream<String, Tweet>> branches = filteredStream.split(Named.as("my-"))
            .branch((key, value) -> value.getLang().equals("en"), Branched.as("english"))
            .branch((key, value) -> !value.getLang().equals("en"), Branched.as("non-english"))
             .noDefaultBranch();
    
    KStream<String, Tweet> englishStream = branches.get("my-english");
    KStream<String, Tweet> nonEnglishStream = branches.get("my-non-english");

     

    3.4 : merge

    •  여러 스트림을 하나의 스트림으로 병합할 수 있다. 
    KStream<String, Tweet> mergedStream = englishStream.merge(translatedEnglishStream);

     

    3.4 : flatMap

    • 하나의 메세지를 이용해서 여러 개의 메세지를 생성해서 내려줄 수 있다. 이 때, Key, Value가 모두 바뀔 수 있다. 이 Processor에서는 Return 값을 Iterable한 녀석으로 반환해야한다. 예를 들면 List<> 같은 녀석이다.
    KStream<String, EntitySentimentJava> flattedStream = mergedStream.flatMap((key, tweet) ->
            {
                List<EntitySentimentJava> results = languageClient.getEntitySentiment(tweet);
                results.removeIf(entitySentiment -> !currencies.contains(entitySentiment.getEntity()));
                List<KeyValue<String, EntitySentimentJava>> ret =
                        results.stream().map(entitySentimentJava -> new KeyValue<>(key, entitySentimentJava)).collect(Collectors.toList());
                return ret;
            }
    );

     

    3.4 : flatMapValues

    • 하나의 메세지를 이용해서 여러 개의 메세지를 생성해서 내려줄 수 있다. 이 때, Value만 다른 객체로 바뀔 수 있다. 이 Processor에서는 Return 값을 Iterable한 녀석으로 반환해야한다. 예를 들면 List<> 같은 녀석이다.
    mergedStream.flatMap()
    KStream<String, EntitySentimentJava> flattedStream = mergedStream.flatMapValues(tweet ->
            {
                List<EntitySentimentJava> results = languageClient.getEntitySentiment(tweet);
                results.removeIf(entitySentiment -> !currencies.contains(entitySentiment.getEntity()));
                return results;
            }
    );

    위의 코드에서 하나의 메세지가 들어왔고 이 때 반환되는 results가 3개인 경우를 생각해보자. 3개의 메세지가 생성되는 것이고 3개의 메세지는 for문을 이용해서 하나씩 depth-first로 처리하게 된다. 

     

    3.5 전체적인 코드 구현

    전체적인 코드는 아래와 같이 구현할 수 있다. 

    public class chapter3Study {
    
        private static final List<String> currencies = Arrays.asList("bitcoin", "ethereum");
    
        public static void main(String[] args) {
    
            LanguageClient languageClient = new DummyClient();
    
    
            StreamsBuilder builder = new StreamsBuilder();
    
            KStream<String, Tweet> tweetStream = builder.stream("tweets", Consumed.with(
                    Serdes.String(), MyCustomSerdes.tweetSerdes()));
    
            tweetStream.print(Printed.<String, Tweet>toSysOut().withLabel("[HELLO]"));
    
            KStream<String, Tweet> filteredStream = tweetStream.filterNot((key, value) -> value.getRetweet());
    
            Map<String, KStream<String, Tweet>> branches = filteredStream.split(Named.as("my-"))
                    .branch((key, value) -> value.getLang().equals("en"), Branched.as("english"))
                    .branch((key, value) -> !value.getLang().equals("en"), Branched.as("non-english"))
                     .noDefaultBranch();
    
            KStream<String, Tweet> englishStream = branches.get("my-english");
            KStream<String, Tweet> nonEnglishStream = branches.get("my-non-english");
    
            KStream<String, Tweet> translatedEnglishStream = nonEnglishStream.mapValues((readOnlyKey, value) ->
                    {
                        value.setText("[TRANSLATED]" + value.getText());
                        return value;
                    }
            );
    
            KStream<String, Tweet> mergedStream = englishStream.merge(translatedEnglishStream);
    
            KStream<String, EntitySentimentJava> flattedStream = mergedStream.flatMapValues(tweet ->
                    {
                        List<EntitySentimentJava> results = languageClient.getEntitySentiment(tweet);
                        results.removeIf(entitySentiment -> !currencies.contains(entitySentiment.getEntity()));
                        return results;
                    }
            );
    
    
    
            flattedStream.print(Printed.<String, EntitySentimentJava>toSysOut().withLabel("[FLATTED]"));
            flattedStream.to("output-topic",
                    Produced.with(Serdes.String(), MyCustomSerdes.entitySentimentJavaSerde()));
    
    
            Properties props = CustomKafkaStreamsProperty.createCustomKafkaStreamsProperty();
            KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);
    
            kafkaStreams.start();
            Runtime.getRuntime().addShutdownHook(new Thread(() -> kafkaStreams.close()));
        }
    
    }

     

    3.6 요약

    3장에서는 Stateless 어플리케이션을 구축할 때, 카프카 스트림즈의 여러 기능들을 어떻게 사용하는지를 공부했다.  이번 포스팅에서는 아래 Processor를 이용해보았다. 다음 게시글에서는 Join, Windowing, Aggregating을 해보려고 한다.

    • filter, filterNot
    • split, branch, get
    • merge
    • maxp, mapValues
    • flatMap, flatMapValues
    • to, thorugh, repartition
    • Serde 구현 

    댓글

    Designed by JB FACTORY