728x90
반응형
SMALL

메시지를 전송하는 Delivery Semantics는 크게 세가지가 있다.

  • At-Most-Once Semantics (최대 한 번)
  • At-Least-Once Semantics (최소 한 번)
  • Exactly-Once Semantics (정확히 한 번)

At-Most-Once Semantics

  • 확인 시간이 초과되거나, 오류가 반환될 때 Producer가 재시도하지 않으면 메시지가 Kafka Topic에 기록되지 않아 Consumer에게 전달되지 않을 수 있다.
  • 중복 가능성을 피하기 위해 때때로 메시지가 전달되지 않을 수 있음을 허용
  • acks = 0, 1 인 경우

At-Least-Once Semantics

  • Producer가 Kafka Broker로부터 ack를 수신하고 acks=all이면 메시지가 Kafka Topic에 최소 한 번 작성되었음을 의미한다.
  • 그러나, ack가 시간 초과되거나 오류를 수신하면 메시지가 Kafka Topic에 기록되지 않았다고 판단하고 메시지 전송을 다시 시도할 수 있다.
  • Broker가 ack를 보내기 직전에 실패했지만 메시지가 Kafka Topic에 성공적으로 기록된 후에 이 재시도를 수행하면 메시지가 두 번 기록되어 최종 Consumer에게 두 번 이상 전달되어 중복 작업과 같은 잘못된 결과로 이어질 수 있다.
  • acks = all 인 경우

Exactly-Once Semantics

  • Producer가 메시지 전송을 다시 시도하더라도, 메시지가 최종 Consumer에게 정확히 한 번 전달된다.
  • 메시징 시스템 자체와 메시지를 생성하고 소비하는 애플리케이션 간의 협력이 반드시 필요하다.
  • 예를 들어, 메시지를 성공적으로 사용한 후 Kafka Consumer를 이전 Offset으로 되감으면 해당 Offset에서 최신 Offset까지 모든 메시지를 다시 수신하게 된다.

 

Exactly-Once Semantics (EOS)의 필요성

중복 메시지로 인한 중복 처리 방지를 위함이다. 데이터가 "정확히 한 번" 처리되도록 보장해야 하는 실시간 미션 크리티컬 스트리밍 애플리케이션에서 중요하다.

  • 클라이언트(Idempotent Producer)에서 생성되는 중복 메시지 방지
  • Transaction 기능을 사용하여, 하나의 트랜잭션 내의 모든 메시지가 모두 Write 되었는지 또는 전혀 Write 되지 않았는지 확인
  • 사용 사례: 금융 거래 처리(송금, 카드 결제 등), 과금 정산을 위한 광고 조회수 추적

 

이전 포스팅에서 메시지를 중복으로 보내지 않게 하기 위해 반드시 처리해야 하는 것이 acks = all, enable.idempotence = true 옵션이었다. 여기에 더 나아가서 트랜잭션 기능까지 추가를 하면 그게 Exactly-Once Semantics 라고 생각하면 좋을 것 같다. 

 

Transaction Coordinator 사용

  • 특별한 Transaction Log를 관리하는 Broker Thread
  • 일련의 ID 번호 (Producer ID, Sequence Number, Transaction ID)를 할당하고 클라이언트가 이 정보를 메시지 Header에 포함하여 메시지를 고유하게 식별
  • Sequence Number는 Broker가 중복된 메시지를 skip할 수 있게 한다.

관련 파라미터

Idempotent Producer

  • Producer의 파라미터 중 enable.idempotence = true 설정
  • Producer가 재시도를 하더라도, 메시지 중복을 방지
  • 성능에 영향이 별로 없음

Transaction

  • 각 Producer에 고유한 transaction.id를 설정
  • Producer를 Transaction API를 사용하여 개발
  • Consumer에서 isolation.levelread_committed로 설정

 

Idempotent Producer 메시지 전송 프로세스

1. 각 Producer는 고유한 Producer ID를 사용하여 메시지 송신, 메시지는 Sequence Number와 고유한 Producer ID를 가지고 있다.

 

2. Broker의 메모리에 map 정보가 저장된다. map { Producer ID : Sequence Number } 를 저장. 이 map은 *.snapshot 파일로 저장된다.

 

3. Broker가 ack를 못 보낸 경우를 가정해보면,

 

4. Producer는 재시도를 수행한다. Producer는 ack를 받지 못했으므로, 동일한 메시지에 대한 재시도를 당연히 수행한다. 이 경우, enable.idempotence = true를 설정하지 않았다면, Broker의 메시지 중복 수신이 불가피하다.

 

5. 하지만, 중복 메시지를 방지하는 옵션을 설정했으므로, Broker는 DUP 응답을 리턴한다. 즉, Broker가 체크하여 메시지가 중복된 것을 확인하고 메시지를 저장하지 않고 Producer에게 DUP Response를 리턴한다.

 

Transaction

Transaction을 구현하기 위해 몇 가지 새로운 개념들이 도입한다.

  • Transaction Coordinator: Consumer Group Coordinator와 비슷하게, 각 Producer에게는 Transaction Coordinator가 할당되며, PID 할당 및 Transaction 관리의 모든 로직을 수행한다.
  • Transaction Log: 새로운 Internal Kafka Topic으로써, Consumer Offset Topic과 유사하게 모든 Transaction의 영구적이고 복제된 Record를 저장하는 Transaction Coordinator의 상태 저장소
  • TransactionalId: Producer를 고유하게 식별하기 위해 사용되며, 동일한 TransactionalId를 가진 Producer의 다른 인스턴스들은 이전 인스턴스에 의해 만들어진 모든 Transaction을 재개 또는 중단 할 수 있다.

Transaction 관련 파라미터

Broker Configs

Parameter 설명 Default 값
transactional.id.expiration.ms Transaction Coordinator가 Producer TransactionalId로부터 Transaction 상태 업데이트를 수신하지 않고 사전에 만료되기 전에 대기하는 최대 시간 (ms) 604800000 (7 days)
transaction.max.timeout.ms - Transaction에 허용되는 최대 timeout 시간
- Client가 요청한 Transaction 시간이 이 시간을 초과하면 Broker는 InitPidRequest에서 InvalidTransactionTimeout 오류를 반환
- Producer가 Transaction에 포함된 Topic에서 읽는 Consumer를 지연시킬 수 있는 너무 큰 시간 초과를 방지
900000 (15 min)
transation.state.log.replication.factor Transaction State Topic의 Replication Factor 3
transaction.state.log.num.partitions Transaction State Topic의 Partition 개수 50
transaction.state.log.min.isr Transaction State Topic의 min ISR 개수 2
transaction.state.log.segment.bytes Transaction State Topic의 Segment 크기 104857600 bytes

 

Producer Configs

Parameter 설명 Default 값
enable.idempotence - 비활성화된 경우 Transaction 기능을 사용할 수 없음
- 활성화하고, acks = all, retries > 1, max.inflight.requests.per.connection = 1을 같이 사용해야 한다.
false
transaction.timeout.ms - Transaction Coordinator가 진행중인 Transaction을 사전에 중단하기 전에 Producer의 Transaction 상태 업데이트를 기다리는 최대 시간 (ms)
- 이 구성 값은 InitPidRequest와 함께 Transaction Coordinator에게 전송
- 이 값이 Broker의 max.transaction.timeout.ms 설정보다 크면 "InvalidTransactionTimeout" 오류와 함께 요청이 실패
60000 (60 sec)
transactional.id - Transaction 전달에 사용할 TransactionalId
- 이를 통해 클라이언트는 새로운 Transaction을 시작하기 전에 동일한 TransactionalId를 사용하는 Transaction이 완료되었음을 보장할 수 있으므로 여러 Producer session에 걸쳐 있는 안정성 의미 체계를 사용할 수 있음
- TransactionalId가 비어있으면(default), Producer는 Idempotent Delivery 만으로 제한
- TransactionalId가 구성된 경우, 반드시 enable.idempotence를 활성화해야 함
없음

 

Consumer Configs

Parameter 설명 Default 값
isolation.level - read_uncommitted: Offset 순서로 Commit된 메시지와 Commit되지 않은 메시지를 모두 사용
- read_committed: Non-Transaction 메시지 또는 Commit된 Transaction 메시지만 Offset 순서로 사용
read_uncommitted
enable.auto.commit - false: Consumer Offset에 대한 Auto Commit을 Off true
  • Consumer가 중복해서 데이터 처리하는 것에 대해 보장하지 않으므로, Consumer의 중복 처리는 따로 로직을 작성해야 한다.
  • 예를 들어, 메시지를 성공적으로 사용한 후 Kafka Consumer를 이전 Offset으로 되감으면 해당 Offset에서 최신 Offset까지 모든 메시지를 다시 수신하게 됨

 

Transaction Data Flow 관련 예제 소스 코드

  • Consume하고 Produce하는 과정을 Transaction으로 처리
  • initTransactions()으로 시작
  • poll로 Source Topic에서 Record를 가져옴
  • beginTransaction()으로 Transaction을 시작
  • record로 비즈니스 로직 수행 후, 결과 record를 Target Topic으로 send()
  • sendOffsetsToTransaction을 호출하여 consume한 Source Topic에 consumer offset을 commit
  • commitTransaction 또는 abortTransaction으로 Transaction Commit 또는 Rollback 수행

Transaction Data Flow 처리 프로세스

