거북이 developer

[Kafka] Multi Threaded Message Consumption with the Apache Kafka Consumer 본문

Kafka

[Kafka] Multi Threaded Message Consumption with the Apache Kafka Consumer

흥부가귀막혀 2022. 3. 14. 11:27

Multi-Threaded Message Consumption with the Apache Kafka Consumer

  • 토픽에서 레코드는 파티션이라는 더 작은 단위로 묶인다. 이 단위는 결과의 정확성을 훼손하지 않고 독립적으로 처리할 수 있으며, 병렬 처리를 위한 토대가 된다. 병렬 처리는 일반적으로 스케일링을 통해서 가능하다. 같은 그룹 내에 여러 컨슈머를 두고, 각 컨슈머는 토픽 파티션들의 서브셋으로 부터 데이터를 가져가 처리하고 단일 쓰레드에서 실행된다.
  • 대부분의 유즈 케이스에서 단일 쓰레드에서 메시지를 읽고 처리하는 것은 문제가 없다. 따라서, 쓰레드당 컨슈머 모델은 아파치 카프카 컨슈머에서 흔히 사용된다. I/O 연산이 들어가지 않는 처리는 일반적으로 매우 빠르기 때문에, poll 루프도 잘 동작한다. 이 모델이 클라이언트 코드의 간단함과 관련하여 많은 이점이 있지만, 일부 유즈 케이스에서 한계점들도 있다.
  • 카프카 컨슈머의 내부를 이해하는 것은 이런 한계점을 극복하는 성공적인 멀티 쓰레드 솔루션을 구현하는데 중요하다.

Thread per consumer model

  • 멀티 쓰레드 컨슈머 아키텍쳐를 구현할 때, 카프카 컨슈머가 쓰레드 세이프하지 않다는 것을 알고있는 것이 매우 중요하다. 멀티 쓰레드의 접근은 반드시 적절한 동기화가 필요한데, 이 부분이 까다로울 수 있고, 이러한 이유 때문에 단일 쓰레드 모델이 일반적으로 사용된다.
  • 일반적인 단일 쓰레드 구현은 poll loop를 중심으로 감싸져 있다. 기본적으로, 두가지 액션을 수행하는 무한 루프이다.
while (true) {
        // 1. `poll()` 메소드를 호출하여 레코드를 가져온다.
        ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
        // 2. 가져온 레코드를 처리한다.
        // Handle fetched records
}
  • 컨슈머는 기본 설정으로 카프카에 오프셋을 자동으로 저장한다. 이 빈도를 조절하기 위해 auto.commit.interval.ms를 조절할 수 있다.
  • 카프카에 처음인 사람들은 백그라운드 쓰레드에서 이 설정 값대로 오프셋이 커밋된다고 생각할 수 있다. 실제로 오프셋은 컨슈머의 poll 메소드 수행동안 커밋되고, auto.commit.interval.ms는 커밋 간의 최소 딜레이를 정의할 뿐이다. 이전 poll 호출을 통해 받은 레코드의 오프셋만이 커밋된다. 레코드의 처리는 poll 호출간 일어나기 때문에 처리되지 않은 레코드의 오프셋은 절대 커밋되지 않는다. 이것은 at-least-once delivery semantics을 보장한다.
  • 자동 오프셋 커밋은 애플리케이션에서 수종으로 처리한 레코드의 오프셋을 커밋하는 경우 disable 할 수 있다.
  • 레코드는 한 쓰레드에 의해서 fetch 되고 처리되기 때문에, 파티션에 쓰여진 순서와 동일한 순서로 처리된다. 이는 처리 순서를 보장한다.

Group rebalancing

  • 컨슈머 그룹 리밸런싱은 컨슈머 그룹의 컨슈머들에게 파티션 재할당이 필요한 경우 트리거 된다.
    • 새로운 컨슈머가 그룹에 조인한 경우
    • 존재하던 컨슈머가 그룹을 나가는 경우
    • 존재하던 컨슈머가 구독을 변경할 경우
    • 구독하던 토픽에 파티션이 추가된 경우
  • 리밸런싱은 그룹 코디네이터에 의해 진행되고, 그룹 내의 모든 컨슈머와의 커뮤니케이션을 한다.
  • 프로토콜 버전에 관계 없이, 파티션이 회수되려고 할 때, 컨슈머는 레코드 처리를 끝내고 해당 파티션의 오프셋을 커밋했다는 것을 그룹 코디네이터에게 알리기 전에 파티션이 안전하게 재할당 될 수 있도록 보장해야 한다.
  • 컨슈머당 쓰레드 모델에서 자동 커밋을 설정한 경우 리밸런싱에 대해 걱정할 필요 없다. 모든 것은 poll 메소드에서 자동으로 일어난다. 하지만 수동 커밋을 하는 경우 조인 그룹 요청이 호출되기 전 오프셋을 커밋하는 것은 당신의 책임이다. 이를 아래와 같은 두가지 방법으로 할 수 있다.
  1. 가져온 레코드의 처리가 완료되고 다음 poll 호출을 하기전 commitSync를 호출한다.
  2. ConsumerRebalanceListener를 구현하여 파티션이 회수될 예정이라는 알림을 받고, 해당 시점에 오프셋을 커밋한다.

