메시지를 전송하는 Delivery Semantics는 크게 세가지가 있다.
- At-Most-Once Semantics (최대 한 번)
- At-Least-Once Semantics (최소 한 번)
- Exactly-Once Semantics (정확히 한 번)
At-Most-Once Semantics
- 확인 시간이 초과되거나, 오류가 반환될 때 Producer가 재시도하지 않으면 메시지가 Kafka Topic에 기록되지 않아 Consumer에게 전달되지 않을 수 있다.
- 중복 가능성을 피하기 위해 때때로 메시지가 전달되지 않을 수 있음을 허용
- acks = 0, 1 인 경우
At-Least-Once Semantics
- Producer가 Kafka Broker로부터 ack를 수신하고 acks=all이면 메시지가 Kafka Topic에 최소 한 번 작성되었음을 의미한다.
- 그러나, ack가 시간 초과되거나 오류를 수신하면 메시지가 Kafka Topic에 기록되지 않았다고 판단하고 메시지 전송을 다시 시도할 수 있다.
- Broker가 ack를 보내기 직전에 실패했지만 메시지가 Kafka Topic에 성공적으로 기록된 후에 이 재시도를 수행하면 메시지가 두 번 기록되어 최종 Consumer에게 두 번 이상 전달되어 중복 작업과 같은 잘못된 결과로 이어질 수 있다.
- acks = all 인 경우
Exactly-Once Semantics
- Producer가 메시지 전송을 다시 시도하더라도, 메시지가 최종 Consumer에게 정확히 한 번 전달된다.
- 메시징 시스템 자체와 메시지를 생성하고 소비하는 애플리케이션 간의 협력이 반드시 필요하다.
- 예를 들어, 메시지를 성공적으로 사용한 후 Kafka Consumer를 이전 Offset으로 되감으면 해당 Offset에서 최신 Offset까지 모든 메시지를 다시 수신하게 된다.
Exactly-Once Semantics (EOS)의 필요성
중복 메시지로 인한 중복 처리 방지를 위함이다. 데이터가 "정확히 한 번" 처리되도록 보장해야 하는 실시간 미션 크리티컬 스트리밍 애플리케이션에서 중요하다.
- 클라이언트(Idempotent Producer)에서 생성되는 중복 메시지 방지
- Transaction 기능을 사용하여, 하나의 트랜잭션 내의 모든 메시지가 모두 Write 되었는지 또는 전혀 Write 되지 않았는지 확인
- 사용 사례: 금융 거래 처리(송금, 카드 결제 등), 과금 정산을 위한 광고 조회수 추적
이전 포스팅에서 메시지를 중복으로 보내지 않게 하기 위해 반드시 처리해야 하는 것이 acks = all, enable.idempotence = true 옵션이었다. 여기에 더 나아가서 트랜잭션 기능까지 추가를 하면 그게 Exactly-Once Semantics 라고 생각하면 좋을 것 같다.
Transaction Coordinator 사용
- 특별한 Transaction Log를 관리하는 Broker Thread
- 일련의 ID 번호 (Producer ID, Sequence Number, Transaction ID)를 할당하고 클라이언트가 이 정보를 메시지 Header에 포함하여 메시지를 고유하게 식별
- Sequence Number는 Broker가 중복된 메시지를 skip할 수 있게 한다.
관련 파라미터
Idempotent Producer
- Producer의 파라미터 중 enable.idempotence = true 설정
- Producer가 재시도를 하더라도, 메시지 중복을 방지
- 성능에 영향이 별로 없음
Transaction
- 각 Producer에 고유한 transaction.id를 설정
- Producer를 Transaction API를 사용하여 개발
- Consumer에서 isolation.level을 read_committed로 설정
Idempotent Producer 메시지 전송 프로세스
1. 각 Producer는 고유한 Producer ID를 사용하여 메시지 송신, 메시지는 Sequence Number와 고유한 Producer ID를 가지고 있다.

2. Broker의 메모리에 map 정보가 저장된다. map { Producer ID : Sequence Number } 를 저장. 이 map은 *.snapshot 파일로 저장된다.

3. Broker가 ack를 못 보낸 경우를 가정해보면,

4. Producer는 재시도를 수행한다. Producer는 ack를 받지 못했으므로, 동일한 메시지에 대한 재시도를 당연히 수행한다. 이 경우, enable.idempotence = true를 설정하지 않았다면, Broker의 메시지 중복 수신이 불가피하다.

5. 하지만, 중복 메시지를 방지하는 옵션을 설정했으므로, Broker는 DUP 응답을 리턴한다. 즉, Broker가 체크하여 메시지가 중복된 것을 확인하고 메시지를 저장하지 않고 Producer에게 DUP Response를 리턴한다.