위 코드의 일련의 흐름을 자세히 살펴보자.

 

  • 1. Transactions Coordinator 찾기
  • Producer가 initTransactions()를 호출하여 Broker에게 FindCoordinatorRequest를 보내서 Transaction Coordinator의 위치를 찾는다.
  • Transaction Coordinator는 PID를 할당

  • 2. Producer ID 얻기
  • Producer가 Transaction Coordinator에게 InitPidRequest를 보내서 (TransactionalId를 전달) Producer의 PID를 가져온다.
  • PID의 Epoch를 높여 Producer의 이전 Zombie 인스턴스가 차단되고 Transaction을 진행할 수 없도록 한다.
  • 해당 PID에 대한 매핑이 2a 단계에서 Transaction Log에 기록된다.

  • 3. Transaction 시작
  • Producer가 beginTransactions()를 호출하여 새 Transaction의 시작을 알린다.
  • Producer는 Transaction이 시작되었음을 나타내는 로컬 상태를 기록한다.
  • 첫번째 Record가 전송될 때까지 Transaction Coordinator의 관점에서는 Transaction이 시작되지 않는다.

  • 4.1. AddPartitionsToTxnRequest
  • Producer는 Transaction의 일부로 새 TopicPartition이 처음 기록될 때 이 요청을 Transaction Coordinator에게 보낸다.
  • 이 TopicPartition을 Transaction에 추가하면 Transaction Coordinator가 4.1a 단계에서 기록한다.
  • Transaction에 추가된 첫번째 Partition인 경우 Transaction Coordinator는 Transaction Timer도 시작한다.

  • 4.2 ProduceRequest
  • Producer는 하나 이상의 ProduceRequests(Producer의 send()에서 시작됨)를 통해 User Topic Partitions에 메시지를 Write.
  • 이러한 요청에는 4.2a에 표시된 대로 PID, Epoch, Sequence Number가 포함된다.

  • 4.3. AddOffsetComitsToTxnRequest
  • Producer에는 Consume되거나 Produce되는 메시지를 Batch 처리할 수 있는 sendOffsetsToTransaction()이 있다.
  • sendOffsetsToTransaction 메서드는 groupId가 있는 AddOffsetCommitsToTxnRequests를 Transaction Coordinator에게 보낸다.
  • 여기서 Transaction Coordinator는 내부 __consumer_offsets Topic에서 이 Consumer Group에 대한 TopicPartition을 추론한다.
  • Transaction Coordinator는 4.3a 단계에서 Transaction Log에 이 Topic Partition의 추가를 기록한다.

  • 4.4. TxnOffsetCommitRequest
  • Producer는 __consumer_offsets Topic에서 Offset을 유지하기 위해 TxnOffsetCommitRequest를 Consumer Coordinator에게 보낸다.
  • Consumer Coordinator는 전송되는 PID 및 Producer Epoch를 사용하여 Producer가 이 요청을 할 수 있는지(Zombie가 아닌지)를 확인한다.
  • Transaction이 Commit 될 때까지 해당 Offset은 외부에서 볼 수 없다.

  • 5.1. EndTxnRequest
  • Producer는 Transaction을 완료하기 위해 commitTransaction() 또는 abortTransaction()을 호출한다.
  • Producer는 Commit되거나 Abort되는지를 나타내는 데이터와 함께 Transaction Coordinator에게 EndTxnRequest를 보낸다.
  • Transaction Log에 PREPARE_COMMIT 또는 PREPARE_ABORT 메시지를 write

  • 5.2. WriteTxnMarkerRequest
  • Transaction Coordinator가 Transaction에 포함된 각 TopicPartition의 Leader에게 이 요청을 보낸다.
  • 이 요청을 받은 각 Broker는 COMMIT(PID) 또는 ABORT(PID) 제어 메시지를 로그에 기록한다.
  • __consumer_offsets Topic에도 Commit 또는 Abort를 로그에 기록
  • Consumer Coordinator는 Commit의 경우 이러한 오프셋을 구체화하거나 Abort의 경우 무시해야 한다는 알림을 받는다.

  • 5.3. Writing the final Commit or Abort Message
  • Transaction Coordinator는 Transaction이 완료되었음을 나타내는 최종 COMMITTED 또는 ABORTED를 Transaction Log에 기록한다.
  • 이 시점에서 Transaction Log에 있는 Transaction과 관련된 대부분의 메시지를 제거할 수 있다.
  • Timestamp와 함께 완료된 Transaction의 PID만 유지하면 되므로 결국 Producer에 대한 TransactionalId -> PID 매핑을 제거할 수 있다. 

 

728x90
반응형
LIST

'Apache Kafka' 카테고리의 다른 글

p12. Kafka Log File  (0) 2025.03.16
p11. Partition Assignment Strategy  (0) 2025.03.16
p10. Consumer Rebalance  (0) 2025.03.16
p9. Replica Recovery  (0) 2025.03.16
p8. Replica Failure  (0) 2025.03.15
728x90
반응형
SMALL

Partition은 Broker들에 분산되며, 각 Partition은 Segment File들로 구성된다고 했다.

  • Segment File은 정해진 기준에 의해 Rolling되는데, 그 전략은 다음과 같다.
  • Rolling Strategy: log.segment.bytes (default : 1GB), log.roll.hours (default 168 hours)

 

Kafka Log Segment File Directory

Segment File은 Log Segment File이라고 부르기도 하며, Data File이라고 부르기도 한다. 이 파일들이 어디에 저장되는지는 각 Broker의 server.properties 파일 안에 log.dirs 파라미터로 정의한다. 아래와 같이 말이다. (Comma로 구분하여 여러 디렉토리 지정이 가능하다)

log.dirs=/data/kafka/kafka-log-a,/data/kafka/kafka-log-b,/data/kafka/kafka-log-c

 

각 Topic과 그 Partition은 log.dirs 아래에 하위 디렉토리로 구성된다. 예를 들어, test_topic의 Partition 0은 /data/kafka/kafka-log-a/test_topic-0 디렉토리로 생성된다.

 

Partition 디렉토리 안의 Log File들

파일명은 각자 의미를 가지는데, 다음과 같은 파일들이 있다고 해보자.

test_topic의 Partition 0 디렉토리

$ ls /data/kafka/kafka-log-a/test_topic-0
00000000000000123453.index
00000000000000123453.timeindex
00000000000000123453.log
00000000000007735204.index
00000000000007735204.timeindex
00000000000007735204.log
leader-epoch-checkpoint
  • 00000000000000123453.* 파일은 00000000000000123453 offset부터 시작한다는 의미이다.
  • 00000000000007735204.* 파일은 00000000000007735204 offset부터 시작한다는 의미이다.
  • 그러니까, 00000000000000123453 이 파일은 00000000000000123453 offset 부터 00000000000007735203 offset 까지의 메시지를 저장 및 관리하고 있다.

Partition 디렉토리에 생성되는 File Types는 최소 4가지이다.

  • Log Segment File - 메시지와 metadata를 저장 : .log
  • Index File - 각 메시지의 Offset을 Log Segment 파일의 Byte 위치에 매핑 : .index
  • Time-based Index File - 각 메시지의 timestamp를 기반으로 메시지를 검색하는 데 사용 : .timeindex
  • Leader Epoch Checkpoint File - Leader Epoch와 관련 Offset 정보를 저장 : leader-epoch-checkpoint

특별한 Producer 파라미터를 사용하면 Partition 디렉토리에 생기는 File Types

  • Idempotent Producer를 사용하면 : .snapshot
  • Transactional Producer를 사용하면 : .txnindex

 

Log Segment File의 특징

이 부분은 이미 알고 있는 내용이다. 

  • 첫번째로 저장되는 메시지의 Offset이 파일명이 된다.
  • Partition은 하나 이상의 Segment File로 구성된다.

Log Segment File을 Rolling 하는 기준을 적용하는 여러 파라미터가 있다.

아래의 파라미터 중 하나라도 해당되면 새로운 Segment File로 Rolling

  • log.segment.bytes (default : 1GB)
  • log.roll.ms (default : 168 시간)
  • log.index.size.max.bytes (default : 10 MB)

그리고, 내부적으로 관리하는 Topic인 __consumer_offsets 의 Segment File Rolling 파라미터는 별도로 존재한다.

  • offsets.topic.segment.bytes (default : 100 MB)

 

Checkpoint File

  • 각 Broker에는 2개의 Checkpoint File이 존재한다.
  • log.dirs 디렉토리에 위치한다.
  • replication-offset-checkpoint : 마지막으로 Commit된 메시지의 ID인 High Water Mark. 시작 시 Follower가 이를 사용하여 Commit되지 않은 메시지를 Truncate한다.
  • recovery-point-offset-checkpoint : 데이터가 디스크로 flush된 지점. 복구 중 Broker는 이 시점 이후의 메시지가 손실되었는지 여부를 확인할 수 있다.

 

 

728x90
반응형
LIST

'Apache Kafka' 카테고리의 다른 글

p13. Exactly Once Semantics(EOS)  (0) 2025.03.23
p11. Partition Assignment Strategy  (0) 2025.03.16
p10. Consumer Rebalance  (0) 2025.03.16
p9. Replica Recovery  (0) 2025.03.16
p8. Replica Failure  (0) 2025.03.15
728x90
반응형
SMALL

Partition을 Consumer에게 할당하는 전략들

  • RangeAssignor : Topic 별로 작동하는 Default Assignor
  • RoundRobinAssignor : Round Robin 방식으로 Consumer에게 Partition 할당
  • StickyAssignor : 최대한 많은 기존 Partition 할당을 유지하면서 최대 균형을 이루는 할당을 보장
  • CooperativeStickyAssignor : 동일한 StickyAssignor 논리를 따르지만 협력적인 Rebalance를 허용
  • ConsumerPartitionAssignor : 인터페이스를 구현하면 사용자 지정 할당 전략을 사용할 수 있다.

 