A callback interface that the user can implement to trigger custom actions when the set of partitions assigned to the consumer changes. This is applicable when the consumer is having Kafka auto-manage group membership. If the consumer directly assigns partitions, those partitions will never be reassigned and this callback is not applicable. When Kafka is managing the group membership, a partition re-assignment will be triggered any time the members of the group change or the subscription of the members changes. This can occur when processes die, new process instances are added or old instances come back to life after failure. Partition re-assignments can also be triggered by changes affecting the subscribed topics (e.g. when the number of partitions is administratively adjusted).

첫번째 방법이 더 쉽지만, 처리가 매우 빠른 경우 커밋이 너무 자주 발생할 수 있다.

두번째 방법은 더 효율적이고 메시지 소비와 처리를 완전히 분리하였다.

Motivation for a multi-threaded consumer architecture

  • 카프카의 기본 개념에 익숙하다면 컨슈머 그룹에 컨슈머를 추가하여 메시지 소비의 병렬화가 가능하다는 것을 알 것이다. 하지만 이 방법은 새로운 애플리케이션 노드(컨테이너, VM 등)가 추가되는 수평 스케일링에 더 적절하다.
  • 멀티 컨슈머 방식은 수직 스케일링에서도 사용될 수 있다. 하지만, 컨슈머 인스턴스 노드와 애플리케이션 코드에 수반하는 쓰레드의 추가적인 관리가 필요하다. 멀티 컨슈머 인스턴스를 사용하는 것은 네트워크 트래픽 뿐만 아니라 추가적인 컨슈머에 대한 관리가 필요하기 때문에 그룹 코디네이터에도 더 많은 일을 주게된다.
  • 이러한 문제들이 쓰레드당 컨슈머에서 멀티 쓰레드 모델로 전환하는 것에 대한 강력한 이유가 될 수 없을 수 있다. 멀티 쓰레드 모델이 장점을 가지는 더 많은 사례가 있다.

The problem of slow processing

poll method 호출 사이에서 허용되는 최대 지연 시간은 max.poll.interval.ms 설정에 의해 결정되는데 5분이 기본이다. 컨슈머가 그 인터벌 동안 poll 호출에 실패한다면, 해당 컨슈머는 죽은 것으로 간주되고 리밸런싱이 일어난다. 이러한 상황은 기본 설정을 사용하고 컨슈머당 쓰레드를 사용할 때 레코드 처리에 오랜 시간이 걸릴 경우 발생할 수 있다.

  • 컨슈머당 쓰레드 모델을 사용할 때 이 문제를 다음과 같이 설정 값을 변경해서 해결할 수 있다.
  1. max.poll.records를 더 작은 값으로
  2. max.poll.interval.ms를 높은 값으로
  3. 위 두 설정을 다

레코드별로 처리 소요 시간이 다를 때, 위 두 설정 값을 조정하는 것은 어려울 수 있다. 따라서 처리를 위한 쓰레드를 별도로 사용 하는 것을 추천한다.

