Log 对象的常见操作

一、高水位管理操作
高水位就是指的已提交信息后面那个位置
一)高水位的定义
- 使用
logStartOffset作为初始值 - 使用
@volatile定义,表示是多线程读取修改
它的类型是/* Keep track of the current high watermark in order to ensure that segments containing offsets at or above it are* not eligible for deletion. This means that the active segment is only eligible for deletion if the high watermark* equals the log end offset (which may never happen for a partition under consistent load). This is needed to* prevent the log start offset (which is exposed in fetch responses) from getting ahead of the high watermark.*/@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
LogOffsetMetadata,看下这个类的定义:case定义样例类,可以理解为 java bean
接着说回高水位,要重点关注下获取和设置高水位值、更新高水位值、读取高水位值的方法。(增改查)/** A log offset structure, including:* 1. the message offset # 消息位移值,这里面就是指的高水位值* 2. the base message offset of the located segment # 保存该位移值所在日志段的起始位移值,用于判断两条消息是否处于同一个日志段* 3. the physical position on the located segment # 所在日志段的物理磁盘位置,用于计算两条消息之间的数据大小*/case class LogOffsetMetadata(messageOffset: Long,segmentBaseOffset: Long = Log.UnknownOffset,relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) {
二)获取和设置高水位
private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {if (newHighWatermark.messageOffset < 0) // 高水位不能是负数throw new IllegalArgumentException("High watermark offset should be non-negative")lock synchronized { // 保护 log 对象修改的 monitor 锁highWatermarkMetadata = newHighWatermark// 下面是事务相关的处理,暂时不用管producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset)maybeIncrementFirstUnstableOffset()}trace(s"Setting high watermark $newHighWatermark")}
三)更新高水位值
有两个方法 updateHighWatermark是给 Follower 用的, 是给 maybeIncrementHighWatermark leader 用的。
疑问: 分成两个的原因写的是给不同副本用,可以理解 Leader 上需要加锁。 但是为什么还多了很多判断条件? 以及后面判断新旧 HighWatermark 是否在同一个 segment 上时,为什么是
old.HW == new.HW?
/*** Update the high watermark to a new offset. The new high watermark will be lower* bounded by the log start offset and upper bounded by the log end offset.** This is intended to be called when initializing the high watermark or when updating* it on a follower after receiving a Fetch response from the leader.** @param hw the suggested new value for the high watermark* @return the updated high watermark offset*/def updateHighWatermark(hw: Long): Long = {val newHighWatermark = if (hw < logStartOffset)logStartOffsetelse if (hw > logEndOffset)logEndOffsetelsehwupdateHighWatermarkMetadata(LogOffsetMetadata(newHighWatermark))newHighWatermark}/*** Update the high watermark to a new value if and only if it is larger than the old value. It is* an error to update to a value which is larger than the log end offset.** This method is intended to be used by the leader to update the high watermark after follower* fetch offsets have been updated.** @return the old high watermark, if updated by the new value*/def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {if (newHighWatermark.messageOffset > logEndOffset)throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " +s"log end offset $logEndOffsetMetadata")lock.synchronized {val oldHighWatermark = fetchHighWatermarkMetadata// Ensure that the high watermark increases monotonically. We also update the high watermark when the new// offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment.if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||(oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {updateHighWatermarkMetadata(newHighWatermark)Some(oldHighWatermark)} else {None}}}
四)读取高水位值
/*** Get the offset and metadata for the current high watermark. If offset metadata is not* known, this will do a lookup in the index and cache the result.*/private def fetchHighWatermarkMetadata: LogOffsetMetadata = {checkIfMemoryMappedBufferClosed() // 读取时确保日志不被关闭val offsetMetadata = highWatermarkMetadata // 保存到本地变量,防止多线程修改if (offsetMetadata.messageOffsetOnly) {lock.synchronized {// 如果 HW 信息不完整,就从日志文件中重新读取 HWval fullOffset = convertToOffsetMetadataOrThrow(highWatermark)updateHighWatermarkMetadata(fullOffset)fullOffset}} else {offsetMetadata}}
二、日志段管理
日志是日志段的容器
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
使用的是 Java 中的 ConcurrentSkipListMap 来保存日志段对象