RangeAssignor

  • partition.assignment.strategy 파라미터의 기본 Assignor이다.
  • 동일한 Key를 가지고 있는 메시지들에 대한 Topic들 간에 "co-partitioning"하기 유리

  • 0번은 0번으로, 1번은 1번으로 간다고 생각하면 된다.

 

RoundRobinAssignor

  • Range 방식보다는 효율적으로 분배하여 할당, 왜냐하면 위 기본 Assignor는 남는 Consumer가 있을 수가 있다.
  • Round Robin 방식으로 Partition들과 Consumer들을 분배하여 할당
  • Reassign(재할당) 후 Consumer가 동일한 Partition을 유지한다고 보장하지 않는다. 예를 들어, Consumer 0이 지금 Topic 0의 Partition 0에 할당되어 있다고 가정할 때, 재할당이 발생하면 Topic 0의 Partition 0이 다른 Consumer에게 할당될 수 있다.

 

근데 이 방식은 할당 불균형이 발생할 가능성이 존재한다. 

  • Consumer간 Subscribe 해오는 Topic이 다른 경우, 할당 불균형이 발생할 가능성이 있다.
  • 3개의 Consumer C0, C1, C2와 3개의 Topic T0, T1, T2가 있다.
  • T0은 Partition 1개, T1은 Partition 2개, T2는 Partition 3개가 있다.
  • C0은 T0만, C1은 T0과 T1만, C2는 T0, T1, T2를 Subscribe 한다고 가정하면 위 그림과 같은 현상이 일어난다.
  • 라운드 로빈 방식으로 순서대로 Consumer에 Partition이 할당되긴 하지만 T2를 Subscribe하는 Consumer는 C2밖에 없기 때문에 저쪽에 쏠리게 된다.

 

StickyAssignor

  • 가능한 한 균형적으로 할당을 보장한다.
  • 특정 ConsumerA가 다른 ConsumerB에 비해 2개 이상 더 적은 Topic Partition이 할당된 경우, A에 할당된 Topic의 나머지 Partition들은 B에 할당될 수 없다.
  • 재할당이 발생했을 때, 기존 할당을 최대한 많이 보존하여 유지. 따라서, Rebalancing 오버헤드를 줄임

  • Round Robin 방식과 비슷하지만 다른 방식으로 동작하는데, 
  • 3개의 Consumer C0, C1, C2와 4개의 Topic T0, T1, T2, T3가 있다고 하자.
  • T0, T1, T2, T3는 모두 Partition 2개가 있다고 하자.
  • C0, C1, C2 모두 T0, T1, T2, T3를 Subscribe 한다고 하자.
  • 위 그림을 보면 얼핏 라운드 로빈 방식과 다를게 없어 보인다.

 

만약, 여기서 C1이 제거되어 재할당하면 어떻게 될까?

  • 라운드 로빈의 경우 전체를 다시 재할당한다. (왼쪽)
  • Sticky Assignor의 경우, 기존에 되어 있던 할당들은 유지하고 C1이 가지고 있던 Partition만 재할당한다. (오른쪽)

 

Round Robin에서 발생할 수 있었던 할당 불균형이 발생할 가능성을 줄인다는 건 어떤걸까?

  • Round Robin 방식에서 설명했던 할당 불균형이 발생했던 시나리오는 다음과 같다.
  • 3개의 Consumer C0, C1, C2와 3개의 Topic T0, T1, T2가 있다.
  • T0은 Partition 1개, T1은 Partition 2개, T2는 Partition 3개가 있다.
  • C0은 T0만, C1은 T0, T1만, C2는 T0, T1, T2를 Subscribe한다.
  • 따라서, C2만 많은 Partition이 몰리게 된다.

근데 이 문제를 Sticky는 이렇게 해결한다.

  • C2가 T1-P1, T2-P0, T2-P1, T2-P2 4개를 할당하고 있으니, T1을 할당할 수 있는 C1, C2 중에서 C2말고 C1에게 T1-P1을 할당시키는 것이다. 
  • 기본적으로 라운드 로빈으로 할당을 하는데, 그렇게 할당하고 나서 보니 특정 Consumer가 너무 많은 할당을 받으면 부담을 덜 수 있게 할당 받을 수 있는 Consumer를 찾아서 불균형을 해소한다.

 

CooperativeStickyAssignor

이 내용을 설명하기 앞서, 시간 흐름에 따른 Consumer Rebalancing 과정을 먼저 알아보기로 한다.

  • 1. Consumer들이 JoinGroup 요청을 Group Coordinator에 보내면서 리밸런싱이 시작된다.
  • 2. JoinGroup의 응답이 Consumer들에 전송된다. (Group Leader는 Consumer들 정보를 수신)
  • 3. 모든 구성원은 Broker에 SyncGroup 요청을 보내야 한다. (Group Leader는 각 Consumer의 Partition 할당을 계산해서 Group Coordinator에게 전송)
  • 4. Broker는 SyncGroup 응답에서 각 Consumer 별 Partition 할당을 보낸다.

Eager Rebalancing 프로토콜

지금까지 사용됐던 방식이고, 이 프로토콜은 최대한 단순하게 유지하기 위해 만들어졌다.

  • 각 구성원은 JoinGroup 요청을 보내고 재조정에 참여하기 전에 소유한 모든 Partition을 취소해야 한다. 재조정을 하기 위해 취소하고 다시 할당을 받는건데, 이때 원래 소유하던 Partition을 다시 할당받을 수도 있고 아닐수도 있다.
  • 안전면에서는 좋지만, 이 경우에 재조정 기간동안 작업을 할 수 없다는 단점이 있다. 

그런데, 생각해보면 재조정 시 Revoke할 Partition만 Revoke하면 되지 않을까? 

이 문제를 해결하기 위해 나온 것이 Cooperative Rebalancing Protocol이다.

 

이상적인 Consumer Rebalancing 프로토콜

  • Consumer A, B가 Consume 하고 있는 상태에서 처리량을 높이기 위해 Consumer C를 추가한다고 가정해보자.
  • Consumer A에 할당된 Partition 중 하나만 Consumer C로 이동하는 것이 가장 이상적일 것이다.
  • 전체 재조정동안 모두 정지 상태로 있는 대신, Consumer A의 하나의 Partition을 취소하는 동안만 해당 Partition으로 데이터 처리하는 작업만 가동 중지하는 것이다.

  • 보면, Consumer A의 3번만 떼어내서, Consumer C로 이동시키는 것이다.
  • 3번을 revoke하면서 재조정이 필요해지고, 모든 Consumer 들이 다시 Group Coordinator에 JoinGroup 요청을 보내고 응답을 받고 ... 이 과정을 하면서 3번만 그 시간동안 중지하고 Consumer C에 붙은 후부터 다시 3번도 동작하도록 말이다.

그런데 이 과정에 문제가 하나 있는데, 바로 Consumer는 자신의 Partition 중 어느것이 다른곳으로 재할당되어야 하는지 알지 못한다는 것이다. 이 부분은 어떻게 해결할 수 있을까?

CooperativeStickyAssignor는 Rebalancing을 2회에 걸쳐 수행한다.

  • 첫번째 rebalance 과정에서는, 모든 Consumer들이 JoinGroup 요청을 보내면서 시작하지만, 소유한 모든 Partition을 보유하고 그 정보를 Group Coordinator에게 보낸다. 즉, 계속 처리할 수 있는 상태로 진행한다는 것이다.
  • 그럼 Group Coordinator가 JoinGroup 요청을 받고 응답을 준 다음에 SyncGroup 요청을 받고 SyncGroup 요청에 대한 응답을 보낼 때, 어떤 Partition을 떼어내야 할 지 해당 Consumer에게 알려준다!
  • 여기까지가 1st rebalance 과정이다.
  • 그 다음 어떤 걸 떼어내야 하는지 알게됐으니 해당 Partition을 떼어내고 다시 JoinGroup 요청을 보낸다. 그리고 쭉 과정이 진행되면서 3번 Partition이 Consumer C로 이전되는 것이다.
  • 자 이 과정에서 Consumer A는 3번만 중간에 잠깐 떼어내고 나머지들은 작업을 계속 진행할 수 있었다. 이렇게 나이스한 Rebalancing 작업을 수행할 수 있게 해주는 것이 CooperativeStickyAssignor이다.

 

정리를 하자면

  • CooperativeStickyAssignor는 2회에 걸쳐 Rebalancing을 진행한다.
  • 빈번하게 Rebalancing 되는 상황이거나, 스케일 인/아웃으로 인한 다운타임이 우려가 된다면, CooperativeStickyAssignor를 사용하는 것을 권장한다.

 

728x90
반응형
LIST

'Apache Kafka' 카테고리의 다른 글

p13. Exactly Once Semantics(EOS)  (0) 2025.03.23
p12. Kafka Log File  (0) 2025.03.16
p10. Consumer Rebalance  (0) 2025.03.16
p9. Replica Recovery  (0) 2025.03.16
p8. Replica Failure  (0) 2025.03.15
728x90
반응형
SMALL

Consumer의 동작 방식

  • Consumer는 메시지를 가져오기 위해서 Partition에 연속적으로 poll 한다.
  • 가져온 위치를 나타내는 offset 정보를 __consumer_offsets Topic에 저장하여 관리한다.

 

Consumer Group Id

  • 동일한 group.id로 구성된 모든 Consumer들은 하나의 Consumer Group을 형성한다. 
  • Consumer Group의 Consumer들은 작업량을 어느 정도 균등하게 분할한다.
  • 동일한 Topic에서 consume하는 여러 Consumer Group이 있을 수 있다.

 

