Kafka

[Kafka] LeaderEpochCache Warn Log

흥부가귀막혀 2022. 3. 14. 15:43

개요

  • Broker 에 아래와 같이 Warn Level 로그가 남겨짐
[2020-07-30 12:40:03,237] WARN [LeaderEpochCache message_action-46] New epoch entry EpochEntry(epoch=27, startOffset=0) caused truncation of conflicting entries ListBuffer(EpochEntry(epoch=26, startOffset=0)). Cache now contains 1 entries. (kafka.server.epoch.LeaderEpochFileCache)
  • 해당 로그가 왜 남겨지고, 해소할 방법은 없는지 탐색.

원인 파악

  • 해당 로그는 partition leader 에 대한 epoch 값이 변경될 경우 변경된 epoch 값을 cache 및 기존 cache 된 내용을 truncate 하는 과정에서 발생하는 로그다.(LeaderEpoch 에 대한 내용은 KIP-101 참고)
  • 로그를 남기고 있는 v2.1.1 코드 는 아래와 같다.
/**
   * Remove any entries which violate monotonicity following the insertion of an assigned epoch.
   */
  private def truncateAndAppend(entryToAppend: EpochEntry): Unit = {
    validateAndMaybeWarn(entryToAppend)

    val (retainedEpochs, removedEpochs) = epochs.partition { entry =>
      entry.epoch < entryToAppend.epoch && entry.startOffset < entryToAppend.startOffset
    }

    epochs = retainedEpochs :+ entryToAppend

    if (removedEpochs.isEmpty) {
      debug(s"Appended new epoch entry $entryToAppend. Cache now contains ${epochs.size} entries.")
    } else {
      warn(s"New epoch entry $entryToAppend caused truncation of conflicting entries $removedEpochs. " +
        s"Cache now contains ${epochs.size} entries.")
    }
  }
  • 신규로 append 되는 epoch 의 startOffset 이 기존과 같거나 작으면 해당 warn log 를 남기도록 되어있다.
  • startOffset 이 작은 경우 문제가 있는 케이스이지만 같은건 정상적인 상황에서도 가능한 케이스라 이렇게 남긴것이 의문.
  • v2.3.0 코드 를 보니 아래와 같이 변경되어있다.(해당 코드는 PR 에서 수정 요청되었다.)
/**
   * Remove any entries which violate monotonicity following the insertion of an assigned epoch.
   */
  private def truncateAndAppend(entryToAppend: EpochEntry): Unit = {
    validateAndMaybeWarn(entryToAppend)

    val (retainedEpochs, removedEpochs) = epochs.partition { entry =>
      entry.epoch < entryToAppend.epoch && entry.startOffset < entryToAppend.startOffset
    }

    epochs = retainedEpochs :+ entryToAppend

    if (removedEpochs.isEmpty) {
      debug(s"Appended new epoch entry $entryToAppend. Cache now contains ${epochs.size} entries.")
    } else if (removedEpochs.size > 1 || removedEpochs.head.startOffset != entryToAppend.startOffset) {
      // Only log a warning if there were non-trivial removals. If the start offset of the new entry
      // matches the start offfset of the removed epoch, then no data has been written and the truncation
      // is expected.
      warn(s"New epoch entry $entryToAppend caused truncation of conflicting entries $removedEpochs. " +
        s"Cache now contains ${epochs.size} entries.")
    }
  }
  • 저렇게 변경한 이유는 위에서 의문을 제기한 startOffset 이 같을 경우가 꼭 문제가 있는 상황은 아니기 때문이다. 메세지가 publish 되지 않은 상황이라면(traffic 이 적은 상황이라면) 충분히 가능한 케이스이기 때문에 2.3.1 에서 해당 상황은 로그를 안남기는거로 제외되었다.

Leader epoch 값이 왜 달랐을까?

  • kafka broker 에서는 partition 에 대한 leader epoch 값을 Partition 객체에도 저장(memory)하고, 추후 복구를 대비하여 LeaderEpochFileCache 를 사용하여 file 로도 저장한다.
  • 문제는 partition leader 가 변경될 경우 변경된 leader epoch 값을 memory(Partition 객체) 와 file(LeaderEpochFileCache) 에 모두 저장하는 노드는 partition leader 노드만 그렇게 저장하고, follower 노드는 memory(Partition 객체) 에만 저장한다.
  • 그러다보니 follower 노드에서 fetcher thread 를 통해 message 복제를 시도할 때 memory 에 저장된 leader epoch 값으로 LeaderEpochFileCache 에 append 를 하니 위와 같은 로그가 남게되는것이다.
  • 즉, 갑자기 leader epoch 값이 올라가서 발생한게 아니라 기존에 leader epoch 값이 변경이 되었는데(브로커 노드 재시작 등으로 인해) follower 노드에서 이에 대한 update 가 message 복제 시점에 발생해서 그런 것이다.

관련 코드

  • partition leader 로 정의하는 Partition 의 makeLeader 함수
