Merge合并操作
这个页面描述了RocksDB中的原子读-修改-写操作,称为”合并”操作。 它是一个界面概述,针对的客户端或RocksDB用户谁有问题:什么时候和为什么我应该使用合并;如何使用归并?
为什么
RocksDB是一个高性能的嵌入式持久性键值存储。传统上,它提供了三个简单的操作Get、Put和Delete,以支持优雅的类似查找表的界面。 https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h
通常,以某种方式更新现有值是一种常见的模式。要在rocksdb中做到这一点,客户机必须读取(获取)现有值,修改它,然后将其写(放)回数据库。 让我们看一个具体的例子。
假设我们维护一组uint64计数器。每个计数器都有一个不同的名称。我们希望支持四个高级操作:Set、Add、Get和Remove。
首先,我们定义接口并获得正确的语义。为了清晰起见,将错误处理放在一边。
class Counters {
public:
// (re)set the value of a named counter
virtual void Set(const string& key, uint64_t value);
// remove the named counter
virtual void Remove(const string& key);
// retrieve the current value of the named counter, return false if not found
virtual bool Get(const string& key, uint64_t *value);
// increase the named counter by value.
// if the counter does not exist, treat it as if the counter was initialized to zero
virtual void Add(const string& key, uint64_t value);
};
其次,我们使用现有的rocksdb支持来实现它。伪代码如下:
class RocksCounters : public Counters {
public:
static uint64_t kDefaultCount = 0;
RocksCounters(std::shared_ptr<DB> db);
// mapped to a RocksDB Put
virtual void Set(const string& key, uint64_t value) {
string serialized = Serialize(value);
db_->Put(put_option_, key, serialized));
}
// mapped to a RocksDB Delete
virtual void Remove(const string& key) {
db_->Delete(delete_option_, key);
}
// mapped to a RocksDB Get
virtual bool Get(const string& key, uint64_t *value) {
string str;
auto s = db_->Get(get_option_, key, &str);
if (s.ok()) {
*value = Deserialize(str);
return true;
} else {
return false;
}
}
// implemented as get -> modify -> set
virtual void Add(const string& key, uint64_t value) {
uint64_t base;
if (!Get(key, &base)) {
base = kDefaultValue;
}
Set(key, base + value);
}
};
注意,除了Add操作之外,所有其他三个操作都可以直接映射到rocksdb中的一个操作。 在代码方面,它并没有那么糟糕。然而,一个概念上的单一操作Add仍然映射到两个rocksdb操作。 这也有性能上的暗示——随机获取在rocksdb中相对较慢。
现在,假设我们要将计数器作为服务托管。考虑到目前服务器的核心数量,我们的服务几乎肯定是多线程的。 如果线程没有按键空间分区,那么同一个计数器的多个Add请求可能会被不同的线程接收并并发执行。 好吧,如果我们也有严格的一致性要求(缺少更新是不可接受的),我们将不得不用外部同步(某种锁)封装Add。开销加起来。
如果RocksDB直接支持Add功能呢?我们可能会得出这样的结论:
virtual void Add(const string& key, uint64_t value) {
string serialized = Serialize(value);
db->Add(add_option, key, serialized);
}
这对于计数器来说似乎是合理的。但并不是你在RocksDB中存储的所有东西都是一个计数器。假设我们需要跟踪用户去过的位置。 我们可以将位置列表(序列化)存储为用户密钥的值。向现有列表添加新位置将是一个常见的操作。 在本例中,我们可能需要一个Append操作:db->Append(user_key, serialize(new_location))。 这表明读-修改-写操作的语义实际上是由客户机值类型决定的。为了保持库的通用性,我们最好抽象出这个操作,并允许客户端指定语义。 这就引出了我们的建议: 合并。
什么
我们在RocksDB中开发了一个通用的合并操作,作为一个新的一流操作来捕获读-修改-写语义。
这个合并操作:
- 将读写的语义封装到一个简单的抽象接口中。
- 允许用户避免重复Get()调用带来的额外成本。
- 执行后端优化,以决定何时/如何组合操作数而不更改底层语义。
- 在某些情况下,可以将成本分摊到所有增量更新上,以提供渐进的效率提高。
如何使用
在下面的部分中,将解释特定于客户机的代码更改。我们还简要介绍了如何使用Merge。 假设读者已经知道如何使用经典RocksDB(或LevelDB),包括:
- DB类(包括构造、DB::Put()、DB::Get()和DB::Delete())
- Options类(以及如何在创建时指定数据库选项)
- 所有写入数据库的键/值都是简单的字节字符串
接口概述
我们定义了一个新的接口/抽象基类: MergeOperator。它公开了一些函数,告诉RocksDB如何将增量更新操作(称为”合并操作数”)与基本值(Put/Delete)组合起来。 这些函数还可以用来告诉RocksDB如何组合合并操作数以形成新的合并操作数(称为”部分”或”关联”合并)。
为了简单起见,我们将暂时忽略部分与非部分合并的概念。因此,我们提供了一个名为AssociativeMergeOperator的单独接口,它封装并隐藏了关于部分合并的所有细节。 而且,对于大多数简单的应用程序(如上面的64位计数器示例),这就足够了。
因此,读者应该假设所有的合并都是通过一个名为AssociativeMergeOperator的接口来处理的。以下是公共接口:
// The Associative Merge Operator interface.
// Client needs to provide an object implementing this interface.
// Essentially, this class specifies the SEMANTICS of a merge, which only
// client knows. It could be numeric addition, list append, string
// concatenation, ... , anything.
// The library, on the other hand, is concerned with the exercise of this
// interface, at the right time (during get, iteration, compaction...)
class AssociativeMergeOperator : public MergeOperator {
public:
virtual ~AssociativeMergeOperator() {}
// Gives the client a way to express the read -> modify -> write semantics
// key: (IN) The key that's associated with this merge operation.
// existing_value:(IN) null indicates the key does not exist before this op
// value: (IN) the value to update/merge the existing_value with
// new_value: (OUT) Client is responsible for filling the merge result here
// logger: (IN) Client could use this to log errors during merge.
//
// Return true on success. Return false failure / error / corruption.
virtual bool Merge(const Slice& key,
const Slice* existing_value,
const Slice& value,
std::string* new_value,
Logger* logger) const = 0;
// The name of the MergeOperator. Used to check for MergeOperator
// mismatches (i.e., a DB created with one MergeOperator is
// accessed using a different MergeOperator)
virtual const char* Name() const = 0;
private:
...
};
一些注意事项:
- AssociativeMergeOperator是一个名为MergeOperator的类的子类。稍后我们将看到,在某些情况下,更通用的MergeOperator类可能更强大。 另一方面,我们在这里使用的AssociativeMergeOperator是一个更简单的接口。
- existing_value可以是nullptr。这在合并操作是键的第一个操作时很有用。nullptr表示”现有”值不存在。这基本上依赖于客户机来解释没有预值的合并操作的语义。 客户可以做任何合理的事情。例如,counter::Add假设一个零值(如果不存在)。
- 如果key空间是分区的,不同的子空间指的是具有不同合并操作语义的不同类型的数据,我们将key传递给客户端,使客户端可以基于密钥对合并操作符进行多路复用。 例如,客户端可能选择将用户帐户的当前余额(数字)存储在键”BAL:”下,而将帐户活动的历史记录(列表)存储在键”HIS:uid”下,存储在同一个数据库中。 (这是否是一种好的做法是有争议的)。对于电流平衡,数值加法是一个完美的合并运算符;但是,对于活动历史记录,我们需要添加一个列表。 因此,通过将键传递回Merge回调函数,我们允许客户机区分这两种类型。
例子:
void Merge(...) {
if (key start with "BAL:") {
NumericAddition(...)
} else if (key start with "HIS:") {
ListAppend(...);
}
}
对客户端可见接口的其他更改
要在应用程序中使用Merge,客户机必须首先定义一个类,该类继承自AssociativeMergeOperator接口(或合并操作符接口,我们将在后面看到)。 这个对象类应该实现接口的函数,当需要应用合并时,RocksDB将(最终)在适当的时候调用接口。 这样,合并语义就完全由客户机指定。
定义了这个类之后,用户应该有一种方法来指定RocksDB来使用这个merge操作符进行合并。 我们为DB类和Options类引入了额外的字段/方法:
// In addition to Get(), Put(), and Delete(), the DB class now also has an additional method: Merge().
class DB {
...
// Merge the database entry for "key" with "value". Returns OK on success,
// and a non-OK status on error. The semantics of this operation is
// determined by the user provided merge_operator when opening DB.
// Returns Status::NotSupported if DB does not have a merge_operator.
virtual Status Merge(
const WriteOptions& options,
const Slice& key,
const Slice& value) = 0;
...
};
Struct Options {
...
// REQUIRES: The client must provide a merge operator if Merge operation
// needs to be accessed. Calling Merge on a DB without a merge operator
// would result in Status::NotSupported. The client must ensure that the
// merge operator supplied here has the same name and *exactly* the same
// semantics as the merge operator provided to previous open calls on
// the same DB. The only exception is reserved for upgrade, where a DB
// previously without a merge operator is introduced to Merge operation
// for the first time. It's necessary to specify a merge operator when
// opening the DB in this case.
// Default: nullptr
const std::shared_ptr<MergeOperator> merge_operator;
...
};
注意: Options::merge_operator字段被定义为一个指向MergeOperator的共享指针。 如上所述,AssociativeMergeOperator继承自MergeOperator,因此可以在这里指定AssociativeMergeOperator。 这就是下面示例中使用的方法。
客户端代码的改变:
给定上述接口更改,客户端可以实现直接使用内置合并操作的计数器版本。
Counters v2:
// A 'model' merge operator with uint64 addition semantics
class UInt64AddOperator : public AssociativeMergeOperator {
public:
virtual bool Merge(
const Slice& key,
const Slice* existing_value,
const Slice& value,
std::string* new_value,
Logger* logger) const override {
// assuming 0 if no existing value
uint64_t existing = 0;
if (existing_value) {
if (!Deserialize(*existing_value, &existing)) {
// if existing_value is corrupted, treat it as 0
Log(logger, "existing value corruption");
existing = 0;
}
}
uint64_t oper;
if (!Deserialize(value, &oper)) {
// if operand is corrupted, treat it as 0
Log(logger, "operand value corruption");
oper = 0;
}
auto new = existing + oper;
*new_value = Serialize(new);
return true; // always return true for this, since we treat all errors as "zero".
}
virtual const char* Name() const override {
return "UInt64AddOperator";
}
};
// Implement 'add' directly with the new Merge operation
class MergeBasedCounters : public RocksCounters {
public:
MergeBasedCounters(std::shared_ptr<DB> db);
// mapped to a leveldb Merge operation
virtual void Add(const string& key, uint64_t value) override {
string serialized = Serialize(value);
db_->Merge(merge_option_, key, serialized);
}
};
// How to use it
DB* dbp;
Options options;
options.merge_operator.reset(new UInt64AddOperator);
DB::Open(options, "/tmp/db", &dbp);
std::shared_ptr<DB> db(dbp);
MergeBasedCounters counters(db);
counters.Add("a", 1);
...
uint64_t v;
counters.Get("a", &v);
用户界面的变化相对较小。而RocksDB后端负责其余的工作。
结合性与非结合性
到目前为止,我们已经使用了维护计数器数据库的相对简单的示例。事实证明,前面提到的AssociativeMergeOperator接口通常非常适合处理许多这样的用例。 例如,如果您希望使用”append”操作维护一组字符串,那么到目前为止我们看到的内容也可以很容易地进行调整来处理。
那么,为什么这些案例被认为是”simple”的呢?隐式地,我们假设了一些关于数据的东西:结合律。这意味着我们假设:
- 将Put()放入RocksDB数据库的值具有与使用merge()调用的merge操作数相同的格式;
- 可以使用相同的用户指定的合并操作符将多个合并操作数合并到单个合并操作数中。
例如,看看计数器的情况。RocksDB数据库内部将每个值存储为一个序列化的8字节整数。因此,当客户机调用counter::Set(对应于DB::Put())时,参数就是这种格式。类似地,当客户机调用计数器::Add(对应于DB::Merge())时, Merge操作数也是一个序列化的8字节整数。这意味着,在客户机的UInt64AddOperator中,existing_value可能对应于原始Put(),也可能对应于合并操作数;这真的不重要! 在所有情况下,只要给定existing_value和value, UInt64AddOperator的行为都是一样的: 它将它们相加并计算new_value。 反过来,这个new_value稍后可能会在随后的merge调用中被输入到merge操作符中。
相比之下,事实证明RocksDB merge可以以比这更强大的方式使用。 例如,假设我们希望数据库存储一组json字符串(如PHP数组或对象)。然后,在数据库中,我们希望将它们存储和检索为完全格式化的json字符串,但是我们可能希望”update”操作与更新json对象的属性相对应。 所以我们可以这样写代码:
...
// Put/store the json string into to the database
db_->Put(put_option_, "json_obj_key",
"{ employees: [ {first_name: john, last_name: doe}, {first_name: adam, last_name: smith}] }");
...
// Use a pre-defined "merge operator" to incrementally update the value of the json string
db_->Merge(merge_option_, "json_obj_key", "employees[1].first_name = lucy");
db_->Merge(merge_option_, "json_obj_key", "employees[0].last_name = dow");
在上面的伪代码中,我们看到数据将以json字符串的形式存储在RocksDB中(对应于原始的Put()),但是当客户机想要更新这个值时,会传递一个”类似javascript”的赋值语句字符串作为合并操作数。 数据库将按原样存储所有这些字符串,并期望用户的merge操作符能够处理这些字符串。
现在,AssociativeMergeOperator模型不能处理这个问题,因为它假设了如上所述的关联约束。 也就是说,在本例中,我们必须区分基本值(json字符串)和合并操作数(赋值语句);我们也没有一种(直观的)方法把合并操作数合并成一个合并操作数。 所以这个用例不适合我们的”关联”合并模型。这就是通用合并操作符接口变得有用的地方。
通用合并操作符接口
MergeOperator接口旨在支持通用性,并利用RocksDB的一些关键操作方式,以便为”增量更新”提供有效的解决方案。 如上面的json示例所述,基本值类型(Put()进入数据库)的格式可能与用于更新它们的合并操作数的格式完全不同。 此外,我们将看到,有时利用以下事实是有益的:一些合并操作数可以组合成单个合并操作数,而另一些则不能。这完全取决于客户机的特定语义。 MergeOperator接口提供了一种相对简单的方法来作为客户机提供这些语义。
// The Merge Operator
//
// Essentially, a MergeOperator specifies the SEMANTICS of a merge, which only
// client knows. It could be numeric addition, list append, string
// concatenation, edit data structure, ... , anything.
// The library, on the other hand, is concerned with the exercise of this
// interface, at the right time (during get, iteration, compaction...)
class MergeOperator {
public:
virtual ~MergeOperator() {}
// Gives the client a way to express the read -> modify -> write semantics
// key: (IN) The key that's associated with this merge operation.
// existing: (IN) null indicates that the key does not exist before this op
// operand_list:(IN) the sequence of merge operations to apply, front() first.
// new_value: (OUT) Client is responsible for filling the merge result here
// logger: (IN) Client could use this to log errors during merge.
//
// Return true on success. Return false failure / error / corruption.
virtual bool FullMerge(const Slice& key,
const Slice* existing_value,
const std::deque<std::string>& operand_list,
std::string* new_value,
Logger* logger) const = 0;
struct MergeOperationInput { ... };
struct MergeOperationOutput { ... };
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const;
// This function performs merge(left_op, right_op)
// when both the operands are themselves merge operation types.
// Save the result in *new_value and return true. If it is impossible
// or infeasible to combine the two operations, return false instead.
virtual bool PartialMerge(const Slice& key,
const Slice& left_operand,
const Slice& right_operand,
std::string* new_value,
Logger* logger) const = 0;
// The name of the MergeOperator. Used to check for MergeOperator
// mismatches (i.e., a DB created with one MergeOperator is
// accessed using a different MergeOperator)
virtual const char* Name() const = 0;
// Determines whether the MergeOperator can be called with just a single
// merge operand.
// Override and return true for allowing a single operand. FullMergeV2 and
// PartialMerge/PartialMergeMulti should be implemented accordingly to handle
// a single operand.
virtual bool AllowSingleOperand() const { return false; }
};
一些注意事项:
- MergeOperator有两个方法,FullMerge()和PartialMerge()。当Put/Delete是*existing_value(或nullptr)时,使用第一个方法。后一种方法用于合并两个合并操作数(如果可能的话)。
- AssociativeMergeOperator只是继承自MergeOperator,并提供这些方法的私有默认实现,同时为了简单起见公开包装器函数。
- 在MergeOperator中,”FullMerge()”函数接受*existing_value和合并操作数的序列(std::deque),而不是单个操作数。我们解释如下。
这些方法是如何工作的?
在较高的级别上,应该注意,对DB::Put()或DB::Merge()的任何调用并不一定强制计算值或立即进行合并。 RocksDB或多或少会延迟地决定何时实际应用这些操作(例如:下一次用户调用Get(),或者系统何时决定执行称为”压缩”的清理过程)。 这意味着,当实际调用合并操作符时,它可能有几个需要应用的“堆叠”操作数。因此,会给merge操作符::FullMerge()函数一个existing_value和一个已经堆叠的操作数列表。 然后,合并操作符应该一个一个地应用操作数(或者以客户端决定的任何优化方式,以便计算最终的new_value,就像一个一个地应用操作数一样)。
部分合并与叠加
有时候,在系统遇到合并操作数时就开始合并它们,而不是将它们堆积起来,这可能是有用的。 在本例中提供了MergeOperator::PartialMerge()函数。如果客户端指定的操作符能够逻辑地将两个合并操作数”组合”成一个操作数,那么应该在这个方法中提供这样做的语义,然后返回true。 如果逻辑上不可能,那么它应该保持*new_value不变并返回false。
从概念上讲,当库决定开始堆积和应用过程时,它首先尝试对遇到的每对操作数应用客户机指定的PartialMerge()。 每当返回false时,它将转而使用堆栈,直到找到Put/Delete基值,在这种情况下,它将调用FullMerge()函数,并将操作数作为列表参数传递。 一般来说,最后的FullMerge()调用应该返回true。它只应该在出现某种形式的损坏或坏数据时返回false。
如何使用AssociativeMergeOperator
如上所述,AssociativeMergeOperator继承自MergeOperator,允许客户端指定单个merge函数。 它覆盖PartialMerge()和FullMerge()来使用这个AssociativeMergeOperator::Merge()。 然后,它用于组合操作数,也用于遇到基值时。这就是为什么它只在上面描述的“联想性”假设下工作(它也解释了名字)。
何时允许单个合并操作数
通常,只有在至少有两个要操作的合并操作数时才调用合并操作符。如果需要调用合并操作符(即使只有一个操作数), Override AllowSingleOperand()将返回true。一个用于此目的的示例用例是,如果您使用merge操作符基于TTL更改值,以便在稍后的压缩过程中删除它(可能使用压缩过滤器)。
JSON 案例
使用通用的MergeOperator接口,我们现在能够实现json示例。
// A 'model' pseudo-code merge operator with json update semantics
// We pretend we have some in-memory data-structure (called JsonDataStructure) for
// parsing and serializing json strings.
class JsonMergeOperator : public MergeOperator { // not associative
public:
virtual bool FullMerge(const Slice& key,
const Slice* existing_value,
const std::deque<std::string>& operand_list,
std::string* new_value,
Logger* logger) const override {
JsonDataStructure obj;
if (existing_value) {
obj.ParseFrom(existing_value->ToString());
}
if (obj.IsInvalid()) {
Log(logger, "Invalid json string after parsing: %s", existing_value->ToString().c_str());
return false;
}
for (const auto& value : operand_list) {
auto split_vector = Split(value, " = "); // "xyz[0] = 5" might return ["xyz[0]", 5] as an std::vector, etc.
obj.SelectFromHierarchy(split_vector[0]) = split_vector[1];
if (obj.IsInvalid()) {
Log(logger, "Invalid json after parsing operand: %s", value.c_str());
return false;
}
}
obj.SerializeTo(new_value);
return true;
}
// Partial-merge two operands if and only if the two operands
// both update the same value. If so, take the "later" operand.
virtual bool PartialMerge(const Slice& key,
const Slice& left_operand,
const Slice& right_operand,
std::string* new_value,
Logger* logger) const override {
auto split_vector1 = Split(left_operand, " = "); // "xyz[0] = 5" might return ["xyz[0]", 5] as an std::vector, etc.
auto split_vector2 = Split(right_operand, " = ");
// If the two operations update the same value, just take the later one.
if (split_vector1[0] == split_vector2[0]) {
new_value->assign(right_operand.data(), right_operand.size());
return true;
} else {
return false;
}
}
virtual const char* Name() const override {
return "JsonMergeOperator";
}
};
...
// How to use it
DB* dbp;
Options options;
options.merge_operator.reset(new JsonMergeOperator);
DB::Open(options, "/tmp/db", &dbp);
std::shared_ptr<DB> db_(dbp);
...
// Put/store the json string into to the database
db_->Put(put_option_, "json_obj_key",
"{ employees: [ {first_name: john, last_name: doe}, {first_name: adam, last_name: smith}] }");
...
// Use the "merge operator" to incrementally update the value of the json string
db_->Merge(merge_option_, "json_obj_key", "employees[1].first_name = lucy");
db_->Merge(merge_option_, "json_obj_key", "employees[0].last_name = dow");
错误处理
如果MergeOperator::PartialMerge()返回false,这是给RocksDB的一个信号,表明合并应该被延迟(堆叠),直到找到一个Put/Delete值来使用FullMerge()。 但是,如果FullMerge()返回false,那么这将被视为“损坏”或错误。这意味着RocksDB通常会用Status::Corruption消息或类似的消息回复客户端。 因此,只有在客户机逻辑本身绝对没有可靠的方法来处理错误时,MergeOperator::FullMerge()方法才应该返回false。(参见JsonMergeOperator示例)
对于AssociativeMergeOperator, Merge()方法在错误处理方面遵循与MergeOperator::FullMerge()相同的”错误”规则。只有在没有处理值的逻辑方法时才返回false。 在上面的计数器示例中,Merge()总是返回true,因为我们可以将任何坏值解释为0。
审查及最佳实践
总之,我们已经描述了Merge操作符,以及如何使用它。根据用例,这里有一些关于何时/如何使用合并操作符和关联vemergeoperator的技巧。
何时使用合并
如果下列情况属实:
- 您有需要增量更新的数据。
- 在知道新值之前,通常需要读取数据。
然后使用wiki中指定的两个合并操作符之一。
关联数据
如果下列情况属实:
- 合并操作数的格式与Put值相同,并且
- 可以将多个操作数组合成一个操作数(只要它们的顺序相同)
然后使用AssociativeMergeOperator
通用合并
如果两个结合性约束都不成立,那么使用MergeOperator。 如果某些时候可以将多个操作数合并为一个操作数(但不总是):
- 使用MergeOperator
- 在操作数可以组合的情况下,让PartialMerge()函数返回true。
提示
多路复用: 虽然RocksDB DB对象在构建时只能传递一个合并操作符,但是用户定义的合并操作符类可以根据传递给它的数据表现出不同的行为。 键和值本身将传递给合并操作符; 因此,可以在操作数本身中编码不同的”操作”,并让合并操作符相应地执行不同的函数。
我的用例是关联的吗?: 如果不确定”关联性”约束是否适用于用例,则始终可以使用通用合并操作符。 AssociativeMergeOperator是合并操作符的一个直接子类,因此可以用AssociativeMergeOperator解决的任何用例都可以用更通用的合并操作符解决。 AssociativeMergeOperator主要是为了方便而提供的。
有用的链接
合并+压缩实现细节: (https://github.com/facebook/rocksdb/wiki/Merge-Operator-Implementation) 对于想知道合并操作符如何影响代码的RocksDB工程师。