Kafka Streams : 카프카 스트림즈 고급 어플리케이션

    들어가기 전

    이 글은 Kafka Streams In Action의 챕터9를 보고 공부한 글입니다. 

     

    9.1 카프카와 다른 데이터 소스 통합

    이 부분은 Kafka Connector를 공부하고 직접 해보려고 합니다.

     

     

    9.2 데이터베이스 제약 걷어내기 (대화식 쿼리 이용)

    카프카 스트림즈의 결과는 콘솔에서 결과를 검토하거나 Sink 토픽에서 그 메세지를 읽으면서 확인할 수 있었다. 그런데 이렇게 될 경우에는 내부적으로 사용하고 있는 StateStore에 대한 값은 확인을 할 수 없다는 단점이 있다. 이런 StateStore의 상태를 보려면, Process 과정에서 StateStore의 데이터를 DB에 밀어넣고 DB에서 확인을 해야한다. 이렇다면 구조는 복잡해지고, 트랜잭션 과정에서 성능이 저하될 수 있다는 단점이 있다. 

    Kafka Streams는 대화식 쿼리(Queryable Store)를 제공한다. 이녀석은 Kafka Streams 외부에서 StateStore에 접근할 수 있는 읽기 전용 접근을 제공한다. 즉, Restful API를 구현하고 이 Restful API에  대화식 쿼리를 참조해서 데이터를 응답으로 내려줄 수 있다. 

     

    9.2.1 대화식 쿼리 작동 방법

    Kafka Streams는 대화식 쿼리를 제공할 때, Wrapped된 StateStore를 전달한다. 이 Wrapped된 StateStore는 Local에 있는 RocksDB에 select 쿼리등은 날릴 수 있지만 업데이트와 관련된 쿼리는 전혀 날릴 수 없다. 즉, 읽기 전용 쿼리만 제공한다. 코드는 아래와 같이 작성한다.

    ReadOnlyKeyValueStore<String, Long> helloStore =
    	kafkaStreams.store(StoreQueryParameters
        .fromNameAndType("hello", QueryableStoreTypes.keyValueStore()));

    이 때 제공되는 Store의 값은 현재 이 요청을 받은 Kafka Streams 인스턴스에 있는 Key / Value  값만 가지고 있다. Kafka Streams가 클러스터로 구성 되어있다면, 각 파티션마다 서로 다른 Kafka Streams 인스턴스에 배정되어있을 것이다. 따라서 Kafka Streams 클러스터에서 REST API로 Queryable Store에서 값을 찾아오고 싶다면, Kafka Streams 클러스터 간에 rpc를 구현해야한다. 즉, 응답을 내려주기 위해서 다른 Kafka Streams 인스턴스에게서 모두 값을 가져와서 그 결과를 모아서 일을 처리해야한다. 

     

    9.2.2 분산 상태 저장소

    Kafka Streams 클러스터는 동일한 Application ID를 사용하는 Kafka Streams 인스턴스의 집합이다. 그리고 각 Kafka Streams 인스턴스는 StreamTask마다 StateStore를 배정한다. 그리고 각 Kafka Streams 클러스터는 그들이 가지고 있는 Consumer에 의해서 각각이 담당하는 TopicPartition이 나누어질 것이다. 

    위와 같은 경우를 가정해보자. 각 Kafka Streams는 HostName / Port로 구분되는 상태다. 이 때 각 Key, Value를 살펴보면 다음과 같다.

    • Kafka Streams A : Key가 Energy인 녀석을 가진다
    • Kafka Streams B : Key가 Finance인 녀석을 가진다. 

    예를 들어 Kafka StreamsA를 REST API에 노출시킨 후, Queryable Statestore를 읽었다고 가정해보자. 이 경우, Kafka Streams A의 Queryable StateStore는 오로지 Key가 Energy인 녀석만 전달해줄 수 있을 것이다.  그렇다면 각 Kafka Streams에 대해서 개별 웹서비스를 제공해야할까? 그렇지는 않다. 간단한 설정만으로 이 부분을 해결할 수 있다.

     

    9.2.3 분산 상태 저장소 설정 및 검색

    ReadOnlyKeyValueStore<String, Long> helloStore =
    	kafkaStreams.store(StoreQueryParameters
        .fromNameAndType("hello", QueryableStoreTypes.keyValueStore()));

    위에서 이야기 했듯이 kafkaStreams.store를 이용하면 이 요청을 받은 카프카 스트림즈 인스턴스가 가지고 있는 StateStore를 반환한다. Kafka Streams Cluster가 가진 모든 StateStore에서 값을 조회해야하기 때문에 사실 모든 KafkaStreams Cluster에게서 데이터를 받아와야한다. 우선은 Kafka Streams Cluster의 어떤 인스턴스가 어떤 StateStore를 가지고 있는지를 확인해야하는데, 이 부분은 Kafka Streams가 제공해준다. 

    props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:30002");

    APPLCATION_SERVER_CONFIG에 다른 Kafka Streams 인스턴스로부터 요청을 받을 호스트와 포트를 알려준다. k8s를 이용한다고 하면, 특정 Pod의 이름 + Port 번호가 될 것이다. 이 값을 설정해주면 Kafka Streams가 기동할 때, Kafka Stream는 내부적으로 작은 서버를 띄우고, 이 요청이 오면 응답을 내려준다. 응답은 Streams Metadata를 내려준다. 아래에서 볼 수 있듯이 KafkaStreams 클러스터의 각 인스턴스가 가지고 있는 정보를 전달해준다.

    위처럼 동작하기 위해서는 APPLICATION_SERVER_CONFIG를 반드시 설정해줘야한다. 이 값을 설정하지 않아도 Kafka Streams Cluster처럼 동작한다. 그렇지만 이 요청을 받을 작은 서버가 뜨지 않기 때문에 같은 Kafka Streams 클러스터지만 이렇게 요청을 했을 때 메타 데이터를 주고 받지는 않는다. 위의 값을 요청하면 현재 Kafka Streams의 해당 StateStore를 가지고 있는 Kafka Streams 인스턴스의 호스트 정보를 알 수 있다. 다음으로 해야할 일은 해당 Host에게 실제 데이터를 받아오는 rpc()를 구현하고 받아와야한다. 

     

    9.2.4 대화식 쿼리 작성

    대화식 쿼리를 작성하는 것은 이전의 Kafka Streams를 작성하는 것과 동일하다. 그렇지만 아래 두 가지를 추가는 해주어야 한다.

    1. StreamsConfig.APPLICATION_SERVER_CONFIG 등록
    2. StreamsMetaData를 불러오기
      • KafkaStreams는 StreamsMetaData 전송을 위한 작은 웹서버를 내장하고 있다. 이 포트로 요청을 보내야 Store 정보를 알 수 있다.
    3. RPC 구현하기
      • 웹 서버를 이용해서 직접 RPC를 구현한다. RPC를 구현할 때 StreamsMetaData에서 전송받은 HostInfo를 이용해서 특정 Host에게 요청해서 값을 받아온다. 받아온 값을 적재해서 처리한다. 

    또한 마지막으로 주의해야할 점은 Kafka Streams가 Running이 아닌 상태에서는 StateStore의 값을 읽어올 수 없다. 왜냐하면 Running이 아니라는 말은 Rebalancing을 하고 있는 과정일 수 있고, Rebalancing 과정에서는 StateStore가 Restore 되는 과정일 수 있기 때문에 이것을 지원해주지는 않는다. 따라서 Kafka Streams가 Running 상태인 것을 반드시 확인하고 해야한다. 이를 위해서 StateListener를 추가해줄 수 있다. 

     

    9.2.5 대화식 쿼리 웹서버 구현

    나는 Spring을 이용해서 대화식 쿼리를 구현했다. 따라서 교재에 있는 코드와는 전혀 다르고, 일부만 구현했다. 

    아래는 폴더 구조다

    C:.
    └─src
        ├─main
        │  ├─generated
        │  ├─java
        │  │  └─com
        │  │      └─example
        │  │          └─demo
        │  │              ├─domain
        │  │              ├─kafkastreamsconfig
        │  │              └─util
        │  └─resources
        │      ├─static
        │      └─templates
        └─test
            └─java
                └─com
                    └─example
                        └─demo

    먼저 KafkaStreamsTopologyConfig 클래스를 구현했다.

    1. KafkaStreamsConfig 빈을 생성한다.
    2. KafkaStreamsConfig 빈을 이용해서 KafkaStreams 빈을 생성해서 등록한다. 
      1. 이 때 KafkaStreams 빈안에서 Topology 전체를 구성한다.
      2. KafkaStreams를 시작하지는 않는다. 
    @Configuration
    public class KafkaStreamsTopologyConfig {
    
        @Bean
        public KafkaStreamsConfig kafkaStreamsConfig() {
            Properties props = new Properties();
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ABCDEF");
            props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:30002");
    
            return new KafkaStreamsConfig(props);
        }
    
    
        @Bean
        public KafkaStreams kafkaStreams() {
            KafkaStreamsConfig kafkaStreamsConfig = kafkaStreamsConfig();
            Properties props = kafkaStreamsConfig.getProps();
    
            GsonSerializer<StockTransaction> stockTransactionGsonSerializer = new GsonSerializer<>();
            GsonDeserializer<StockTransaction> stockTransactionGsonDeserializer = new GsonDeserializer<>(StockTransaction.class);
    
            Serde<String> stringSerde = Serdes.String();
            Serde<StockTransaction> stockTransactionSerde = Serdes.serdeFrom(stockTransactionGsonSerializer, stockTransactionGsonDeserializer);
    
    
            String storeName = "hello";
    
            StreamsBuilder streamsBuilder = new StreamsBuilder();
            // store 추가
            KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(storeName);
            StoreBuilder<KeyValueStore<String, Integer>> keyValueStoreStoreBuilder = Stores.keyValueStoreBuilder(storeSupplier, stringSerde, Serdes.Integer());
            streamsBuilder.addStateStore(keyValueStoreStoreBuilder);
    
    
            KStream<String, StockTransaction> stream = streamsBuilder.stream("stock-transactions",
                    Consumed.with(stringSerde, stockTransactionSerde).withOffsetResetPolicy(EARLIEST));
    
            KStream<String, StockTransaction> stringObjectKStream = stream.transformValues(
                    () -> new CountValueTransformer(storeName),
                    "hello");
    
    
            final Topology topology = streamsBuilder.build();
            return new KafkaStreams(topology, props);
        }
    
    }

    이벤트 리스너를 이용해서 Spring 컨테이너가 모두 시작되고 난 다음에 Kafka Streams의 추가 설정을 한다. 이제부터 KafkaStreams는 Kafka Broker와 통신하며 데이터를 가져올 것이다. 

    1. KaffkaStremas에 StateListener를 등록한다. 
    2. KafkaStreams를 시작한다. 
    @Component
    @RequiredArgsConstructor
    @Slf4j
    public class KafkaInjectListener {
    
        private final ObjectProvider<MyController> provider;
        private final KafkaStreams kafkaStreams;
    
        @EventListener
        public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
            MyController myController = provider.getObject();
    
            // set stateListener
            kafkaStreams.setStateListener((newState, oldState) -> {
                log.info("kafka Streams State is changed. {} -> {} ", oldState, newState);
                myController.changeKafkaStreamsState(newState);
            });
            myController.configKafkaStreams(kafkaStreams);
    
            // kafka streams start
            kafkaStreams.start();
        }
    }

    아래는 Controller의 일부분이다. Controller는 Http 요청을 받고 처리해주는 역할을 한다.

    1. streamMetadataForStore() 메서드를 이용해서 KafkaStreams 클러스터 내의 인스턴스에서 해당 StoreName을 가진 KafkaStreams의 메타 정보를 가져온다.
    2. 가져온 정보를 응답해준다. 
    @GetMapping("/show/{storeName}")
    public String showKafkaStreamMetaData(@PathVariable String storeName) {
        HashMap<String, String> ret = new HashMap<>();
        Collection<StreamsMetadata> hello = kafkaStreams.streamsMetadataForStore(storeName);
        for (StreamsMetadata streamsMetadata : hello) {
            String value = gson.toJson(streamsMetadata.topicPartitions());
            ret.put(streamsMetadata.host(), value);
        }
    
        return gson.toJson(ret);
    }

    Controller의 다른 메서드들은 이 StreamMetaData 정보를 바탕으로 다른 KafkaStreams 클러스터에 요청을 하고, 그 요청에 대한 응답을 취합해서 StateStore의 Key / Value 값을 리턴할 수 있게 된다. 메타 정보에는 Host 정보가 전달되는데, 만약 Host가 DNS에 등록되어있다면 이 Host에 요청해서 데이터를 받아오고 취합하는 형태가 될 수 있다. 기본적으로 k8s는 Pod를 DNS에 등록해주기 때문에 이것을 이용해서 손쉽게 대화식 쿼리를 구현할 수 있다.

    또한, StateListner를 통해서 MyController에는 KafkaStreams의 State를 계속 기록해두고 있다. State가 Running일 때만 값을 가져올 수 있고, 아닌 경우에는 에러가 발생한다. 따라서 이 부분을 잘 처리하기 위해서 while 문을 이용해서 State가 Running 일 때까지 기다리도록 했다. 

    @RestController
    @Slf4j
    public class MyController {
    
    	// 초기값 셋팅
        private KafkaStreams.State kafkaStreamsState = KafkaStreams.State.NOT_RUNNING;
    
    	...
    
        @GetMapping("/fetch/{storeName}/{key}")
        public String fetchKeyValue(@PathVariable String key, @PathVariable String storeName) {
    
            while (!isKafkaStreamsRunning()) {
                log.info("Kafka Streams state is {}", this.kafkaStreamsState);
                try {
                    Thread.sleep(1000L);
                } catch (Exception e) {
                    log.error("e.getMessage {}", e.getMessage());
                }
            }
            ...
            
        }

     

    대화식 쿼리 구현 코드 

    아래에서 내가 작성한 코드를 볼 수 있다. Spring을 이용해서 간략히 대화식 쿼리를 찍먹해 볼 수 있다.

    댓글

    Designed by JB FACTORY