Kafka
[Kafka] LeaderEpochCache Warn Log
흥부가귀막혀
2022. 3. 14. 15:43
개요
- Broker 에 아래와 같이 Warn Level 로그가 남겨짐
[2020-07-30 12:40:03,237] WARN [LeaderEpochCache message_action-46] New epoch entry EpochEntry(epoch=27, startOffset=0) caused truncation of conflicting entries ListBuffer(EpochEntry(epoch=26, startOffset=0)). Cache now contains 1 entries. (kafka.server.epoch.LeaderEpochFileCache)
- 해당 로그가 왜 남겨지고, 해소할 방법은 없는지 탐색.
원인 파악
- 해당 로그는 partition leader 에 대한 epoch 값이 변경될 경우 변경된 epoch 값을 cache 및 기존 cache 된 내용을 truncate 하는 과정에서 발생하는 로그다.(LeaderEpoch 에 대한 내용은 KIP-101 참고)
- 로그를 남기고 있는 v2.1.1 코드 는 아래와 같다.
/**
* Remove any entries which violate monotonicity following the insertion of an assigned epoch.
*/
private def truncateAndAppend(entryToAppend: EpochEntry): Unit = {
validateAndMaybeWarn(entryToAppend)
val (retainedEpochs, removedEpochs) = epochs.partition { entry =>
entry.epoch < entryToAppend.epoch && entry.startOffset < entryToAppend.startOffset
}
epochs = retainedEpochs :+ entryToAppend
if (removedEpochs.isEmpty) {
debug(s"Appended new epoch entry $entryToAppend. Cache now contains ${epochs.size} entries.")
} else {
warn(s"New epoch entry $entryToAppend caused truncation of conflicting entries $removedEpochs. " +
s"Cache now contains ${epochs.size} entries.")
}
}
- 신규로 append 되는 epoch 의 startOffset 이 기존과 같거나 작으면 해당 warn log 를 남기도록 되어있다.
- startOffset 이 작은 경우 문제가 있는 케이스이지만 같은건 정상적인 상황에서도 가능한 케이스라 이렇게 남긴것이 의문.
- v2.3.0 코드 를 보니 아래와 같이 변경되어있다.(해당 코드는 PR 에서 수정 요청되었다.)
/**
* Remove any entries which violate monotonicity following the insertion of an assigned epoch.
*/
private def truncateAndAppend(entryToAppend: EpochEntry): Unit = {
validateAndMaybeWarn(entryToAppend)
val (retainedEpochs, removedEpochs) = epochs.partition { entry =>
entry.epoch < entryToAppend.epoch && entry.startOffset < entryToAppend.startOffset
}
epochs = retainedEpochs :+ entryToAppend
if (removedEpochs.isEmpty) {
debug(s"Appended new epoch entry $entryToAppend. Cache now contains ${epochs.size} entries.")
} else if (removedEpochs.size > 1 || removedEpochs.head.startOffset != entryToAppend.startOffset) {
// Only log a warning if there were non-trivial removals. If the start offset of the new entry
// matches the start offfset of the removed epoch, then no data has been written and the truncation
// is expected.
warn(s"New epoch entry $entryToAppend caused truncation of conflicting entries $removedEpochs. " +
s"Cache now contains ${epochs.size} entries.")
}
}
- 저렇게 변경한 이유는 위에서 의문을 제기한 startOffset 이 같을 경우가 꼭 문제가 있는 상황은 아니기 때문이다. 메세지가 publish 되지 않은 상황이라면(traffic 이 적은 상황이라면) 충분히 가능한 케이스이기 때문에 2.3.1 에서 해당 상황은 로그를 안남기는거로 제외되었다.
Leader epoch 값이 왜 달랐을까?
- kafka broker 에서는 partition 에 대한 leader epoch 값을 Partition 객체에도 저장(memory)하고, 추후 복구를 대비하여 LeaderEpochFileCache 를 사용하여 file 로도 저장한다.
- 문제는 partition leader 가 변경될 경우 변경된 leader epoch 값을 memory(Partition 객체) 와 file(LeaderEpochFileCache) 에 모두 저장하는 노드는 partition leader 노드만 그렇게 저장하고, follower 노드는 memory(Partition 객체) 에만 저장한다.
- 그러다보니 follower 노드에서 fetcher thread 를 통해 message 복제를 시도할 때 memory 에 저장된 leader epoch 값으로 LeaderEpochFileCache 에 append 를 하니 위와 같은 로그가 남게되는것이다.
- 즉, 갑자기 leader epoch 값이 올라가서 발생한게 아니라 기존에 leader epoch 값이 변경이 되었는데(브로커 노드 재시작 등으로 인해) follower 노드에서 이에 대한 update 가 message 복제 시점에 발생해서 그런 것이다.
관련 코드
- partition leader 로 정의하는 Partition 의 makeLeader 함수
/**
* Make the local replica the leader by resetting LogEndOffset for remote replicas (there could be old LogEndOffset
* from the time when this broker was the leader last time) and setting the new leader and ISR.
* If the leader replica id does not change, return false to indicate the replica manager.
*/
def makeLeader(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
val newAssignedReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt)
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch
// add replicas that are new
val newInSyncReplicas = partitionStateInfo.basePartitionState.isr.asScala.map(r => getOrCreateReplica(r, partitionStateInfo.isNew)).toSet
// remove assigned replicas that have been removed by the controller
(assignedReplicas.map(_.brokerId) -- newAssignedReplicas).foreach(removeReplica)
inSyncReplicas = newInSyncReplicas
newAssignedReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew))
val leaderReplica = localReplicaOrException
val leaderEpochStartOffset = leaderReplica.logEndOffset.messageOffset
info(s"$topicPartition starts at Leader Epoch ${partitionStateInfo.basePartitionState.leaderEpoch} from " +
s"offset $leaderEpochStartOffset. Previous Leader Epoch was: $leaderEpoch")
//We cache the leader epoch here, persisting it only if it's local (hence having a log dir)
// * Partition 객체에서 멤버 변수로 정의된 leaderEpoch 를 새로 정의한다(메모리에 저장한다)
leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
zkVersion = partitionStateInfo.basePartitionState.zkVersion
// In the case of successive leader elections in a short time period, a follower may have
// entries in its log from a later epoch than any entry in the new leader's log. In order
// to ensure that these followers can truncate to the right offset, we must cache the new
// leader epoch and the start offset since it should be larger than any epoch that a follower
// would try to query.
// * LeaderEpochFileCache 에 변경된 leaderEpoch 값을 저장한다.(file 에 저장한다.)
leaderReplica.log.foreach { log =>
log.maybeAssignEpochStartOffset(leaderEpoch, leaderEpochStartOffset)
}
val isNewLeader = !leaderReplicaIdOpt.contains(localBrokerId)
val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
val curTimeMs = time.milliseconds
// initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset.
(assignedReplicas - leaderReplica).foreach { replica =>
val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L
replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs)
}
if (isNewLeader) {
// construct the high watermark metadata for the new leader replica
leaderReplica.convertHWToLocalOffsetMetadata()
// mark local replica as the leader after converting hw
leaderReplicaIdOpt = Some(localBrokerId)
// reset log end offset for remote replicas
assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult))
}
// we may need to increment high watermark since ISR could be down to 1
(maybeIncrementLeaderHW(leaderReplica), isNewLeader)
}
// some delayed operations may be unblocked after HW changed
if (leaderHWIncremented)
tryCompleteDelayedRequests()
isNewLeader
}
- partition follower 로 정의하는 Partition 의 makeFollower 함수
/**
* Make the local replica the follower by setting the new leader and ISR to empty
* If the leader replica id does not change and the new epoch is equal or one
* greater (that is, no updates have been missed), return false to indicate to the
* replica manager that state is already correct and the become-follower steps can be skipped
*/
def makeFollower(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
inWriteLock(leaderIsrUpdateLock) {
val newAssignedReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt)
val newLeaderBrokerId = partitionStateInfo.basePartitionState.leader
val oldLeaderEpoch = leaderEpoch
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch
// add replicas that are new
newAssignedReplicas.foreach(r => getOrCreateReplica(r, partitionStateInfo.isNew))
// remove assigned replicas that have been removed by the controller
(assignedReplicas.map(_.brokerId) -- newAssignedReplicas).foreach(removeReplica)
inSyncReplicas = Set.empty[Replica]
// * Partition 객체의 멤버 변수인 leaderEpoch 에만 변경된 epoch 값을 정의하고 LeaderEpochFileCache 에는 저장하지 않는다.
leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
leaderEpochStartOffsetOpt = None
zkVersion = partitionStateInfo.basePartitionState.zkVersion
if (leaderReplicaIdOpt.contains(newLeaderBrokerId) && leaderEpoch == oldLeaderEpoch) {
false
} else {
leaderReplicaIdOpt = Some(newLeaderBrokerId)
true
}
}
}
로그를 통한 검증
- 기존 leader 노드를 재시작하여 partition leader 가 변경되었었다.
- partition leader 가 변경되어 leaderEpoch 값이 변경된건 leader 와 follower 노드 모두 정상 처리 되었다.
- leader 노드 로그
[2020-07-10 12:22:06,652] TRACE [Broker id=23] Cached leader info PartitionState(controllerEpoch=16, leader=23, leaderEpoch=211, isr=[24, 23], zkVersion=295, replicas=[23, 24], offlineReplicas=[]) for partition test-48 in response to UpdateMetadata request sent by controller 24 epoch 16 with correlation id 11 (state.change.logger)
- follower 노드 로그
[2020-07-10 12:22:04,687] TRACE [Broker id=24] Cached leader info PartitionState(controllerEpoch=16, leader=23, leaderEpoch=211, isr=[24, 23], zkVersion=295, replicas=[23, 24], offlineReplicas=[]) for partition test-48 in response to UpdateMetadata request sent by controller 24 epoch 16 with correlation id 17 (state.change.logger)
- 브로커 재시작 이후 test topic 48번 파티션으로 최초로 메세지가 유입되어 follower 노드쪽에 복제가 발생하고 아래와 같은 로그가 발생하였다
[2020-08-04 19:03:40,945] WARN [LeaderEpochCache test-48] New epoch entry EpochEntry(epoch=211, startOffset=40) caused truncation of conflicting entries ListBuffer(EpochEntry(epoch=210, startOffset=40)). Cache now contains 1 entries. (kafka.server.epoch.LeaderEpochFileCache)
의문
- 왜 follower 노드에서는 파티션 리더가 변경될 당시에 file cache 를 하지 않았는지 의문.