Partition Assignment

Partition을 Consumer에게 Assign할 때,

  • 하나의 Partition은 지정된 Consumer Group 내의 하나의 Consumer만 사용
  • 동일한 Key를 가진 메시지는 동일한 Consumer가 사용 (Partition의 수를 변경하지 않는 한)
  • Consumer의 설정 파라미터 중에서 partition.assignment.strategy 로 할당 방식 조정
  • Consumer Group은 Group Coordinator라는 프로세스에 의해 관리 된다.

 

Consumer Group Coordination

  • Group Coordinator (하나의 Broker)와 Group Leader(하나의 Consumer)가 상호작용

 

 

Consumer Group을 등록하고 Consumer 전체에 Partition을 할당하는 프로세스

1. Consumer 등록 및 Group Coordinator 선택

  • 각 Consumer는 group.id로 Kafka 클러스터에 자신을 등록
  • Kafka는 Consumer Group을 만들고 Consumer의 모든 Offset은 __consumer_offsets Topic의 하나의 Partition에 저장
    • hash(group.id) % offsets.topic.num.partitions 수식을 사용하여 group.id가 저장될 __consumer_offsets의 Partition을 결정
  • 이 Partition의 Leader Broker는 Consumer Group의 Group Coordinator로 선택 (여기서는 Broker 102에 Leader Partition이 있고 따라서 Group Coordinator로 선택됐다고 가정)

2. JoinGroup 요청 순서에 따라 Consumer 나열

  • Group Coordinator는 Group의 Consumers 카탈로그를 생성하기 전에 Consumers의 JoinGroup 요청에 대해 group.initial.rebalance.delay.ms (기본값 3초)를 대기
  • Consumer들이 Consume할 최대 Partition 수까지 JoinGroup 요청을 수신하는 순서대로 Consumer를 나열

3. Group Leader 결정 및 Partition 할당

  • JoinGroup 요청을 보내는 최초 Consumer는 Group Leader로 지정되며, Group Coordinator로부터 Consumer 목록을 받음.
  • Group Leader는 구성된 partition.assignment.strategy를 사용하여 각 Consumer에게 Partition을 할당
  • 위 그림에서는 Partition보다 더 많은 Consumer가 있으므로, 각 Consumer는 Consume할 Partition이 최대 1개 있다.

4. Consumer -> Partition 매핑 정보를 Group Coordinator에게 전송

  • Group Leader는 Consumer -> Partition 매핑 정보를 Group Coordinator에게 다시 보냄.
  • Group Coordinator는 매핑정보를 메모리에 캐시하고 보관

5. 각 Consumer에게 할당된 Partition 정보를 보냄

  • Group Coordinator는 각 Consumer에게 할당된 Partition 정보를 보냄
  • 각 Consumer는 할당된 Partition에서 Consume을 시작
왜 Group Coordinator(Broker)가 직접 Partition을 할당하지 않는가?
- Kafka의 한가지 원칙은 가능한 한 많은 계산을 클라이언트에 수행하도록 하여, Broker의 부담을 줄이는 것
- 많은 Consumer Group과 Consumer들이 있고 Broker 혼자서 Rebalance를 위한 계산을 한다고 생각해 보면 Broker에 엄청난 부담이 간다.

 

Consumer Rebalancing Trigger

Rebalancing Trigger

  • Consumer가 Consumer Group에서 탈퇴
  • 신규 Consumer가 Consumer Group에 합류
  • Consumer가 Topic 구독을 변경
  • Consumer Group이 Topic 메타데이터의 변경 사항을 인지 (예: Partition 증가)

Rebalancing Process

  • Group Coordinator는 heartbeats의 플래그를 사용하여 Consumer에게 Rebalance 신호를 보냄
  • Consumer가 일시 중지하고 Offset을 Commit
  • Consumer는 Consumer Group의 새로운 Generation에 다시 합류
  • Partition 재할당
  • Consumer는 새 Partition에서 다시 Consume을 시작

위 과정을 살펴보면, Consumer Rebalancing 시, Consumer 들은 메시지를 Consume하지 못한다. 따라서 불필요한 Rebalancing은 반드시 피해야 한다.

 

Consumer Heartbeats
- Consumer 장애를 인지하기 위함.Consumer는 poll()과 별도로 백그라운드 Thread에서 heartbeats를 보냄 (heartbeat.interval.ms 기본값 3초)
- session.timeout.ms (기본값 10초) 동안 heartbeats가 수신되지 않으면 해당 Consumer는 Consumer Group에서 삭제
- poll()은 heartbeats와 상관없이 주기적으로 호출된다. (max.poll.interval.ms 기본값 5분)

 

과도한 Rebalancing을 피하는 방법

Consumer Group 멤버 고정

  • Group의 각 Consumer에게 고유한 group.instance.id 를 할당
  • Consumer는 LeaveGroupRequest를 사용하지 않아야 한다. LeaveGroupRequest는 Consumer가 빠질때 "나 빠져요~"하고 알려주는 건데 이걸 하면 해당 group의 고유한 Consumer의 group.instance.id가 날라가게 된다.
  • Rejoin은 알려진 group.instance.id에 대한 Rebalance를 trigger하지 않는다.

session.timeout.ms 튜닝

  • heartbeat.interval.ms를 session.timeout.ms의 1/3로 설정
  • group.min.session.timeout.ms(default : 6 sec)와 group.max.session.timeout.ms(default : 5 min)의 사이값으로 설정
  • 장점: Consumer가 Rejoin할 수 있는 더 많은 시간을 제공
  • 단점: Consumer 장애를 감지하는 데 시간이 더 오래걸림

max.poll.interval.ms 튜닝

  • Consumer에게 poll()한 데이터를 처리할 수 있는 충분한 시간 제공
  • 너무 크게 해도 안된다.

 

 

728x90
반응형
LIST

'Apache Kafka' 카테고리의 다른 글

p12. Kafka Log File  (0) 2025.03.16
p11. Partition Assignment Strategy  (0) 2025.03.16
p9. Replica Recovery  (0) 2025.03.16
p8. Replica Failure  (0) 2025.03.15
p7. Acks, Batch, Page Cache, Flush  (0) 2025.03.15
728x90
반응형
SMALL

이번 포스팅은 복제본들을 복구하는 과정을 설명하는 내용이다. 아래와 같은 예시를 들어보자.

  • 3개의 Replica로 구성된 하나의 Partition
  • Producer가 4개의 메시지 (M1, M2, M3, M4)를 보낸 상황

  • Producer는 acks = all, retries = MAX_INT, enable_idempotence = false 옵션을 가지고 있다.
  • 메시지 M1, M2가 ISR 리스트 전체에 복제되었고 Commit(High Water Mark)된 상태이다. 
  • Follower Y는 M3를 복제했지만, Commit은 아직 못한 상태
  • Follower Z는 M2까지만 복제했다.

 

이 상태에서 Leader X를 포함한 Broker가 장애가 나면? 새로운 Leader가 선출된다.

  • Controller Broker가 Y를 Leader로 재선출했다고 가정하자.
  • 현재 상태는 M3, M4가 Commit되기 전에 Leader X에서 장애가 발생한 상태이다.
  • Y가 Leader로 선출되고, Leader Epoch가 0에서 1로 증가됐다.
  • Z는 Y로부터 이제 fetch를 하는데 Y에 M3가 있으니 M3를 fetch한다.
  • Y는 High Water Mark를 진행한다.
  • Z는 fetch를 다시 수행하고 High Water Mark를 수신하고 진행한다.
  • 그런데, M3는 Commit되기 전에 Leader X가 고장났는데 어떻게 Y는 M3를 가지고 있을 수 있을까? 
    • 카프카는 기본적으로 Leader의 데이터를 지우지 않는다. 비록 Y가 Follower일때 M3를 복제한 상태이고 커밋을 하지는 못했더라도 이 Y가 Leader가 된 상태라면 데이터를 지우지 않는다.
    • Leader Epoch가 일어나면, 새로운 Leader의 시대가 열렸다고 간주하고, 해당 리더의 데이터부터 시작한다.

 

그렇지만, acks = all인 상태에서 Leader X는 Producer에게 M3, M4에 대한 ack를 보내지 못한 채 죽어버렸다. 그러면 Producer는 재시도에 의해 M3, M4를 다시 보내게 된다. 누구한테? 새로운 Leader Y에게.

  • 이제 새로운 Leader Y에게 Producer는 M3, M4를 다시 보내게 된다. M3는 새로운 Leader Y가 이미 복제를 한 상태이기 때문에 중복으로 저장된다.
  • 만약, enable.idempotence 옵션이 true라면 M3를 이미 보냈다는 것을 알기 때문에 중복 저장하지 않게 된다. 

enable.idempotence

enable.idempotence 옵션은 Kafka 프로듀서에서 중복 메시지 전송을 방지하는 기능을 활성화하는 설정이다.이 옵션을 활성화하면 Kafka 프로듀서는 Producer ID (PID)와 Sequence Number를 사용하여 메시지를 추적한다.
즉, 동일한 프로듀서에서 동일한 시퀀스 번호를 가진 메시지가 중복으로 전송되면, 브로커가 이를 감지하고 중복 메시지를 무시한다.

효과

중복 방지: 같은 메시지가 여러 번 전송되더라도 Kafka가 자동으로 중복을 제거
메시지 순서 보장: 동일한 파티션에 전송되는 메시지의 순서를 유지
ACK 설정 제한: acks=all로 자동 설정됨 (데이터 손실 방지)
max.in.flight.requests.per.connection ≤ 5 제한됨 (순서 보장을 위해)