Transaction
Transaction을 구현하기 위해 몇 가지 새로운 개념들이 도입한다.
- Transaction Coordinator: Consumer Group Coordinator와 비슷하게, 각 Producer에게는 Transaction Coordinator가 할당되며, PID 할당 및 Transaction 관리의 모든 로직을 수행한다.
- Transaction Log: 새로운 Internal Kafka Topic으로써, Consumer Offset Topic과 유사하게 모든 Transaction의 영구적이고 복제된 Record를 저장하는 Transaction Coordinator의 상태 저장소
- TransactionalId: Producer를 고유하게 식별하기 위해 사용되며, 동일한 TransactionalId를 가진 Producer의 다른 인스턴스들은 이전 인스턴스에 의해 만들어진 모든 Transaction을 재개 또는 중단 할 수 있다.
Transaction 관련 파라미터
Broker Configs
Parameter | 설명 | Default 값 |
transactional.id.expiration.ms | Transaction Coordinator가 Producer TransactionalId로부터 Transaction 상태 업데이트를 수신하지 않고 사전에 만료되기 전에 대기하는 최대 시간 (ms) | 604800000 (7 days) |
transaction.max.timeout.ms | - Transaction에 허용되는 최대 timeout 시간 - Client가 요청한 Transaction 시간이 이 시간을 초과하면 Broker는 InitPidRequest에서 InvalidTransactionTimeout 오류를 반환 - Producer가 Transaction에 포함된 Topic에서 읽는 Consumer를 지연시킬 수 있는 너무 큰 시간 초과를 방지 |
900000 (15 min) |
transation.state.log.replication.factor | Transaction State Topic의 Replication Factor | 3 |
transaction.state.log.num.partitions | Transaction State Topic의 Partition 개수 | 50 |
transaction.state.log.min.isr | Transaction State Topic의 min ISR 개수 | 2 |
transaction.state.log.segment.bytes | Transaction State Topic의 Segment 크기 | 104857600 bytes |
Producer Configs
Parameter | 설명 | Default 값 |
enable.idempotence | - 비활성화된 경우 Transaction 기능을 사용할 수 없음 - 활성화하고, acks = all, retries > 1, max.inflight.requests.per.connection = 1을 같이 사용해야 한다. |
false |
transaction.timeout.ms | - Transaction Coordinator가 진행중인 Transaction을 사전에 중단하기 전에 Producer의 Transaction 상태 업데이트를 기다리는 최대 시간 (ms) - 이 구성 값은 InitPidRequest와 함께 Transaction Coordinator에게 전송 - 이 값이 Broker의 max.transaction.timeout.ms 설정보다 크면 "InvalidTransactionTimeout" 오류와 함께 요청이 실패 |
60000 (60 sec) |
transactional.id | - Transaction 전달에 사용할 TransactionalId - 이를 통해 클라이언트는 새로운 Transaction을 시작하기 전에 동일한 TransactionalId를 사용하는 Transaction이 완료되었음을 보장할 수 있으므로 여러 Producer session에 걸쳐 있는 안정성 의미 체계를 사용할 수 있음 - TransactionalId가 비어있으면(default), Producer는 Idempotent Delivery 만으로 제한 - TransactionalId가 구성된 경우, 반드시 enable.idempotence를 활성화해야 함 |
없음 |
Consumer Configs
Parameter | 설명 | Default 값 |
isolation.level | - read_uncommitted: Offset 순서로 Commit된 메시지와 Commit되지 않은 메시지를 모두 사용 - read_committed: Non-Transaction 메시지 또는 Commit된 Transaction 메시지만 Offset 순서로 사용 |
read_uncommitted |
enable.auto.commit | - false: Consumer Offset에 대한 Auto Commit을 Off | true |
- Consumer가 중복해서 데이터 처리하는 것에 대해 보장하지 않으므로, Consumer의 중복 처리는 따로 로직을 작성해야 한다.
- 예를 들어, 메시지를 성공적으로 사용한 후 Kafka Consumer를 이전 Offset으로 되감으면 해당 Offset에서 최신 Offset까지 모든 메시지를 다시 수신하게 됨
Transaction Data Flow 관련 예제 소스 코드

- Consume하고 Produce하는 과정을 Transaction으로 처리
- initTransactions()으로 시작
- poll로 Source Topic에서 Record를 가져옴
- beginTransaction()으로 Transaction을 시작
- record로 비즈니스 로직 수행 후, 결과 record를 Target Topic으로 send()
- sendOffsetsToTransaction을 호출하여 consume한 Source Topic에 consumer offset을 commit
- commitTransaction 또는 abortTransaction으로 Transaction Commit 또는 Rollback 수행
Transaction Data Flow 처리 프로세스
위 코드의 일련의 흐름을 자세히 살펴보자.

