Kafka Streams : 모니터링과 성능

    이 글은 Kafka Streams In Action을 보고 공부하며 정리한 글입니다. 

    7. 모니터링과 성능

    • Kafka Streams의 모니터링
    • Kafka Stremas의 성능
    • Kafka Streams의 수집한 메트릭 관찰
    • Kafka Streams의 토폴로지를 보여주는 기술

    이번 글에서는 다음을 알아보려고 한다.

     

    7.1 기본적인 카프카 모니터링

    Kafka Streams는 카프카 컨슈머와 프로듀서를 사용한다. 따라서 카프카 컨슈머와 프로듀서 모니터링을 하는 방법에 대해서 알아본다.

    7.1.1 컨슈머와 프로듀서 성능 측정

    프로듀서는 메세지를 공급하는 주체이다. 따라서 프로듀서의 성능은 '얼마나 빨리 메세지를 공급하는지'로 결정된다. 컨슈머는 메세지를 소비하는 주체인데 'Consumer Lag'으로 컨슈머의 성능을 평가한다. Consumer Lag은 프로듀서가 메세지를 공급하는 속도보다 컨슈머가 메세지를 흡수하는 속도가 느려서 발행된 메세지의 전달이 지연되는 것을 의미한다.

    일반적으로 Consumer Lag은 당연히 발생할 수 밖에 없는데, 문제가 되는 경우는 Consumer Lag이 점점 커지는 경우가 될 것이다. 이상적으로는 Consumer Lag이 존재하지 않거나, 일정한 Consumer Lag을 유지하는 상태일 것이다.

     

    7.1.2 Consumer 지연 확인하기

    // 조회 가능한 Consumer Group 확인
    $ kafka-consumer-group.sh --bootstrap-server localhost:9092 --list
    
    // Consumer Lag 확인
    $ kafka-consumer-group.sh --bootstrap-server localhost:9092 --group <GROUP-NAME> -- describe

    먼저 리스트 명령어를 이용해서 조회 가능한 Consumer Group을 확인한다. 그리고 찾아온 Consumer Group에 대해 describe 명령어를 이용해서 조회를 하면 해당 Consumer가 구독하고 있는 TopicPartition에 대한 정보를 알려준다.

    명령어를 실행하면 다음 결과를 확인할 수 있다. 여기서 중요한 부분은 이것이다

    • CURRENT-OFFSET : 현재 Consumer Group의 Consumer가 현재까지 읽은 Offset
    • LOG-END-OFFSET : 현재 Producer가 TopicPartition에 제공한 Offset
    • LAG : Consumer Lag를 의미함. LOG-END-OFFSET - CURRENT-OFFSET

    기본적으로 약간의 Consumer Lag은 존재할 수 밖에 없다. Consumer는 메세지를 배치 단위로 읽어와서 배치 단위로 처리한다. 즉, 일정한 크기가 되지 않는 이상 Consumer는 Broker에서 메세지를 가져오지 않기 때문에 소비되지 않은 레코드가 쌓인다. 따라서 약간의 Consumer Lag이 발생할 수 있다. 

    만약 Consumer Lag이 계속 증가하는 상태라면 컨슈머에게 더 많은 리소스를 제공해야한다. 이 때는 Topic의 Partition을 늘려서 더 많은 Consumer를 제공해야한다. 혹은, Consumer가 메세지를 처리하는 속도가 느릴 수 있기 때문에 이 부분을 비동기 큐에 전달해 다른 스레드가 메세지를 처리하도록 해야할 수도 있다. 

    7.1.3 프로듀서와 컨슈머 가로채기

    Kafka Producer / Consumer는 Interceptor 클래스를 제공한다. Interceptor 클래스는 Producer / Consumer가 동작하는 가운데 특정 부분에서 현재 상태를 알아볼 수 있는 도움을 준다. Kafka Streams에서 한 가지 좋은 활용처는 카프카 스트림즈 어플리케이션이 카프카 토픽으로 다시 생상하는 메세지 오프셋을 추적하는데 사용하는 것이다. 예를 들어 Repartition을 해서 Internal Topic을 만드는데 사용해볼 수 있다. 

    Consumer Interceptor

    Kafka는 ConsumerInterceptor 인터페이스를 제공한다. 사용자는 이 인터페이스를 구현하고, 등록해서 사용하기만 하면 된다. 이렇게 하면 Kafka는 특정 메서드가 호출되는 시점에 등록된 ConsumerInterceptor를 호출해준다. 

    • onConsume() : Consumer가 poll() 한 후, 메세지를 반환하기 전에 호출됨. 전달되는 레코드의 수정이 가능함.
    • onCommit() : Consumer가 commit()하면 commit에 대한 결과로 메타 정보를 응답받는다. 이 메타 정보를 넘겨주면서 호출된다.

    ConsumerInterceptor는 다음 메서드가 호출될 때 호출된다.

    @Slf4j
    public class StockTransactionConsumerInterceptor implements ConsumerInterceptor<String, StockTransaction> {
        @Override
        public ConsumerRecords<String, StockTransaction> onConsume(ConsumerRecords<String, StockTransaction> records) {
            ...
        }
    
        @Override
        public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
            ...
        }
    	...
    }

     ConsumerInterceptor는 기본적으로 여러 개 등록할 수 있고, ConsumerInterceptor는 ConsumerInterceptors<K, V> 클래스에 의해서 관리된다. ConsumerInterceptors 클래스는 내부적으로 List 형태로 등록된 ConsumerInterceptor 들을 관리한다. 이 객체를 통해 ConsumerInterceptor는 체이닝될 수 있다. 아래는 KafkaConsumer 클래스의 poll() 메서드다.

    아래에서 확인할 수 있듯이, interceptors(인터셉터들)을 onConsume() 메서드를 이용해서 계속 호출하는 것을 볼 수 있다. 체이닝 형태로 호출된다는 것이다.

    // KafkaConsumer.java
    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
        ...
        	return this.interceptors.onConsume(new ConsumerRecords<>(fetch.records()));
                }
    	..
    }

    체이닝에서 알아둬야 할 점은 다음과 같다. 

    1. onConsume() 도중에 에러가 발생하면 Catch()에 의해서 예외는 처리되고, 다음 인터셉터는 실행된다.
    2. 인터셉터가 처리한 레코드가 다음 레코드에 전달되는 방식이다. 따라서 하나의 인터셉터에서 예외가 발생하는 경우, 다음 인터셉터는 정상적으로 동작하지 않을 수 있다. 

    2번과 같은 문제가 있기 때문에 인터셉터끼리는 서로가 생성하는 레코드에 의존관계를 가지지 않도록 하는 것이 좋다. 즉, 레코드를 수정하지 않는 것이 좋다. 

    // ConsumerInterceptors.java
    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
        ConsumerRecords<K, V> interceptRecords = records;
        for (ConsumerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptRecords = interceptor.onConsume(interceptRecords);
            } catch (Exception e) {
                // do not propagate interceptor exception, log and continue calling other interceptors
                log.warn("Error executing interceptor onConsume callback", e);
            }
        }
        return interceptRecords;
    }

    인터셉터의 onCommit()은 ConsumerCoordinator의 doCommitOffsetAsync()가 호출될 때 사용된다. interceptors.onCommit()을 확인할 수 있다.

    // ConsumerCoordinator.java
    private RequestFuture<Void> doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
        RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
        final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
        future.addListener(new RequestFutureListener<Void>() {
            @Override
            public void onSuccess(Void value) {
                if (interceptors != null)
                    interceptors.onCommit(offsets);
                completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
            }
    
            @Override
            public void onFailure(RuntimeException e) {
                Exception commitException = e;
    
                if (e instanceof RetriableException) {
                    commitException = new RetriableCommitFailedException(e);
                }
                completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
                if (commitException instanceof FencedInstanceIdException) {
                    asyncCommitFenced.set(true);
                }
            }
        });
        return future;
    }

    interceptors(인터셉터들은) onCommit() 메서드에 의해서 체이닝된다. 이 때도 마찬가지로 for 문을 돌면서 처리가 되는데 정리하면 다음과 같다.

    1. 인터셉터의 onCommit() 메서드를 수행하는 과정에서 발생하는 예외는 catch() 된다.
    2. 인터셉터의 onCommit() 메서드는 메세지를 생성하지 않기 때문에 인터셉터 간의 메세지 의존성은 존재하지 않음. 
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        for (ConsumerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptor.onCommit(offsets);
            } catch (Exception e) {
                // do not propagate interceptor exception, just log
                log.warn("Error executing interceptor onCommit callback", e);
            }
        }
    }

     

    ProducerInterceptor

    Kafka는 프로듀서 인터셉터를 위한 ProducerInterceptors 인터페이스를 제공한다. 사용자는 이 인터페이스를 구현 및 등록해서 사용하기만 하면 된다. 인터셉터는 다음 메서드를 지원하고, 이 메서드가 호출될 때 인터셉터가 호출되어 자신의 일을 하게 될 것이다

    • onSend() : 프로듀서가 메세지를 보낼 때 onSend() 메서드가 호출된다. 프로듀서가 메세지를 보내기 직전에 프로듀서 인터셉터가 호출된다.
    • onAcknowledgement() : 브로커가 레코드를 받으면 ACK 신호를 프로듀서에게 전달한다. 이 때, onAcknowledgement() 메서드가 호출된다. 
    @Slf4j
    public class StockTransactionProducerInterceptor implements ProducerInterceptor<String, StockTransaction> {
    
        @Override
        public ProducerRecord<String, StockTransaction> onSend(ProducerRecord<String, StockTransaction> record) {
        	...
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    		...
    	}
    }

    아래는 Kafka Producer의 send() 메서드다. 이곳에서 Kafka Producer는 메세지를 보내는데, 메세지를 보내기 직전에 인터셉터를 호출한다. 따라서 메세지가 브로커로 가기 전에 인터셉터는 그 내용을 가로채서 확인해볼 수 있다. 

    // KafkaProducer.java
    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }

    인터셉터는 ProducerInterceptors.java 클래스에 의해서 관리된다. 이 녀석 역시 내부적으로 List<> 형태로 Interceptor를 가지고 있다. 

    1. for 문으로 반복되고, 인터셉터에서 처리된 레코드가 전달된다.
    2. catch 문으로 예외는 잡아진다. 

    여기서 살펴볼 것은 인터셉터에서 처리된 레코드가 다음 인터셉터로 전달되고, 결국에는 KafkaProducer의 doSend() 메서드에까지 전달된다. 만약 인터셉터에서 레코드가 변경된다면 Kafka Broker까지 변경된 레코드가 전달된다는 것을 의미한다. 따라서 이 부분을 유의하고 사용해야한다. 

    // ProducerInterceptors.java
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        ProducerRecord<K, V> interceptRecord = record;
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptRecord = interceptor.onSend(interceptRecord);
            } catch (Exception e) {
                // do not propagate interceptor exception, log and continue calling other interceptors
                // be careful not to throw exception from here
                if (record != null)
                    log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
                else
                    log.warn("Error executing interceptor onSend callback", e);
            }
        }
        return interceptRecord;
    }

    인터셉터의 onAcknowledgemnt()는 KafkaProducer의 onCompletion() 메서드에서 호출된다. 이 메서드는 브로커가 메세지를 잘 받았고 ACK를 보내주면, Callback에 의해서 호출되는 메서드다. 브로커에게 응답 받은 결과에 대해서 모니터링 해볼 수 있는 여지가 존재한다. 

    // KafkaProducer.java
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (metadata == null) {
            metadata = new RecordMetadata(topicPartition(), -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1);
        }
        this.interceptors.onAcknowledgement(metadata, exception);
        if (this.userCallback != null)
            this.userCallback.onCompletion(metadata, exception);
    }

    마찬가지로 for문을 돌면서 체이닝된다. 주의해야할 점은 다음과 같다.

    1. 여기서 발생한 예외는 Catch 되어서 먹어진다.
    2. 생성된 레코드가 없고 전달되지 않기 때문에 의존성 문제는 없다. 
    // ProducerInterceptos.java
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptor.onAcknowledgement(metadata, exception);
            } catch (Exception e) {
                // do not propagate interceptor exceptions, just log
                log.warn("Error executing interceptor onAcknowledgement callback", e);
            }
        }
    }

     

    Kafka Streams에서 Producer / Interceptor 사용하기

    Kafka Streams에서 Producer / Interceptor를 사용하기 위해서는 Property 파일에 이것을 사용하겠다는 것을 등록해줘야한다. 그런데 Kafka Streams는 어플리케이션 이름이 있기 때문에 설정명에 반드시 Prefix를 붙여줘야한다. 이것들을 고려하면 사용하는 방법은 다음과 같다.

    props.put(StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG),
            List.of(StockTransactionProducerInterceptor.class.getName(),ClicksProducerInterceptor.class.getName()));
    1. setProperty()가 아닌 put()으로 넣어줘야한다.
    2. 설정명에 procuerPrefix() / consumerPrefix()를 이용해서 설정명을 한번 감싸야한다.
      • 이렇게 하면 interceptor.classes → producer.interceptor.classes로 바뀌게 된다. 이 설정값을 넣어줘야한다. 
    3. 여러 Producer / Interceptor를 사용하기 위해 리스트 형식으로 사용할 인터셉터 클래스 이름을 넣어준다.

     

     

    7.2 어플리케이션 메트릭

    카프카 스트림즈는 성능 메트릭을 수집하는 메커니즘을 제공한다. 대부분의 경우 일부 설정값을 제공하면 된다. 그렇지만 메트릭 수집에는 성능 비용이 발생하기 때문에 INFO / DEBUG 레벨 중 필요한 것을 사용해야한다. 일반적으로 프로덕션 환경에서는 성능 비용이 높기 때문에 DEBUG는 사용하지 않고, 주로 INFO 레벨을 사용한다고 한다. 메트릭을 사용하기 위해서는 다음 정보를 넣어주면 된다. 

    // 메트릭 설정
    props.setProperty(StreamsConfig.CLIENT_ID_CONFIG,"metrics-client-id");
    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"metrics-group-id");
    props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");

    다음 정보를 설정해주면 DEBUG 레벨의 메트릭 측정 항목을 수집하고 기록할 수 있께 된다. 

    수집한 메트릭 확인

    JCONSOLE은 JDK가 설치될 때 함께 설치된다. 따라서 이걸 이용해서 간단히 수집된 메트릭을 확인해볼 수 있다. JAVA가 설치된 폴더의 bin 디렉토리로 가면 jconsole이 존재한다. 이것을 더블 클릭한다. 

    1. New Connection을 클릭한다.
    2. 현재 돌아가고 있는 Kafka Streams를 클릭한다.

    기본적으로는 위와 같은 화면이 나온다. 여기서 MBEANS 차트를 클릭하면 이곳에서 Kafka Streams 어플리케이션 성능에 대한 수집된 통계가 포함되어있다. 

    MBEANS > kafka.streams로 들어가면 Kafka Stream에서 제공하고 있는 Task들을 모두 확인해볼 수 있다.

     

     

    7.3 추가적인 카프카 스트림즈 디버깅 기술

    Metric과 Interceptor를 이용하면 카프카 모니터링을 할 수 있다. 그렇지만 좀 더 다양한 디버깅 방법을 알고 있으면 좋기 때문에 추가로 아래 부분을 살펴보고자 한다. 

     

    7.3.1 어플리케이션 구조 조회

    kafka Streams는 토폴로지 형식으로 구현된다. Kafka Streams를 실행하기 전에 Topology에 대한 정보, Topology의 StateStore 정보들을 미리 출력해볼 수 있다. 

    • Kafka Streams의 Topology는 여러 SubTopology로 구현된다. (내부 토픽 등) 따라서 다음과 같이 하나의 Topology에 여러 SubTopology가 들어간다.
    • Kafka Streams는 여러 GlobalStateStore를 가진다. 따라서 다음과 같이 하나의 Topology에 여러 GlobalStateStore를 가진다. 

    이 때 주의해야할 점은 GlobalStore()는 GlobalKTable과 관련된 요소들만 볼 수 있다는 점이다. 지엽적으로 사용되는 StateStore에 대한 정보는 알 수 없다. 

    // Topology 설명 객체 생성
    TopologyDescription describe = topology.describe();
    
    // Subtopology 모두 출력
    Set<TopologyDescription.Subtopology> subtopologies = describe.subtopologies();
    for (TopologyDescription.Subtopology subtopology : subtopologies) {
        System.out.println("subtopology = " + subtopology);
    }
    
    // StateStore 정보 모두 출력 
    Set<TopologyDescription.GlobalStore> globalStores = describe.globalStores();
    for (TopologyDescription.GlobalStore globalStore : globalStores) {
        System.out.println("globalStore = " + globalStore);            
    }

    위 명령어를 이용해서 출력해보면 다음과 같이 나오는 것을 볼 수 있다.

    1. Subtopology의 번호는 StreamTask의 [번호-번호] 이름 중에 앞쪽 번호를 의미하는 것으로 볼 수 있다. 뒷쪽 번호는 파티션과 관련된 번호다.
    2. GlobalStateStore만 출력되는 것을 볼 수 있다. 
    subtopology = Sub-topology: 0
        Source: KSTREAM-SOURCE-0000000000 (topics: [stock-transactions])
          --> KSTREAM-KEY-SELECT-0000000001
        Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
          --> KSTREAM-FILTER-0000000005
          <-- KSTREAM-SOURCE-0000000000
        Processor: KSTREAM-FILTER-0000000005 (stores: [])
          --> KSTREAM-SINK-0000000004
          <-- KSTREAM-KEY-SELECT-0000000001
        Sink: KSTREAM-SINK-0000000004 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)
          <-- KSTREAM-FILTER-0000000005
    
    
    subtopology = Sub-topology: 1
        Source: KSTREAM-SOURCE-0000000006 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])
          --> KSTREAM-AGGREGATE-0000000003
        Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])
          --> KTABLE-TOSTREAM-0000000007
          <-- KSTREAM-SOURCE-0000000006
        Processor: KTABLE-TOSTREAM-0000000007 (stores: [])
          --> KSTREAM-MAP-0000000008
          <-- KSTREAM-AGGREGATE-0000000003
        Processor: KSTREAM-MAP-0000000008 (stores: [])
          --> KSTREAM-LEFTJOIN-0000000015
          <-- KTABLE-TOSTREAM-0000000007
        Processor: KSTREAM-LEFTJOIN-0000000015 (stores: [])
          --> KSTREAM-LEFTJOIN-0000000016
          <-- KSTREAM-MAP-0000000008
        Processor: KSTREAM-LEFTJOIN-0000000016 (stores: [])
          --> KSTREAM-PRINTER-0000000017
          <-- KSTREAM-LEFTJOIN-0000000015
        Processor: KSTREAM-PRINTER-0000000017 (stores: [])
          --> none
          <-- KSTREAM-LEFTJOIN-0000000016
    
    
    globalStore = Sub-topology: 2 for global store (will not generate tasks)
        Source: KSTREAM-SOURCE-0000000010 (topics: [companies])
          --> KTABLE-SOURCE-0000000011
        Processor: KTABLE-SOURCE-0000000011 (stores: [companies-STATE-STORE-0000000009])
          --> none
          <-- KSTREAM-SOURCE-0000000010
    
    globalStore = Sub-topology: 3 for global store (will not generate tasks)
        Source: KSTREAM-SOURCE-0000000013 (topics: [clients])
          --> KTABLE-SOURCE-0000000014
        Processor: KTABLE-SOURCE-0000000014 (stores: [clients-STATE-STORE-0000000012])
          --> none
          <-- KSTREAM-SOURCE-0000000013

     

     

    7.3.2 StateListner 사용

    Kafka Streams는 다음과 같은 상태를 가지고 있다. Kafka Streams는 동작하는 도중에 언제든지 위의 상태 사이에서 왔다갔다 할 수가 있다. 이런 상태 변경 상황이 발생했는지를 확인하고 싶을 때가 있다. 이 때는 StateListener를 사용할 수 있다. 

    Kafka Streams는 State의 변화를 모니터링 하기 위한 StateListener 인터페이스를 지원한다. 이 인터페이스를 구현하고, Kafka Streams에 등록해주면 Kafka Streams의 상태가 변할 때 StateListener가 호출된다. StateListener가 호출되면 StateListener 내부에 구현된 로직을 따라서 로깅을 할 수 있게 된다.

    아래 StateListenr 인터페이스의 onChange 메서드만 호출하면 된다. 

    • newState / OldState가 인자로 넘어온다.
      • 상태가 변할 때 StateListener가 호출되기 때문에 위와 같은 인자가 넘어온다
      • 위의 인자를 활용해서 어떨 때 어떤 로그를 출력할지 설정할 수 있다. 
    public interface StateListener {
        void onChange(final State newState, final State oldState);
    }

    StateListener를 사용할 때는 다음과 같이 사용할 수 있다. 아래 순서대로 작성하면 되고, 아래에 사용 예시가 있다.

    1. StateListener 인터페이스를 구현하고, 객체를 생성한다.
    2. KafkaStreams.seStateListener()에 생성한 객체를 넘겨서 셋팅한다. 
    // StateListener 구현
    KafkaStreams.StateListener stateListener = new KafkaStreams.StateListener() {
        @Override
        public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
            if (newState == CREATED) {
                log.info("ASHASHASHASH KAFKA STREAMS ");
            }
    
            if (newState == RUNNING) {
                log.info("ASHASHASHASH KAFKA STREAMS RUNNING");
            }
    
        }
    };
    
    // StateListener 셋팅
    kafkaStreams.setStateListener(stateListener);

     

    7.3.3 StateStoreListener 사용

    Kafka Streams는 StateStore를 이용해서 집계 처리를 한다. 이 때 StateStore를 백업하기 위해서 Broker에 관련된 ChangeTopic을 생성한다. 만약 로컬 디스크에 파일이 완전히 삭제된 경우 ChangeLog로부터 데이터를 읽어와서 천천히 복구를 시작하는데, 레코드의 양에 따라서 오랜 시간이 걸릴 수도 있다. StateStore가 모두 Restore 되어야 모든 StreamTask가 Acitve 상태가 되어 Kafka Streams가 동작할 수 있게 된다. 

    StateStore의 복원에는 오랜 시간이 걸릴 수 있기 때문에 복원 상태를 모니터링 할 수 있어야 한다. 그렇지 않으면 복원하고 있는 상황인지 알 수 없기 때문이다. Kafka Streams는 이런 동작을 위해서 StateRestoreLiestener 인터페이스를 제공한다.그리고 이 인터페이스는 세 가지 기능을 제공한다. 주의해야 할 점은 StateStoreListener는 Global하게 사용되고 있고 기본적으로 Stateless하다. 따라서 어떤 StateStore를 어디까지 복구했는지에 대한 상태 정보를 가지고 싶다면 StateStoreListener에 ConcurrentHashMap 같은 녀석들을 선언해서 정보를 보관하는 것을 추천한다. 

    • onRestoreStart() : StateStore 복구가 시작될 때마다 호출된다.
    • onBatchRestored() : restoreConsumer에서 읽어온 레코드가 StateStore에 적재(복구)된 이후에 바로 호출된다.(여러번 호출된다)
    • onRestoreEnd() : StateSTore의 복구가 완료되면 호출된다. 
    public interface StateRestoreListener {
    
        /**
         * Method called at the very beginning of {@link StateStore} restoration.
         */
        void onRestoreStart(final TopicPartition topicPartition,
                            final String storeName,
                            final long startingOffset,
                            final long endingOffset);
    
        /**
         * Method called after restoring a batch of records.  In this case the maximum size of the batch is whatever
      
         */
        void onBatchRestored(final TopicPartition topicPartition,
                             final String storeName,
                             final long batchEndOffset,
                             final long numRestored);
    
        /**
         * Method called when restoring the {@link StateStore} is complete.
         */
        void onRestoreEnd(final TopicPartition topicPartition,
                          final String storeName,
                          final long totalRestored);
    
    }

    사용하고자 할 때는 다음과 같이 사용하면 된다.

    1. StateRestoreListener 구현 및 객체를 생성한다.
    2. KafkaStreams.setGlobalStateRestoreListener()에 생성된 객체를 넘겨서 셋팅한다.
    StateRestoreListener stateRestoreListener = new StateRestoreListener() {
        // 각 토픽 파티션 별로 복구해야할 offset을 기록한다. 
        private Map<TopicPartition, Long> totalToRestore = new ConcurrentHashMap<>();
    
        // 각 토픽 파티션 별로 최종 복구된 offset를 기록한다.
        private Map<TopicPartition, Long> restoredSoFar = new ConcurrentHashMap<>();
    
        @Override
        public void onRestoreStart(TopicPartition topicPartition, String storeName, long startingOffset, long endingOffset) {
            long toRestore = endingOffset - startingOffset;
            totalToRestore.put(topicPartition, toRestore);
            log.info("start restore. restore target : {}, topicPartition : {}, record to Restore : {}", storeName, topicPartition, toRestore);
        }
    
        @Override
        public void onBatchRestored(TopicPartition topicPartition, String storeName, long batchEndOffset, long numRestored) {
    
            // 복원된 전체 레코드 수 계산
            long currentProgress = batchEndOffset + restoredSoFar.getOrDefault(topicPartition, 0L);
            double percentComplete = (double) currentProgress / totalToRestore.get(topicPartition);
            log.info("restore progress : {}, progress Percent : {}, Target store : {}, topicPartition : {}",
                    currentProgress, percentComplete, storeName, topicPartition );
    
            // 복원된 레코드 수를 저장한다. 
            restoredSoFar.put(topicPartition, currentProgress);
        }
    
        @Override
        public void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored) {
            log.info("Restore Completed : {}, topicPartition : {}", storeName, topicPartition);
        }
    };
    
    // 설정
    kafkaStreams.setGlobalStateRestoreListener(stateRestoreListener);

     

     

    7.3.4 StreamsUncaughtExceptionHandler

    예상하지 못한 에러가 발생할 수 있다. 이 경우에는 그냥 바로 꺼지는 것보다는 로그를 한 줄이라도 남기고 꺼지는 것이 좋다. 그런데 예상하지 못한 에러가 발생했을 때 어떻게 이런 부분을 남길 수 있을까? Kafka Streams는 이것을 위해서 StreamsUncaughtExceptionHandler 인터페이스를 제공한다. 

    이 인터페이스를 구현하고, Kafka Streams에 값을 설정해준다. Kafka Streams에 값을 설정해주면, Kafka Streams는 값을 설정하면서 GlobalStreamThread와 StreamThread 모두에 Exception Handler를 등록해준다. 설정한 Exception Handler는 각각 StreamThread에서 사용할 수 있게 된다. 

     

    Exception Handler 생성 및 등록

    생성 및 등록은 다음과 같이 작성할 수 있다. 최근에는 Response 객체까지 생성해서 반환할 수 있다고 한다. 

    // ExceptionHandler 생성
    StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler = exception -> {
        log.info("exception raise");
        return null;
    };
    
    // ExceptionHandler 등록
    kafkaStreams.setUncaughtExceptionHandler(streamsUncaughtExceptionHandler);

    KafkaStreams.setUncaughtExceptionHandler() 메서드를 호출하면 아래 메서드가 호출된다. 이 메서드에서는 주로 두 가지 행동이 있는 것을 볼 수 있다.

    1. StreamThread에 User가 설정한 ExceptionHandler를 등록한다.
    2. GlobalStreamThread에 User가 설정한 ExceptionHandler를 등록한다. 

    이 때 주의해야 할 점은 ExceptionHandler가 리스트 형태로 관리되는 것이기 아니기 때문에 사용자 정의 Exception Handler는 하나만 등록할 수 있다는 점이다. 

    // KafkaStreams.java
    public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler userStreamsUncaughtExceptionHandler) {
        synchronized (stateLock) {
            if (state.hasNotStarted()) {
                ...
                // StreamThread에 설정
                processStreamThread(thread -> thread.setStreamsUncaughtExceptionHandler(streamsUncaughtExceptionHandler));
                if (globalStreamThread != null) {
                    // GlobalStreamThread에 설정
                    globalStreamThread.setUncaughtExceptionHandler(
                        exception -> handleStreamsUncaughtException(exception, userStreamsUncaughtExceptionHandler, false)
                    );
                }
    	...
    }

    이렇게 등록된 녀석은 StreamThread의 아래 메서드 내부의 Catch 절에서 호출된다. 기본적으로 큼지막한 동작들에서 Catch 절로 등록되기 때문에 Kafka Streams에서 예기치 않은 예외가 발생하면 비명횡사를 하기보다는 로그 한 줄이라도 남겨볼 수 있을 것 같다. 

    • StreamThread.run()
    • StreamThread.pollPhase()
    • StreamThread.runLoop()

     

    요약

    • Kafka Streams를 모니터링하려면 Kafka Broker도 살펴봐야한다. 
    • 어플리케이션 성능을 확인하고 싶은 경우 메트릭 리포팅을 수시로 활성화 해야한다.
    • 가끔 jstack(스레드 덤프), jmap/jhat(힙 덤프) 같은 자바에 포함된 명령줄 도구를 사용해 좀 더 저수준에서 어플리케이션 동작을 이해해야한다. 

    댓글

    Designed by JB FACTORY