어떻게 중복을 감지하는가?

  1. Producer는 각 메시지를 보낼 때 고유한 (PID, Sequence Number)를 함께 보냄.
    • PID(Producer ID): 프로듀서가 재시작하지 않는 한 고유한 값.
    • Sequence Number: 같은 PID를 가진 메시지들에 대해 연속적으로 증가하는 숫자.
  2. Kafka의 Broker(Leader)는 (PID, Last Sequence Number)를 저장함.
    • 즉, 특정 Producer의 마지막으로 받은 메시지의 Sequence Number를 기억하고 있음.
  3. 새로운 Leader(Y)가 선출된 후에도, 이전 Leader(X)의 마지막 Sequence Number를 유지함.
    • enable.idempotence=false일 때는 Leader가 단순히 메시지를 받아들이지만,
      enable.idempotence=true이면 이미 처리한 Sequence Number 이후의 메시지만 수락함.
  4. Producer가 M3(재전송)를 보낼 때, Leader(Y)는 해당 (PID, Sequence Number)를 확인하고 이미 처리된 M3를 거부함.

 

 

다시 돌아가서, 위의 경우는 enable.idempotence 옵션이 꺼져있어서 중복으로 데이터가 저장됐다고 한들 데이터의 유실은 없었다. 그런데 만약, acks = 1 이었다면 어떻게 됐을까?

  • 새로운 Leader Y가 선출된 시점에서 M3까지는 커밋은 아니더라도 복제가 된 상태이기 때문에 M3는 날라가지 않는다. Follower Z는 새로운 Leader로부터 계속 fetch를 진행하고 M3를 받고 High Water Mark를 진행한다.
  • 그런데, Producer는 이미 M3,M4는 보내고 acks = 1 이기 때문에 기존의 Leader 였던 X에게 ack를 받은 상태다. 그럼 Producer는 M4를 보내지 않게 된다. 즉, M4라는 데이터는 영원히 잃어버리게 되는 결과를 초래한다.

여기까지 내용으로 중간 결론을 내려보면, Producer의 acks = all, enable.idempotence = true 옵션은 매우 중요하다는 것. 

 

다시 위 시나리오로 돌아가서, 죽었던 X가 복구되면 어떻게 될까?

  • X는 M1, M2까지 Commit이 된 상태였고, M3, M4를 받긴 했지만 커밋하지 못했다. 그래서 X가 복구가 된다면 M1, M2부터 다시 시작하게 된다.
  • 그리고 X는 이제 Follower 이므로, Leader로부터 데이터를 복제한다.

 

 

가용성과 내구성 중 선택?

Topic 파라미터에는 다음과 같은 옵션들이 있다.

 

⭕️ unclean.leader.election.enable

  • ISR 리스트에 없는 Replica를 Leader로 선출할 것인지에 대한 옵션 (default : false)
  • ISR 리스트에 Replica가 하나도 없으면 Leader 선출을 안 함 - 서비스 중단
  • ISR 리스트에 없는 Replica를 Leader로 선출함 - 데이터 유실

생각해보자, ISR 리스트는 High Water Mark 까지 데이터를 온전히 가지고 있는 집합이다. 물론 여기서 새로운 리더를 선출하면 더할 나위 없이 좋겠지만, 만약 ISR 리스트에 Leader를 제외한 복제본이 하나도 없으면 Leader가 고장났을 때 새 Leader를 아예 안 뽑을 것인가? 그럼 서비스가 중단될텐데? 즉 가용성 측면에서 굉장히 좋지 못할 것이다.

 

그러나, ISR 리스트에 없는 Replica는 굉장히 느린 복제본일 것이다. High Water Mark 까지 데이터를 가지고 있지 못하니까. 그런데 이 복제본으로 새 Leader를 선출하면 데이터의 유실이 생길 수 밖에 없다. 즉, 내구성이 떨어진다. 

 

서비스를 아예 중단할 것인가? 약간의 데이터 유실을 감안하고 서비스를 진행할 것인가?

 

⭕️ min.insync.replicas

  • 최소 요구되는 ISR의 개수에 대한 옵션 (default : 1)
  • 즉, 기본값으로 설정해서 사용하면 Leader만 ISR에 포함되도 문제 없다는 의미
  • ISR이 min.insync.replicas 보다 적은 경우, Producer는 NotEnoughReplicas 예외를 수신

기본적으로 이 값은 2 이상으로 설정해서 사용하면 좋다. 장애는 언제든 일어날 수 있고, 그 경우에 기본값인 1로 사용했다면 ISR 리스트에 복제본이 없어 적절한 Leader 선출이 어려워 질 것이다.

 

그리고 이 값은 Producer에서 acks = all과 함께 사용할 때 더 강력한 보장을 해준다. 무슨 말이냐면, acks = all 옵션은 모든 Followers들이 전부 다 데이터를 복제 해야 ack를 Producer에게 전달하는 옵션이다. 근데 여기서 min.insync.replicas가 2로 설정되어 있고, replication factor가 4로 설정되어 있다면 Leader 하나에 Followers 3개가 있을 것이다. 이때 Leader 포함 ISR 리스트에 Follower 중 하나만이라도 Producer가 준 데이터를 복제하면 ack를 보낼 수 있다. min.insync.replicas가 2 이므로 최소 요구되는 ISR의 개수를 만족하니 말이다. 즉, 4개 모두가 다 복제되지 않아도 ISR리스트를 만족하는 최소 요구 개수만 해당 데이터를 복제하면 ack를 보낼 수 있다는 뜻이다. 성능 면에서 더 유리함을 가져갈 수 있다. 

 

그리고, 이 옵션을 사용하면 n개의 Replica가 있고 min.insync.replicas = 2 인 경우, n-2개의 장애를 허용할 수 있다. 생각해보자. 4개의 복제본이 있는데 ISR 최소 요구 개수가 2라면, 2개까지 장애를 허용할 수 있다는 의미이다. 

 

 

정리를 하자면

 

데이터 유실과 중복이 없게 하려면, 

  • Topic : replication.factor는 최소 3 이상으로 설정, min.insync.replicas는 최소 2 이상으로 설정 
  • Producer: acks = all, enable.idempotence = true 설정

데이터 유실이 다소 있더라도 가용성을 높게 하려면,

  • Topic: unclean.leader.election.enable을 true로 설정

 

728x90
반응형
LIST

'Apache Kafka' 카테고리의 다른 글

p11. Partition Assignment Strategy  (0) 2025.03.16
p10. Consumer Rebalance  (0) 2025.03.16
p8. Replica Failure  (0) 2025.03.15
p7. Acks, Batch, Page Cache, Flush  (0) 2025.03.15
p6. Replication  (0) 2025.03.15
728x90
반응형
SMALL

In-Sync Replicas (ISR) 리스트 관리

n 개의 Replica가 있는 경우 n - 1 개의 장애를 허용할 수 있다.

 

만약, Follower가 문제가 발생한다면

  • Leader에 의해 ISR 리스트에서 삭제된다.
  • Leader는 새로운 ISR을 사용하여 계속 진행한다.

만약, Leader가 문제가 발생한다면

  • Controller는 Follower 중에서 새로운 Leader를 선출한다.
  • Controller는 새 Leader와 ISR 정보를 모든 Broker에 푸시한다.

  • Broker에 장애가 발생한다. 그 Broker는 Leader Partition을 가지고 있다.
  • Controller가 이를 감지하여 새로운 Leader를 선출한다. 그리고 새로운 ISR 리스트를 만들어낸다.
  • 모든 Broker에게 새로운 ISR을 Push한다.
  • 클라이언트들은 메타데이터를 요청하여 새로운 Leader 정보를 받은 후, 해당 Leader에 메시지를 읽고, 쓴다.

 

  • Broker 4대,Partition 4, Replication Factor 3(Leader, Follower의 총 수가 3)인 경우,
  • Partition 생성 시 Broker들 사이에서 Partition들이 분산되어 배치된다.
  • 이때, Broker 4에 장애가 발생하면?

  • Broker 4에 있는 Follower의 경우, Leader들이 해당 Follower를 제거하고 새로운 ISR을 만들어 계속해서 아무일 없다는 듯이 진행한다.
  • 그런데 Partition 3의 경우 Leader가 고장난 상태가 된다. Leader가 새로 선출되어야 한다.
  • Partition에 Leader가 없으면 Leader가 선출될 때까지 해당 Partition을 사용할 수 없게 된다.
  • Producer의 send()는 retries 파라미터가 설정되어 있으면 재시도하고, 만약 retries = 0이면, NetworkException이 발생한다.
  • 즉, Leader가 장애가 발생하면 해당 Partition을 사용할 수 없게 되므로 크나큰 문제가 될 수 있다.

 

정리를 하자면

  • Follower가 실패하는 경우, Leader에 의해 ISR 리스트에서 삭제되고, Leader는 새로운 ISR을 사용하여 Commit한다.
  • Leader가 실패하는 경우, Controller는 Follower중에서 새로운 Leader를 선출하고, Controller는 새 Leader와 ISR 정보를 모든 Broker들에게 로컬 캐싱을 위해 알린다.
  • Leader가 장애가 나면, 새로운 Leader가 선출될 때까지 해당 Partition을 사용할 수 없게 된다.

 

728x90
반응형
LIST

'Apache Kafka' 카테고리의 다른 글

p10. Consumer Rebalance  (0) 2025.03.16
p9. Replica Recovery  (0) 2025.03.16
p7. Acks, Batch, Page Cache, Flush  (0) 2025.03.15
p6. Replication  (0) 2025.03.15
p5. Consumer  (0) 2025.03.15
728x90
반응형
SMALL

