Kafka Producer Transaction 관련 정리
- Kafka eco-system/Kafka
- 2025. 12. 31.
들어가기 전
이 글은 Kafka Producer Transaction이 동작하는 방식에 대해 Producer Client의 내부 코드를 확인하며 어떻게 동작하는지를 정리한 글입니다. 틀린 부분은 언제나 조언 환영합니다.
이 글에서 사용된 코드는 apach Kafka 4.3.0-SNAPSHOT 버전의 trunk 브랜치를 기준으로 작성되었습니다.
Producer Transaction 관련 API
- initTransactions()
- beginTransaction()
- commitTransaction()
- abortTransaction()
- completeTransaction()
- prepareTransaction()
- sendOffsetsToTransaction()
- Consume한 메세지를 Producer가 트랜잭션을 열고, 다른 토픽으로 Produce 했을 때 Offset Commit까지도 하나의 트랜잭션으로 묶기 위한 메서드다.
- 이 메서드를 직접 호출해서 __consumer_offsets 토픽도 트랜잭션에 참여하는 토픽이 되도록 한다. 이 메서드를 직접 호출하는 경우 commitSync(), commitAsync(), autoCommit() 등은 호출하지 않는다.
Sender
먼저 Sender Thread에 대해서 이해를 해야한다.

- Sender
- NetworkClient
- Selector를 Poll함.
- Selector가 Receive를 완료한 것이 있는 경우, selector에게 completedReceive를 요청해서 후속 작업을 마무리한다.
- handleCompletedSend(...)를 호출해서 NetworkSend를 ClientResponse 객체로 변환.
- completeResponses(...) 등을 호출해서 ClientResponse 객체의 콜백을 호출. 주로 콜백은 RequestCompletionHandler.onComplete(...)를 호출해서 처리됨.
- RequestCompletionHandler.onComplete(...) -> RequestCompletionHandler.handleResponse(...)를 호출하도록 설정되어있음.
- Selector
- 실질적인 네트워크 관리자.
- SelectionKey (Broker의 Host, Port 기반)을 바탕으로 Iteration을 돌면서 각 KafkaChannel에 Read/Write를 함.
- 특이사항은 일정 Memory 사용량이 증가할 경우, Selection Key 이터레이션 방식을 Shuffling하여 Read starvation을 방지함.
- 완료된 요청은 completedSends 필드에 추가함. List<NetworkSend>
- KafkaChannel
- Producer <-> Broker 사이의 TCP 연결이 KafkaChannel로 추상화 됨. 여기서 실제 I/O와 관련된 Read/Write 버퍼를 가지고 있음.
1. InitTransacation
- 이 때 Producer / Sender에는 TransactionManager가 존재해야 함.
producer.initTransaction(...)
// *** TransactionManager Context...
-> transactionManager.initializeTransactions(...) // Returns TransactionalrequestResult
-> initializeTransactions(...) // produceId = -1, epoch = -1
-> handleCachedTransactionRequestResult(...)
-> transactionalRequestResultSupplier.get() // callback
-> transitionTo(State.INITIALIZING); // isEpochBump = false이기 때문. state: Uninitialized -> Initializing.
-> new InitProducerIdRequestData(); // Transaction Init request 객체 생성.
// producerId, epoch, enable2PC, keepPreparedTxn 등을 함께 정리
-> new InitProducerIdHandler(...);
-> enqueueRequest(handler);
-> pendingRequests.add(requestHandler); // TransactionManager field에 추가.
-> pendingTransition = new PendingStateTransition(result, nextState, operation);
<-
<-
<-
-> sender.wakeup();
// ********* Sender Context
-> this.client.wakeup();
-> result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
- Producer Thread쪽은 다음과 같이 동작한다.
- 초기 ProducerIdAndEpoch는 각각 -1, -1이된다. Producer는 InitProducerIdRequest를 보내도록 TransactionManager의 pendingRequest에 추가해둔다.
- TransactionManager의 상태는 INITIALIZING이 된다.
// *** Sender Context
sender.run()
-> runOnce()
-> transactionManager.bumpIdempotentEpochAndResetIdIfNeeded(); // Producer Id를 새로 발급받아야 할 경우.
-> maybesendAndPollTransactionalRequest()
// 여기서 InitProducerIdHandler 객체를 받아옴.
// 여기서 transactionManager.pendingRequests에 있던 InitProducerIdHandler가 제거됨.
-> TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequest(accumulator.hasIncomplete());
// 현재 시점에는 CoordinatorType이 Transactional, 그러나 TargetNode가 null임.
-> FindCoordinatorRequest.CoordinatorType coordinatorType = nextRequestHandler.coordinatorType();
targetNode = coordinatorType != null ?
transactionManager.coordinator(coordinatorType) :
client.leastLoadedNode(time.milliseconds()).node();
-> maybeFindCoordinatorAndRetry(nextRequestHandler);
// nextRequestHandler = InitProducerIdHandler
-> transactionManager.lookupCoordinator(nextRequestHandler);
// ********** TransactionManager Context
-> FindCoordinatorRequestData data = new FindCoordinatorRequestData()
-> enqueueRequest(new FindCoordinatorHandler(builder));
// FindCoordinator Request를 enque한다.
-> pendingRequests.add(requestHandler);
<-
<-
-> transactionManager.retry(nextRequestHandler);
// ********** TransactionManager Context
-> pendingRequests.add(requestHandler); // In pending request [0]: FindCoodrinator, [1]: InitProducerId
<-
<-
<-
<-
<-
-> runOnce()
-> maybesendAndPollTransactionalRequest()
// 여기서 FindCoordinator가 나옴.
// CordinatorType = null
-> TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequest(accumulator.hasIncomplete());
...
-> ClientRequest clientRequest = client.newClientRequest(targetNode.idString(), requestBuilder, currentTimeMs,
true, requestTimeoutMs, nextRequestHandler);
-> client.send(clientRequest, currentTimeMs);
// ************ NetworkClient Context
// RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clientId=producer-ash-2, correlationId=3, headerVersion=2)
-> doSend(...)
-> InFlightRequest inFlightRequest = new InFlightRequest(...)
// NetworkClient가 가진 inFlightRequest Queue에 추가.
-> this.inFlightRequests.add(inFlightRequest);
// selector를 통해서 'send'를 전송. 이건 byterize 되어있음.
-> selector.send(new NetworkSend(clientRequest.destination(), send));
// *********** Selector Context
-> KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
-> channel.setSend(send);
<-
<-
-> transactionManager.setInFlightCorrelationId(clientRequest.correlationId());
-> client.poll(retryBackoffMs, time.milliseconds()); // Poll 시에 read / write가 진행됨.
-> this.selector.poll()
Sender Thread는 다음 작업을 처리한다.
- TransactionManager에서 InitProducerId Request를 만들어 transactionManager.pendingRequest에 저장하고 Sender를 wake 시킨다.
- Sender는 transactionManager의 InitProducerId Request를 처리하려고 했으나 Coordinator Node를 모르기 때문에 FindCoordinator 요청을 하나 추가해서 다음 순서로 transcationManager.pendingRequest 순서를 조절한다.
- FindCoordinator Request
- InitProducerId Request
모든 요청은 RequestData를 만들고, 그걸 Builder와 함께 Handler를 만든다. 그리고 Handler가 Callback 형태로 처리하는 구조를 가진다.
2. FindCoordinator Response 처리
- 다음 두 가지 메서드를 이용해 Broker가 응답한 ClientResponse를 Handling함.
- TransactionManager.TxnRequestHandler.onComplete()
- TransactionManager.FindCoordinator.handleResponse()
- 아래 코드를 통해서 찾게 된 노드를 Coordinator로 등록함.
// TransactionManager.FindCoordinatorHandler
...
if (error == Errors.NONE) {
Node node = new Node(coordinatorData.nodeId(), coordinatorData.host(), coordinatorData.port());
switch (coordinatorType) {
case GROUP:
consumerGroupCoordinator = node;
break;
case TRANSACTION:
transactionCoordinator = node;
break;
default:
log.error("Group coordinator lookup failed: Unexpected coordinator type in response");
fatalError(new IllegalStateException("Group coordinator lookup failed: Unexpected coordinator type in response"));
}
result.done();
log.info("Discovered {} coordinator {}", coordinatorType.toString().toLowerCase(Locale.ROOT), node);
...
- 확인해야 할 부분. Coordinator를 찾을 때, 어떤 값을 포함해서 보냈는지 확인해보자.
- 그리고 이건 트랜잭션 코디네이터인지 확인이 필요하고, 트랜잭션 코디네이터는 트랜잭션 ID의 해쉬값에 대응되는 파티션의 리더인지도 확인이 필요함.


- FindCoordinator를 보낼 때, CooridnatoryKey에는 ash-2 (Transaction ID)로 설정된다.
- Producer <-> 각 Broker 사이의 TCP 연결은 Kafka Channel로 추상화되는데, 처음에는 내가 알고있는 Bootstrap Server들 중에서 CoordinatorKey에 대응되는 Broker가 누군지 모른다. 따라서 아래와 같이 동작한다.
- 실제로는
- '-1'이라는 ID를 가진 Kafka Channel이 먼저 만들어져서 등록되고, Producer는 이곳으로 FindCoordinator Request를 전송한다.
- FindCoordinator Response가 오면 여기에는 Transaction Coordinator Node 관련 정보가 존재한다. handleResponse()에서 TransactionManager의 Transaction Coordinator Node가 설정된다.
- Sender.maybeSendAndPollTransactionalRequest() -> awaitNodeReady() 안에서 FindCoordinator로 찾은 노드에 연결하도록 된다.
- 이후 ID가 1인 Kafka Channel이 하나 더 만들어져서 등록된다. 이것은 Transaction Coordinator와 Producer 사이의 TCP를 추상화한 것이다.
- 처음에 생성된 Kafka Channel (ID = -1)은 이후에 사용이 안될 수도 있는데, connections.max.idle.ms 이후에 제거된다.
3. InitProducerIdRequest 전송
- 최초에는 InitProducerIdRequest를 전송하려 했으나 Coordinator를 모르는 상태였음. 따라서 다음과 같이 API 요청 순서를 조절했다
- 원래 - InitProducerIdRequest
- 이후 - FindCoordinatorRequest, InitProducerIdRequest
- 1번에서 FindCoordinatorRequest를 통해서 Coordinator를 확인했으므로, 이제서야 트랜잭션 코디네이터에게 InitProducerIdRequest를 전송할 수 있게 되었다. 아래 순서대로 호출한다.
// Sender
sender.run()
-> runOnce()
-> maybeSendAndPollTransactionalRequest()
-> ClientRequest clientRequest = client.newClientRequest(...);
-> client.send(clientRequest, currentTimeMs);
-> transactionManager.setInFlightCorrelationId(clientRequest.correlationId());
-> client.poll(retryBackoffMs, time.milliseconds());
- 이 때, setInFlightCorrelationId(...)를 호출한다.
- setInFlightCorrelationId(...)가 유효한 동안은 프로듀서는 Data Buffer에 있는 Producer Batch를 전송하지 못한다. 그것은 Sender가 maybeSendAndPollTransactionalRequest() 메서드가 true인 경우 Data Buffer를 보지 않고 바로 Return하기 때문이다.
- Broker에 보낸 InitProducerIdRequest 요청에 대해 응답이 오면 다음 순서대로 호출된다.
- TransactionManager#TxnRequestHandler.onComplete(...)를 호출함.
- 여기서 InFlightCorrelationId를 Clear함.
- TransactionManager#InitProducerIdHandler.handleReponse(...)를 호출함.
- ProducerIdAndEpoch 객체를 파싱해서 TransactionManager에 셋팅함.
- TranscationManager의 State가 Initializing -> Ready로 Transition 됨.
- TransactionManager#TxnRequestHandler.onComplete(...)를 호출함.
이 때, Transaction Coordinator는 어떻게 동작할까?
- InitProducerId Request을 받으면 ProducerId와 Epoch를 발급한 후에 InitProducerId Response에 그 값을 포함해서 Producer에게 응답한다.
- 한편으로 __transaction_state에 ProducerId, Epoch와 함께 Transaction State를 EMPTY(0)로 업데이트한다.
$ ./kafka-console-consumer.sh \ ✘ INT 15:54:12
--bootstrap-server localhost:9092 \
--topic __transaction_state \
--from-beginning \
--consumer-property exclude.internal.topics=false \
--formatter org.apache.kafka.tools.consumer.TransactionLogMessageFormatter
>>>
{"key":{"type":0,"data":{"transactionalId":"ash-2"}},"value":{"version":0,"data":{"producerId":0,"producerEpoch":46,"transactionTimeoutMs":60000,"transactionStatus":0,"transactionPartitions":null,"transactionLastUpdateTimestampMs":1767075679951,"transactionStartTimestampMs":-1
4. beginTransaction() 호출.
- beginTransaction() 메서드 자체는 다음 작업만 해준다.
- TranscationManager의 State가 Ready -> IN_TRANSACTION으로 변경됨.
이 상태 변화 이후 딱히 내부적으로 동작하는 코드는 없는 것으로 보인다. 실제로 Transaction과 관련되어 다음 액션을 Trigger 할 것으로 기대되는 것은 Producer.send(...)를 호출하는 것이다.
또한, Transaction Coordinator, Broker Leader 관점에서도 일어나는 일은 없다.
5. Producer.send() 호출
// Main Thread
producer.send()
-> doSend(...)
-> AppendCallbacks appendCallbacks = new AppendCallbacks(...);
-> accumulator.append(...) // Batch 적재
-> transactionManager.maybeAddPartition(appendCallbacks.topicPartition());
// *** TransactionManager Context
-> if (currentState != State.IN_TRANSACTION) { throw new IllegalStateException(...); }
// TransactionManager 내부의 필드에 새로운 TopicPartition을 추가함.
-> txnPartitionMap.getOrCreate(topicPartition);
-> newPartitionsInTransaction.add(topicPartition);
<-
->
- Main 쓰레드에서 Producer.send()를 호출한다.
- newPartitionsInTransaction.add(topicPartition)을 추가한다. 여기에 포함된 Topic Partition은 이후 Sender 쓰레드가 addPartitionsToTransactionHandler()를 호출할 때, pendingPartitionsInTransaction에 들어가도록 처리된다.
- 실제로 Main 쓰레드에서는 이 정도 작업만 하고, 트랜잭션과 관련된 실질적인 작업은 Network Thread(Sender class)에서 처리되기 시작한다.
// Sender
sender.run()
-> runOnce()
-> maybeSendAndPollTransactionalRequest()
-> transactionManager.nextRequest()
// *** TransactionManager Context ***
// newPartitionsInTransaction에 Producer Thread에서 Topic Partition을 추가했었음.
-> if (!newPartitionsInTransaction.isEmpty()) { enqueueRequest(addPartitionsToTransactionHandler()); }
-> addPartitionsToTransactionHandler();
-> pendingPartitionsInTransaction.addAll(newPartitionsInTranscation);
-> newPartitionsInTransaction.clear();
// AddPartitionsToTxnRequest API를 생성.
-> return new AddPartitionsToTxnHandler(builder);
<-
-> AbstractRequest.Builder<?> requestBuilder = nextRequestHandler.requestBuilder();
-> ClientRequest clientRequest = client.newClientRequest(...);
-> client.send(clientRequest, currentTimeMs);
-> transactionManager.setInFlightCorrelationId(clientRequest.correlationId());
-> client.poll(retryBackoffMs, time.milliseconds());
-> return true;
<-
<-
<-
<-
- Producer Batch에 레코드를 적재한다고 바로 요청을 보내지는 않는다.
- AddPartitionsInTxn Request를 보내고, 그것이 완료된 시점에 Producer Batch를 보낼 수 있도록 구성되어있음.
Race Condition 방지
1. Producer.send(...)에서 Accumulator에 적재하는 시점과 TransactionManager에 partitionsInTransactionMap에 추가하는 시점이 다르다.
2. Network Thread는 Batch를 보내기 전에 Transcation Request가 있으면 그걸 먼저 처리하고 배치를 보내는데, 1번에 의해서 partitionsInTransactionMap에 새로운 파티션을 추가하기 전에 배치만 추가된 상태로 이 코드가 호출되는 경우를 상상해볼 수 있다.
3. 이런 이유 때문에 AddPartitionsInTransaction Request API가 나가기 전에 Producer Batch가 공급되는 Race Condition을 상상해 볼 수 있다.
이 Race Condition은 다음 Logic에 의해서 차단된다.
1. sender.sendProducerData() -> this.acuumulator.drain(...) -> drainBatchesForOneNode(...) -> shouldStopDrainBatchesForPartition() -> if (!transactionManager.isSendToPartitionAllowed(tp))
위 코드에서 isSendToPartitionAllowed(...) 이 코드에 의해서 AddPartition 처리가 되지 않은 TopicPartition Batch가 먼저 Producer로 발송되는 Race Condition은 차단된다.
- AddPartitionsInTxn Request 결과를 기다린다.
// TranscationManager#AddPartitionsToTxnHandler
@Override
public void handleResponse(AbstractResponse response) {
AddPartitionsToTxnResponse addPartitionsToTxnResponse = (AddPartitionsToTxnResponse) response;
Map<TopicPartition, Errors> errors = addPartitionsToTxnResponse.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID);
boolean hasPartitionErrors = false;
Set<String> unauthorizedTopics = new HashSet<>();
...
pendingPartitionsInTransaction.removeAll(partitions);
if (!unauthorizedTopics.isEmpty()) {
...
} else {
log.debug("Successfully added partitions {} to transaction", partitions);
partitionsInTransaction.addAll(partitions);
transactionStarted = true;
result.done();
}
}
- AddPartitionToTxn Request의 결과를 Transaction Coordinator(Broker)로부터 받으면 AddPartitionsToTxnHandler.handleResponse()를 호출한다. 그리고 다음 두 가지 작업을 한다.
- pendingPartitionsInTransaction.removeAll(...)
- partitionsInTransaction.addAll(...)
- 위 두 작업을 통해서 Producer Batch는 트랜잭션 내에서 발송될 준비가 된다.
한편으로 Transaction Coordinator(Broker) 관점에서는 어떻게 동작할까?
- AddPartitionsInTxn Request에는 트랜잭션에 참여되는 Topic + Partition이 여러 개 포함될 수 있고, 하나의 트랜잭션에서 AddPartitionsInTxn Request를 여러번 받을 수 있다. AddPartitionsInTxn Request를 보낸 후, Producer에 새로운 Topic + Partition이 들어오는 경우도 존재하기 때문이다.
- 이 메세지를 받으면 Transaction Coordinator는 __transaction_state를 OnGoing(1)으로 기록하며, 동시에 현재 알고 있는 Topic + Partition Entry를 포함시킨다. 이것은 2 Phase Commit에서 장애가 발생했을 때, 다시 한번 WriteTxnRequest를 보내기 위함이다.
$ kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic __transaction_state \
--from-beginning \
--consumer-property exclude.internal.topics=false \
--formatter org.apache.kafka.tools.consumer.TransactionLogMessageFormatter
>>>
# 첫번째 AddPartitionsInTxnRequest
{"key":{"type":0,"data":{"transactionalId":"ash-2"}},"value":{"version":0,"data":{"producerId":0,"producerEpoch":46,"transactionTimeoutMs":60000,"transactionStatus":1,"transactionPartitions":[{"topic":"ash-1","partitionIds":[0]}],"transactionLastUpdateTimestampMs":1767075681310,"transactionStartTimestampMs":1767075681310
# 두번째 AddPartitionsInTxnRequest
{"key":{"type":0,"data":{"transactionalId":"ash-2"}},"value":{"version":0,"data":{"producerId":0,"producerEpoch":46,"transactionTimeoutMs":60000,"transactionStatus":1,"transactionPartitions":[{"topic":"sha-1","partitionIds":[0]},{"topic":"ash-1","partitionIds":[0]}],"transactionLastUpdateTimestampMs":1767075682881,"transactionStartTimestampMs":1767075681310
Topic Partition Leader Broker는 어떻게 동작할까?
- Producer Batch는 각 토픽 파티션 리더 브로커에게 전송된다. 브로커는 Producer Batch를 받더라도 트랜잭션과 관련된 마킹을 하지는 않는다.
6. producer.commitTransaction()
// KafkaProducer
producer.commitTransaction();
-> transactionManager.beginCommit();
// *** TransactionManager
-> handleCachedTransactionRequestResult(...);
-> beginCompletingTransaction(...);
-> EndTxnHandler handler = new EndTxnHandler(builder);
-> enqueueRequest(handler);
-> pendingRequests.add(requestHandler);
<- return handler.result;
<-
<-
<-
-> pendingTransition = new PendingStateTransition(result, nextState, operation);
<- return result;
-> sender.wakeup();
-> result.await(...);
- 여기서 EndTxnRequest를 브로커에 보낸 후, 응답이 오기를 기다린다.
// TransactionManager.EndTxnHandler
@Override
public void handleResponse(AbstractResponse response) {
EndTxnResponse endTxnResponse = (EndTxnResponse) response;
Errors error = endTxnResponse.error();
boolean isAbort = !builder.data.committed();
if (error == Errors.NONE) {
// For End Txn version 5+, the broker includes the producerId and producerEpoch in the EndTxnResponse.
// For versions lower than 5, the producer Id and epoch are set to -1 by default.
// When Transaction Version 2 is enabled, the end txn request 5+ is used,
// it mandates bumping the epoch after every transaction.
// If the epoch overflows, a new producerId is returned with epoch set to 0.
// Note, we still may see EndTxn TV1 (< 5) responses when the producer has upgraded to TV2 due to the upgrade
// occurring at the end of beginCompletingTransaction. The next transaction started should be TV2.
if (endTxnResponse.data().producerId() != -1) {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(
endTxnResponse.data().producerId(),
endTxnResponse.data().producerEpoch()
);
setProducerIdAndEpoch(producerIdAndEpoch);
resetSequenceNumbers();
}
resetTransactionState();
result.done();
}
...
}
- EndTxnHandlerResponse를 받으면 위 코드가 호출된다.
- 트랜잭션 Version에 따라 ProducerIdAndEpoch에 대해서 다른 동작을 한다.
- 트랜잭션 버전이 최신인 경우 : 응답에 ProducerId, Epoch가 포함되어 오는데 Epoch는 항상 단조 증가하는 형태가 되는데, 이는 Hanging Transaction을 해결하기 위함이다. 이 때는 ProducerIdAndEpoch를 새로 셋업한다.
- 트랜잭션 버전이 오래된 경우 : Producer ID가 -1로 응답이 온다. 이 때는 기존 ProducerIdAndEpoch를 그대로 사용한다.
- resetTransactionState()를 호출한다.
- 상태를 READY로 변경한다.
- 각종 변수들을 clear한다.
- newPartitionsInTransaction
- pendingPartitionsInTransaction
- partitionsInTransaction
- 트랜잭션 Version에 따라 ProducerIdAndEpoch에 대해서 다른 동작을 한다.
Transaction Coordinator(Broker) 관점에서는 어떻게 동작할까?
- EndTxnRequest를 받으면 해당 __transaction_state 토픽에 TransactionId에 대한 상태를 PREPARE_COMMIT(1)으로 업데이트하고, 바로 Producer에게 EndTxnResponse를 응답한다.
- 이후, Transaction Coordinator는 트랜잭션에 참여한 TopicPartition의 Leader Broker들에게 Marker를 찍으라는 요청을 전송한다. (WriteTxnMarkerRequest)
- WriteTxnMarker Request를 받은 Leader Broker들은 데이터 토픽에 Transaction과 관련된 마커를 기록하고, Transaction Coordinator에게 WriteTxnMarker Response를 응답한다.
- Transaction Coordinator는 트랜잭션에 참여한 TopicPartition의 모든 Leader Broker들에게 WriteTxnMarker Response 응답을 받으면 __transaction_state에 COMPLETE_COMMIT(4)으로 업데이트한다.
Leader Broker 관점에서는 어떻게 동작할까?
- WriteTxnMarker Request를 받은 Leader Broker들은 데이터 토픽에 Transaction과 관련된 마커를 기록한다.
$ ./kafka-dump-log.sh \
--files ~/broker/data/sha-1-0/*.log \
--print-data-log \
--deep-iteration
>>
baseOffset: 399 lastOffset: 399 count: 1 baseSequence: 398 lastSequence: 398 producerId: 0 producerEpoch: 46 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 32276 CreateTime: 1767075688671 size: 81 magic: 2 compresscodec: none crc: 2720694595 isvalid: true
| offset: 399 CreateTime: 1767075688671 keySize: -1 valueSize: 13 sequence: 398 headerKeys: [] payload: Hello World99
baseOffset: 400 lastOffset: 400 count: 1 baseSequence: 399 lastSequence: 399 producerId: 0 producerEpoch: 46 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 32357 CreateTime: 1767075688678 size: 81 magic: 2 compresscodec: none crc: 2996530180 isvalid: true
| offset: 400 CreateTime: 1767075688678 keySize: -1 valueSize: 13 sequence: 399 headerKeys: [] payload: Hello World99
# Commit Transaction 확인
baseOffset: 401 lastOffset: 401 count: 1 baseSequence: -1 lastSequence: -1 producerId: 0 producerEpoch: 46 partitionLeaderEpoch: 0 isTransactional: true isControl: true deleteHorizonMs: OptionalLong.empty position: 32438 CreateTime: 1767075688688 size: 78 magic: 2 compresscodec: none crc: 1889618563 isvalid: true
| offset: 401 CreateTime: 1767075688688 keySize: 4 valueSize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
- 데이터 토픽에 남은 메세지를 읽어보면 다음과 같이 작성되는데, 읽어보자.
- 2줄이 1개의 Offset을 의미한다.
- baseOffset으로 시작하는 줄 : Header
- | offset으로 시작하는 줄 : payload
- Header 로그에는 ProducerID, Producer Epoch, isTransactional이 있다.
- 이 정보들을 활용하면 특정 레코드가 어떤 트랜잭션에 참여하고 있는지를 알 수 있다. (트랜잭션 ID 1개마다 Unique한 Producer ID + Producer Epoch가 있다!)
- 트랜잭션 종료 마커를 찍으면 Payload 로그에는 "endTxnMarker: COMMMIT" 이라는 것이 남는다.
Consumer는 메세지를 Consume 한 후, 위 Marker 정보를 이용해서 READ_UNCOMMIT / READ_COMMIT을 읽어서, Consumer 자체에서 트랜잭션 격리 수준에 따라서 레코드를 필터링해서 제공한다.
7. 다음 트랜잭션 - beginTransaction() 호출
정상적인 경우에는 이전 트랜잭션이 정상적으로 종료되었을 것이다. 이후 다시 트랜잭션을 시작하려면, beginTransaction()을 호출하면 된다. 이 메서드가 호출되면 TransactionManager의 State가 Ready -> IN_TRANSACTION으로 바뀌게 되고 위와 동일한 과정을 거치게 된다.
8. 중간 정리
- Producer Thread는 트랜잭션을 시작하고 끝내고, 레코드를 전송한다. 이 때, 트랜잭션과 관련된 것은 TransactionManager에 업데이트, 레코드를 전송하는 것은 RecordAccumulator를 통해서 처리된다.
- Sender Thread는 TransactionManager를 통해서 트랜잭션과 관련된 API를 브로커에 전송하고 응답을 비동기적으로 처리한다. 또한, TransactionManager가 존재하는 경우 트랜잭션과 관련된 API를 우선처리하고 트랜잭션이 정상적으로 Open 되었을 때 Producer Batch를 전송하도록 강제한다.
- Transaction 관련된 Request / Response가 존재하는 경우 Producer Batch는 절대 보내지 않는다고 했는데 이것은 TransactionManager에 있는 InFlightCorrelationId를 이용해서 제어된다. InFlight 중인 Transaction API가 있으면 그곳에 업데이트 하고, InFlight Transaction API가 있는 경우 Producer Batch를 보내는 함수가 호출되지 않고 Early return 되도록 통제한다.
- Sender Thread는 Broker에게 다음 API 요청을 보낸다.
- FindCoordinator
- InitProducerID
- AddPartitionsToTxn
- EndTxnRequest
- AbortTxnRequest
9. Happy 하지 않은 경우
모든 것이 잘풀리면 좋으나 그렇지 않은 경우가 있다. 예를 들어 트랜잭션 Commit이나 Abort가 정상적으로 수행되어서 Transaction Coordinator나 Topic Leader에게 Marker가 잘 찍히면 좋은데, 네트워크나 노드 장애 등의 문제로 인해 그렇게 동작하지 않는 경우가 존재한다.
이를테면
- __transaction_state에는 PREPARE_COMMIT을 기록하고, Producer에는 트랜잭션이 끝났다고 응답을 했다. 그러나 그 직후, Transaction Coordinator가 죽어서 실제 데이터 토픽에는 COMMIT Marker를 못 찍는 경우.
- 이 경우, 해당 데이터 토픽에는 트랜잭션이 끝나지 않는다. 결과적으로 LSO가 전진하지 않게 된다.
- Abort 이후, WriteTxnMarker Request를 보냈으나 Transaction Coordinator <-> Broker 사이의 네트워크 사정이 좋지 않아, 동일한 프로듀서의 다음 트랜잭션에 포함된 Record Batch나 WriteTxnMarker Request보다 먼저 도착하고 이후에 Abort Marker가 도착한 경우
- 새로운 트랜잭션에 포함된 배치 메세지가 이전 트랜잭션에 포함된 것처럼 되어서 Abort 되는 문제가 발생.
- Transaction Coordinator가 해당 트랜잭션을 이미 Abort 해서 Marker까지 데이터 토픽에 기록되었으나, 실제 Producer Batch는 나중에 도착해서 Append된 경우.
- Abort 된 트랜잭션이므로 Transaction Coordinator는 더 이상 해당 트랜잭션에 대해서 관여를 하지 않는다. 그러나 나중에 Producer Batch가 도착해서 Data Topic에 Transactional Record로 추가되어서 Transaction이 여전히 존재하는 것처럼 보인다. 즉, LSO가 진행되지 않는다.
이런 종류의 예기치 않은 동작들이 발생하면 데이터 토픽의 LSO(Last Stable Offset)이 전진하지 않게 된다. Consumer가 READ_COMMIT만 읽는 경우, LSO까지만 읽는데 LSO가 전진하지 않으므로 프로듀서가 새로운 트랜잭션을 열고 성공적으로 메세지를 제공하고 커밋을 했다고 하더라도 Consumer는 영원히 새로운 Record를 읽지 못하는 문제가 발생한다.
이런 문제는 Hanging Transaction이라고 하는데, 이 문제는 Transaction을 구별하는 구분자가 Rough하기 때문에 발생한다. 이 문제를 해결하기 위해 최신 버전의 트랜잭션에서는 트랜잭션이 끝날 때 마다 TransactionCoordinator가 Producer Epoch를 단조증가 (+1)하여 응답하고 Producer가 이 값을 매번 사용하는 방식으로 수정되었다.
이전에는 별 문제가 없는 이상 항상 Producer Epoch는 동일했는데, 이제는 트랜잭션마다 Producer Epoch를 올리면서 Producer Epoch를 트랜잭션의 구별자로 사용해서 이 문제를 해결한다는 것이다. 이것은 다른 글(https://ojt90902.tistory.com/1910)에서 다루겠다.
전체 다이어그램 without sendOffsetsToTxn

전체 다이어그램 with sendOffsetsToTxn()
- 만약 EOS (Exactly Once Semantic)으로 Consume + Producer를 하고 싶다면, Transaction Producer가 제공하는 sendOffsetsToTxn() API를 이용해서 __consumer_offset을 Commit하는 방법이 있다.
- 이 때 주의할 점은 sendOffsetsToTxn() API를 이용하게 되면 트랜잭션에 __consumer_offset 토픽이 참여하게 되고, 트랜잭션이 커밋되는 시점에 Offset이 커밋되는 방식이 되기 때문에 Consumer의 auto commit, syncCommit, asyncCommit 등을 사용하지 않아야 한다는 것이다.
전체적인 실행 순서는 다음과 같다.

RecordAccumulator의 계층 구조
- RecordAccumlator는 TopicInfoMap 객체를 가짐. TopicInfoMap은 ConcurrentHashMap임. 이 때, Key는 Topic, Value는 TopicInfo임.
- 각 TopicInfo 객체는 batches를 필드로 가짐. batches는 ConcurrentMap이며 Key는 파티션, Value는 Deque<ProducerBatch>임.

정리해보면 Topic-Partition 별로 여러 개의 Batch를 가질 수 있다는 것을 의미한다.
참고
https://strimzi.io/blog/2023/05/03/kafka-transactions/?utm_source=chatgpt.com
'Kafka eco-system > Kafka' 카테고리의 다른 글
| Kafka Broker : Replication (0) | 2022.11.29 |
|---|---|
| Kafka : 프로듀서의 전송 방법 (idempotence, at least once, at most once, exactly once) (0) | 2022.11.23 |
| Kafka Consumer에 대한 총 정리 및 코드 분석 (1) | 2022.09.29 |
| kafka-dump-log 명령어로 로그 파일의 메세지 내용 확인 (0) | 2022.09.05 |