Kafka : 프로듀서의 전송 방법 (idempotence, at least once, at most once, exactly once)

    들어가기 전

    이 글은 실전 카프카 개발부터 운영까지 5장을 공부하며 작성한 글입니다.

     

    Producer - Broker 통신

    Producer는 Broker에게 메세지를 보낸다. Broker는 메세지를 받고, 그것을 잘 기록한 경우에 Producer에게 ACK를 내려준다. Producer는 ACK 신호를 받을 수 있고, 받은 ACK 신호를 참조해서 이후에 다음 일을 할 수 있다. 예를 들어 ACK 신호와 상관없이 다음 메세지 배치를 보낼 수도 있고, ACK가 잘못된 경우 다시 재전송을 해볼 수도 있다. 

     

     

    프로듀서의 전송 방법

    프로듀서의 전송 Semantic은 다음 4가지가 존재한다. 그리고 아래에서 하나하나 어떻게 동작하는지를 살펴보고자 한다. 

    • at least once : 적어도 한번 전송. 중복 메세지 발생 가능
    • at most once : 최대 한번 전송. 메세지 누락 가능
    • exaclty once : 정확히 한번 전송. 성능 저하 있음.
    • idempotence : 중복 없이 전송. 성능 저하 있음. 

     

    at least once : 적어도 한번 전송

    선요약부터 하면 다음과 같다. 

    • 프로듀서는 ACK를 받지 못하면 재전송하는 방식으로 적어도 한번 전송을 구현함.
    • 중복 전송이 있을 수 있음.
    • ACK = 1, all을 이용해서 구현할 수 있음. 
    • delivery.timeout.ms를 초과하면 적어도 한번 전송은 지켜지지 않음.

     

    at least once는 적어도 한번 전송을 보장하기 위해 노력하는 전송 방식이다. 기본적으로는 적어도 한번 전송하는 것으로 이해를 해도 되지만, 이것은 전적으로 설정에 달려있다. at least once는 ACK 신호가 잘못되었을 경우 재전송을 하면서 '적어도 한번 발송'을 수행하려고 한다. 그렇지만 Producer는 한번 메세지 배치를 전송할 때 사용할 수 있는 전체 시간과 재전송 횟수가 정해져있기 때문에 이 범위 내에서 수행할 것이다. 따라서 적어도 한번 전송을 최대한 만족하려고 할 것이다. 

    at least once는 기본적으로 다음과 같은 형태로 적어도 한번 전송을 보장하려고 노력한다.

    • 프로듀서는 브로커에게 정상적으로 ACK를 받을 때까지 메세지를 재전송한다.

    그런데 브로커가 메세지를 가지고 있어서 ACK를 보냈는데, 그 ACK가 유실된 경우가 발생할 수 있다. 이 경우에 프로듀서는 ACK를 받지 못했기 때문에 다시 한번 메세지를 전송하고 브로커는 중복된 메세지를 갖게 된다. 

    아래는 재전송을 할 때의 로직이다. Sender 쓰레드는 canRetry()를 통해서 재전송할 수 있는 경우 다시 메세지 배치를 보내려고 한다. 이 때, canRetry()에서는 delivery.timeout.ms를 초과했는지를 살펴보는데, 초과한 경우 else문으로 들어가게 되어 failBatch()를 호출한다. 즉, at least once는 지켜지지 않을 수도 있다는 것을 의미한다. 

    // Sender.java
    
    if (canRetry(batch, response, now)) {
        // delivery.timeout.ms 이내인 경우 재시도함. 
        reenqueueBatch(batch, now);
    } else if (error == Errors.DUPLICATE_SEQUENCE_NUMBER) {
        // 이 경우는 배치 완료
        completeBatch(batch, response);
    } else {
        // delivery.timeout.ms 초과한 경우에 메세지 배치는 실패함. 
        failBatch(batch, response, batch.attempts() < this.retries);
    }
    ...

     

     

    at most once (최대 한번 전송) 

    선요약부터 하면 다음과 같다. 

    • 프로듀서는 ACK를 기다리지 않고 다음 메세지 배치를 보낸다.
    • 메세지 전송이 누락될 수 있음. 
    • ACK = 0으로 구현할 수 있음. 
    • 일부 메세지 누락이 있어도 되는 경우 사용 → IOT 로그

    at most once는 최대 한번 전송을 보장하는 semantic이다. 이 모드를 선택할 경우, Producer는 딱 한번 메세지를 전송한다. 그리고 브로커가 어떤 ACK를 보내든지 무시하고 다음 메세지 배치를 보낸다. 이것은 브로커가 메세지를 잘 받지 못했었도, Producer는 다음 메세지 배치를 보낸다는 것을 의미한다. 

     

    중복 없는 전송 (idempotence)

    선요약을 하면 다음과 같다.

    • 프로듀서는 메세지를 보낼 때, PID(Producer ID) + 메세지 Seq.를 헤더에 포함해서 전송함.
    • 프로듀서는 브로커의 ACK를 기다림.     
    • 브로커는 PID(Producer ID), 메세지 Seq.를 메모리에 기록하고 ACK함. PID, 메세지 Seq.는 replication 로그에도 저장됨.
      • replication 로그에 저장되기 때문에 브로커의 장애로 리더가 변경되는 일이 발생해도 새로운 리더가 PID와 시퀀스 번호를 정확히 알 수 있으므로 중복없는 전송이 가능함.
    • 브로커는 메세지를 받았을 때, 메세지 헤더의 Seq = 브로커가 가지고 있는 Seq + 1인 경우에만 메세지를 저장함.
    • PID(Producer ID)는 브로커가 자동으로 생성해 줌. 개발자는 이것을 사용할 수 없음.

     

    Producer가 idempotence를 사용하겠다는 설정을 하고, 프로듀서가 브로커에게 요청을 하면 브로커는 프로듀서를 위한 PID(Producer ID)를 발급해준다. 이제부터 프로듀서는 브로커에게 메세지를 보낼 때 마다, 메세지의 헤더에 본인의 PID와 Message Seq.를 함께 넣어서 보내준다. Message Seq.는 0부터 시작하고, 보낼 때마다 1씩 올라간다. 

    한편 브로커는 메세지를 받으면 프로듀서가 보낸 메세지를 열어서 PID와 Message Seq.를 확인한다. 브로커는 이 두 값을 메모리와 특정 토픽에 저장해두고 있는데, 프로듀서가 보낸 Message Seq = 브로커가 가진 Message Seq + 1 일 때만 메세지를 로그파일로 저장한다. 그리고 브로커는 Message Seq 값을 갱신하고 ACK를 보내준다. 

     

    1. 프로듀서는 PID 0 / Seq 0을 브로커에게 보낸다.
    2. 브로커는 값을 저장하고 ACK를 보낸다.
    3. 프로듀서는 PID 0 / Seq 1을 브로커에게 보낸다.
    4. 브로커는 자신이 가지고 있는 Seq 0보다 1이 큰 값이 왔기 때문에 정상적으로 메세지가 온 것을 확인하고 ACK를 보내준다. 이 때, ACK는 유실된다.
    5.  프로듀서는 ACK를 받지 못했기 때문에 PID 0 / Seq 1을 브로커에게 다시 재전송한다. 

     

    만약 브로커가 가지고 있는 Message Seq.보다 프로듀서가 보낸 Message Seq.가 1보다 더 클 경우에는 데이터가 유실된 상태다. 따라서 Broker는 OutOfOrederSequenceException을 받게 될 것이고, Producer는 commitTransaction() 메서드 등에서 IllegalStateException을 호출하게 될 것이다. 또한, seq 번호가 같은 경우에는 중복 전송이므로 충분히 무시할 수 있기 때문에 프로듀서는 계속 메세지를 전송한다. 

    The OutOfOrderSequence Exception

    The Producer will raise an OutOfOrderSequenceException if the broker detects data loss. In other words, if it receives a sequence number which is greater than the sequence it expected. This exception will be returned in the Future and passed to the Callback, if any. This is a fatal exception, and future invocations of Producer methods like send, beginTransaction, commitTransaction, etc. will raise an IlegalStateException.

    Messages with a lower sequence number result in a duplicate error, which can be ignored by the producer. Messages with a higher number result in an out-of-sequence error, which indicates that some messages have been lost, and is fatal.

     

     

    중복없는 전송 설정

    프로듀서 옵션 설명
    enable.idempotence true 프로듀서가 중복 없는 전송을 허용할지 결정하는 옵션.
    기본 값은 false. true로 하는 경우, 아래의 옵션도 적절하게 설정해줘야 함.
    max.in.flight.request.per.connection 1~5 ACK를 받지 않은 상태에서 하나의 커넥션에서 보낼 수 있는 최대 요청 수.
    기본값은 5. 1~5 범위로 설정
    acks all 프로듀서 ACK와 관련된 설정. all로 설정해야함. 
    retries 5 프로듀서 재전송과 관련된 설정. 0보다 큰 값으로 설정해야함. 

    중복없는 전송은 위 표대로 설정을 해야 사용할 수 있다. ACK를 받지 않은 상태에서 보낼 수 있는 최대 요청 수를 5개까지만 사용이 가능한데, 이것은 OutOfOrderException이 발생했을 때 다시 메세지를 보내줘야하기 때문이다. 너무 많은 Batch를 가지고 있을 경우에는 메모리 상으로 문제가 될 수 있기 때문이 아닐까 싶다.

     

     

    Idempotence 전송 실습

    멱등성 전송을 하기 위해서는 아래와 설정을 먼저 작성해야한다. Producer에 하나씩 넣어줄 수도 있겠지만, 이 경우에는 커맨드 라인이 길어진다. 따라서 아래와 같은 파일을 하나 작성하도록 한다. 

    # producer.config
    enable.idempotence=true
    max.in.flight.requests.per.connection=5
    retries=5
    acks=all

    파일을 작성한 후, 아래 명령어를 이용해서 Producer를 시작하고 메세지를 보낸다. 메시지를 여러개 보내면 된다. 

    $ kafka-console-producer.sh --bootstrap-server localhost:9092 \
    --topic topic-4 \
    --producer.config ~/producer.config
    
    메세지 발송
    >> a
    >> b
    >> c
    >> d

    이후에 스냅샷 파일이 생성되는지를 확인한다. 스냅샷 파일은 브로커가 PID, SEQ 번호를 주기적으로 저장하는 파일이다. 즉, 브로커가 프로듀서의 상태를 저장하기 위해서 주기적으로 생성하는 파일이다. 만약 이 스냅샷 파일이 각 토픽 파티션에 들어있지 않다면, 리더 브로커를 강제로 종료해주면 스냅샷 파일이 생성된다. 

    리더 브로커를 강제로 종료해주면, 해당 리더 브로커에 다음과 같이 스냅샷이 생성된다. 스냅샷을 아래 명령어로 읽어주면 프로듀서 ID(PID)와 first / last Sequence 번호가 존재하는 것을 알 수 있다. 이 때 메세지를 처음 4번 SEQ 부터 받기 시작해서, 6번 SEQ까지 받고 저장했다는 것을 의미한다. 이 스냅샷 파일은 삭제하면 브로커가 복구할 때 오랜 시간이 걸린다고 한다.  (https://stackoverflow.com/questions/51443656/what-happens-if-you-delete-kafka-snapshot-files)

    $ kafka-dump-log.sh --files 00000000000000000007.snapshot --print-data-log
    
    >>> 실행 결과
    Dumping 00000000000000000007.snapshot
    producerId: 1002 
    producerEpoch: 0 
    coordinatorEpoch: -1 
    currentTxnFirstOffset: None 
    lastTimestamp: 1669689243854 
    firstSequence: 4 
    lastSequence: 6 
    lastOffset: 6 
    offsetDelta: 2 
    timestamp: 1669689243854

    사실 snapshot이 아니더라도 PID , SEQ 번호가 메세지로 넘어오는 것을 볼 수 있다. 그냥 기본적인 로그 파일들을 dump-log로 읽어도 sequence 번호가 어떻게 되는지를 확인할 수 있다. 

    $ kafka-dump-log.sh --files 00000000000000000000.log --print-data-log
    
    >>>>
    
    Dumping 00000000000000000000.log
    Log starting offset: 0
    baseOffset: 0 lastOffset: 3 count: 4 baseSequence: 0 lastSequence: 3 producerId: 1002 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1669689242590 size: 110 magic: 2 compresscodec: none crc: 3743604431 isvalid: true
    | offset: 0 CreateTime: 1669689241617 keySize: -1 valueSize: 12 sequence: 0 headerKeys: [] payload: exactly once
    | offset: 1 CreateTime: 1669689241998 keySize: -1 valueSize: 2 sequence: 1 headerKeys: [] payload: e1
    | offset: 2 CreateTime: 1669689242326 keySize: -1 valueSize: 2 sequence: 2 headerKeys: [] payload: e2
    | offset: 3 CreateTime: 1669689242590 keySize: -1 valueSize: 2 sequence: 3 headerKeys: [] payload: e3
    baseOffset: 4 lastOffset: 6 count: 3 baseSequence: 4 lastSequence: 6 producerId: 1002 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 110 CreateTime: 1669689243854 size: 90 magic: 2 compresscodec: none crc: 3174953030 isvalid: true
    | offset: 4 CreateTime: 1669689242926 keySize: -1 valueSize: 2 sequence: 4 headerKeys: [] payload: e4
    | offset: 5 CreateTime: 1669689243558 keySize: -1 valueSize: 2 sequence: 5 headerKeys: [] payload: e5
    | offset: 6 CreateTime: 1669689243854 keySize: -1 valueSize: 2 sequence: 6 headerKeys: [] payload: e6

     

     

     

    정확히 한번 전송(Exactly Once Semantic)

    많은 사람들은 이상적인 상황일 때 '정확히 한번만 처리 해주기'를 바란다. 카프카에서는 이러한 기능을 지원한다. 그런데 구분해서 알아두어야 할 점은 Idempotence 전송은 '정확히 한번 전송'의 일부 기능으로 이해를 할 수 있다. '정확히 한번 전송'은 트랜잭션 처리와 같이 전체적인 프로세스 처리를 의미한다.

     

    5.4.1 디자인

    프로듀서가 카프카로 '정확히 한번 전송' 방식으로 메세지를 보낸다면, 프로듀서가 보내는 메세지들은 원자적으로 처리되어 전송에 성공하거나 실패하게 된다. 이것은 브로커에 있는 '트랜잭션 코디네이터'라는 것이 도와준다. 트랜잭션 코디네이터는 다음 역할을 한다.

    1. Transaction ID, Producer ID를 맵핑해서 보관함.
    2. 프로듀서에 의해 전송된 메세지를 관리하며, 메세지의 상태를 표시한다. 
    3. __transaction_state에 트랜잭션 로그를 저장한다.

    트랜잭션 관리를 위해서 브로커는 컨슈머를 __consumer_offset 토픽에 관리하는 것처럼 트랜잭션 로그를 __transaction_state에 저장한다. __transaction_state 토픽은 기본값으로 50개의 파티션, 3개의 replication factor로 생성된다. 

     

    트랜잭션에 의해서 메세지가 공급되고 카프카 서버에 저장된다면, 카프카 데이터를 다루는 녀석들은 이 녀석이 트랜잭션이 완료된 녀석인지를 알아야만 한다. 카프카는 이것을 식별하기 위한 정보로 '컨트롤 메세지'라고 불리는 특별한 타입의 메세지를 토픽파티션의 세그먼트 파일에 기록해둔다. 

    | offset: 352 CreateTime: 1669771397134 keySize: -1 valueSize: 13 sequence: 346 headerKeys: [] payload: q = 6, i = 46
    | offset: 353 CreateTime: 1669771397134 keySize: -1 valueSize: 13 sequence: 347 headerKeys: [] payload: q = 6, i = 47
    | offset: 354 CreateTime: 1669771397134 keySize: -1 valueSize: 13 sequence: 348 headerKeys: [] payload: q = 6, i = 48
    | offset: 355 CreateTime: 1669771397134 keySize: -1 valueSize: 13 sequence: 349 headerKeys: [] payload: q = 6, i = 49
    // 컨트롤 메세지
    baseOffset: 356 lastOffset: 356 count: 1 baseSequence: -1 lastSequence: -1 producerId: 1003 producerEpoch: 1 partitionLeaderEpoch: 0 isTransactional: true isControl: true deleteHorizonMs: OptionalLong.empty position: 7825 CreateTime: 1669771397624 size: 78 magic: 2 compresscodec: none crc: 2994166254 isvalid: true
    | offset: 356 CreateTime: 1669771397624 keySize: 4 valueSize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 2
    baseOffset: 357 lastOffset: 406 count: 50 baseSequence: 350 lastSequence: 399 producerId: 1003 producerEpoch: 1 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 7903 CreateTime: 1669771397164 size: 1051 magic: 2 compresscodec: none crc: 1872591715 isvalid: true
    //
    | offset: 357 CreateTime: 1669771397161 keySize: -1 valueSize: 12 sequence: 350 headerKeys: [] payload: q = 7, i = 0
    | offset: 358 CreateTime: 1669771397162 keySize: -1 valueSize: 12 sequence: 351 headerKeys: [] payload: q = 7, i = 1
    | offset: 359 CreateTime: 1669771397162 keySize: -1 valueSize: 12 sequence: 352 headerKeys: [] payload: q = 7, i = 2
    | offset: 360 CreateTime: 1669771397162 keySize: -1 valueSize: 12 sequence: 353 headerKeys: [] payload: q = 7, i = 3

    컨트롤 메세지는 특이하게 Payload에 Value를 포함하지 않는다. 그리고 컨트롤 메세지는 어플리케이션에게 노출되지 않기 때문에 클라이언트 측에서는 이 메세지를 고려하지 않고 로직을 작성하면 된다. 컨트롤 메세지는 오로지 브로커 ↔ 클라이언트의 통신에서만 내부적으로 사용된다. 

     

    5.4.2 프로듀서 예제 코드

    아래 예제 코드를 작성했다. 중요하게 봐야할 부분은 다음 두 가지다.

    1. Idempotence를 활성화했다.
    2. Transactional_id_config에 값을 설정했다. 
    3. 트랜잭션을 초기화 / 시작 / 커밋하는 코드가 들어간다.

    이 두 가지 작업을 필수적으로 해야한다. 이 때, Transactional_ID는 각 프로듀서마다 고유한 값을 가져야 한다. 따라서 이 부분은 미리 잘 설정해두고 작성해야한다. 

     

    public class Main {
        public static void main(String[] args) {
    
    
            Properties props = new Properties();
    
            props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
            props.setProperty(ProducerConfig.ACKS_CONFIG, "all");
            props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
            props.setProperty(ProducerConfig.RETRIES_CONFIG, "5");
            props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "TEST-11");
    
    
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
            kafkaProducer.initTransactions();
    
            for (int q = 0; q < 10; q++) {
                kafkaProducer.beginTransaction();
    
                try {
                    for (int i = 0; i < 50; i++) {
                        String value = String.format("q = %d, i = %d", q, i);
                        ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<>("my-test100", null, value);
                        kafkaProducer.send(stringStringProducerRecord);
                        System.out.println("stringStringProducerRecord = " + stringStringProducerRecord.value());
                    }
                } catch (Exception e) {
                    System.out.println("e = " + e);
                }finally {
                    kafkaProducer.commitTransaction();
                }
            }
        }
    }

     

     

    5.4.3 단계별 동작

     

    STEP1. FindCoordinatorRequest

    프로듀서가 가장 먼저 수행하는 작업은 Transaction Coordinator를 찾는 일이다. 프로듀서는 브로커에게 FindCoordinatorRequest를 보내서 트랜잭션 코디네이터의 위치를 찾는다. 트랜잭션 코디네이터는 브로커들 중에 하나가 맡게 된다. 트랜잭션 코디네이터의 주 역할 중 하나는 transactional.id와 Producer Id를 맵핑하고, 이 트랜잭션 ID에 대한 전체 트랜잭션을 관리하는 것이다. 만약 트랜잭션 코디네이터가 존재하지 않는다면, 트랜잭션 코디네이터가 하나 생성된다. 

    __transaction_state 토픽의 파티션은 기본적으로 총 50개가 생성되고, replication factor는 3으로 설정된다. 토픽 파티션 번호는 transaction.id를 기반으로 해시되어서 결정되고, 이 토픽 파티션의 리더 파티션이 있는 브로커가 이 transaction.id의 트랜잭션 코디네이터의 브로커로 선정된다. 즉, 하나의 transaction.id에 대응되는 트랜잭션 코디네이터는 하나만 존재한다. 

     

    STEP2. InitTransaction()

    프로듀서는 initTransaction() 메서드를 이용해서 트랜잭션 전송을 위한 InitPidRequest를 트랜잭션 코디네이터에게 보낸다. 이 때, TID가 설정된 경우에 InitPidRequest와 함께 TID가 트랜잭션 코디네이터로 전송된다. 트랜잭션 코디네이터는 TID, PID를 매핑하고 해당 정보를 트랜잭션 로그에 기록한다. 그런 다음 PID epoch를 한 단계 올리는 동작을 하고, PID epoch가 올라가면서 이전 PID epoch로 오는 쓰기 요청은 무시된다. 

     

     

    STEP3. beginTransaction()

    프로듀서는 beginTransaction() 메서드를 이용해서 새로운 트랜잭션의 시작을 알린다. 프로듀서는 내부적으로 트랜잭션이 시작된 것을 알리지만 브로커 관점에서는 트랜잭션이 시작되지 않았다. 브로커 관점에서는 첫 번째 레코드가 전송될 때 트랜잭션이 시작된다. 

     

    STEP4. TID / 파티션 정보 넘겨주기

    프로듀서는 토픽 파티션 정보를 트랜잭션 코디네이터에게 전달하고, 트랜잭션 코디네이터는 이 정보를 트랜잭션 로그에 기록한다. 트랜잭션 코디네이터는 TID와 파티션의 정보가 함께 기록되고, 트랜잭션의 현재 상태를 'OnGoing'으로 표시한다. 만약 트랜잭션 로그에 추가되는 첫번째 파티션이라면 트랜잭션 코디네이터는 해당 트랜잭션에 대한 타이머를 시작한다. 기본값으로 1분 동안 트랜잭션 상태에 대한 업데이트가 없다면, 이 트랜잭션은 실패로 처리된다.

     

     

    STEP5. 메세지 전송

    프로듀서는 브로커로 메세지를 전송한다. 

     

     

    STEP6. 트랜잭션 종료 요청

    메세지 전송을 완료한 프로듀서는 commitTransaction() / abortTransaction() 메서드 중 하나를 반드시 호출해야한다. 그리고 이 메서드를 호출하게 되면서 트랜잭션이 완료됨을 트랜잭션 코디네이터에게 알리게 된다. 이 메세지를 받은 트랜잭션 코디네이터는 두 단계의 커밋 과정을 시작한다. 

    1. 트랜잭션 코디네이터는 트랜잭션 로그에 해당 트랜잭션에 대한 상태를 PrepareCommit 또는 PrepareAbort로 기록한다.
    2. 트랜잭션 코디네이터는 사용자 토픽에 커밋 표시를 '컨트롤 메세지'를 이용해서 기록한다.
    3. 트랜잭션 코디네이터는 트랜잭션 로그의 상태를 Commit으로 바꿔 기록하고, 프로듀서에게 해당 트랜잭션이 완료되었음을 알린다.

    첫번째 단계로 commiTransaction()을 하면 프로듀서는 이 트랜잭션이 종료되었음을 트랜잭션 코디네이터에게 알려준다. 트랜잭션 코디네이터는 이 신호를 받고, 해당 TID의 파티션의 상태를 'Prepare'로 바꾸고 그것을 __transaction_state에 기록한다.

    두번째 단계로 브로커는 이 파티션을 가지고 있는 브로커에게 WriteTxnMarkerRequest 요청을 보낸다. 이 요청을 받게 되면, 해당 브로커의 파티션의 세그먼트 파일에는 커밋되었다는 컨트롤 메세지가 기록되게 된다. 컨트롤 메세지는 컨슈머가 메세지를 가져갈 때, 트랜잭션이 끝났는지를 알려주는 용도로 사용된다. 트랜잭션 커밋이 끝나지 않은 메세지는 컨슈머에게 반환할 수 없고, 오프셋의 순서 보장을 위해 트랜잭션 성공 또는 실패를 나타내는 LSO(Last Stable Offset)이라는 오프셋을 유지하게 된다. 

    세번째 단계로 브로커는 트랜잭션 코디네이터에 이 transaction.id의 파티션이 commit 된 것을 __transaction_state에 기록하고, 정상적으로 커밋된 것을 프로듀서에게 응답해준다. 

     

     

    5.4.4 예제 실습

    먼저 아래 명령어를 이용해서 토픽을 생성한다. 토픽이 생성되었으면, 앞서 작성했던 Producer 코드를 이용해서 메세지를 보낸다. 이제 카프카에서 메세지가 어떻게 처리되었는지를 확인해본다.

    $ kafka-topics --bootstrap-server localhost:9092 --topic my-topic-100 --create --partitions 1 --replication-factor 3

    아래 명령어를 이용하면 트랜잭션 상태를 읽어볼 수 있다. Transaction State는 내부 토픽이기 때문에 --proptery 명령어를 이용해서 내부 토픽도 읽을 수 있도록 설정해주고 포멧 역시 맞춰준다. 포멧을 맞춰주지 않으면 읽을 수 없는 외계어만 나오게 된다. 

    $ kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic __transaction_state --property exclude.internal.topics-false \ 
    --from-beginning \ 
    --formatter "kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter"

    __transaction_state의 메세지를 읽어보면 아래와 같다. 

    // 트랜잭션 초기화
    TEST-1::TransactionMetadata(transactionalId=TEST-1, producerId=2000, producerEpoch=0, txnTimeoutMs=60000, state=Empty, pendingState=None, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1669693174239)
    
    // 트랜잭션 시작
    TEST-1::TransactionMetadata(transactionalId=TEST-1, producerId=2000, producerEpoch=0, txnTimeoutMs=60000, state=Ongoing, pendingState=None, topicPartitions=Set(my-test-0), txnStartTimestamp=1669693174442, txnLastUpdateTimestamp=1669693174442)
    
    // 트랜잭션 커밋 준비
    TEST-1::TransactionMetadata(transactionalId=TEST-1, producerId=2000, producerEpoch=0, txnTimeoutMs=60000, state=PrepareCommit, pendingState=None, topicPartitions=Set(my-test-0), txnStartTimestamp=1669693174442, txnLastUpdateTimestamp=1669693174499)
    
    // 트랜잭션 커밋 완료
    TEST-1::TransactionMetadata(transactionalId=TEST-1, producerId=2000, producerEpoch=0, txnTimeoutMs=60000, state=CompleteCommit, pendingState=None, topicPartitions=Set(), txnStartTimestamp=1669693174442, txnLastUpdateTimestamp=1669693174505)

    트랜잭션 초기화

    트랜잭션 초기화에 대응되는 로그다. State = Empty로 표시되고, TopicPartion=set()에는 아무런 값도 존재하지 않는다.

    트랜잭션 시작 

    state = Ongoing으로 변경되었고, 이를 통해 트랜잭션이 시작된 것을 알 수 있다. 또한 topicPartition=set(my-test-0)로 업데이트 된 것을 볼 수 있다. 

    트랜잭션 종료 요청

    state = PrepareCommit으로 변경된 것을 볼 수 있다. 이 상태는 트랜잭션 프로듀서가 my-test 토픽의 0번 파티션으로 메세지 전송을 완료했으면, 트랜잭션 코디네이터에게 트랜잭션 종료 요청을 보낸 상태다.

    트랜잭선 완료

    state=CompleteCommit으로 변경되었고, 트랜잭션 프로듀서가 my-test 토픽으로 보낸 메세지에 대한 트랜잭션 단계가 최종적으로 완료되었음을 알 수 있다. 

     

    위에서는 __transaction_state의 상태를 읽어봤다. 그렇다면 실제 세그먼트 파일에서는 컨트롤 메세지가 어떻게 기록되어있을까? 이 부분도 함께 살펴보고자 한다. 먼저 다음 과정을 거친다.

    1. 트랜잭션 프로듀서를 총 2번 실행한다.
    2. 아래 코드를 실행해서 메세지 로그를 확인한다.
    $ kafka-dump-log.sh --files 00000000000000000000.log --print-data-log

    트랜잭션 프로듀서를 2번 실행하는 것은 TID에 의해서 동일한 PID가 맵핑되고, epoch가 올라가는지를 확인하기 위함이다. 아무튼 저렇게 한 후에 위 코드를 실행해서 실제 메세지 로그를 확인해보자. 

    baseOffset: 0 lastOffset: 4 count: 5 baseSequence: 0 lastSequence: 4 producerId: 3000 producerEpoch: 1 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1669776657486 size: 156 magic: 2 compresscodec: none crc: 3510235798 isvalid: true
    
    // 프로듀서 메세지
    | offset: 0 CreateTime: 1669776657471 keySize: -1 valueSize: 12 sequence: 0 headerKeys: [] payload: q = 0, i = 0
    | offset: 1 CreateTime: 1669776657486 keySize: -1 valueSize: 12 sequence: 1 headerKeys: [] payload: q = 0, i = 1
    | offset: 2 CreateTime: 1669776657486 keySize: -1 valueSize: 12 sequence: 2 headerKeys: [] payload: q = 0, i = 2
    | offset: 3 CreateTime: 1669776657486 keySize: -1 valueSize: 12 sequence: 3 headerKeys: [] payload: q = 0, i = 3
    | offset: 4 CreateTime: 1669776657486 keySize: -1 valueSize: 12 sequence: 4 headerKeys: [] payload: q = 0, i = 4
    
    baseOffset: 5 lastOffset: 5 count: 1 baseSequence: -1 lastSequence: -1 producerId: 3000 producerEpoch: 1 partitionLeaderEpoch: 0 isTransactional: true isControl: true deleteHorizonMs: OptionalLong.empty position: 156 CreateTime: 1669776657913 size: 78 magic: 2 compresscodec: none crc: 4066700887 isvalid: true
    // 컨트롤 메세지 - 트랜잭션 종료
    | offset: 5 CreateTime: 1669776657913 keySize: 4 valueSize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 2
    
    baseOffset: 6 lastOffset: 10 count: 5 baseSequence: 0 lastSequence: 4 producerId: 3000 producerEpoch: 2 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 234 CreateTime: 1669776735055 size: 156 magic: 2 compresscodec: none crc: 1699489141 isvalid: true
    
    // 프로듀서 메세지
    | offset: 6 CreateTime: 1669776735039 keySize: -1 valueSize: 12 sequence: 0 headerKeys: [] payload: q = 0, i = 0
    | offset: 7 CreateTime: 1669776735054 keySize: -1 valueSize: 12 sequence: 1 headerKeys: [] payload: q = 0, i = 1
    | offset: 8 CreateTime: 1669776735055 keySize: -1 valueSize: 12 sequence: 2 headerKeys: [] payload: q = 0, i = 2
    | offset: 9 CreateTime: 1669776735055 keySize: -1 valueSize: 12 sequence: 3 headerKeys: [] payload: q = 0, i = 3
    | offset: 10 CreateTime: 1669776735055 keySize: -1 valueSize: 12 sequence: 4 headerKeys: [] payload: q = 0, i = 4
    
    baseOffset: 11 lastOffset: 11 count: 1 baseSequence: -1 lastSequence: -1 producerId: 3000 producerEpoch: 2 partitionLeaderEpoch: 0 isTransactional: true isControl: true deleteHorizonMs: OptionalLong.empty position: 390 CreateTime: 1669776735464 size: 78 magic: 2 compresscodec: none crc: 934547290 isvalid: true
    // 컨트롤 메세지- 트랜잭션 종료
    | offset: 11 CreateTime: 1669776735464 keySize: 4 valueSize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 2

    트랜잭션 시작

    트랜잭션 시작은 따로 세그먼트에 저장되지는 않는다. 다만 baseOffset을 표시하는 곳을 바라봤을 때 isTranscation True인 것을 볼 수 있다. 그리고 PID는 3000, Producer epoch는 1인 것을 확인할 수 있다. 

    프로듀서 메세지

    일반적인 프로듀서 메세지가 공급된 것을 볼 수 있다.

    트랜잭션 종료

    컨트롤 메세지로 트랜잭션 종료가 온 것을 확인할 수 있다. 이 때 payload에는 아무런 값도 없는 것을 볼 수 있고, 특이하게 endTxnMarker는 Commit이 된 것을 볼 수 있다. 그리고 isControl = True라는 것을 볼 수 있는데, 컨트롤 메세지라는 것을 알려준다. 

    다음 프로듀서 메세지

    다음에 또 프로듀서 메세지가 온 것을 확인해보면 된다. 이 때, PID는 3000으로 유지되만, Producer epoch는 1에서 2로 바뀐 것을 볼 수 있다. 이 때 프로듀서가 한번 메세지를 다 전달한 후에, 다시 한번 전달했다. 즉, 두번에 나눠서 Producer가 메세지를 전송한 셈인데 (Run을 2번함), 이 때 PID는 유지되면서 Producer Epoch를 올려서 데이터를 나눠서 전송한 것을 볼 수 있다. 

     

    참고 

    https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

    https://github.com/dhkdn9192/data_engineer_should_know/blob/master/interview/hadoop/kafka_exactly_once.md

    https://bistros.tistory.com/166

     

     

    추가 생각해 볼 거

    SEQ 번호는 배치 단위로 전송되었을 때, 하나씩 보는 것일까? 

    LSO는 무엇일까?

    댓글

    Designed by JB FACTORY