Kafka

[Kafka] Consumer Group Offset Reset

흥부가귀막혀 2022. 3. 15. 14:56

필요한 상황

  • Consumer 에 이슈가 발생해서 특정 시점으로 offset 을 돌려서 다시 읽고 싶어요.
  • 지금까지 읽었던 메세지를 다시 읽게 하고 싶을 경우?

사전 확인 사항

- offset 을 어디까지로 reset 할지 확인

  • 특정 offset 으로 reset 을 하고 싶다면 Consumer 에서 남긴 로그를 통해 각 파티션별 reset 하고자 하는 offset 값을 알아야 한다.
  • 특정 시점으로 reset 을 하고 싶다면 reset 하고자 하는 시점을 'yyyy-MM-ddTHH:mm:SS.sss' 포맷으로 정의한다.

- 경우에 따라 메세지 처리가 중복으로 발생할 수 있는데 이때 이슈가 없는지 확인한다.

  • 이슈가 없다면 상관없지만 이슈가 있을 경우 Consumer 로직에서 중복처리를 하지 않도록 수정작업이 필요하다.

Kafka Command 수행 방법

- 신규 Consumer Group 생성 방법

  • offset 을 reset 하기 위해 /home1/irteam/apps/kafka/bin/kafka-consumer-groups.sh 로 --execute 요청을 하면 consumer group 이 없을경우 신규로 만들어진다.(즉, consumer group 생성을 위해 특별히 어떤 command 요청을 할 필요는 없고 offset reset command 만 실행하면 된다.)

- Consumer Group 삭제 방법

⚠️ consumer group 을 삭제하기 위해서는 해당 consumer group 상태가 inactive 여야 가능하다.

  • consumer group 을 삭제하고자 하는 kafka broker 장비에 접속한다.
  • $KAFKA_HOME/bin/kafka-consumer-groups.sh 을 아래와 같이 실행한다.
$ bin/kafka-consumer-groups.sh --bootstrap-server $KAFKA --group $CONSUMER_GROUP --topic $TOPIC --delete --execute

⚠️ Kafka Broker 에 보안설정이 되어있을 경우 --command-config 옵션을 통해 보안 관련 설정을 반드시 추가해야 한다. 

특정 시점의 offset 으로 reset 할 경우

⚠️ consumer group 의 offset 을 reset 하기 위해서는 해당 consumer group 상태가 inactive 여야 가능하다. 신규로 정의되는 consumer group 일 경우 이부분은 신경쓰지 않아도 된다.

  • offset 을 변경하고자 하는 kafka broker 장비에 접속한다.
  • $KAFKA_HOME/bin/kafka-consumer-groups.sh 을 아래와 같이 실행한다.
$ bin/kafka-consumer-groups.sh --bootstrap-server $KAFKA --group $CONSUMER_GROUP --topic $TOPIC --reset-offsets --to-datetime '2020-01-01 01:00:00.001' --execute

⚠️ Kafka Broker 에 보안설정이 되어있을 경우 --command-config 옵션을 통해 보안 관련 설정을 반드시 추가해야 한다. 

특정 offset 값으로 reset 할 경우

⚠️ consumer group 의 offset 을 reset 하기 위해서는 해당 consumer group 상태가 inactive 여야 가능하다. 신규로 정의되는 consumer group 일 경우 이부분은 신경쓰지 않아도 된다.

  • offset 을 변경하고자 하는 kafka broker 장비에 접속한다.
  • 아래와 같이 csv 포멧으로 offset 설정 파일을 만든다.(토픽, 파티션Id, offset 값 순으로 나열한다.)
example,0,16569
example,1,15715
example,2,13281
example,3,15252
...
  • $KAFKA_HOME/bin/kafka-consumer-groups.sh 을 아래와 같이 실행한다.
$ bin/kafka-consumer-groups.sh --bootstrap-server $KAFKA --group $CONSUMER_GROUP --reset-offsets --from-file $CONSUMER_GROUP_offsets.csv --execute

⚠️ Kafka Broker 에 보안설정이 되어있을 경우 --command-config 옵션을 통해 보안 관련 설정을 반드시 추가해야 한다.