거북이 developer

[Kafka] OutOfOrderSequenceException 본문

Kafka

[Kafka] OutOfOrderSequenceException

흥부가귀막혀 2022. 3. 14. 16:11

내용 정리

  • Kafka Broker 로그에 간혹가다 아래와 같은 ERROR 로그가 발생한다.
ERROR [ReplicaManager broker=23] Error processing append operation on partition test-8 (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.OutOfOrderSequenceException: Invalid sequence number for new epoch: 3 (request epoch), 4 (seq. number)
 ERROR [ReplicaManager broker=22] Error processing append operation on partition test-55 (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.UnknownProducerIdException: Found no record of producerId=557891 on the broker. It is possible that the last message with the producerId=557891 has been removed due to hitting the retention limit.

분석

  • message publish 요청시 호출되는 함수 순서
    • kafka.server.KafkaRequestHandler.run
    • kafka.server.KafkaApis.handle
    • kafka.server.KafkaApis.handleProduceRequest
    • kafka.server.ReplicaManager.appendRecords
  • 위 에러는 kafka.server.ReplicaManager.appendRecords 에서 ReplicaManager 의 private 함수인 appendToLocalLog 함수를 수행하면서 발생
  • appendRecords 함수 내용
  /**
   * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
   * the callback function will be triggered either when timeout or the required acks are satisfied;
   * if the callback function itself is already synchronized on some object then pass this object to avoid deadlock.
   */
  def appendRecords(timeout: Long,
                    requiredAcks: Short,
                    internalTopicsAllowed: Boolean,
                    isFromClient: Boolean,
                    entriesPerPartition: Map[TopicPartition, MemoryRecords],
                    responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                    delayedProduceLock: Option[Lock] = None,
                    recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()) {
    if (isValidRequiredAcks(requiredAcks)) {
      val sTime = time.milliseconds
      val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
        isFromClient = isFromClient, entriesPerPartition, requiredAcks)
      debug("Produce to local log in %d ms".format(time.milliseconds - sTime))

      val produceStatus = localProduceResults.map { case (topicPartition, result) =>
        topicPartition ->
                ProducePartitionStatus(
                  result.info.lastOffset + 1, // required offset
                  new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime, result.info.logStartOffset)) // response status
      }

      recordConversionStatsCallback(localProduceResults.mapValues(_.info.recordConversionStats))

      if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
        // create delayed produce operation
        val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
        val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)

        // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
        val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq

        // try to complete the request immediately, otherwise put it into the purgatory
        // this is because while the delayed produce operation is being created, new
        // requests may arrive and hence make this operation completable.
        delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)

      } else {
        // we can respond immediately
        val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
        responseCallback(produceResponseStatus)
      }
    } else {
      // If required.acks is outside accepted range, something is wrong with the client
      // Just return an error and don't handle the request at all
      val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
        topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,
          LogAppendInfo.UnknownLogAppendInfo.firstOffset.getOrElse(-1), RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset)
      }
      responseCallback(responseStatus)
    }
  }
  • appendToLocalLog 내용
  /**
   * Append the messages to the local replica logs
   */
  private def appendToLocalLog(internalTopicsAllowed: Boolean,
                               isFromClient: Boolean,
                               entriesPerPartition: Map[TopicPartition, MemoryRecords],
                               requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
    trace(s"Append [$entriesPerPartition] to local log")
    entriesPerPartition.map { case (topicPartition, records) =>
      brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()
      brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()

      // reject appending to internal topics if it is not allowed
      if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
        (topicPartition, LogAppendResult(
          LogAppendInfo.UnknownLogAppendInfo,
          Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))
      } else {
        try {
          val partition = getPartitionOrException(topicPartition, expectLeader = true)
          val info = partition.appendRecordsToLeader(records, isFromClient, requiredAcks)
          val numAppendedMessages = info.numMessages

          // update stats for successfully appended bytes and messages as bytesInRate and messageInRate
          brokerTopicStats.topicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes)
          brokerTopicStats.allTopicsStats.bytesInRate.mark(records.sizeInBytes)
          brokerTopicStats.topicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)
          brokerTopicStats.allTopicsStats.messagesInRate.mark(numAppendedMessages)

          trace(s"${records.sizeInBytes} written to log $topicPartition beginning at offset " +
            s"${info.firstOffset.getOrElse(-1)} and ending at offset ${info.lastOffset}")
          (topicPartition, LogAppendResult(info))
        } catch {
          // NOTE: Failed produce requests metric is not incremented for known exceptions
          // it is supposed to indicate un-expected failures of a broker in handling a produce request
          case e@ (_: UnknownTopicOrPartitionException |
                   _: NotLeaderForPartitionException |
                   _: RecordTooLargeException |
                   _: RecordBatchTooLargeException |
                   _: CorruptRecordException |
                   _: KafkaStorageException |
                   _: InvalidTimestampException) =>
            (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
          case t: Throwable =>
            val logStartOffset = getPartition(topicPartition) match {
              case Some(partition) =>
                partition.logStartOffset
              case _ =>
                -1
            }
            brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark()
            brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark()
            error(s"Error processing append operation on partition $topicPartition", t)
            (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset), Some(t)))
        }
      }
    }
  }
  • appendToLocalLog 에서 record append 가 실패하면 실패한 결과를 넘겨주고 appendRecords 함수에서는 localProduceResults 라는 변수명으로 결과값을 받는다.
