본문 바로가기

Kafka

[Kafka] Listener 기본 설정 (6)

이번에 카프카를 구현하면서 오랜만에 막연함을 느꼈던 것 같다. 구현을 하고, 원리 같은 걸 공부해보기는 했는데 붕 뜬 지식을 잡으려 손을 뻗는 기분이었다.

왜 그런 기분을 느낄까 생각해 보면 답은 하나밖에 나오지 않았다.

  • 잘 모르기 때문에.

나름대로 정리도 해보고 구현도 해봤는데 왜 잘 모를까.

  • 여러 가지 블로그를 기반으로 공부하다 보니까 카프카에 대해서 중구난방으로 공부하게 된 것 같았다.

 


 

내가 헷갈렸던 내용은 아래와 같다.

BatchListner와 RecordListener

BatchListener를 ‘한 번에 여러 레코드를 처리한다’고 정리된 글을 읽고 BatchListener는 병렬처리를 하는 리스너라고 생각을 했다.

하지만 코드를 보다보면 Concurrent… 가 붙은 용어( ex. ConcurrentKafkaListenerContainerFactory )를 심심치 않게 볼 수 있었다.

그리고 누군가에게 카프카에 대해 공부한 내용을 잠깐 보여주다가 이런 질문도 받았다.

 

RecordListener와 데이터 유실

‘그러면 RecordListener 는 한 번에 하나의 레코드만 처리하니까 데이터 누락이 발생하지 않는 건가요?’

이때의 나는 바로 답하지 못했다. 데이터 누락 혹은 중복이 발생하는 건 커밋 시점 때문이라는 건 진작 알고 있었으나 확신하지 못했다.

RecordListener에서 poll 할 때 데이터를 가져온다면 누락은 발생할 수 있겠지.

- poll 한 데이터를 커밋하고 처리하는 도중 오류 발생 → 이미 커밋한 메시지는 처리할 수 없음 → 데이터 누락

 

그렇다면 RecordListener 에서 데이터를 처리한 뒤 commit을 하게 된다면?

완전히 누락을 방지할 수 없을 것 같다는 직관적인 추측밖에 할 수 없었다.

그동안 공부했다고 생각했는데 공부한 것 같지가 않은 기분이 들었다. 그래서 공식 문서를 펼쳤다.

 

이제부터 내가 이해한 내용을 토대로 설명을 해볼까 한다.

interceptor 니 뭐니 transaction 이 뭐니 다 제끼고 가장 기본적인 내용부터, 나를 출발 선상에 세울 수 있게 했던 수준까지의 내용만 다룰 예정이다. (리얼 초심자 수준임)

앞서 이야기하자면 나는 코드 구현을 어느정도 해 둔 상태에서 공식문서를 읽으며 내용을 정리했다.

 


 

Kafka 구성하기

Spring 에서 Kafka를 구현할 때 MessageListenerContainer라는 인터페이스를 상속받은 리스너를 사용한다.

MessageListenerContainer를 상속받은 리스너

  1. KafkaMessageListenerContainer
    • 싱글 스레드를 사용하며 모든 topic 혹은 partition 메시지를 받는다.
    public KafkaMessageListnerContainerContainer ( ConsumerFactory<K, V> consumerFactory, ContainerProperties contianerproperties)
  2. ConcurrentMessageListenerContainer
    • 하나 이상의 KafkaMessageListenerContainer 인스턴스를 생성하여 멀티 스레드로 메시지를 수신한다.
    public ConcurrentMessageListenerContainer ( ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties)

ContainerProperties

ContainerProperties 가 갖고 있는 정보

  1. topic과 partitions에 대한 정보
  2. 그 외 다른 configuration 에 대한 정보

MessageListener를 컨테이너에 할당하기 위해서는 이 정보들이 앞서 정의되어야 한다.

ContainerProps.setMessageListener를 사용해서 컨테이너를 생성할 수 있다.

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
💡 Starting with version 2.2, a new container property called missingTopicsFatal has been added (default: false since 2.3.4). This prevents the container from starting if any of the configured topics are not present on the broker. </aside>

ConcurrentMessageListenerContainer

public ConcurrentMessageListenerContainer( ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties)

container.setConcurrency(3)  를 호출하면 KafkaMessageListenerContainer 인스턴스 3개가 생성된다.

Committing Offsets

enable.auto.commit

  • true 카프카 설정에 맞춰 offset 을 auto commit
  • flase 수동 커밋으로 설정 (default: AckMode.BATCH)
💡 2.3 버전 이후 디폴트 값은 flase 이나 이전에는 true였다.

 

AckMode( 트랜잭션이 적용되지 않을 때)

AckMode desc
RECORD 리스너가 레코드를 처리한 후 한번에 하나의 레코드 커밋
BATCH 이전에 poll()되어 처리된 모든 record 가 listener에 의해 모두 처리된 후 커밋
TIME BATCH 특성
+ ackTime 을 넘어갈 때 커밋이 옵션을 사용할 경우 시간 간격을 선언하는 AckTime 옵션을 설정해야 한다.
COUNT BATCH 특성
+ ackCount 값을 넘어설 때 커밋이 옵션을 사용할 경우에는 레코드 개수를 선언하는 AckCount 옵션을 설정해야 한다.
COUNT_TIME ackCount 혹은 ackTime 을 넘어갈 때 커밋
MANUAL Acknowledgement.acknowledge() 메서드가 호출되면 다음번 poll() 때 처리된 모든 레코드를 커밋
Acknowledgment 에 대한 책임이 있다.
MANUAL_IMMEDIATE AcknowledgingMessageListener를 호출한 즉시 커밋AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용

RECORD 설정은 단건 조회, BATCH 설정은 다건 조회에 가깝다.

💡 AckMode.MANUAL

AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용

 

💡 ListenerContainer 자체는 offset commit에 대한 메커니즘이 없기 때문에 auto commit 가능 여부는 false로 설정하기를 권장한다.

'Kafka' 카테고리의 다른 글