Log 对象的常见操作

03 日志(下):彻底搞懂Log对象的常见操作 - 图1

一、高水位管理操作

高水位就是指的已提交信息后面那个位置
image.png

一)高水位的定义

  • 使用 logStartOffset 作为初始值
  • 使用 @volatile 定义,表示是多线程读取修改
    1. /* Keep track of the current high watermark in order to ensure that segments containing offsets at or above it are
    2. * not eligible for deletion. This means that the active segment is only eligible for deletion if the high watermark
    3. * equals the log end offset (which may never happen for a partition under consistent load). This is needed to
    4. * prevent the log start offset (which is exposed in fetch responses) from getting ahead of the high watermark.
    5. */
    6. @volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
    它的类型是 LogOffsetMetadata,看下这个类的定义:
    case定义样例类,可以理解为 java bean
    1. /*
    2. * A log offset structure, including:
    3. * 1. the message offset # 消息位移值,这里面就是指的高水位值
    4. * 2. the base message offset of the located segment # 保存该位移值所在日志段的起始位移值,用于判断两条消息是否处于同一个日志段
    5. * 3. the physical position on the located segment # 所在日志段的物理磁盘位置,用于计算两条消息之间的数据大小
    6. */
    7. case class LogOffsetMetadata(messageOffset: Long,
    8. segmentBaseOffset: Long = Log.UnknownOffset,
    9. relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) {
    接着说回高水位,要重点关注下获取和设置高水位值、更新高水位值、读取高水位值的方法。(增改查)

二)获取和设置高水位

  1. private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {
  2. if (newHighWatermark.messageOffset < 0) // 高水位不能是负数
  3. throw new IllegalArgumentException("High watermark offset should be non-negative")
  4. lock synchronized { // 保护 log 对象修改的 monitor 锁
  5. highWatermarkMetadata = newHighWatermark
  6. // 下面是事务相关的处理,暂时不用管
  7. producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset)
  8. maybeIncrementFirstUnstableOffset()
  9. }
  10. trace(s"Setting high watermark $newHighWatermark")
  11. }

三)更新高水位值

有两个方法 updateHighWatermark是给 Follower 用的, 是给 maybeIncrementHighWatermark leader 用的。

疑问: 分成两个的原因写的是给不同副本用,可以理解 Leader 上需要加锁。 但是为什么还多了很多判断条件? 以及后面判断新旧 HighWatermark 是否在同一个 segment 上时,为什么是 old.HW == new.HW

  1. /**
  2. * Update the high watermark to a new offset. The new high watermark will be lower
  3. * bounded by the log start offset and upper bounded by the log end offset.
  4. *
  5. * This is intended to be called when initializing the high watermark or when updating
  6. * it on a follower after receiving a Fetch response from the leader.
  7. *
  8. * @param hw the suggested new value for the high watermark
  9. * @return the updated high watermark offset
  10. */
  11. def updateHighWatermark(hw: Long): Long = {
  12. val newHighWatermark = if (hw < logStartOffset)
  13. logStartOffset
  14. else if (hw > logEndOffset)
  15. logEndOffset
  16. else
  17. hw
  18. updateHighWatermarkMetadata(LogOffsetMetadata(newHighWatermark))
  19. newHighWatermark
  20. }
  21. /**
  22. * Update the high watermark to a new value if and only if it is larger than the old value. It is
  23. * an error to update to a value which is larger than the log end offset.
  24. *
  25. * This method is intended to be used by the leader to update the high watermark after follower
  26. * fetch offsets have been updated.
  27. *
  28. * @return the old high watermark, if updated by the new value
  29. */
  30. def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
  31. if (newHighWatermark.messageOffset > logEndOffset)
  32. throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " +
  33. s"log end offset $logEndOffsetMetadata")
  34. lock.synchronized {
  35. val oldHighWatermark = fetchHighWatermarkMetadata
  36. // Ensure that the high watermark increases monotonically. We also update the high watermark when the new
  37. // offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment.
  38. if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
  39. (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
  40. updateHighWatermarkMetadata(newHighWatermark)
  41. Some(oldHighWatermark)
  42. } else {
  43. None
  44. }
  45. }
  46. }

四)读取高水位值

  1. /**
  2. * Get the offset and metadata for the current high watermark. If offset metadata is not
  3. * known, this will do a lookup in the index and cache the result.
  4. */
  5. private def fetchHighWatermarkMetadata: LogOffsetMetadata = {
  6. checkIfMemoryMappedBufferClosed() // 读取时确保日志不被关闭
  7. val offsetMetadata = highWatermarkMetadata // 保存到本地变量,防止多线程修改
  8. if (offsetMetadata.messageOffsetOnly) {
  9. lock.synchronized {
  10. // 如果 HW 信息不完整,就从日志文件中重新读取 HW
  11. val fullOffset = convertToOffsetMetadataOrThrow(highWatermark)
  12. updateHighWatermarkMetadata(fullOffset)
  13. fullOffset
  14. }
  15. } else {
  16. offsetMetadata
  17. }
  18. }

二、日志段管理

日志是日志段的容器

  1. private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

使用的是 Java 中的 ConcurrentSkipListMap 来保存日志段对象