val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
        origin, entriesPerPartition, requiredAcks)
      debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
  • localProduceResults 결과중 실패가 있을 경우 delayedProducePurgatory 에 등록하여 재시도 처리한다.
  • delayedProducePurgatory 에 등록하는 조건
 // If all the following conditions are true, we need to put a delayed produce request and wait for replication to complete
  //
  // 1. required acks = -1
  // 2. there is data to append
  // 3. at least one partition append was successful (fewer errors than partitions)
  private def delayedProduceRequestRequired(requiredAcks: Short,
                                            entriesPerPartition: Map[TopicPartition, MemoryRecords],
                                            localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
    requiredAcks == -1 &&
    entriesPerPartition.nonEmpty &&
    localProduceResults.values.count(_.exception.isDefined) < entriesPerPartition.size
  }

 

  • OutOfOrderSequenceException 로그에서 나온 epoch 는 partition leader epoch 가 아닌 producer 관련 epoch 값이다. org.apache.kafka.common.record.RecordBatch 에 정의된 producerEpoch 값을 사용한다.(partition leader 에 대한 epoch 값은 partitionLeaderEpoch 로 따로 정의되어있다.)
  • 메세지 publish 시에 producer 에 대한 상태 체크를 진행하게된다.
  • kafka.log.producerStateManager 에서 요청시 들어온 producerId 를 가지고 broker 내부에 저장된 producer 상태 정보를 가지고 온다.
  • 조회한 상태 정보를 바탕으로 kafka.log.ProducerAppendInfo 를 생성해 maybeValidateAppend 함수에서 producerId, producerEpoch, appendFirstSeq 값을 확인하게 되는데 그 때 에러가 발생한다.
  • maybeValidateAppend 함수
private def maybeValidateAppend(producerEpoch: Short, firstSeq: Int) = {
    validationType match {
      case ValidationType.None =>

      case ValidationType.EpochOnly =>
        checkProducerEpoch(producerEpoch)

      case ValidationType.Full =>
        checkProducerEpoch(producerEpoch)
        checkSequence(producerEpoch, firstSeq)
    }
  }
  • 실제 에러를 발생하는 checkSequence 함수
private def checkSequence(producerEpoch: Short, appendFirstSeq: Int): Unit = {
    if (producerEpoch != updatedEntry.producerEpoch) {
      if (appendFirstSeq != 0) {
        if (updatedEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) {
          throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $producerEpoch " +
            s"(request epoch), $appendFirstSeq (seq. number)")
        } else {
          throw new UnknownProducerIdException(s"Found no record of producerId=$producerId on the broker. It is possible " +
            s"that the last message with the producerId=$producerId has been removed due to hitting the retention limit.")
        }
      }
    } else {
      val currentLastSeq = if (!updatedEntry.isEmpty)
        updatedEntry.lastSeq
      else if (producerEpoch == currentEntry.producerEpoch)
        currentEntry.lastSeq
      else
        RecordBatch.NO_SEQUENCE

      if (currentLastSeq == RecordBatch.NO_SEQUENCE && appendFirstSeq != 0) {
        // We have a matching epoch, but we do not know the next sequence number. This case can happen if
        // only a transaction marker is left in the log for this producer. We treat this as an unknown
        // producer id error, so that the producer can check the log start offset for truncation and reset
        // the sequence number. Note that this check follows the fencing check, so the marker still fences
        // old producers even if it cannot determine our next expected sequence number.
        throw new UnknownProducerIdException(s"Local producer state matches expected epoch $producerEpoch " +
          s"for producerId=$producerId, but next expected sequence number is not known.")
      } else if (!inSequence(currentLastSeq, appendFirstSeq)) {
        throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: $appendFirstSeq " +
          s"(incoming seq. number), $currentLastSeq (current end sequence number)")
      }
    }
  }

참고

  • 위 분석 내용 및 코드에서 나오는 producerEpoch  firstSequence 에 대한 설명은 아래와 같다.

변수설명

ProducerEpoch Introduced in 0.11.0.0 for KIP-98, this is the broker assigned producerEpoch received by the 'InitProducerId' request. Clients which want to support idempotent message delivery and transactions must set this field.
FirstSequence Introduced in 0.11.0.0 for KIP-98, this is the producer assigned sequence number which is used by the broker to deduplicate messages. Clients which want to support idempotent message delivery and transactions must set this field. The sequence number for each Record in the RecordBatch is its OffsetDelta + FirstSequence.
  • idempotent write 설정을 해서 위 변수값들이 이용되어있는거로 보인다.
  • 좀더 자세한 사항은 A Guide To The Kafka Protocol 참고
  • kafka broker 2.5.0 에 위 에러에 대한 개선 내용이 있는거로 보여짐.KIP-360 참고

 

Comments