Producer는 Kafka가 Message를 잘 받았는지 어떻게 알까?

Producer Acks

acks 설정은 요청이 성공할 때를 정의하는 데 사용되는 Producer에 설정하는 파라미터.

 

acks = 0

  • acks가 필요하지 않음. 이 수준은 자주 사용되지 않음. 메시지 손실이 다소 있더라도 빠르게 메시지를 보내야 하는 경우에 사용
  • 그러니까, Producer들은 메시지를 보내고 잘 받았는지 안 받았는지 관심이 없다는 것.

 

acks = 1 (기본값)

  • Leader가 메시지를 수신하면 ack를 보냄. 그런데, Leader가 Producer에게 Ack를 보낸 후, Follower가 복제하기 전에 Leader에 장애가 발생하면 메시지가 손실. "At most once(최대 한 번)"전송을 보장.

 

acks = -1 또는 acks = all

  • 메시지를 Leader가 모든 Replica까지 Commit 되면 ack를 보냄. Leader를 잃어도 데이터가 살아남을 수 있도록 보장. 그러나 그만큼 대기시간이 더 길다. "At least once(최소 한 번)" 전송을 보장
  • 쉽게 말해 모든 Replica가 전부 다 해당 메시지가 복제되면, 그때 ack를 보낸다고 생각하면 된다.

 

Producer Retry

재전송을 위한 Parameters

Parameter 설명 Default 값
retries 메시지를 send 하기 위해 재시도하는 횟수 MAX_INT
retry.backoff.ms 재시도 사이에 추가되는 대기 시간 100
request.timeout.ms Producer가 응답을 기다리는 최대 시간 30,000 (30초)
delivery.timeout.ms send() 후 성공 또는 실패를 보고하는 시간의 상한 120,000 (2분)
  • acks = 0 에서는 retry는 무의미하다.
  • retries를 조정하는 대신에 delivery.timeout.ms 조정으로 재시도 동작을 제어

 

Producer Batch

메시지를 모아서 한번에 전송하는 것으로, Batch 처리는 RPC(Remote Procedure Call)수를 줄여서 Broker가 처리하는 작업이 줄어들기 때문에 더 나은 처리량을 제공한다.

  • 애플리케이션에서 send()를 호출하면 내부적으로, Serializer에 의해 데이터를 Serialize하고, Partitioner에 의해 어떤 파티션으로 보낼지가 결정된다. 그리고 그 이후, 메시지를 모아서 한번에 보내는 과정이 위 그림의 RecordAccumulator -> ProduceRequest -> Broker 이다.
  • 그럼 배치 처리를 하기 위한 옵션은 어떤게 있을까?
  • linger.ms (default : 0, 즉시 보냄) : 메시지가 함께 Batch 처리될 때까지 대기 시간
  • batch.size (default : 16KB) : 보내기 전 Batch의 최대 크기
  • 일반적으로, linger.ms = 100, batch.size = 1000000로 설정해서 배치 사이즈는 크게 잡고, 배치 대기 시간을 짧게 하여 이벤트가 너무 늦게 전송되지도 않고 너무 한번에 바로 바로 전송되지도 않게 한다.

Producer Delivery Timeout

send() 후 성공 또는 실패를 보고하는 시간의 상한

  • send()를 호출하는 동안에 영향을 받는 옵션은 max.block.ms 인데, 이는 메시지를 담기 위한 Buffer 할당 대기 시간이다.
  • 그 이후 단계부터 영향 받는 범위가 delivery.timeout.ms 이다.
  • linger.ms는 메시지가 함께 Batch 처리될 때까지 대기 시간을 의미한다. 너무 길어도 너무 짧아도 안된다.
  • retry.backoff.ms는 재시도를 하는데 재시도 사이의 대기 시간을 설정하는 것이다. 재시도를 바로 하는게 아니라 잠깐 기다렸다가 다시 재시도, 잠깐 기다렸다가 다시 재시도하는 것이다.

 

Message Send 순서 보장

진행 중인 여러 요청을 재시도하면 순서가 변경될 수 있다. 예를 들어, Producer가 메시지를 생성해서 Kafka에 보낼 때 하나의 브로커가 한 커넥션에 한번에 최대로 보낼 수 있는 Batch 메시지 개수를 설정하는 옵션이 있다. 그 옵션이 바로, max.in.flight.requests.per.connection 이고 기본값은 5이다. 그래서 아래 그림과 같이 전송한다고 해보자.

  • Topic A - Partition 0에 보낼 배치 메시지를 5개를 모아서 보내는 것이다. 근데 만약, 이 전송을 하는 과정 중에 뭔가 잘못되서 재시도를 했다고 해보자. 예를 들어, Batch 0번을 보내고 1, 2, 3번을 보내야 하는데 0번을 보내는 중에 문제가 발생해서 0번이 안들어가지면 순서가 꼬인다는 것이다. 
  • 1번이 먼저 들어와서 2,3,4번이 들어오고 이제 0번을 못 보냈으니 재시도를 해서 0번을 보내면 원래라면 0 ~ 4번까지 보내져야 할 것이 1 ~ ,4,0번 순서로 들어온 것이다.
  • 이런 경우가 발생할 수 있는데 이럴때 메시지 순서를 보장하려면, Producer에서 enable.idempotence를 true로 설정하면 된다.
  • 이 옵션을 키게 되면, 하나의 Batch가 실패하면 같은 Partition으로 들어오는 후속 Batch들도 모두 실패하게 한다.

 

Page Cache, Flush

  • 메시지는 Partition에 기록된다.
  • Partition은 Segment File로 구성되어 있다. (기본값: 1GB마다 새로운 Segment 파일 생성)
  • 성능을 위해 Segment는 OS Page Cache에 기록된다.
  • Segment 파일에 저장된 메시지의 데이터 형식은 Broker가 Producer로부터 수신한 것, 그리고 Consumer에게 보내는 것과 정확히 동일하므로 Zero-Copy가 가능하다. 
    • Zero-Copy란, 데이터가 네트워크를 통해서 들어올텐데 그 네트워크를 통해 들어오는 데이터는 Network Buffer에 담겨서 들어온다. 이 Buffer에서 Broker를 거쳐서 데이터를 복사해서 OS Page Cache에 저장되는 게 아니라, 다이렉트로 Network Buffer에서 OS Page Cache로 전송되는 것을 의미한다. 이러면, Broker Heap 메모리를 절약하고 CPU 개입이 필요없어 지기 때문에 처리량에서 매우 유리하다.
  • Page Cache는 다음과 같은 경우, 디스크로 Flush 된다.
    • Broker가 완전히 종료
    • OS Background "Flusher Thread" 실행 (OS 레벨에서 자동으로 해주는 작업)

 

Flush 되기 전에 Broker 장애가 발생하면?

  • OS가 데이터를 디스크로 Flush 하기 전에 Broker의 시스템에 장애가 발생하면 해당 데이터가 손실될 수 있다.
  • Partition이 Replication되어 있다면, Broker가 다시 온라인 상태가 되면 필요시 Leader Replica에서 데이터가 복구됨
  • Replication이 없다면, 데이터는 영구적으로 손실될 수 있다.

 

Kafka 자체 Flush 정책

  • 마지막 Flush 이후의 메시지 수(log.flush.interval.messages) 또는 시간(log.flush.interval.ms)으로 Flush(fsync)를 트리거하도록 설정할 수 있다.
  • 그러나, Kafka는 운영 체제의 background Flush 기능(예: pdflush)을 더 효율적으로 허용하는 것을 선호하기 때문에 이러한 설정은 기본적으로 무한(기본적으로 fsync 비활성화)으로 설정
  • 이러한 설정을 기본값으로 유지하는 것을 권장
  • *.log 파일을 보면 디스크로 Flush된 데이터와 아직 Flush 되지 않은 Page Cache (OS Buffer)에 있는 데이터가 모두 표시된다.
  • Flush된 항목과 Flush되지 않은 항목을 표시하는 Linux 도구(예: vmtouch)도 있다.

 

정리를 하자면

  • Producer Acks: 0, 1, all(-1)
  • Batch 처리를 위한 옵션 : linger.ms, batch.size
  • 메시지 순서를 보장하려면, Producer에서 enable.idempotence를 true로 설정
  • 성능을 위해 Segment는 OS Page Cache에 기록된다.

 

728x90
반응형
LIST

'Apache Kafka' 카테고리의 다른 글

p9. Replica Recovery  (0) 2025.03.16
p8. Replica Failure  (0) 2025.03.15
p6. Replication  (0) 2025.03.15
p5. Consumer  (0) 2025.03.15
p4. Producer  (0) 2025.03.15
728x90
반응형
SMALL

Broker에 장애가 발생하면 어떻게 될까? 장애가 발생한 Broker의 Partition들은 모두 사용할 수 없게 되는 문제가 발생할 것이다.

 

계속 하기 전에 다시 한번 복습해보자면, 

  • Producer가 Write하는 LOG-END-OFFSET과 Consumer Group의 Consumer가 Read하고 처리한 후에 Commit한 CURRENT-OFFSET과의 차이(Consumer Lag)가 발생할 수 있다고 했다.
  • 이 상태에서 장애가 나면?

 

다른 Broker에서 Partition을 새로 만들 수 있으면 장애는 해결될까? 다음과 같이 말이다.

  • 고장난 Broker에 있는 파티션들을 다른 멀쩡한 Broker에 새로 만들었다. 해결될 것 같지만 이러한 고민거리가 남는다.
  • 기존 메시지는 버릴 것인가? 기존 Offset 정보들을 버릴 것인가?

