Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
Tags
- ProjectLoom
- Zookeeper
- restdocs
- Mirror
- raft
- tsdb
- 비동기
- OFFSET
- kafka
- Rebalance
- spring
- Vmagent
- Vmalert
- OpenJDK
- webflux
- Reassign
- JVM
- broker
- API문서
- consumer
- java
- Brooklin
- springboot
- ExecutableJar
- JDK
- Reactive
- NoClassDefFoundError
- swagger
- VictoriaMetrics
Archives
- Today
- Total
거북이 developer
[Kafka] Consumer Group Offset Reset 본문
필요한 상황
- Consumer 에 이슈가 발생해서 특정 시점으로 offset 을 돌려서 다시 읽고 싶어요.
- 지금까지 읽었던 메세지를 다시 읽게 하고 싶을 경우?
사전 확인 사항
- offset 을 어디까지로 reset 할지 확인
- 특정 offset 으로 reset 을 하고 싶다면 Consumer 에서 남긴 로그를 통해 각 파티션별 reset 하고자 하는 offset 값을 알아야 한다.
- 특정 시점으로 reset 을 하고 싶다면 reset 하고자 하는 시점을 'yyyy-MM-ddTHH:mm:SS.sss' 포맷으로 정의한다.
- 경우에 따라 메세지 처리가 중복으로 발생할 수 있는데 이때 이슈가 없는지 확인한다.
- 이슈가 없다면 상관없지만 이슈가 있을 경우 Consumer 로직에서 중복처리를 하지 않도록 수정작업이 필요하다.
Kafka Command 수행 방법
- 신규 Consumer Group 생성 방법
- offset 을 reset 하기 위해 /home1/irteam/apps/kafka/bin/kafka-consumer-groups.sh 로 --execute 요청을 하면 consumer group 이 없을경우 신규로 만들어진다.(즉, consumer group 생성을 위해 특별히 어떤 command 요청을 할 필요는 없고 offset reset command 만 실행하면 된다.)
- Consumer Group 삭제 방법
⚠️ consumer group 을 삭제하기 위해서는 해당 consumer group 상태가 inactive 여야 가능하다.
- consumer group 을 삭제하고자 하는 kafka broker 장비에 접속한다.
- $KAFKA_HOME/bin/kafka-consumer-groups.sh 을 아래와 같이 실행한다.
$ bin/kafka-consumer-groups.sh --bootstrap-server $KAFKA --group $CONSUMER_GROUP --topic $TOPIC --delete --execute
⚠️ Kafka Broker 에 보안설정이 되어있을 경우 --command-config 옵션을 통해 보안 관련 설정을 반드시 추가해야 한다.
특정 시점의 offset 으로 reset 할 경우
⚠️ consumer group 의 offset 을 reset 하기 위해서는 해당 consumer group 상태가 inactive 여야 가능하다. 신규로 정의되는 consumer group 일 경우 이부분은 신경쓰지 않아도 된다.
- offset 을 변경하고자 하는 kafka broker 장비에 접속한다.
- $KAFKA_HOME/bin/kafka-consumer-groups.sh 을 아래와 같이 실행한다.
$ bin/kafka-consumer-groups.sh --bootstrap-server $KAFKA --group $CONSUMER_GROUP --topic $TOPIC --reset-offsets --to-datetime '2020-01-01 01:00:00.001' --execute
⚠️ Kafka Broker 에 보안설정이 되어있을 경우 --command-config 옵션을 통해 보안 관련 설정을 반드시 추가해야 한다.
특정 offset 값으로 reset 할 경우
⚠️ consumer group 의 offset 을 reset 하기 위해서는 해당 consumer group 상태가 inactive 여야 가능하다. 신규로 정의되는 consumer group 일 경우 이부분은 신경쓰지 않아도 된다.
- offset 을 변경하고자 하는 kafka broker 장비에 접속한다.
- 아래와 같이 csv 포멧으로 offset 설정 파일을 만든다.(토픽, 파티션Id, offset 값 순으로 나열한다.)
example,0,16569
example,1,15715
example,2,13281
example,3,15252
...
- $KAFKA_HOME/bin/kafka-consumer-groups.sh 을 아래와 같이 실행한다.
$ bin/kafka-consumer-groups.sh --bootstrap-server $KAFKA --group $CONSUMER_GROUP --reset-offsets --from-file $CONSUMER_GROUP_offsets.csv --execute
⚠️ Kafka Broker 에 보안설정이 되어있을 경우 --command-config 옵션을 통해 보안 관련 설정을 반드시 추가해야 한다.
'Kafka' 카테고리의 다른 글
LinkedIn 의 brooklin 활용 (0) | 2022.08.26 |
---|---|
[Kafka] binary 설치 및 실행 (0) | 2022.03.15 |
[Kafka] Reassign Partition 방법 (0) | 2022.03.15 |
[Kafka] OutOfOrderSequenceException (0) | 2022.03.14 |
[Kafka] kafka SASL / SCRAM 인증, kafka-acls 권한 설정 (0) | 2022.03.14 |
Comments