Kafka Streams : 부록
- Kafka eco-system/KafkaStreams
- 2022. 11. 22.
들어가기 전
이 책은 Kafka Stremas in Action의 부록A~B를 공부하며 작성한 글입니다.
시작 시 리밸런싱 수 제한하기 (group.initial.rebalance.delay.ms)
카프카 스트림즈가 처음 기동될 때 카프카 스트림즈는 Consumer와 동일한 형태로 동작한다. 다음 동작으로 정리할 수 있다.
- 첫번째로 기동되는 카프카 스트림즈는 브로커의 Group Coordinator에서 모든 TopicPartition을 가져온다. 그리고 본인은 컨슈머 리더가 된다.
- 두번째로 기동되는 카프카 스트림즈는 브로커의 Group Coordinator에게 자신이 들어온 것을 알린다. 그리고 리밸런싱이 발생한다.
- 세번째로 기동되는 카프카 스트림즈는 브로커의 Group Coordinator에게 자신이 들어온 것을 알린다. 그리고 리밸런싱이 발생한다.
이것은 카프카 스트림즈의 정상적인 동작인데, 파티션 리밸런싱이 일어나는 동안 일반적으로는 레코드의 Consume이 중지된다. 즉, 카프카 스트림즈의 동작이 오랫동안 멈추게 된다. 한 가지 문제점이 될 것은 리밸런싱 이후에 정상적으로 동작하려는 순간 또 다시 리밸런싱이 계속 일어나는 경우가 있다. 이 부분에 도움을 주기 위해 group.initial.rebalance.delay.ms가 도입되었다.
group.initial.rebalance.delay.ms는 새로운 카프카 컨슈머가 조인했을 때 설정된 시간만큼 기다렸다가 리밸런싱을 시작한다. 기다리는 도중에 새로운 카프카 컨슈머가 조인한다면 기다리는 시간은 다시 초기화되어 기다리게 된다. 이 설정을 잘 활용하면 새로운 카프카 인스턴스를 시작할 때 모든 인스턴스가 온라인 상태가 될 때까지 리밸런싱이 지연될 수 있다.
브로커 중단에 대한 카프카 스트림즈 회복력
항목 | 내용 |
ProducerConfig.NUM_RETRIES | Producer가 브로커에 메세지를 보내고 Error or 응답이 없는 경우 재전송 할 수 있는 횟수 |
ProducerConfig.REQUEST_TIMEOUT | 브로커로부터 Error 또는 응답이 없는 경우, request.timeout.ms만큼 기다림. 이 시간이 지나면 Retyr를 하너간 Timeout Exception이 발생함. |
ProducerConfig.BLOCK_MS_CONFIG | Record Accumulator에 데이터가 꽉 차 있는 경우, 더 이상 데이터는 적재되지 않음. 데이터가 들어가지 않을 때 이 설정 값만큼 Record Accumulator가 비기를 기다림. |
ConsumerConfig.MAX_POLL_CONFIG | 이전 poll() 호출 후, 다음 poll() 호출까지 브로커가 기다리는 시간. 이 시간 내에 호출이 발생하지 않으면, Consumer는 리밸런싱 대상이 됨. |
브로커 오류가 발생하면 카프카 스트림즈는 영향을 받는다. 이 때, 카프카 스트림즈에 영향을 최소화 하기 위해서 다음 설정값을 이용할 수 있다. 이 값은 카프카 스트림즈가 사용하는 카프카 프로듀서, 컨슈머에 대한 설정값이다.
역직렬화 오류 처리
카프카는 메세지를 보낼 때 직렬화 및 역직렬화를 이용한다. 스키마 레지스트리 등을 이용할 경우에는 기존에 정해둔 역직렬화 클래스를 이용할 때 에러가 발생하진 않지만, 그렇지 않을 경우에는 에러가 발생할 수도 있다. 카프카 스트림즈는 역직렬화 오류가 발생했을 때, 이것을 처리하는 방법을 지정하기 위해 Deserialization Exception Handler 클래스를 제공하고, 설정값에서 이것을 사용할 수 있다.
Properties props = new Properties();
// 역직렬화 실패 시, 카프카 스트림즈 종료
props.setProperty(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndFailExceptionHandler.class.getName());
// 역직렬화 실패해도 카프카 스트림즈 계속 진행
props.setProperty(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class.getName());
위 설정값을 추가해서 역직렬화 실패 시, 카프카 스트림즈를 종료할지 혹은 카프카 스트림즈를 계속 진행할지를 결정할 수 있다. 만약 필요하다면, DeserializationExceptionHandler 인터페이스를 구현한 클래스를 만들어서 사용할 수도 있다.
어플리케이션 스케일업
카프카 스트림즈는 Source 토픽의 파티션마다 StreamTask를 생성한다. 생성되는 StreamTask의 숫자 기준은 Soruce Topic들 중에 가장 많은 파티션을 가진 녀석이 된다. 예를 들어 아래와 같은 상황에서는 12개의 StreamTask가 생성된다.
- 토픽 + 파티션 갯수
- TopicA → 파티션 2개
- TopicB → 파티션 2개
- TopicC → 파티션 6개
- 스트림 테스크
- 0-0 : A,B,C의 파티션 0
- 0-1 : A,B,C의 파티션 1
- 0-2 : A,B,C의 파티션 2
- 0-3 : C의 파티션 3
- 0-4 : C의 파티션 4
- 0-5 : C의 파티션 5
생성된 StreamTask는 스트림 쓰레드가 사이좋게 나누어 가진다. 위의 상황을 두 가지로 나누어서 생각해보자.
1개의 카프카 스트림즈 인스턴스
- 스트림 쓰레드가 1개 있는 경우, 이 녀석이 총 6개의 스트림 테스크를 처리한다.
- 스트림 쓰레드가 6개 있는 경우, 하나의 스트림 쓰레드당 하나의 스트림 테크스를 처리한다.
1개의 카프카 스트림즈 인스턴스만을 사용하는 경우, 해당 서버가 중단될 경우 모든 카프카 스트림즈 인스턴스가 죽는다는 단점이 있다. 따라서 고가용성에 있어서 큰 문제가 발생한다.
3개의 카프카 스트림즈 인스턴스
3개의 카프카 스트림즈 인스턴스를 생성하고, 카프카 스트림즈 클러스터를 구성한 경우를 가정해보자. 클러스터가 구성되면 카프카 스트림즈 인스턴스는 Broker의 Group Coordinator를 이용해서 구독하고 있는 토픽 파티션이 나누어지고, 각각 리밸런싱된 상태가 된다. 가장 먼저 아래 상황을 고려해보자.
- 각 카프카 스트림즈 인스턴스마다 스트림 쓰레드가 1개 있는 경우, 각각의 스트림 쓰레드는 각각 2개의 스트림 테스크를 처리한다.
위 상황에서 하나의 카프카 스트림즈 인스턴스가 죽는 경우를 가정해보자. 그러면 전체 카프카 스트림즈 인스턴스는 2개가 되고, 이것을 Broker가 알아채게 된다. 그리고 카프카 스트림즈 인스턴스는 리밸런싱이 발생한다. 이 경우, 각각의 카프카 스트림즈 인스턴스는 3개의 스트림 테스크를 맡게 된다.
이후에 새로운 카프카 스트림즈 인스턴스가 카프카 스트림즈 클러스터에 들어오는 경우 다시 리밸런싱이 발생한다. 카프카 스트림즈 인스턴스가 총 3개가 되면서, 다시 각각의 카프카 스트림즈는 2개의 스트림 테스크를 맡게 된다.
테스크 수를 초과하는 스트림 쓰레드
스트림 쓰레드 갯수를 과하게 설정하면 스트림 테스크보다 더 많은 스트림 쓰레드가 생성될 수 있다. 각 스트림 테스크는 하나의 스트림 쓰레드에만 배정되기 때문에 스트림 테스크를 배정하지 못한 스트림 쓰레드가 생길 수 있다. 이 경우 스트림 쓰레드는 개점휴업 쓰레드가 되어 아무런 일도 하지 않게 된다.
내부 토픽 설정
카프카 스트림즈에서 리파티셔닝이 발생하거나, StateStore를 사용하는 경우 이를 위한 Internal 토픽으로 ChangeLog 토픽이 생성된다. ChangeLog 토픽은 기본적으로 clean-up 정책이 compact이지만, 만약 유일한 키의 종류가 많을 경우 토픽의 크기가 증가할 수 있다. 이런 이유 때문에 내부 토픽마다 다양하게 관리할 수 있다. 내부 토픽에 대한 ChangeLog를 관리하는 방법은 두 가지가 존재한다.
StateStore를 생성하며 설정하기
HashMap<String, String> stateStoreProps = new HashMap<>();
keyValueStoreStoreBuilder.withLoggingEnabled(stateStoreProps);
StateStore를 생성하며 설정하는 경우 withLoggingEnabled를 이용해서 처리할 수 있다. 이 때, HashMap에 설정값을 넣어서 KeyValueStore를 생성하면 된다.
전체적인 설정하기
Properties props = new Properties();
props.setProperty(StreamsConfig.topicPrefix("retention.bytes"), 1024 * 1024);
props.setProperty(StreamsConfig.topicPrefix("retenetion.ms"), 3600000);
위의 명령을 이용해서 설정도 할 수 있다. 이렇게 설정을 할 경우, 제공된 설정이 모든 내부 토픽에 전역적으로 적용된다.
설정 적용 순서
StateStore를 생성하며 설정한 값이 가장 우선적으로 적용된다. 그리고 전체적인 설정이 적용된다.
로컬 상태 클린업
카프카 스트림즈는 Local Storage에 RocksDB를 이용해서 데이터를 저장한다. 만약 카프카 스트림즈를 시작할 때 이 부분을 깔끔하게 시작하고 싶다면, kafkaStreams.cleanUp() 메서드를 이용해서 처리할 수 있다. claneUp() 메서드는 카프카 스트림즈가 동작하지 않을 때 사용할 수 있다. 즉, Start() 전 / Stop() 후에 호출할 수 있다.
EOS (Exactly Once Semantic)
프로듀서 관점
카프카 프로듀서는 아래와 같은 데이터 전송 Semantic을 제공한다. 카프카 스트림즈는 카프카 프로듀서를 이용하기 때문에 다음 Semantic을 이용할 수 있다.
- at-most-once : 최대 한번만 전송. 데이터 누락될 수 있음. → MAX_RETRIES = 0 or acks = 0
- at-least-once : 적어도 한번 이상 전송. 중복으로 데이터가 전송될 수 있음. (retyr, acks = 1,all)
- exactly-once : 정확히 한번 전송 (idempotence, transaction)
프로듀서에서 트랜잭션을 이용해서 EOS를 구현하기 위해서는 두 가지 작업을 하면 된다.
- 설정값 추가하기
- 트랜잭션 시작하고 닫기
먼저 아래와 같은 설정값을 추가한다.
props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional-id");
트랜잭션 코드는 아래와 같이 작성할 수 있다.
- 트랜잭션을 초기화한다.
- 트랜잭션을 시작하고 데이터를 전송한다.
- 끝났으면 트랜잭션을 커믹한다.
kafkaProducer.initTransactions();
try {
kafkaProducer.beginTransaction();
kafkaProducer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
kafkaProducer.close();
} catch (KafkaException e) {
kafkaProducer.abortTransaction();
}
컨슈머 관점
트랜잭션과 함께 컨슈머를 이용하기 위해서는 isolation.level을 변경하면 된다. 기본적으로 Kafka Consumer의 Isolation Level은 read_uncommitted다. READ UNCOMIT은 모든 메세지를 반환한다. 이 Isolation Level을 READ_COMITTED로 바꿔주게 되면 컨슈머는 커밋된 트랜잭션 메세지만 성공적으로 읽는다.
트랜잭션을 이용하지 않을 경우, 컨슈머는 READ_UNCOMMIT / READ_COMMIT을 둘다 읽을 수 있게 된다.
카프카 스트림즈에서 사용
props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
카프카 스트림즈에서 사용할 때는 다음 옵션을 설정해주기만 하면 된다.
Note that exactly-once processing requires a cluster of at least three brokers by default what is the recommended setting for production; for development you can change this, by adjusting broker setting <code>transaction.state.log.replication.factor</code> and <code>transaction.state.log.min.isr</code>
그런데 EOS를 사용하기 위해서는 다음과 같은 주의 사항이 있다. EOS를 사용하기 위해서는 적어도 3개 이상의 브로커가 있는 클러스터가 필요하고, 이 설정은 프로덕션 레벨에서 사용될 녀석이라는 것이다.
알아볼 부분
- 리밸런싱하면, 스트림 테스크는 새로 생성하게 될까?
'Kafka eco-system > KafkaStreams' 카테고리의 다른 글
Kafka Streams : pollPhase() (0) | 2022.12.01 |
---|---|
Kafka Streams : Repartition (0) | 2022.11.22 |
Kafka Streams : 카프카 스트림즈 어플리케이션 테스트 (0) | 2022.11.21 |
Kafka Streams : Kafka Streams와 Spring 함께 사용하기 (0) | 2022.11.20 |
Kafka Streams : 카프카 스트림즈 고급 어플리케이션 (0) | 2022.11.20 |