Handling record processing exceptions

  • 에러 핸들링을 포함한 레코드 처리 로직은 애플리케이션에 따라 다르다. 에러를 처리하는 경우 다음과 같은 옵션 중 하나를 수행할 수 있다.
  1. 처리를 중단하고 컨슈머를 종료한다. (선택적으로 이전에 몇번 더 재시도한다)
  2. 레코드를 DLQ에 전송하고 다음 레코드로 계속한다(선택적으로 이전에 몇번 더 재시도한다)
  3. 레코드가 완전히 처리될 때까지 재시도한다(this might take forever)
  • 무한 재시도하는 세번째 옵션은 일부 유즈 케이스에서 바람직한 선택일 수 있다. 예를 들어, 현재 오프라인 상태인 외부 시스템에 쓰기가 필요한 경우 얼마가 걸리더라도 외부 시스템이 다시 정상화 될 때까지 재시도 하고 싶을 수 있다.
  • 컨슈머당 쓰레드 모델의 경우, 단일 레코드 처리는 반드시 주어진 시간안에 되어야 한다. 그렇지 않으면, 전체 처리시간이 max.poll.interval.ms를 넘을 수 있고, 이로 인해 컨슈머가 그룹에서 제외될 수 있다. 이러한 이유로 재시도를 위해 꽤나 복잡한 로직을 구현해야 한다.
  • 멀티 쓰레드 방안은 레코드를 처리하는데 걸리는 시간을 원하는 대로 할 수 있다. 따라서, 레코드 처리가 성공할 때까지 무한 재시도할 수 있다.

Multi-threaded Kafka Consumer

카프카 컨슈머의 멀티 쓰레드 모델을 설계에는 다양한 방법이 있다. 가장 원시적인 방법은 쓰레드 풀에서 꺼낸 개별 쓰레드에서 자동 오프셋 커밋을 사용하면서 메시지를 처리하는 것이다. 불행하게도 이것은 예기치 않은 현상을 야기한다.

  1. 오프셋이 레코드가 처리되기 전에 커밋될 수 있다. 
  2. 같은 파티션의 메시지가 병렬로 처리될 수 있어 메시지 처리 순서가 보장되지 않는다. 레코드 처리의 병렬화를 달성함과 단일 쓰레드 방안에서 가졌던 특징 : 파티션당 처리 순서 보장, at-least-once 을 보장 받고 싶을 것이다. 아래에 설명된 해결책은 쓰레드 풀에 의해 실행되는 runnabl task를 사용한다.

Decoupling consumption and processing

  • 멀티 쓰레드 구현에서, 메인 컨슈머 쓰레드는 레코드 처리를 다른 쓰레드들에게 위임한다.
  • 레코드는 단일 쓰레드 방식과 동일하게 poll 메소드를 통해서 가져온다. 레코드는 파티션 별로 그룹화되는데, 단일 파티션의 레코드를 가지고 있는 여러 컬렉션을 갖게 된다. 이 컬렉션을 처리하기 위해서는 runnable tasks가 각각 생성되고, 자바의 빌트인 쓰레드풀 구현으로 제출해야 한다.
public class Task implements Runnable {

   private final List records;

   public Task(List records) {
       this.records = records;
   }

   public void run() {
       for (ConsumerRecord record : records) {
            // do something with record
       }
    }
}

태스크의 구현은 꽤 심플하다.

파티션 그룹화에 의한 컬렉션의 수는 컨슈머에 할당된 파티션의 수, 처리량 그리고 max.poll.records와 max.partitions.fetch.bytes와 같은 컨슈머 일부 설정에 따라 다르다.

각 파티션의 레코드는 순차적으로 처리되기 때문에 적은 수의 파티션으로 CPU 활용도가 떨어질 수 있다. 파티션의 수가 매우 많은 경우 많은 수의 쓰레드가 필요할 것이기 때문에 모든 것을 병렬로 처리하고 싶지 않을 것이다. CPU를 효율적으로 활용하기 위해서는 고정된 갯수의 쓰레드가 사용된다. 이 갯수는 CPU 코어에 따라 결정된다.

여기서는 8개의 코어를 사용하고, 여덟개의 쓰레드를 설정했다고 가정했다.

private ExecutorService executor = Executors.newFixedThreadPool(8);
  • 태스크는 ExecutorService.submit()를 통해 제출된다. 이 함수는 쓰레드 풀의 모든 쓰레드가 현재 사용중이더라도 즉시 리턴한다. 이것은 쓰레드 풀의 내부 큐에 단순히 작업을 추가한다.
  • 모든 태스크가 ExecutorService에 제출된 후, 메인 컨슈머 쓰레드는 새로운 레코드를 poll하기 시작한다. poll 함수는 call 사이에 특별히 할 것이 없기 때문에 자주 호출 된다.
  • 즉, poll 호출 사이에 과도한 딜레이가 생기는 문제가 없을 것이고, 그룹 리밸런싱이 매우 빨라질 것이라는 것이다. 하지만, 처리 순서와 at-least-once 딜리버리를 보장하기 위해서는 추가적인 단계가 필요하다.
Comments