Kafka Streams : Repartition

    들어가기 전

    Kafka Streams에는 repartition() 메서드가 존재한다. 이 메서드가 어떤 역할을 하는지 확인하고자 한다.

     

    Kafka Streams의 Repartition의 의미

    KStream<K, V> repartition();
    KStream<K, V> repartition(final Repartitioned<K, V> repartitioned);

    카프카 스트림즈에는 repartition() 메서드가 존재하고, 이 메서드는 2가지 형태로 사용할 수 있다. 하나는 매개변수를 전달하지 않은 것이고, 하나는 매개변수를 전달해서 하는 것이다. 결론부터 정리하면 다음과 같다. 

    • repartition() : 이전 스트림과 동일한 형태로 리파티션한다. ChangeLog 토픽을 생성하고 새롭게 리파티셔닝 함.
      • 이 때, 동일한 파티션 갯수, 파티셔닝 전략을 사용하기 때문에 같은 키가 새로운 파티션으로 배정되지는 않음.
    • repartition(Repartitioned) :  파티션 갯수, 파티션 전략을 이용해 새롭게 리파티셔닝 할 수 있다. ChangeLog 토픽 생성

    repartition() 메서드가 호출되기 전에는 Stream이 있을 것이다. 이 Stream은 Stream Thread Consumer에 의해서 데이터를 읽어와서, Depth First로 처리되고 있을 것이다. 이 때, 처음 Source에서 읽어온 Partition이 일반적으로 그대로 유지되고 있을 것이다. 이 때 repartition()을 호출할 경우 카프카 스트림즈와 브로커에서는 두 가지 일이 발생할 것이다. 

     

    카프카 스트림즈 관점

    리파티셔닝 되었다는 것은 새로운 ChangeLog 토픽이 생성된다는 것을 의미한다. ChangeLog 토픽에게 메세지를 보내야하기 때문에 카프카 스트림즈 토폴로지는 내부적으로 한번 끊어지게 된다. 정확한 내용은 ChangeLog 토픽을 Sink 노드로 해서 데이터를 한번 보낸다. 그리고 이 이후에 오는 스트림은 ChangeLog 토픽을 Source 노드를 이용해서 데이터를 불러오게 된다. 

     

    브로커 관점

    브로커에는 새로운 ChangeLog 토픽이 생성된다. repartition()을 하게 될 경우, 현재 스트림에 전달된 레코드의 파티션을 참고해서, 그 파티션의 갯수만큼 ChangeLog 토픽의 파티션을 설정하고 생성한다. repartition()에 repartitioned를 전달하는 경우, 파티셔닝 전략을 바꿀 수도 있고 파티션의 갯수를 그만큼 생성할 수도 있다. 

    예를 들어 파티션이 5개인 경우, 파티션을 20개로 늘리는 것을 상상해보자. 카프카 스트림즈는 브로커에 파티션 20개짜리 ChangeLog 토픽을 생성한다. 그리고 repartitioned에 전달된 파티셔너를 이용해서 레코드를 파티셔닝해서 해당 브로커에게 메세지를 보내주게 된다. 

     

    테스트 코드

    Producer 테스트코드

    데이터를 Broker로 보내준다.

    @Slf4j
    public class MyBroker {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
    
    
            Properties props = new Properties();
            props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
    
            for (int i = 1000 ; i < 1100; i++) {
                ProducerRecord<String, String> producerRecord = new ProducerRecord<>("my-topic", String.valueOf(i), String.valueOf(i));
                Future<RecordMetadata> send = kafkaProducer.send(producerRecord);
                RecordMetadata recordMetadata = send.get(1000, TimeUnit.SECONDS);
                log.info("topic = {}, partition = {}, key = {}", recordMetadata.topic(), recordMetadata.partition(), producerRecord.key());
            }
        }
    }

     

    Consumer 테스트코드

    리파티셔닝 된 녀석의 값들을 확인한다.

    @Slf4j
    public class MyConsumer {
    
        public static void main(String[] args) {
    
    
            Properties props = new Properties();
            props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
            props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
            kafkaConsumer.subscribe(List.of("repartition-topic1", "repartition-topic2"));
    
            while (true) {
                ConsumerRecords<String, String> poll = kafkaConsumer.poll(Duration.ofSeconds(10));
                for (ConsumerRecord<String, String> record : poll) {
                    log.info("topic = {}, partition = {}, key = {}", record.topic(), record.partition(), record.key());
                }
            }
        }
    }

     

    카프카 스트림즈 코드

    • repartition()을 하는 녀석이 repartition1이고, 이 녀석은 repartition-topic1에 데이터가 제공된다.
    • repartition(Repartitioned)을 하는 녀석이 repartition2이고, 이 녀석은 repartition-topic2에 데이터가 제공된다.
    public class playground1 {
    
        public static void main(String[] args) {
    
    
            Serde<String> stringSerde = Serdes.String();
    
            Properties props = new Properties();
            props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "application-1");
            props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,stringSerde.getClass().getName());
            props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,stringSerde.getClass().getName());
    
            StreamsBuilder streamsBuilder = new StreamsBuilder();
            KStream<String, String> stream = streamsBuilder.stream("my-topic",
                    Consumed.with(stringSerde, stringSerde).withOffsetResetPolicy(EARLIEST));
    
            KStream<String, String> repartition1 = stream.repartition();
            KStream<String, String> repartition2 = stream.repartition(Repartitioned.with(stringSerde, stringSerde).withNumberOfPartitions(20));
    
            repartition1.to("repartition-topic1",Produced.with(stringSerde, stringSerde));
            repartition2.to("repartition-topic2",Produced.with(stringSerde, stringSerde));
    
    
            KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
            kafkaStreams.start();
        }
    
    
    
    
    }

     

    테스트 결과

    파티션의 갯수를 5개로 유지했을 때, 각 Key에 따른 파티션은 다음과 같다. 결론은 Kafka Producer / Streams ChangeLog / StreamsSink 모두 같은 파티션 갯수를 유지하고, 파티셔닝 전략이 동일한 경우에는 똑같은 파티션을 유지한다. 

    key Producer
    (파티션 5)
    Streams Repartition
    (Change Log / 파티션 5)
    Streams Sink
    (파티션 5)
    1101 0 0 0
    1102 0 0 0
    1103 0 0 0
    1115 0 0 0
    1118 0 0 0
    1120 0 0 0
    1144 2 2 2
    1146 2 2 2

    파티션의 갯수를 5개 → 20개로 나눈 경우, 각 Key에 따른 파티션은 다음과 같다. 바뀌는 것을 확인할 수 있다. 

    key Producer
    (파티션 5)
    Streams Sink
    (파티션 20)
    1101 0 5
    1102 0 5
    1103 0 15
    1115 0 5
    1118 0 0
    1120 0 10
    1144 2 17
    1146 2 7

     

    댓글

    Designed by JB FACTORY