- 1. Transactions Coordinator 찾기
- Producer가 initTransactions()를 호출하여 Broker에게 FindCoordinatorRequest를 보내서 Transaction Coordinator의 위치를 찾는다.
- Transaction Coordinator는 PID를 할당

- 2. Producer ID 얻기
- Producer가 Transaction Coordinator에게 InitPidRequest를 보내서 (TransactionalId를 전달) Producer의 PID를 가져온다.
- PID의 Epoch를 높여 Producer의 이전 Zombie 인스턴스가 차단되고 Transaction을 진행할 수 없도록 한다.
- 해당 PID에 대한 매핑이 2a 단계에서 Transaction Log에 기록된다.

- 3. Transaction 시작
- Producer가 beginTransactions()를 호출하여 새 Transaction의 시작을 알린다.
- Producer는 Transaction이 시작되었음을 나타내는 로컬 상태를 기록한다.
- 첫번째 Record가 전송될 때까지 Transaction Coordinator의 관점에서는 Transaction이 시작되지 않는다.

- 4.1. AddPartitionsToTxnRequest
- Producer는 Transaction의 일부로 새 TopicPartition이 처음 기록될 때 이 요청을 Transaction Coordinator에게 보낸다.
- 이 TopicPartition을 Transaction에 추가하면 Transaction Coordinator가 4.1a 단계에서 기록한다.
- Transaction에 추가된 첫번째 Partition인 경우 Transaction Coordinator는 Transaction Timer도 시작한다.

- 4.2 ProduceRequest
- Producer는 하나 이상의 ProduceRequests(Producer의 send()에서 시작됨)를 통해 User Topic Partitions에 메시지를 Write.
- 이러한 요청에는 4.2a에 표시된 대로 PID, Epoch, Sequence Number가 포함된다.

- 4.3. AddOffsetComitsToTxnRequest
- Producer에는 Consume되거나 Produce되는 메시지를 Batch 처리할 수 있는 sendOffsetsToTransaction()이 있다.
- sendOffsetsToTransaction 메서드는 groupId가 있는 AddOffsetCommitsToTxnRequests를 Transaction Coordinator에게 보낸다.
- 여기서 Transaction Coordinator는 내부 __consumer_offsets Topic에서 이 Consumer Group에 대한 TopicPartition을 추론한다.
- Transaction Coordinator는 4.3a 단계에서 Transaction Log에 이 Topic Partition의 추가를 기록한다.

- 4.4. TxnOffsetCommitRequest
- Producer는 __consumer_offsets Topic에서 Offset을 유지하기 위해 TxnOffsetCommitRequest를 Consumer Coordinator에게 보낸다.
- Consumer Coordinator는 전송되는 PID 및 Producer Epoch를 사용하여 Producer가 이 요청을 할 수 있는지(Zombie가 아닌지)를 확인한다.
- Transaction이 Commit 될 때까지 해당 Offset은 외부에서 볼 수 없다.

- 5.1. EndTxnRequest
- Producer는 Transaction을 완료하기 위해 commitTransaction() 또는 abortTransaction()을 호출한다.
- Producer는 Commit되거나 Abort되는지를 나타내는 데이터와 함께 Transaction Coordinator에게 EndTxnRequest를 보낸다.
- Transaction Log에 PREPARE_COMMIT 또는 PREPARE_ABORT 메시지를 write

- 5.2. WriteTxnMarkerRequest
- Transaction Coordinator가 Transaction에 포함된 각 TopicPartition의 Leader에게 이 요청을 보낸다.
- 이 요청을 받은 각 Broker는 COMMIT(PID) 또는 ABORT(PID) 제어 메시지를 로그에 기록한다.
- __consumer_offsets Topic에도 Commit 또는 Abort를 로그에 기록
- Consumer Coordinator는 Commit의 경우 이러한 오프셋을 구체화하거나 Abort의 경우 무시해야 한다는 알림을 받는다.

- 5.3. Writing the final Commit or Abort Message
- Transaction Coordinator는 Transaction이 완료되었음을 나타내는 최종 COMMITTED 또는 ABORTED를 Transaction Log에 기록한다.
- 이 시점에서 Transaction Log에 있는 Transaction과 관련된 대부분의 메시지를 제거할 수 있다.
- Timestamp와 함께 완료된 Transaction의 PID만 유지하면 되므로 결국 Producer에 대한 TransactionalId -> PID 매핑을 제거할 수 있다.
'Apache Kafka' 카테고리의 다른 글
p12. Kafka Log File (0) | 2025.03.16 |
---|---|
p11. Partition Assignment Strategy (0) | 2025.03.16 |
p10. Consumer Rebalance (0) | 2025.03.16 |
p9. Replica Recovery (0) | 2025.03.16 |
p8. Replica Failure (0) | 2025.03.15 |