이번에 카프카를 구현하면서 오랜만에 막연함을 느꼈던 것 같다. 구현을 하고, 원리 같은 걸 공부해보기는 했는데 붕 뜬 지식을 잡으려 손을 뻗는 기분이었다.
왜 그런 기분을 느낄까 생각해 보면 답은 하나밖에 나오지 않았다.
- 잘 모르기 때문에.
나름대로 정리도 해보고 구현도 해봤는데 왜 잘 모를까.
- 여러 가지 블로그를 기반으로 공부하다 보니까 카프카에 대해서 중구난방으로 공부하게 된 것 같았다.
내가 헷갈렸던 내용은 아래와 같다.
BatchListner와 RecordListener
BatchListener를 ‘한 번에 여러 레코드를 처리한다’고 정리된 글을 읽고 BatchListener는 병렬처리를 하는 리스너라고 생각을 했다.
하지만 코드를 보다보면 Concurrent… 가 붙은 용어( ex. ConcurrentKafkaListenerContainerFactory )를 심심치 않게 볼 수 있었다.
그리고 누군가에게 카프카에 대해 공부한 내용을 잠깐 보여주다가 이런 질문도 받았다.
RecordListener와 데이터 유실
‘그러면 RecordListener 는 한 번에 하나의 레코드만 처리하니까 데이터 누락이 발생하지 않는 건가요?’
이때의 나는 바로 답하지 못했다. 데이터 누락 혹은 중복이 발생하는 건 커밋 시점 때문이라는 건 진작 알고 있었으나 확신하지 못했다.
RecordListener에서 poll 할 때 데이터를 가져온다면 누락은 발생할 수 있겠지.
- poll 한 데이터를 커밋하고 처리하는 도중 오류 발생 → 이미 커밋한 메시지는 처리할 수 없음 → 데이터 누락
그렇다면 RecordListener 에서 데이터를 처리한 뒤 commit을 하게 된다면?
완전히 누락을 방지할 수 없을 것 같다는 직관적인 추측밖에 할 수 없었다.
그동안 공부했다고 생각했는데 공부한 것 같지가 않은 기분이 들었다. 그래서 공식 문서를 펼쳤다.
이제부터 내가 이해한 내용을 토대로 설명을 해볼까 한다.
interceptor 니 뭐니 transaction 이 뭐니 다 제끼고 가장 기본적인 내용부터, 나를 출발 선상에 세울 수 있게 했던 수준까지의 내용만 다룰 예정이다. (리얼 초심자 수준임)
앞서 이야기하자면 나는 코드 구현을 어느정도 해 둔 상태에서 공식문서를 읽으며 내용을 정리했다.
Kafka 구성하기
Spring 에서 Kafka를 구현할 때 MessageListenerContainer라는 인터페이스를 상속받은 리스너를 사용한다.
MessageListenerContainer를 상속받은 리스너
- KafkaMessageListenerContainer
- 싱글 스레드를 사용하며 모든 topic 혹은 partition 메시지를 받는다.
public KafkaMessageListnerContainerContainer ( ConsumerFactory<K, V> consumerFactory, ContainerProperties contianerproperties)
- ConcurrentMessageListenerContainer
- 하나 이상의 KafkaMessageListenerContainer 인스턴스를 생성하여 멀티 스레드로 메시지를 수신한다.
public ConcurrentMessageListenerContainer ( ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties)
ContainerProperties
ContainerProperties 가 갖고 있는 정보
- topic과 partitions에 대한 정보
- 그 외 다른 configuration 에 대한 정보
MessageListener를 컨테이너에 할당하기 위해서는 이 정보들이 앞서 정의되어야 한다.
ContainerProps.setMessageListener를 사용해서 컨테이너를 생성할 수 있다.
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
💡 Starting with version 2.2, a new container property called missingTopicsFatal has been added (default: false since 2.3.4). This prevents the container from starting if any of the configured topics are not present on the broker. </aside>
ConcurrentMessageListenerContainer
public ConcurrentMessageListenerContainer( ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties)
container.setConcurrency(3) 를 호출하면 KafkaMessageListenerContainer 인스턴스 3개가 생성된다.
Committing Offsets
enable.auto.commit
- true 카프카 설정에 맞춰 offset 을 auto commit
- flase 수동 커밋으로 설정 (default: AckMode.BATCH)
💡 2.3 버전 이후 디폴트 값은 flase 이나 이전에는 true였다.
AckMode( 트랜잭션이 적용되지 않을 때)
AckMode | desc |
RECORD | 리스너가 레코드를 처리한 후 한번에 하나의 레코드 커밋 |
BATCH | 이전에 poll()되어 처리된 모든 record 가 listener에 의해 모두 처리된 후 커밋 |
TIME | BATCH 특성 + ackTime 을 넘어갈 때 커밋이 옵션을 사용할 경우 시간 간격을 선언하는 AckTime 옵션을 설정해야 한다. |
COUNT | BATCH 특성 + ackCount 값을 넘어설 때 커밋이 옵션을 사용할 경우에는 레코드 개수를 선언하는 AckCount 옵션을 설정해야 한다. |
COUNT_TIME | ackCount 혹은 ackTime 을 넘어갈 때 커밋 |
MANUAL | Acknowledgement.acknowledge() 메서드가 호출되면 다음번 poll() 때 처리된 모든 레코드를 커밋 Acknowledgment 에 대한 책임이 있다. |
MANUAL_IMMEDIATE | AcknowledgingMessageListener를 호출한 즉시 커밋AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용 |
RECORD 설정은 단건 조회, BATCH 설정은 다건 조회에 가깝다.
💡 AckMode.MANUAL
AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용
💡 ListenerContainer 자체는 offset commit에 대한 메커니즘이 없기 때문에 auto commit 가능 여부는 false로 설정하기를 권장한다.
'Kafka' 카테고리의 다른 글
[Kafka] Spring Kafka commit (5) (0) | 2024.07.17 |
---|---|
[Kafka] commit 이해하기 (4) (0) | 2024.07.17 |
[Kafka] Partition & Replication (3) (1) | 2024.07.12 |
[Kafka] kafka 내부 구조 (2) (0) | 2024.07.12 |
[Kafka] 탄생 배경 (1) (0) | 2024.07.08 |