/**
   * Make the local replica the leader by resetting LogEndOffset for remote replicas (there could be old LogEndOffset
   * from the time when this broker was the leader last time) and setting the new leader and ISR.
   * If the leader replica id does not change, return false to indicate the replica manager.
   */
  def makeLeader(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
    val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
      val newAssignedReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt)
      // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
      // to maintain the decision maker controller's epoch in the zookeeper path
      controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch
      // add replicas that are new
      val newInSyncReplicas = partitionStateInfo.basePartitionState.isr.asScala.map(r => getOrCreateReplica(r, partitionStateInfo.isNew)).toSet
      // remove assigned replicas that have been removed by the controller
      (assignedReplicas.map(_.brokerId) -- newAssignedReplicas).foreach(removeReplica)
      inSyncReplicas = newInSyncReplicas
      newAssignedReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew))

      val leaderReplica = localReplicaOrException
      val leaderEpochStartOffset = leaderReplica.logEndOffset.messageOffset
      info(s"$topicPartition starts at Leader Epoch ${partitionStateInfo.basePartitionState.leaderEpoch} from " +
        s"offset $leaderEpochStartOffset. Previous Leader Epoch was: $leaderEpoch")

      //We cache the leader epoch here, persisting it only if it's local (hence having a log dir)
      // * Partition 객체에서 멤버 변수로 정의된 leaderEpoch 를 새로 정의한다(메모리에 저장한다)
      leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
      leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
      zkVersion = partitionStateInfo.basePartitionState.zkVersion

      // In the case of successive leader elections in a short time period, a follower may have
      // entries in its log from a later epoch than any entry in the new leader's log. In order
      // to ensure that these followers can truncate to the right offset, we must cache the new
      // leader epoch and the start offset since it should be larger than any epoch that a follower
      // would try to query.
      // * LeaderEpochFileCache 에 변경된 leaderEpoch 값을 저장한다.(file 에 저장한다.)
      leaderReplica.log.foreach { log =>
        log.maybeAssignEpochStartOffset(leaderEpoch, leaderEpochStartOffset)
      }

      val isNewLeader = !leaderReplicaIdOpt.contains(localBrokerId)
      val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
      val curTimeMs = time.milliseconds
      // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset.
      (assignedReplicas - leaderReplica).foreach { replica =>
        val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L
        replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs)
      }

      if (isNewLeader) {
        // construct the high watermark metadata for the new leader replica
        leaderReplica.convertHWToLocalOffsetMetadata()
        // mark local replica as the leader after converting hw
        leaderReplicaIdOpt = Some(localBrokerId)
        // reset log end offset for remote replicas
        assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult))
      }
      // we may need to increment high watermark since ISR could be down to 1
      (maybeIncrementLeaderHW(leaderReplica), isNewLeader)
    }
    // some delayed operations may be unblocked after HW changed
    if (leaderHWIncremented)
      tryCompleteDelayedRequests()
    isNewLeader
  }
  • partition follower 로 정의하는 Partition 의 makeFollower 함수
/**
   *  Make the local replica the follower by setting the new leader and ISR to empty
   *  If the leader replica id does not change and the new epoch is equal or one
   *  greater (that is, no updates have been missed), return false to indicate to the
    * replica manager that state is already correct and the become-follower steps can be skipped
   */
  def makeFollower(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
    inWriteLock(leaderIsrUpdateLock) {
      val newAssignedReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt)
      val newLeaderBrokerId = partitionStateInfo.basePartitionState.leader
      val oldLeaderEpoch = leaderEpoch
      // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
      // to maintain the decision maker controller's epoch in the zookeeper path
      controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch
      // add replicas that are new
      newAssignedReplicas.foreach(r => getOrCreateReplica(r, partitionStateInfo.isNew))
      // remove assigned replicas that have been removed by the controller
      (assignedReplicas.map(_.brokerId) -- newAssignedReplicas).foreach(removeReplica)
      inSyncReplicas = Set.empty[Replica]
      // * Partition 객체의 멤버 변수인 leaderEpoch 에만 변경된 epoch 값을 정의하고 LeaderEpochFileCache 에는 저장하지 않는다.
      leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
      leaderEpochStartOffsetOpt = None
      zkVersion = partitionStateInfo.basePartitionState.zkVersion

      if (leaderReplicaIdOpt.contains(newLeaderBrokerId) && leaderEpoch == oldLeaderEpoch) {
        false
      } else {
        leaderReplicaIdOpt = Some(newLeaderBrokerId)
        true
      }
    }
  }

로그를 통한 검증

  • 기존 leader 노드를 재시작하여 partition leader 가 변경되었었다.
  • partition leader 가 변경되어 leaderEpoch 값이 변경된건 leader 와 follower 노드 모두 정상 처리 되었다.
  • leader 노드 로그
[2020-07-10 12:22:06,652] TRACE [Broker id=23] Cached leader info PartitionState(controllerEpoch=16, leader=23, leaderEpoch=211, isr=[24, 23], zkVersion=295, replicas=[23, 24], offlineReplicas=[]) for partition test-48 in response to UpdateMetadata request sent by controller 24 epoch 16 with correlation id 11 (state.change.logger)
  • follower 노드 로그
[2020-07-10 12:22:04,687] TRACE [Broker id=24] Cached leader info PartitionState(controllerEpoch=16, leader=23, leaderEpoch=211, isr=[24, 23], zkVersion=295, replicas=[23, 24], offlineReplicas=[]) for partition test-48 in response to UpdateMetadata request sent by controller 24 epoch 16 with correlation id 17 (state.change.logger)
  • 브로커 재시작 이후 test topic 48번 파티션으로 최초로 메세지가 유입되어 follower 노드쪽에 복제가 발생하고 아래와 같은 로그가 발생하였다
[2020-08-04 19:03:40,945] WARN [LeaderEpochCache test-48] New epoch entry EpochEntry(epoch=211, startOffset=40) caused truncation of conflicting entries ListBuffer(EpochEntry(epoch=210, startOffset=40)). Cache now contains 1 entries. (kafka.server.epoch.LeaderEpochFileCache)

의문

  • 왜 follower 노드에서는 파티션 리더가 변경될 당시에 file cache 를 하지 않았는지 의문.