Kafka Streams : 카프카 스트림즈 어플리케이션 테스트

    들어가기 전

    이 글은 Kafka Streams in action 8장을 보고 공부하며 작성한 글입니다. 기존 책의 코드가 오래된 코드라 변경하면서 교재와는 다른 코드일 수 있습니다.

     

    8. 카프카 스트림즈 어플리케이션 테스트

    테스트 유형 목적 테스트 속도 사용 수준
    단위 격리된 부분 기능에 대한 개별 테스트 빠름 대다수
    통합 전체 시스템 사이의 통합 지점 테스트 실행 시간이 더 길어짐 극소수

    테스트에는 단위 / 통합 테스트가 있다. 이번 장에서는 Kafka Streams에서 사용할 수 있도록 단위 / 통합 테스트를 구축하는 방법을 알아보고자 한다. 또한 Kafka Streams의 리밸런싱 등을 테스트 하기 위해서는 Kafka Streams 클러스터가 필요하다. 그렇지만 외부 클러스터 설정에 의존하고 싶지 않기 때문에, 통합 테스트에 내장된 Kafka 및 주키퍼를 사용하는 방법도 알아볼 것이다. 

     

    8.1 토폴로지 테스트

    토폴로지 테스트는 하나의 레코드를 입력으로 넣었을 때, 그 토폴로지가 기대한 것처럼 동작하는지 살펴보는 테스트다. 이 테스트는 Kafka Streams에서 제공하는 TestDriver를 사용해서 해볼 수 있다. TestDriver를 사용하기 위해서는 아래 의존성을 추가해야한다.

    testImplementation 'org.apache.kafka:kafka-streams-test-utils:3.3.1'

    TestDrvier를 사용할 때는 다음과 같은 형식으로 구현한다. 

    1. TestDriver에게 테스트 할 Topology, Props를 전달한다.
    2. TestDriver를 통해 Input / output 토픽을 각각 생성한다.
    3. Input 토픽에 데이터를 넣고, Output 토픽에서 원하는 형태의 데이터가 생성되었는지를 확인한다. 

     

    8.1.1 테스트 만들기

    TestDriver를 이용해서 Input / output 토픽을 생성하고, Input 토픽에 레코드를 하나 집어넣는다. 그리고 output 토픽에서 topology에서 작성한 것처럼 동작했는지를 확인하는 코드를 구현한다.

     

    TestDriver 생성

    먼저 TestDriver를 생성하는 작업을 한다. 

    1. TestDriver에게 테스트 할 Topology를 전달한다.
    2. TestDriver에게 input / output 토픽을 생성한다.
    3. Input / ouput 토픽을 Serde가 각각 필요하다.
      1. 즉 Kafka를 직접 사용하지 않더라도, 카프카에 역직렬화 / 직렬화까지 테스트 하는 상황이다.

    이 때, input / output 토픽을 생성하기 위해 전달하는 토픽 이름은 테스트 할 Topology에 기입된 것을 작성해야한다. 이렇게 작성하면 TestDriver는 input / output 토픽의 엔트리 포인트를 하나씩 가져와서 데이터를 밀어넣고, 프로세싱 된 것을 output 토픽에서 받는 형태로 동작한다. 

     

    public class chapter3Test {
    
        private TopologyTestDriver topologyTestDriver;
        private TestInputTopic<String, Purchase> inputTopic;
        private TestOutputTopic<String, Purchase> outputTopicPurchase;
        private TestOutputTopic<String, RewardAccumulator> outputTopicRewardAccumulator;
        private TestOutputTopic<String, PurchasePattern> outputTopicPurchasePattern;
    
        @Before
        public void setUp() {
            Properties props = new Properties();
            props.setProperty(StreamsConfig.CLIENT_ID_CONFIG, "FirstZmart-Kafka-Streams-Client");
            props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "zmart-purchases");
            props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "FirstZmart-kafka-Streams-App");
            props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.setProperty(StreamsConfig.REPLICATION_FACTOR_CONFIG, "1");
    
            StreamsConfig streamsConfig = new StreamsConfig(props);
            Topology topology = ZMartTopology.build();
            topologyTestDriver = new TopologyTestDriver(topology, props);
    
    
            GsonSerializer<Purchase> purchaseGsonSerializer = new GsonSerializer<>();
            GsonDeserializer<Purchase> purchaseGsonDeserializer = new GsonDeserializer<>(Purchase.class);
    
            GsonSerializer<PurchasePattern> purchasePatternGsonSerializer = new GsonSerializer<>();
            GsonDeserializer<PurchasePattern> purchasePatternGsonDeserializer = new GsonDeserializer<>(PurchasePattern.class);
    
            GsonSerializer<RewardAccumulator> recordAccumulatorGsonSerializer = new GsonSerializer<>();
            GsonDeserializer<RewardAccumulator> recordAccumulatorGsonDeserializer = new GsonDeserializer<>(RewardAccumulator.class);
    
            Serde<RewardAccumulator> rewardAccumulator = Serdes.serdeFrom(recordAccumulatorGsonSerializer, recordAccumulatorGsonDeserializer);
            Serde<PurchasePattern> purchasePatternSerde = Serdes.serdeFrom(purchasePatternGsonSerializer, purchasePatternGsonDeserializer);
            Serde<Purchase> purchaseSerde = Serdes.serdeFrom(purchaseGsonSerializer, purchaseGsonDeserializer);
            Serde<String> stringSerde = Serdes.String();
    
            // input
            inputTopic = topologyTestDriver.createInputTopic("transactions", stringSerde.serializer(), purchaseSerde.serializer());
    
            // output
            outputTopicPurchase = topologyTestDriver.createOutputTopic("purchases",stringSerde.deserializer(), purchaseSerde.deserializer());
            outputTopicRewardAccumulator = topologyTestDriver.createOutputTopic("rewards",stringSerde.deserializer(), rewardAccumulator.deserializer());
            outputTopicPurchasePattern = topologyTestDriver.createOutputTopic("patterns",stringSerde.deserializer(), purchasePatternSerde.deserializer());
        }

     

    단위 테스트 생성

    @Before 어노테이션에서는 TestDriver를 생성하고, TestDriver를 통해 input / output 토픽을 생성했다. 실제 단위 테스트에서는 input / output 토픽을 이용해서 Topology를 테스트한다.

    1. input 토픽에 레코드를 전달한다. 이 레코드는 Topology를 타고 Depth First로 동작한다.
    2. input 토픽에 전달된 레코드는 topology대로 처리되서 각각 output 토픽에 저장된다. 

    따라서 아래와 같이 코드를 작성하면서 단위 테스트를 빠르게 작성해 볼 수 있게 된다.

    @Test
    @DisplayName("outputTopic Purchase Test")
    public void this1() {
        Purchase purchase = DataGenerator.generatePurchase();
        inputTopic.pipeInput(purchase);
        Purchase outputValue = outputTopicPurchase.readValue();
        Purchase expectedValue = Purchase.builder(purchase).maskCreditCard().build();
        assertThat(outputValue).isEqualTo(expectedValue);
    }
    
    @Test
    @DisplayName("outputTopic Record Accumulator Test")
    public void this2() {
        Purchase purchase = DataGenerator.generatePurchase();
        inputTopic.pipeInput(purchase);
        RewardAccumulator outputValue = outputTopicRewardAccumulator.readValue();
        RewardAccumulator expectedValue = RewardAccumulator.builder(purchase).build();
        assertThat(outputValue).isEqualTo(expectedValue);
    }
    
    @Test
    @DisplayName("Testing the Purchasepattern")
    public void this3() {
        Purchase purchase = DataGenerator.generatePurchase();
        inputTopic.pipeInput(purchase);
        PurchasePattern outputValue = outputTopicPurchasePattern.readValue();
        PurchasePattern expectedValue = PurchasePattern.builder(purchase).build();
        assertThat(outputValue).isEqualTo(expectedValue);
    }

     

     

    8.1.2 Topology에서 상태 저장소 테스트

    위에서 토폴로지의 input / output 토픽을 테스트 한 것처럼 input 토픽에 특정 메세지를 집어넣었을 때, Topology 내부에 있는 StateStore도 정상적으로 값을 받고 있는지를 테스트 해볼 수 있다. 테스트는 다음과 같이 작성한다.

    1. TestDriverTopology 클래스를 이용해서 input 토픽을 생성한다.
    2. TestDriverTopology 클래스를 이용해서 StateStore를 가져온다.
    3. input 데이터를 넣고, StateStore에서 기대값과 비교한다. 

    테스트를 구현한 코드는 아래와 같다. 

    • @Before 어노테이션에서는 TestDriver와 input / StateStore를 생성하는 작업을 한다.
    • @Test 어노테이션에서는 실제로 데이터를 넣어주고 StateStore에서 값을 확인하는 작업을 한다. 
    public class Chapter8test {
    
        private TestInputTopic<String, StockTransaction> inputTopic;
        private KeyValueStore<String, StockPerformance> keyValueStore;
    
        @Before
        public void setup() {
            Properties props = new Properties();
            props.put(StreamsConfig.CLIENT_ID_CONFIG, "ks-papi-stock-analysis-client");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "ks-papi-stock-analysis-group");
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ks-stock-analysis-appid");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    
    
            TopologyTestDriver topologyTestDriver = new TopologyTestDriver(StockPerformanceStreamsProcessorTopology.build(), props);
            inputTopic = topologyTestDriver.createInputTopic("stock-transactions",Serdes.String().serializer(), StreamsSerdes.StockTransactionSerde().serializer());
            keyValueStore = topologyTestDriver.getKeyValueStore("stock-performance-store");
    
        }
    
        @Test
        public void test1() {
            StockTransaction stockTransaction = DataGenerator.generateStockTransaction();
            inputTopic.pipeInput(stockTransaction.getSymbol(),stockTransaction);
            StockPerformance stockPerformance = keyValueStore.get(stockTransaction.getSymbol());
            assertThat(stockPerformance.getCurrentShareVolume()).isEqualTo(stockTransaction.getShares());
            assertThat(stockPerformance.getCurrentPrice()).isEqualTo(stockTransaction.getSharePrice());
        }
    
    
    
    }

     

     

    8.1.3. 프로세서와 트랜스포머 테스트

    앞서 수행했던 Topology 테스트는 TopologyTestDriver를 이용해서 Source 토픽에 데이터를 넣고 Output 토픽에서 데이터를 받아보는 형태로 테스트 해볼 수 있었다. 이 경우에는 전체적인 Topology에 대한 테스트를 할 수 있었던 것이다. 그렇지만 예를 들어 Processor 하나의 기능이나 Punctuator의 기능을 테스트해보고 싶은 경우가 있다. 이럴 때를 위해 Kafka Streams는 MockProcessorContext 객체를 제공한다.

    Kafka Streams는 각 Node마다 Processor를 가지고, 각 Processor는 각각의 ProcessorContext를 가진다. 이것과 마찬가지로 테스트 할 Processor를 하나 생성하고 이 Processor에 MockProcessorContext를 전달해주는 형태로 테스트를 해볼 수 있다. 전반적인 테스트 흐름은 다음과 같다.

    1. Processor.process()를 이용해서 레코드를 처리한다. 처리된 Record는 forwarded() 메서드로 얻을 수 있다. 이것은 큐의 형태로 처리된 메세지가 저장된다.
    2. MockProcessorContext 객체를 통해 forwarded() 리스트를 얻어서 테스트를 평가한다.
    3. 만약 punctuator 기능이 필요한 경우 MockProcessorContext.scheduledPunctuator() 기능을 이용해서 처리할 수 있다. 

     

    삽질 주의

    테스트 할 Processor가 Stateful한 경우에는 StateStore를 반드시 추가해줘야한다. 이 때 StateStore를 초기화 하는 과정도 필요한데, 초기화를 해주지 않으면 metric 관련된 에러가 발생한다. StateStore를 어떻게 초기화 해줘야할지 나는 삽질을 많이 했다. 초기화는 다음과 같이 해주면 된다. 

    stateStore.init(processorContext.getStateStoreContext(), stateStore);
    processorContext.addStateStore(stateStore);

     

    셋업 코드

    먼저 전체 테스트에서 계속 사용될 녀석들을 생성하고 필드로 등록해준다. 

    1. StateStore를 생성해서 ProcessorContext, Processor에 각각 등록해준다. StateStore를 초기화한다.
    2. MockProcessorContext를 생성한다.
    3. Processor를 생성하고 초기화한다. 
    public class MyStockTransactionProcessorWithPunctuatorTest {
    
        private final String STORE_NAME = "HELLO";
        private Processor myProcessor;
        private MockProcessorContext<String, StockTransaction> processorContext;
    
    
        @Before
        public void setUp() {
            Properties props = new Properties();
            props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
            props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    
    		// MockProcessorContext 생성
            processorContext = new MockProcessorContext<>(props);
    
            // StateStore 생성
            KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(STORE_NAME);
            StoreBuilder<KeyValueStore<String, StockTransaction>> stateStoreBuilder = Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), StreamsSerdes.StockTransactionSerde());
            KeyValueStore<String, StockTransaction> stateStore = stateStoreBuilder.withCachingDisabled().withLoggingDisabled().build();
    
    		// StateStore 초기화
            stateStore.init(processorContext.getStateStoreContext(), stateStore);
            processorContext.addStateStore(stateStore);
    
    		// Processor 생성
            myProcessor = new MyProcessorWithPunctuator(STORE_NAME);
            myProcessor.init(processorContext);
        }

     

     

    테스트 코드

    프로세서의 process 메서드와 punctuate 기능을 테스트 하는 코드를 두 개 작성했다. 

    1. process()를 하게 될 경우, process한 녀석은 processorContext의 forwarded() 큐에 저장되어있다. 이 값을 찾아와서 프로세스 된 결과를 기대값과 비교하는 테스트를 작성한다
    2. process()를 하게 될 경우, process한 녀석은 StateStore로 들어가있을 것이다. 그리고 forwarded() 큐에는 프로세스 된 녀석이 하나 들어있을 것이다.
      • 이 부분을 resetForwards() 명령어를 이용해서 큐를 비운 후에, punctuate() 메서드를 호출한다.
      • punctuate() 메서드는 context.forward()를 이용해서 downstream 처리를 해주기 때문에 다시 forwarded() 큐에 펑츄에이티드 된 값이 저장될 것이다.
      • 이 값을 기대값과 비교해본다. 
    @Test
    @DisplayName("process test")
    public void test1() {
        StockTransaction stockTransaction = DataGenerator.generateStockTransaction();
        myProcessor.process(new Record(stockTransaction.getSymbol(), stockTransaction, SystemTime.SYSTEM.milliseconds()));
        assertThat(processorContext.forwarded().size()).isEqualTo(1);
        assertThat(processorContext.forwarded().get(0).record().key()).isEqualTo("new-" + stockTransaction.getSymbol());
        assertThat(processorContext.forwarded().get(0).record().value().getCustomerId()).isEqualTo(stockTransaction.getCustomerId());
    }
    
    @Test
    @DisplayName("punctuate test")
    public void test2() {
        StockTransaction stockTransaction = DataGenerator.generateStockTransaction();
        myProcessor.process(new Record(stockTransaction.getSymbol(), stockTransaction, SystemTime.SYSTEM.milliseconds()));
    
        List<MockProcessorContext.CapturedPunctuator> capturedPunctuators = processorContext.scheduledPunctuators();
        MockProcessorContext.CapturedPunctuator capturedPunctuator = capturedPunctuators.get(0);
    
        processorContext.resetForwards();
    
        Duration interval = capturedPunctuator.getInterval();
        PunctuationType type = capturedPunctuator.getType();
        Punctuator punctuator = capturedPunctuator.getPunctuator();
        punctuator.punctuate(0);
    
        assertThat(interval).isEqualTo(Duration.ofSeconds(15));
        assertThat(type).isEqualTo(STREAM_TIME);
    
        MockProcessorContext.CapturedForward<? extends String, ? extends StockTransaction> capturedForward = processorContext.forwarded().get(0);
        Record<? extends String, ? extends StockTransaction> record = capturedForward.record();
    
        assertThat(processorContext.forwarded().size()).isEqualTo(1);
        assertThat(record.key()).isEqualTo(stockTransaction.getSymbol());
    }

     

     

    8.2 통합 테스트 구축

    이 책이 작성되었을 때는 Embedded Kafka Cluster를 생성할 수 있어서 테스트가 가능했다. 현재는 라이브러리에 등록되어있지 않아서 어떻게 해야할지 모르겠다. 

     

     

    pipeInput은 어떻게 동작할까?

    예를 들어 아래와 같이 pipeInput이 연속으로 두 개가 있으면 실제로 아래 테스트는 어떻게 동작할까? pipeInput() 함수는 호출되자마자 자신이 전달받은 레코드를 Topology 전체에 Depth-First로 작업을 한다. 이 과정에서 StateStore에 값을 업데이트 하기도 하고, 마지막에 생성한 값을 Sink 노드에 전달하기도 한다. 그렇게 한번 완료되면 pipeInput()이 끝난다. 그리고 두번째 pipeInpu()이 시작된다. 즉, 레코드 2개가 전체 아웃풋 처리가 되었다고 보는 것이 타당하다.

    @Test
    public void test1() {
        StockTransaction stockTransaction = DataGenerator.generateStockTransaction();
        StockTransaction stockTransaction1 = DataGenerator.generateStockTransaction();
        inputTopic.pipeInput(stockTransaction.getSymbol(),stockTransaction);
        inputTopic.pipeInput(stockTransaction.getSymbol(),stockTransaction1);
    
        StockPerformance stockPerformance = keyValueStore.get(stockTransaction.getSymbol());
        assertThat(stockPerformance.getCurrentShareVolume()).isEqualTo(stockTransaction.getShares());
        assertThat(stockPerformance.getCurrentPrice()).isEqualTo(stockTransaction.getSharePrice());
    }

     

     

    readValue()는 어떻게 동작할까?

    pipeInput()을 하게 되면 각 레코드는 output 토픽의 큐에 순서대로 적재된다. 이 때, readValue()를 이용해서 메세지를 읽어오면 output 토픽의 큐에서 먼저 들어온 레코드 순서대로 적출된다. 예를 들어 아래 코드에서 pipeInput()이 진행되면 아웃풋 토픽의 레코드 큐에는 총 2개의 메세지가 적재되어있다. 그리고 readValue()를 하면 먼저 들어온 purchase 데이터가 큐에서 빠져나오고, 아웃풋 큐에는 1개의 메세지만 남아있게 된다. 

    @Test
    @DisplayName("outputTopic Purchase Test")
    public void this1() {
        Purchase purchase = DataGenerator.generatePurchase();
        Purchase purchase1 = DataGenerator.generatePurchase();
        inputTopic.pipeInput(purchase);
        inputTopic.pipeInput(purchase1);
        Purchase outputValue = outputTopicPurchase.readValue();
        Purchase expectedValue = Purchase.builder(purchase).maskCreditCard().build();
        assertThat(outputValue).isEqualTo(expectedValue);
    }

    댓글

    Designed by JB FACTORY