Log 对象的常见操作
一、高水位管理操作
高水位就是指的已提交信息后面那个位置
一)高水位的定义
- 使用
logStartOffset
作为初始值 - 使用
@volatile
定义,表示是多线程读取修改
它的类型是/* Keep track of the current high watermark in order to ensure that segments containing offsets at or above it are
* not eligible for deletion. This means that the active segment is only eligible for deletion if the high watermark
* equals the log end offset (which may never happen for a partition under consistent load). This is needed to
* prevent the log start offset (which is exposed in fetch responses) from getting ahead of the high watermark.
*/
@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
LogOffsetMetadata
,看下这个类的定义:case
定义样例类,可以理解为 java bean
接着说回高水位,要重点关注下获取和设置高水位值、更新高水位值、读取高水位值的方法。(增改查)/*
* A log offset structure, including:
* 1. the message offset # 消息位移值,这里面就是指的高水位值
* 2. the base message offset of the located segment # 保存该位移值所在日志段的起始位移值,用于判断两条消息是否处于同一个日志段
* 3. the physical position on the located segment # 所在日志段的物理磁盘位置,用于计算两条消息之间的数据大小
*/
case class LogOffsetMetadata(messageOffset: Long,
segmentBaseOffset: Long = Log.UnknownOffset,
relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) {
二)获取和设置高水位
private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {
if (newHighWatermark.messageOffset < 0) // 高水位不能是负数
throw new IllegalArgumentException("High watermark offset should be non-negative")
lock synchronized { // 保护 log 对象修改的 monitor 锁
highWatermarkMetadata = newHighWatermark
// 下面是事务相关的处理,暂时不用管
producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset)
maybeIncrementFirstUnstableOffset()
}
trace(s"Setting high watermark $newHighWatermark")
}
三)更新高水位值
有两个方法 updateHighWatermark
是给 Follower 用的, 是给 maybeIncrementHighWatermark
leader 用的。
疑问: 分成两个的原因写的是给不同副本用,可以理解 Leader 上需要加锁。 但是为什么还多了很多判断条件? 以及后面判断新旧 HighWatermark 是否在同一个 segment 上时,为什么是
old.HW == new.HW
?
/**
* Update the high watermark to a new offset. The new high watermark will be lower
* bounded by the log start offset and upper bounded by the log end offset.
*
* This is intended to be called when initializing the high watermark or when updating
* it on a follower after receiving a Fetch response from the leader.
*
* @param hw the suggested new value for the high watermark
* @return the updated high watermark offset
*/
def updateHighWatermark(hw: Long): Long = {
val newHighWatermark = if (hw < logStartOffset)
logStartOffset
else if (hw > logEndOffset)
logEndOffset
else
hw
updateHighWatermarkMetadata(LogOffsetMetadata(newHighWatermark))
newHighWatermark
}
/**
* Update the high watermark to a new value if and only if it is larger than the old value. It is
* an error to update to a value which is larger than the log end offset.
*
* This method is intended to be used by the leader to update the high watermark after follower
* fetch offsets have been updated.
*
* @return the old high watermark, if updated by the new value
*/
def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
if (newHighWatermark.messageOffset > logEndOffset)
throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " +
s"log end offset $logEndOffsetMetadata")
lock.synchronized {
val oldHighWatermark = fetchHighWatermarkMetadata
// Ensure that the high watermark increases monotonically. We also update the high watermark when the new
// offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment.
if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
(oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
updateHighWatermarkMetadata(newHighWatermark)
Some(oldHighWatermark)
} else {
None
}
}
}
四)读取高水位值
/**
* Get the offset and metadata for the current high watermark. If offset metadata is not
* known, this will do a lookup in the index and cache the result.
*/
private def fetchHighWatermarkMetadata: LogOffsetMetadata = {
checkIfMemoryMappedBufferClosed() // 读取时确保日志不被关闭
val offsetMetadata = highWatermarkMetadata // 保存到本地变量,防止多线程修改
if (offsetMetadata.messageOffsetOnly) {
lock.synchronized {
// 如果 HW 信息不完整,就从日志文件中重新读取 HW
val fullOffset = convertToOffsetMetadataOrThrow(highWatermark)
updateHighWatermarkMetadata(fullOffset)
fullOffset
}
} else {
offsetMetadata
}
}
二、日志段管理
日志是日志段的容器
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
使用的是 Java 中的 ConcurrentSkipListMap
来保存日志段对象