Kafka

[Kafka] fetch.max.wait.ms 와 fetch.min.bytes 설정 으로 인한 Consumer 성능 이슈 정리

흥부가귀막혀 2022. 3. 14. 15:06

이슈

  • producer 가 메세지를 일정 간격으로 전송(500ms)
  • consumer 설정 중 fetch.min.bytes 를 1로 설정(데이터가 있으면 즉시 받도록 최소로 설정) 하고 fetch.max.wait.ms 를 default(500ms)로 설정했을 때, consumer 가 저장된 메세지를 읽어들이는데 지연시간이 발생.(메세지가 broker 에 도착시간은 12시 0분 0초 0ms 인데 해당 메세지를 컨슈머가 읽은 시간은 12시 0분 0초 300ms)
  • fetch.max.wait.ms 를 100ms 로 설정하였을 경우 지연시간이 줄어듬

원인

  • kafka documentation 에 정리된 설정값의 설명을 보면 fetch.min.bytes 를 1로 설정할 경우 데이터가 들어왔을 때 즉시 응답을 줄거로 예상하지만 실제 동작은 그게 아니였다.
  • consumer 에서 fetch request 를 보냈을 때 broker 에서는 응답해야할 메세지가 있으면 fetch.min.bytes 가 1 이기 때문에 즉시 응답을 준다.
  • 하지만 broker 에서 응답을 줄 메세지가 없을 경우 대기를 하게 되고, 이 대기 시간은 새로운 메세지가 들어올때 끝나는게 아니라 fetch.max.wait.ms 만큼 기다리게 된다.
  • 즉, 응답할 메세지가 없는 상태에서 fetch request 가 0.3 초에 요청되고 메세지가 0.5 초에 도착했더라도 fetch.max.wait.ms 가 500ms 로 설정되었다면 0.5초에 응답을 받는게 아니라 0.8초에 응답을 받게되어 지연이 발생하게 된다.
  • 해당 내용은 kafka broker code 를 통해 확인 가능하다.
    // respond immediately if 1) fetch request does not want to wait
    //                        2) fetch request does not require any data
    //                        3) has enough data to respond
    //                        4) some error happens while reading data
    if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {
      val fetchPartitionData = logReadResults.map { case (tp, result) =>
        tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
          result.lastStableOffset, result.info.abortedTransactions)
      }
      responseCallback(fetchPartitionData)
    } else {
      // construct the fetch results from the read results
      val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)]
      fetchInfos.foreach { case (topicPartition, partitionData) =>
        logReadResultMap.get(topicPartition).foreach(logReadResult => {
          val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
          fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))
        })
      }
      val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader,
        fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
      val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback)

      // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
      val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }

      // try to complete the request immediately, otherwise put it into the purgatory;
      // this is because while the delayed fetch operation is being created, new requests
      // may arrive and hence make this operation completable.
      delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
    }

 

주석에 설명되어있듯이 즉시 응답을 주는 조건은 다음과 같다.
  • timeout 값(fetch.max.wait.ms 설정값)을 0 이하로 했을 경우
  • fetch 요청할 토픽과 파티션 정보가 없을 경우(이런 케이스는 어떤 케이스인지 확인 필요)
  • fetch.min.bytes 이상의 데이터를 읽었을 경우
  • 에러가 발생했을 경우

그래서.. 결론은?

  • fetch.max.wait.ms 를 줄이는게 무조건 좋은건 아니다. fetch.max.wait.ms 를 줄이게 되면 그만큼 polling 루프가 빨라지게 되고 fetch 요청도 많아져 broker 에 부담이 갈 수 밖에 없다. 또한, max.partition.fetch.bytes, fetch.max.bytes 만큼 받지 못하고 짧게 끊어서 여러번 받기 때문에 네트워크 비용도 증가한다.
  • 일반적인 client 의 consumer 라면, 그리고 message 를 간헐적으로 보내는 producer 가 있을때 빠르게 반응해야하는게 아니라면 해당 설정은 default 로 두어도 무방하다.
  • 하지만 어떤 간격의 message 가 오더라도 빠르게 반응하고 전달해야하는 Consumer 라면 fetch.max.wait.ms 를 default 값보다 줄이는게 이득으로 보인다.