Replication

위와 같은 문제를 해결하기 위해, Partition을 복제(Replication)하여 다른 Broker상에서 복제물(Replicas)을 만들어서 장애를 미리 대비한다. 

 

Replicas - Leader Partition, Follower Partition

  • 위 그림처럼, Leader 1개, Follower 2개로 총 Replicas는 3개가 있다. 
  • Producer는 Leader에만 Write하고, Consumer는 Leader로부터만 Read한다.
  • Follower는 Broker 장애시 안정성을 제공하기 위해서만 존재한다.
  • Follower는 Leader의 Commit Log에서 데이터를 가져오기 요청(Fetch Request)으로 복제한다.

  • 물론, 2.4부터 Follower에도 Consumer가 Read할 수 있는 기능이 추가됐지만 기본적으로는 Leader에서만 Read한다.

 

Leader에 장애가 발생하면?

Kafka 클러스터는 Follower 중에서 새로운 Leader를 선출한다. Clients(Producer/Consumer)는 자동으로 새 Leader로 전환하여 Write/Read를 한다.

 

Partition Leader에 대한 자동 분산

하나의 Broker에만 Partition의 Leader들이 몰려 있다면? 여러가지 문제가 발생한다. 

  • 특정 Broker에만 Clients(Producer/Consumer)로 인해 부하 집중
  • 해당 Broker가 고장나면 모든 Partition에 대해서 Leader를 다 재선출

  • 이러한 문제가 있기 때문에 기본적으로 Leader를 골고루 분산하는 옵션이 켜져있다.
  • auto.leader.rebalance.enable (기본값 enable)
  • leader.imbalance.check.interval.seconds (기본값 300 sec)
  • leader.imbalance.per.broker.percentage (기본값 10)

  • 그래서 이와 같은 골고루 Leader를 분산하여 Broker에 저장된다.

Rack Awareness

  • 동일한 Rack 혹은 Available Zone 상의 Broker들에 동일한 "rack name" 지정
  • 복제본 (Replica-Leader/Follower)은 최대한 Rack 간에 균형을 유지하여 Rack 장애 대비
  • Topic 생성 시 또는 Auto Data Balancer/Self Balancing Cluster 동작 때만 실행

 

 

In-Sync Replicas (ISR)

Leader 장애시 Leader를 선출하는데 사용된다. In-Sync Replicas(ISR)는 High Water Mark라고 하는 지점까지 동일한 Replicas(Leader와 Follower 모두)의 목록이다. Leader에 장애가 발생하면 ISR중에서 새 Leader를 선출한다.

  • 보면, replica.lag.max.messages=4 라는 옵션으로 설정되면 4개 이상으로 메시지(데이터)가 차이가 나는 Replica는 OSR(Out-of-Sync Follower)이라고 간주한다. 
  • 그리고, 그 범위 안에 들어오는 Replica는 ISR이라고 간주한다. 보면 Broker102에 있는 Follower는 2개밖에 차이가 나지 않기 때문에 ISR로 간주된다.
  • 저 High Water Mark는 무엇이냐? Leader에 Producer가 데이터를 생산을 쭉 해나가면, Follower들은 Fetch Request를 통해 Leader의 데이터를 계속 복제하려 할 것이다. 이때, 모든 Follower들(ISR에 속한)이 전부 다 복제한 지점까지를 High Water Mark라고 한다.

 

그런데 이 replica.lag.max.messages로 ISR을 판단할 때 어떤 문제가 있을 수 있냐면,

  • 메시지가 항상 일정한 비율로 Kafka로 들어올 때는 지연되는 경우가 없을것이므로 ISR들이 정상적으로 동작한다.
  • 그런데 갑자기 메시지 유입량이 늘어난다면 예를 들어, 원래는 초당 3개의 메시지만 들어왔는데 갑자기 초당 20개의 메시지가 들어오면 20개의 차이가 갑자기 나니까 replica.lag.max.messages를 4로 설정하면 지연으로 판단하고 OSR로 상태를 변경시킨다.
  • 그런데 실제 Follower들은 정상적으로 동작하고 단지 잠깐 지연만 발생했을 뿐인데 OSR로 판단하게 되는 문제가 발생한다. 이로 인해 운영중에 불필요한 에러 발생 및 그로 인해 불필요한 Retry를 유발하므로써 좋지 않은 상태가 된다)

그래서, replica.lag.time.max.ms로 판단해야 한다.

  • Follower가 Leader로 Fetch 요청을 보내는 Interval을 체크
  • 예) replica.lag.time.max.ms = 10000 이라면 Follower가 Leader로 Fetch 요청을 10000ms 내에만 요청하면 정상으로 판단

 

ISR은 Leader가 관리한다.

  • Follower가 너무 느리면, Leader는 ISR에서 Follower를 제거하고, 변경 사항에 대한 메타데이터를 Controller가 받아 처리한다.
  • 여기서 Controller란?
    • Kafka Cluster 내의 Broker 중에 하나가 Controller가 된다.
    • Controller는 Leader와 Replica 정보를 Cluster내의 다른 Broker들에게 전달한다.
    • Controller가 Leader 장애 시 Leader Election을 수행
    • Controller가 장애가 나면 다른 Active Broker들 중에서 재선출

Consumer 관련 Position들

  • Last Committed Offset(Current Offset): Consumer가 최종 Commit한 Offset
  • Current Position: Consumer가 읽어간 위치(처리 중, Commit 전). 그러니까, 마지막으로 커밋한 지점은 2인데 Consumer가 데이터를 지금 현재 읽는중이고 2부터 6까지 읽고 있는 중이라고 생각하면 된다.
  • High Water Mark(Committed): ISR간에 복제된 Offset.
  • Log End Offset: Producer가 메시지를 보내서 저장된 로그의 맨 끝 Offset

High Water Mark(Committed)의 의미

  • ISR 목록의 모든 Replicas가 메시지를 성공적으로 가져오면 Committed.
  • Consumer는 Committed 메시지만 읽을 수 있다. (위 그림에서는 0 ~ 11번 메시지까지)
  • Leader는 메시지를 Commit할 시기를 결정한다.
  • Committed 메시지는 모든 Follower에서 동일한 Offset을 갖도록 보장한다. (모든 Replicas들이 0번 메시지는 0번 메시지를 가르키고, 10번 메시지는 10번 메시지를 가르키고, 11번 메시지는 11번 메시지를 가르킨다는 의미)
  • 즉, 어떤 Replica가 Leader인지에 관계없이 모든 Consumer는 해당 Offset에서 같은 데이터를 볼 수 있다. (Leader, Follower 상관없이 0번 메시지는 모두 다 동일한 0번 메시지라는 것을 말함)
  • Broker가 다시 시작할 때 Committed 메시지 목록을 유지하도록 하기 위해, Broker의 모든 Partition에 대한 마지막 Committed Offset은 replication-offset-checkpoint라는 파일에 기록됨

Message Commit 과정

  • Offset 5까지 복제가 완료된 상태에서, Producer가 메시지를 보내면 Leader가 offset 6에 새 메시지를 추가
  • Fetcher Thread는 모든 브로커에 존재하고, 각 브로커에 모든 Follwer들이 본인의 Leader에 메시지를 가져오기 요청을 보낼 때 사용

  • 각 Follower들의 Fetcher Thread가 독립적으로 fetch를 수행하고, 가져온 메시지를 offset 6에 메시지를 write

  • 이 상태에서 각 Follwer들의 Fetcher Thread가 또 독립적으로 fetch를 수행하고 null을 받는데, null을 Leader가 준다는 것은 Leader가 모든 Follwer들에게 현재 내가 가진 모든 데이터를 다 정상적으로 줬음을 의미하므로, High Water Mark를 이동시킨다.

  • 각 Follower들의 Fetcher Thread가 독립적으로 또 다시 fetch를 수행하고, High Water Mark를 받는다. 받은 High Water Mark로 이동한다.

 

정리하자면

  • Partition을 복제(Replication)하여 다른 Broker 상에서 복제물(Replicas)을 만들어서 장애를 미리 대비함
  • Replicas - Leader Partition, Follower Partition
  • Producer는 Leader에만 Write하고, Consumer는 Leader로부터만 Read한다.
  • Follower는 Leader의 Commit Log에서 데이터를 가져오기 요청(Fetch Request)으로 복제
  • 복제본(Replicas-Leader/Follower)은 최대한 Rack 간에 균형을 유지하여 Rack 장애 대비하는 Rack Awareness 기능이 있다.
  • In-Sync Replicas(ISR)는 High Water Mark 라고 하는 지점까지 동일한 Replicas(Leader, Follower)의 목록
  • High Water Mark(Committed)는 ISR(Leader-Follower)간에 복제된 Offset
  • Consumer는 Committed 메시지만 읽을 수 있다.
  • Kafka Cluster 내의 Broker 중 하나가 Controller가 된다.

 

 

728x90
반응형
LIST

'Apache Kafka' 카테고리의 다른 글

p8. Replica Failure  (0) 2025.03.15
p7. Acks, Batch, Page Cache, Flush  (0) 2025.03.15
p5. Consumer  (0) 2025.03.15
p4. Producer  (0) 2025.03.15
p3. Broker  (0) 2025.03.15
728x90
반응형
SMALL

Consumer

  • Partition으로부터 Record를 가져온다. 
  • Consumer는 각각 고유의 속도로 Commit Log로부터 순서대로 Read를 수행
  • 다른 Consumer Group에 속한 Consumer들은 서로 관련이 없으며, Commit Log에 있는 Event(Message)를 동시에 다른 위치에서 Read할 수 있다.

 

