본문 바로가기

Kafka

[Kafka] commit 이해하기 (4)

Kafka는 다른 메시지큐와 다른 가장 대표적인 특징을 떠올리면 consumer 가 메시지를 가져가더라도 메시지큐에서 바로 사라지지 않는다는 점이다. 그 덕분에 하나의 로그 메시지를 여러 컨슈머가 다양한 용도로 사용할 수 있다.

 

이때 문득 고민이 들었다.

 

메시지큐에서 바로 사라지지 않는다면 메모리는 어떻게 관리되고 있을까, consumer는 자신이 읽은 파티션을 어떻게 구분할까.

 

 


 

 

Commit

깃허브를 다루면서 가장 많이 듣게 되는 용어, commit을 kafka에서도 만나게 된다.

  • 깃허브에서 commit 은 add 이후로 깃에 저장하는 단계에서 사용된다.
  • 카프카에서 commit 은 메시지 소비 여부를 구분하는 데 사용된다.

 

컨슈머가 브로커에서 메시지를 poll() 하고 나면 컨슈머 그룹은 카프카에서 불러온 메시지를 구분하여 저장한다. 각 파티션에서 자신이 가져간 메시지의 위치정보, offset오프셋을 기록한다. 이렇게 파티션에 대한 위치 정보를 기록하는 동작을 commit이라고 부른다.

 

poll에 앞서 commit 이 이뤄지기 때문에 consumer는 offset 기반으로 읽지 않은 메시지를 구분할 수 있다. 그렇다면 commit은 언제, 어떻게 이뤄지는 것일까. 그 방법은 크게 두 가지로 나뉜다.

 

 

자동 커밋

setting desc default

enable.auto.commit Auto-Commit 사용 여부 true
auto.commit.interval.ms Auto-Commit 사용 시 Commit을 수행할 시간 (ms) 5000

 

오프셋을 매뉴얼로 관리하지 않고 특정 주기로 컨슈머가 poll을 호출할 때 가장 마지막 오프셋을 커밋한다.

💡 manual commit매뉴얼 커밋: 수동 커밋
- 메시지 처리가 완료될 때까지 메시지를 가져온 것으로 간주되어서는 안 되는 경우에 사용한다.

 

auto.commit.interval.ms로 조정 가능하며 consumer가 poll을 호출 할 때마다 커밋할 주기가 됐는지 확인하여 커밋을 수행한다.

 

 

 

만약 auto.commit.interval.ms 로 설정한 주기 전 리밸런스가 발생하면, 아래 그림에서 메시지 6은 새로운 컨슈머에 의해 중복으로 처리가 될 것이다.

 

auto.commit.interval.ms를 줄이면 확률은 줄어들겠지만 완벽한 해결방법이라 볼 수는 없다.

 

 

 

 

또한 아래와 같이 메시지 5, 6을 poll 한 뒤, 자동 커밋 된 상황이라 하자. 만약 컨슈머가 가져온 데이터 5,6을 처리하는데 에러가 발생했다면 메시지 5, 6은 손실이 된다.

 

 

이런 메시지 손실을 방지하기 위해 poll 하자마자 commit 하지 않고 DB에 온전히 저장한 뒤 commit 하는 수동 커밋이 안전하다.

 

 

 

수동 커밋

consumer 가 직접 커밋을 호출하는 방법이다. 메시지 처리가 완료 될 때까지 메시지를 가져온 것으로 간주하면 안 되는 경우에 사용한다.

 

수동 커밋의 경우 메시지의 성공 처리 후 커밋하도록 하여 손실을 방지할 수 있다. 자동 commit과 마찬가지로 메시지 처리 완료 후 어떤 작업에 의한 에러로 commit을 못했을 때 중복으로 메시지를 가져올 수 있는 가능성이 있다.

 

예를 들어 메시지들을 DB에 저장하던 도중 실패하게 된다면 마지막 commit 된 오프셋부터 불러와야 하기 때문에 일부 메시지는 DB에 중복 저장될 수 있다.

 

컨슈머가 토픽을 구독하면 카프카는 컨슈머 그룹의 컨슈머들에게 파티션을 공평하게 분배한다. 그러나 특정 파티션을 할당받기를 원한다면 TopicPartition을 통해 컨슈머 인스턴스마다 명세하여 할당받을 수 있다. 만약 특정 파티션 0, 1만 할당받고 싶다면 별개의 컨슈머 그룹을 만들고 파티션을 매핑하면 된다.

 

 

 

 

Spring Kafka AckMode

AckMode desc
BATCH 이전에 poll()된 record가 listener에 의해 모두 처리된 후 커밋
COUNT ackCount 값을 넘어설 때 커밋
이 옵션을 사용할 경우에는 레코드 개수를 선언하는 AckCount 옵션을 설정해야 한다.
COUNT_TIME ackCount 혹은 ackTime 을 넘어갈 때 커밋
MANUAL Acknowledgement.acknowledge() 메서드가 호출되면 다음번 poll() 때 커밋
AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용
MANUAL_IMMEDIATE AcknowledgingMessageListener를 호출한 즉시 커밋
AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용
RECORD 한번에 하나의 처리된 레코드 커밋
TIME ackTime 을 넘어갈 때 커밋

 

 

ex.

    factory.getContainerProperties().setAckMode(AckMode.RECORD);

'Kafka' 카테고리의 다른 글

[Kafka] Listener 기본 설정 (6)  (0) 2024.07.26
[Kafka] Spring Kafka commit (5)  (0) 2024.07.17
[Kafka] Partition & Replication (3)  (1) 2024.07.12
[Kafka] kafka 내부 구조 (2)  (0) 2024.07.12
[Kafka] 탄생 배경 (1)  (0) 2024.07.08