Kafka
[Kafka] Consumer 파티션 리밸런싱
흥부가귀막혀
2019. 8. 23. 07:53
파티션 리밸런싱이 발생하는 케이스
- KafkaConsumer.close 가 호출되어 코디네이터에게 consumer 가 종료 되는 것을 알렸을 때.
- heartbeat.interval.ms 간격으로 호출되는 하트비트가 session.timeout.ms 동안 오지 않았을 경우 코디네이터에서 해당 컨슈머를 컨슈머그룹에서 제외하고 파티션 리밸런싱 발생.
- max.poll.interval.ms 동안 poll 요청이 없을 경우 코디네이터에서 해당 컨슈머를 컨슈머그룹에서 제외하고 파티션 리밸런싱 발생.
- Consumer 가 신규로 추가되어 파티션 분배가 필요한 상황일 경우.
- 파티션 수를 증가 시켰을 때
코디네이터란❓
모든 브로커는 특정 컨슈머 그룹의 코디네이터 역할을 합니다. 코디네이터는 컨슈머 그룹의 멤버 변화가 생겼을 때 파티션 리밸런싱 작업을 조정하는 역할을 합니다.
파티션 리밸런싱이 일어나는 과정
- 그룹 내 남아있는 컨슈머들은 poll 메소드 호출 시 그룹에 다시 조인해야 하는지를 확인.
- 컨슈머 그룹에 새로운 컨슈머가 추가되었거나, 기존 컨슈머가 그룹에서 제외된 경우에 컨슈머는 그룹에 다시 조인해야 한다.
- 컨슈머가 그룹에 다시 조인해야 할 때에는 코디네이터에게 조인 요청을 보낸다.
- 컨슈머는 조인 요청을 보낼 때 메타데이터를 추가해서 보낼 수 있다.
- 코디네이터는 그룹 내의 모든 컨슈머로부터 조인 요청 받으면 그중 하나를 리더로 선정.
- 리더로 선정된 컨슈머는 조인 요청에 대한 응답으로 그룹 내의 모든 컨슈머 목록과 메타데이터를 받는다.
- 리더는 컨슈머들의 메타데이터 등을 참고해서 각 컨슈머에게 파티션을 어떻게 할당할지를 결정한 뒤, 코디네이터에게 전달한다.
- 팔로워는(리더가 아닌 컨슈머들) 조인 요청에 대한 응답으로 컨슈머 목록이나, 메타데이터를 받지 않는다. 자신이 리더가 아닌것을 응답을 통해 확인한 팔로워는 새롭게 할당된 파티션 목록을 얻기 위해 코디네이터에게 다시 요청을 보낸다. 코디네이터는 리더로부터 파티션 할당 정보를 받으면 팔로워의 요청에 대한 응답으로 새롭게 할당된 파티션 목록을 보내준다.
- 위 과정은 KafkaConsumer.poll 함수 내부에서 진행된다.
- KafkaConsumer.poll 함수 내용 중 리밸런싱과 관련된 코드(kafka-client v2.3.1)
if (includeMetadataInTimeout) {
if (!updateAssignmentMetadataIfNeeded(timer)) {
return ConsumerRecords.empty();
}
} else {
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
log.warn("Still waiting for metadata");
}
}
리밸런싱이 발생하면?
- 파티션 리밸런싱이 발생하는 컨슈머 그룹에 속한 모든 컨슈머는 리밸런싱이 완료될때까지 메세지를 수신받지 못한다.
- 코디네이터는 그룹 내의 모든 컨슈머들로부터 조인 요청을 받은 이후에 리더를 선출한다. 따라서 그룹 내의 모든 컨슈머들이 poll 함수를 호출해야지만 파티션 리밸런싱이 진행될 수 있다.
- 그룹 내 특정 컨슈머가 poll 함수를 호출하지 않은 경우
- max.poll.interval.ms 시간이 지나면 해당 컨슈머를 그룹에서 제외시키고 리밸런싱을 진행한다.
리밸런싱 속도 향상을 위한 전략
💡 핵심은 각 컨슈머가 poll 함수를 빠르게 호출함으로써 리밸런싱 진행을 빠르게 하도록 하는 것이다.
1안. 비동기로 메세지를 처리(혹은 메세지 처리를 하는 다른 솔루션 적용)
- 비동기로 메세지를 처리함으로써 메세지 처리로 인해 지연되는 polling 시간을 단축시킨다.
- 장점
- 메세지 처리 지연이 발생하지 않는다.
- 단점
- kafka 의 메카니즘을 그대로 따를 수 없다.(메세지 처리 완료여부와 상관없이 offset commit 을 하기 때문)
- 메세지 처리 순서를 보장할 수 없다.
- 메세지를 처리할 thread pool 혹은 솔루션 관리를 추가로 해야한다.(thread pool 이나 솔루션에 여유가 없을 경우 지연 또는 메세지 유실이 발생할 수 있다.)
2안. 설정값 변경
- max.poll.interval.ms 를 짧게 잡아 poll 함수를 늦게 호출할 경우 빠르게 제거하거나, max.poll.records 를 짧게 하여 polling 주기가 짧아지도록 설정한다.
- 장점
- 간단한 설정 변경으로 적용해 볼 수 있다.
- kafka 의 메카니즘을 그대로 따를 수 있다.
- 단점
- 한계가 있다.(결국 중요한 건 message 1건에 대한 처리속도이기 때문)
3안. Consumer 수를 2개로 제한
- 1개의 consumer 가 종료되었을 경우 남은 consumer 는 1개이기 때문에 빠른 리밸런싱이 가능하다.
- 장점
- 리밸런싱이 빠르고, kafka 의 메카니즘을 그대로 따른다.
- 단점
- 전체적인 메세지 처리 속도에 한계가 생긴다.
- 장애가 발생할 가능성이 커진다.