Consumer Offset

  • Consumer Group이 읽은 위치를 표시한다. Consumer가 자동이나 수동으로 데이터를 읽은 위치를 commit하여 다시 읽음을 방지한다.
  • __consumer_offsets 이라는 Internal Topic에서 Consumer Offset을 저장하여 관리한다.

 

Multi-Partition with Single Consumer

Consumer가 하나인 경우, 모든 Partition에서 Consume한다.

예를 들어, 4개의 Partition으로 구성된 Topic의 데이터를 사용하는 Single Consumer가 있는 경우, 이 Consumer는 Topic의 모든 Partition에서 모든 Record를 Consume한다.

 

Consuming as a Group

동일한 group.id로 구성된 모든 Consumer들은 하나의 Consumer Group을 형성한다.

  • 4개의 파티션이 있는 Topic을 Consume하는 4개의 Consumer가 하나의 Consumer Group에 있다면, 각 Consumer는 정확히 하나의 Partition에서 Record를 Consume한다.
  • Partition은 항상 Consumer Group 내의 하나의 Consumer에 의해서만 사용된다.
  • Consumer는 주어진 Topic에서 0개 이상의 많은 Partition을 사용할 수 있다.

 

Multi Consumer Group

  • 동일한 group.id로 구성된 모든 Consumer들은 하나의 Consumer Group을 형성한다.
  • Consumer Group의 Consumer들은 작업량을 어느정도 균등하게 분할한다.
  • 동일한 Topic에서 Consume하는 여러 Consumer Group이 있을 수 있다.

 

 

Key를 사용하면 Partition 별 동일한 Key를 가지는 메시지를 저장

같은 Key라면 같은 Partition에 저장된다는 의미이다. 다음 그림을 보자.

  • Key가 AAA인 경우 P0에 저장된다.

  • Key가 BBB인 경우 P1에 저장된다.

  • Key가 CCC인 경우 P2에 저장된다.

  • Key가 DDD인 경우 P3에 저장된다.

 

이렇듯, Partition이 여러개인 경우 모든 메시지에 대한 전체 순서는 보장이 불가능하다. Key가 무엇이냐에 따라 각각 다른 파티션에 저장될테니 말이다. 

  • 파티션 별로 순서는 보장이 되지만, 소비하는 Consumer 입장에서는 각 파티션에 있는 모든 데이터를 가져왔을 때 그 전체 순서를 보장할 수 없다.

 

파티션을 1개로 구성하면 모든 메시지에서도 전체 순서가 보장 가능하지만 이 경우, 처리량 저하가 일어난다. 근데 과연 1개로 구성해서 모든 메시지에서 전체 순서 보장을 해야하는 경우가 얼마나 많을까? 대부분의 경우, Key로 구분할 수 있는 메시지들의 순서 보장이 필요한 경우가 많다. 예를 들어, 주문 테이블에 주문 데이터를 넣을 때 이 주문 데이터를 카프카에 이벤트로 전송한다고 해보자. 주문 데이터에는 주문 시간이라는 명확한 순서를 알 수 있는 정보가 들어있는데 굳이 한 파티션에 모든 주문 데이터를 순서대로 넣을 필요가 있을까? 그렇지 않다. 

 

그래서, 대부분의 경우, 전체 순서를 보장할 필요가 없고 많은 데이터를 처리해야 한다면 멀티 파티션을 사용해서 처리량 증가에 초점을 맞추면 된다. 근데 만약, 운영중에 처리량 증가를 더 효율적으로 하겠다고 파티션 개수를 변경하면 어떻게 될까? 동일한 파티션은 순서가 보장이 됐었는데 이젠 동일한 파티션이어도 순서가 보장되지 않는다. 왜냐하면, 파티션이 늘어나면 해시값을 구할 때 사용하는 파티션의 수가 달라진다.

Hash(Key) % partition 개수

이런 공식으로 해시값을 구하는데, 3개였던 파티션이 4개로 늘어나면 같은 키라고 해도 파티션 수를 바꾼 이후부터 다른 파티션에 들어갈 수 있기 때문에, 운영중에는 파티션 개수를 변경하면 안된다. 

 

Consumer Failure

  • 4개의 파티션이 있는 Topic을 Consume하는 4개의 Consumer가 하나의 Consumer Group에 있다면, 각 Consumer는 정확히 하나의 Partition에서 Record를 Consume한다.
  • Partition은 항상 Consumer Group 내의 하나의 Consumer에 의해서만 사용된다.
  • Consumer는 주어진 Topic에서 0개 이상의 많은 Partition을 사용할 수 있다.

  • 위 그림과 같이 P3을 처리하는 Consumer 3번이 죽으면 어떻게 될까?

  • Consumer Group 내의 다른 Consumer가 실패한 Consumer를 대신하여 Partition에서 데이터를 가져와서 처리한다.
  • Partition은 항상 Consumer Group 내의 하나의 Consumer에 의해서만 사용된다.
  • Consumer는 주어진 Topic에서 0개 이상의 많은 Partition을 사용할 수 있다.

 

정리를 하자면

  • Consumer가 자동이나 수동으로 데이터를 읽은 위치를 Commit하여 다시 읽음을 방지한다.
  • __consumer_offsets 이라는 Internal Topic에서 Consumer Offset을 저장하여 관리한다.
  • 동일한 group.id로 구성된 모든 Consumer들은 하나의 Consumer Group을 형성한다.
  • 다른 Consumer Group의 Consumer들은 분리되어 독립적으로 작동한다.
  • 동일한 Key를 가진 메시지는 동일한 파티션에만 전달되어 Key 레벨의 순서 보장이 가능하다.
  • Key 선택이 잘못되면 작업 부하가 고르지 않을 수 있다.
  • Consumer Group 내의 다른 Consumer가 실패한 Consumer를 대신하여 Partition에서 데이터를 가져와서 처리한다.

 

 

728x90
반응형
LIST

'Apache Kafka' 카테고리의 다른 글

p7. Acks, Batch, Page Cache, Flush  (0) 2025.03.15
p6. Replication  (0) 2025.03.15
p4. Producer  (0) 2025.03.15
p3. Broker  (0) 2025.03.15
p2. Topic, Partition, Segment  (0) 2025.03.15
728x90
반응형
SMALL

Producer

Producer는 메시지를 생산해서 Kafka의 Topic으로 메시지를 보내는 애플리케이션이다. Producer와 Consumer는 서로 알지 못하며, Producer와 Consumer는 각각 고유의 속도로 Commit Log에 Write 및 Read를 수행한다.

 

Message, Record, Event, Data

모두 같은 표현이며, 다음과 같은 구조를 가지고 있다.

  • Key, Value는 JSON, Avro 등 다양한 형태가 가능하다.

Serializer / Deserializer

  • Producer가 데이터를 생산해서 Kafka에 전송할 때 데이터를 Byte 형태로 Serialize를 수행한다.
  • Consumer는 데이터를 Kafka로부터 읽을 때 Byte 형태의 데이터를 원하는 형태로 Deserialize한다. 

그래서, 실제 애플리케이션에서는 Key와 Value용 Serializer를 각각 설정한다. 다음과 같이 말이다.

private Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, " broker101:9092,broker102:9092 ");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
KafkaProducer producer = new KafkaProducer(props);

 

Producing to Kafka

  • 개발자가 하는 것은 데이터를 만들어서 send()하는 것밖에 없다. 그 이후부터는 Kafka 라이브러리에서 전부 다 해준다.
  • 데이터를 send()하면 내부적으로 먼저 설정된 Serializer로 Serialize를 수행한다.
  • 그리고, Partitioner라는 녀석이 있는데 이 녀석은 어떤 파티션으로 데이터를 전송할지를 결정해주는 녀석이다. 그래서 이 녀석을 통해 특정 파티션이 선정된다.
  • 추가적으로 압축 설정을 했다면 압축 과정이 일어난 후 데이터를 Kafka로 전송한다.
  • 정상적으로 전송이 됐다면 전송이 잘 됐다는 메타데이터를 Producer에 보내고, 전송에 실패하면 재시도를 하거나 재시도를 할 수 없으면 예외를 반환한다. 

Partitioner

  • 위에서 Partitioner에 대해 말했는데, 이 녀석은 메시지를 Topic의 어떤 Partition으로 보낼지 결정한다.
  • 해시값을 구해서 특정 파티션으로 보내는 게 일반적이다. 단, Key가 null이 아닌 경우에만 이 알고리즘을 수행한다.
  • 다시 말해, 동일한 Key를 보내면 동일한 파티션으로만 데이터가 전송될 것이다.

그럼 Key가 null인 경우엔?

  • 라운드 로빈 방식으로 데이터를 보내는 게 2.4 이전의 방식이었는데, 이 방식을 사용하면 데이터가 6개면 6번의 네트워크 과정이 일어난다. 즉, 비효율적이라는 것. 그래서 2.4 이후의 방식은 하나의 Batch가 닫힐때까지 하나의 파티션에게만 데이터를 쭉 보낸다. 즉, 배치단위로 보내서 6번을 보낼것을 2번으로 줄일 수 있다는 의미이다.

 

728x90
반응형
LIST

'Apache Kafka' 카테고리의 다른 글

p6. Replication  (0) 2025.03.15
p5. Consumer  (0) 2025.03.15
p3. Broker  (0) 2025.03.15
p2. Topic, Partition, Segment  (0) 2025.03.15
p1. Apache Kafka란  (0) 2025.03.15

+ Recent posts