- 66. Overview
- 67. Catalog Tables
- 68. Client
- 69. Client Request Filters
- 70. Master
- 71. RegionServer
66. Overview
66.1. NoSQL?
HBase is a type of “NoSQL” database. “NoSQL” is a general term meaning that the database isn’t an RDBMS which supports SQL as its primary access language, but there are many types of NoSQL databases: BerkeleyDB is an example of a local NoSQL database, whereas HBase is very much a distributed database. Technically speaking, HBase is really more a “Data Store” than “Data Base” because it lacks many of the features you find in an RDBMS, such as typed columns, secondary indexes, triggers, and advanced query languages, etc.
However, HBase has many features which supports both linear and modular scaling. HBase clusters expand by adding RegionServers that are hosted on commodity class servers. If a cluster expands from 10 to 20 RegionServers, for example, it doubles both in terms of storage and as well as processing capacity. An RDBMS can scale well, but only up to a point - specifically, the size of a single database server - and for the best performance requires specialized hardware and storage devices. HBase features of note are:
Strongly consistent reads/writes: HBase is not an “eventually consistent” DataStore. This makes it very suitable for tasks such as high-speed counter aggregation.
Automatic sharding: HBase tables are distributed on the cluster via regions, and regions are automatically split and re-distributed as your data grows.
Automatic RegionServer failover
Hadoop/HDFS Integration: HBase supports HDFS out of the box as its distributed file system.
MapReduce: HBase supports massively parallelized processing via MapReduce for using HBase as both source and sink.
Java Client API: HBase supports an easy to use Java API for programmatic access.
Thrift/REST API: HBase also supports Thrift and REST for non-Java front-ends.
Block Cache and Bloom Filters: HBase supports a Block Cache and Bloom Filters for high volume query optimization.
Operational Management: HBase provides build-in web-pages for operational insight as well as JMX metrics.
66.2. When Should I Use HBase?
HBase isn’t suitable for every problem.
First, make sure you have enough data. If you have hundreds of millions or billions of rows, then HBase is a good candidate. If you only have a few thousand/million rows, then using a traditional RDBMS might be a better choice due to the fact that all of your data might wind up on a single node (or two) and the rest of the cluster may be sitting idle.
Second, make sure you can live without all the extra features that an RDBMS provides (e.g., typed columns, secondary indexes, transactions, advanced query languages, etc.) An application built against an RDBMS cannot be “ported” to HBase by simply changing a JDBC driver, for example. Consider moving from an RDBMS to HBase as a complete redesign as opposed to a port.
Third, make sure you have enough hardware. Even HDFS doesn’t do well with anything less than 5 DataNodes (due to things such as HDFS block replication which has a default of 3), plus a NameNode.
HBase can run quite well stand-alone on a laptop - but this should be considered a development configuration only.
66.3. What Is The Difference Between HBase and Hadoop/HDFS?
HDFS is a distributed file system that is well suited for the storage of large files. Its documentation states that it is not, however, a general purpose file system, and does not provide fast individual record lookups in files. HBase, on the other hand, is built on top of HDFS and provides fast record lookups (and updates) for large tables. This can sometimes be a point of conceptual confusion. HBase internally puts your data in indexed “StoreFiles” that exist on HDFS for high-speed lookups. See the Data Model and the rest of this chapter for more information on how HBase achieves its goals.
67. Catalog Tables
The catalog table hbase:meta
exists as an HBase table and is filtered out of the HBase shell’s list
command, but is in fact a table just like any other.
67.1. hbase:meta
The hbase:meta
table (previously called .META.
) keeps a list of all regions in the system, and the location of hbase:meta
is stored in ZooKeeper.
The hbase:meta
table structure is as follows:
Key
- Region key of the format (
[table],[region start key],[region id]
)
Values
info:regioninfo
(serialized HRegionInfo instance for this region)info:server
(server:port of the RegionServer containing this region)info:serverstartcode
(start-time of the RegionServer process containing this region)
When a table is in the process of splitting, two other columns will be created, called info:splitA
and info:splitB
. These columns represent the two daughter regions. The values for these columns are also serialized HRegionInfo instances. After the region has been split, eventually this row will be deleted.
Note on HRegionInf
The empty key is used to denote table start and table end. A region with an empty start key is the first region in a table. If a region has both an empty start and an empty end key, it is the only region in the table
In the (hopefully unlikely) event that programmatic processing of catalog metadata is required, see the RegionInfo.parseFrom utility.
67.2. Startup Sequencing
First, the location of hbase:meta
is looked up in ZooKeeper. Next, hbase:meta
is updated with server and startcode values.
For information on region-RegionServer assignment, see Region-RegionServer Assignment.
68. Client
The HBase client finds the RegionServers that are serving the particular row range of interest. It does this by querying the hbase:meta
table. See hbase:meta for details. After locating the required region(s), the client contacts the RegionServer serving that region, rather than going through the master, and issues the read or write request. This information is cached in the client so that subsequent requests need not go through the lookup process. Should a region be reassigned either by the master load balancer or because a RegionServer has died, the client will requery the catalog tables to determine the new location of the user region.
See Runtime Impact for more information about the impact of the Master on HBase Client communication.
Administrative functions are done via an instance of Admin
68.1. Cluster Connections
The API changed in HBase 1.0. For connection configuration information, see Client configuration and dependencies connecting to an HBase cluster.
68.1.1. API as of HBase 1.0.0
It’s been cleaned up and users are returned Interfaces to work against rather than particular types. In HBase 1.0, obtain a Connection
object from ConnectionFactory
and thereafter, get from it instances of Table
, Admin
, and RegionLocator
on an as-need basis. When done, close the obtained instances. Finally, be sure to cleanup your Connection
instance before exiting. Connections
are heavyweight objects but thread-safe so you can create one for your application and keep the instance around. Table
, Admin
and RegionLocator
instances are lightweight. Create as you go and then let go as soon as you are done by closing them. See the Client Package Javadoc Description for example usage of the new HBase 1.0 API.
68.1.2. API before HBase 1.0.0
Instances of HTable
are the way to interact with an HBase cluster earlier than 1.0.0. Table instances are not thread-safe. Only one thread can use an instance of Table at any given time. When creating Table instances, it is advisable to use the same HBaseConfiguration instance. This will ensure sharing of ZooKeeper and socket instances to the RegionServers which is usually what you want. For example, this is preferred:
HBaseConfiguration conf = HBaseConfiguration.create();
HTable table1 = new HTable(conf, "myTable");
HTable table2 = new HTable(conf, "myTable");
as opposed to this:
HBaseConfiguration conf1 = HBaseConfiguration.create();
HTable table1 = new HTable(conf1, "myTable");
HBaseConfiguration conf2 = HBaseConfiguration.create();
HTable table2 = new HTable(conf2, "myTable");
For more information about how connections are handled in the HBase client, see ConnectionFactory.
Connection Pooling
For applications which require high-end multithreaded access (e.g., web-servers or application servers that may serve many application threads in a single JVM), you can pre-create a Connection
, as shown in the following example:
Example 22. Pre-Creating a Connection
// Create a connection to the cluster.
Configuration conf = HBaseConfiguration.create();
try (Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf(tablename))) {
// use table as needed, the table returned is lightweight
}
HTablePool
is DeprecatePrevious versions of this guide discussed
HTablePool
, which was deprecated in HBase 0.94, 0.95, and 0.96, and removed in 0.98.1, by HBASE-6580, orHConnection
, which is deprecated in HBase 1.0 byConnection
. Please use Connection instead.
68.2. WriteBuffer and Batch Methods
In HBase 1.0 and later, HTable is deprecated in favor of Table. Table
does not use autoflush. To do buffered writes, use the BufferedMutator class.
In HBase 2.0 and later, HTable does not use BufferedMutator to execute the Put
operation. Refer to HBASE-18500 for more information.
For additional information on write durability, review the ACID semantics page.
For fine-grained control of batching of Put
s or Delete
s, see the batch methods on Table.
68.3. Asynchronous Client
It is a new API introduced in HBase 2.0 which aims to provide the ability to access HBase asynchronously.
You can obtain an AsyncConnection
from ConnectionFactory
, and then get a asynchronous table instance from it to access HBase. When done, close the AsyncConnection
instance(usually when your program exits).
For the asynchronous table, most methods have the same meaning with the old Table
interface, expect that the return value is wrapped with a CompletableFuture usually. We do not have any buffer here so there is no close method for asynchronous table, you do not need to close it. And it is thread safe.
There are several differences for scan:
There is still a
getScanner
method which returns aResultScanner
. You can use it in the old way and it works like the oldClientAsyncPrefetchScanner
.There is a
scanAll
method which will return all the results at once. It aims to provide a simpler way for small scans which you want to get the whole results at once usually.The Observer Pattern. There is a scan method which accepts a
ScanResultConsumer
as a parameter. It will pass the results to the consumer.
Notice that AsyncTable
interface is templatized. The template parameter specifies the type of ScanResultConsumerBase
used by scans, which means the observer style scan APIs are different. The two types of scan consumers are - ScanResultConsumer
and AdvancedScanResultConsumer
.
ScanResultConsumer
needs a separate thread pool which is used to execute the callbacks registered to the returned CompletableFuture. Because the use of separate thread pool frees up RPC threads, callbacks are free to do anything. Use this if the callbacks are not quick, or when in doubt.
AdvancedScanResultConsumer
executes callbacks inside the framework thread. It is not allowed to do time consuming work in the callbacks else it will likely block the framework threads and cause very bad performance impact. As its name, it is designed for advanced users who want to write high performance code. See org.apache.hadoop.hbase.client.example.HttpProxyExample
for how to write fully asynchronous code with it.
68.4. Asynchronous Admin
You can obtain an AsyncConnection
from ConnectionFactory
, and then get a AsyncAdmin
instance from it to access HBase. Notice that there are two getAdmin
methods to get a AsyncAdmin
instance. One method has one extra thread pool parameter which is used to execute callbacks. It is designed for normal users. Another method doesn’t need a thread pool and all the callbacks are executed inside the framework thread so it is not allowed to do time consuming works in the callbacks. It is designed for advanced users.
The default getAdmin
methods will return a AsyncAdmin
instance which use default configs. If you want to customize some configs, you can use getAdminBuilder
methods to get a AsyncAdminBuilder
for creating AsyncAdmin
instance. Users are free to only set the configs they care about to create a new AsyncAdmin
instance.
For the AsyncAdmin
interface, most methods have the same meaning with the old Admin
interface, expect that the return value is wrapped with a CompletableFuture usually.
For most admin operations, when the returned CompletableFuture is done, it means the admin operation has also been done. But for compact operation, it only means the compact request was sent to HBase and may need some time to finish the compact operation. For rollWALWriter
method, it only means the rollWALWriter request was sent to the region server and may need some time to finish the rollWALWriter
operation.
For region name, we only accept byte[]
as the parameter type and it may be a full region name or a encoded region name. For server name, we only accept ServerName
as the parameter type. For table name, we only accept TableName
as the parameter type. For list*
operations, we only accept Pattern
as the parameter type if you want to do regex matching.
68.5. External Clients
Information on non-Java clients and custom protocols is covered in Apache HBase External APIs
69. Client Request Filters
Get and Scan instances can be optionally configured with filters which are applied on the RegionServer.
Filters can be confusing because there are many different types, and it is best to approach them by understanding the groups of Filter functionality.
69.1. Structural
Structural Filters contain other Filters.
69.1.1. FilterList
FilterList represents a list of Filters with a relationship of FilterList.Operator.MUST_PASS_ALL
or FilterList.Operator.MUST_PASS_ONE
between the Filters. The following example shows an ‘or’ between two Filters (checking for either ‘my value’ or ‘my other value’ on the same attribute).
FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ONE);
SingleColumnValueFilter filter1 = new SingleColumnValueFilter(
cf,
column,
CompareOperator.EQUAL,
Bytes.toBytes("my value")
);
list.add(filter1);
SingleColumnValueFilter filter2 = new SingleColumnValueFilter(
cf,
column,
CompareOperator.EQUAL,
Bytes.toBytes("my other value")
);
list.add(filter2);
scan.setFilter(list);
69.2. Column Value
69.2.1. SingleColumnValueFilter
A SingleColumnValueFilter (see: https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.html) can be used to test column values for equivalence (CompareOperaor.EQUAL
), inequality (CompareOperaor.NOT_EQUAL
), or ranges (e.g., CompareOperaor.GREATER
). The following is an example of testing equivalence of a column to a String value “my value”…
SingleColumnValueFilter filter = new SingleColumnValueFilter(
cf,
column,
CompareOperaor.EQUAL,
Bytes.toBytes("my value")
);
scan.setFilter(filter);
69.2.2. ColumnValueFilter
Introduced in HBase-2.0.0 version as a complementation of SingleColumnValueFilter, ColumnValueFilter gets matched cell only, while SingleColumnValueFilter gets the entire row (has other columns and values) to which the matched cell belongs. Parameters of constructor of ColumnValueFilter are the same as SingleColumnValueFilter.
ColumnValueFilter filter = new ColumnValueFilter(
cf,
column,
CompareOperaor.EQUAL,
Bytes.toBytes("my value")
);
scan.setFilter(filter);
Note. For simple query like “equals to a family:qualifier:value”, we highly recommend to use the following way instead of using SingleColumnValueFilter or ColumnValueFilter:
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("family"), Bytes.toBytes("qualifier"));
ValueFilter vf = new ValueFilter(CompareOperator.EQUAL,
new BinaryComparator(Bytes.toBytes("value")));
scan.setFilter(vf);
...
This scan will restrict to the specified column ‘family:qualifier’, avoiding scan unrelated families and columns, which has better performance, and ValueFilter
is the condition used to do the value filtering.
But if query is much more complicated beyond this book, then please make your good choice case by case.
69.3. Column Value Comparators
There are several Comparator classes in the Filter package that deserve special mention. These Comparators are used in concert with other Filters, such as SingleColumnValueFilter.
69.3.1. RegexStringComparator
RegexStringComparator supports regular expressions for value comparisons.
RegexStringComparator comp = new RegexStringComparator("my."); // any value that starts with 'my'
SingleColumnValueFilter filter = new SingleColumnValueFilter(
cf,
column,
CompareOperaor.EQUAL,
comp
);
scan.setFilter(filter);
See the Oracle JavaDoc for supported RegEx patterns in Java.
69.3.2. SubstringComparator
SubstringComparator can be used to determine if a given substring exists in a value. The comparison is case-insensitive.
SubstringComparator comp = new SubstringComparator("y val"); // looking for 'my value'
SingleColumnValueFilter filter = new SingleColumnValueFilter(
cf,
column,
CompareOperaor.EQUAL,
comp
);
scan.setFilter(filter);
69.3.3. BinaryPrefixComparator
69.3.4. BinaryComparator
See BinaryComparator.
69.4. KeyValue Metadata
As HBase stores data internally as KeyValue pairs, KeyValue Metadata Filters evaluate the existence of keys (i.e., ColumnFamily:Column qualifiers) for a row, as opposed to values the previous section.
69.4.1. FamilyFilter
FamilyFilter can be used to filter on the ColumnFamily. It is generally a better idea to select ColumnFamilies in the Scan than to do it with a Filter.
69.4.2. QualifierFilter
QualifierFilter can be used to filter based on Column (aka Qualifier) name.
69.4.3. ColumnPrefixFilter
ColumnPrefixFilter can be used to filter based on the lead portion of Column (aka Qualifier) names.
A ColumnPrefixFilter seeks ahead to the first column matching the prefix in each row and for each involved column family. It can be used to efficiently get a subset of the columns in very wide rows.
Note: The same column qualifier can be used in different column families. This filter returns all matching columns.
Example: Find all columns in a row and family that start with “abc”
Table t = ...;
byte[] row = ...;
byte[] family = ...;
byte[] prefix = Bytes.toBytes("abc");
Scan scan = new Scan(row, row); // (optional) limit to one row
scan.addFamily(family); // (optional) limit to one family
Filter f = new ColumnPrefixFilter(prefix);
scan.setFilter(f);
scan.setBatch(10); // set this if there could be many columns returned
ResultScanner rs = t.getScanner(scan);
for (Result r = rs.next(); r != null; r = rs.next()) {
for (KeyValue kv : r.raw()) {
// each kv represents a column
}
}
rs.close();
69.4.4. MultipleColumnPrefixFilter
MultipleColumnPrefixFilter behaves like ColumnPrefixFilter but allows specifying multiple prefixes.
Like ColumnPrefixFilter, MultipleColumnPrefixFilter efficiently seeks ahead to the first column matching the lowest prefix and also seeks past ranges of columns between prefixes. It can be used to efficiently get discontinuous sets of columns from very wide rows.
Example: Find all columns in a row and family that start with “abc” or “xyz”
Table t = ...;
byte[] row = ...;
byte[] family = ...;
byte[][] prefixes = new byte[][] {Bytes.toBytes("abc"), Bytes.toBytes("xyz")};
Scan scan = new Scan(row, row); // (optional) limit to one row
scan.addFamily(family); // (optional) limit to one family
Filter f = new MultipleColumnPrefixFilter(prefixes);
scan.setFilter(f);
scan.setBatch(10); // set this if there could be many columns returned
ResultScanner rs = t.getScanner(scan);
for (Result r = rs.next(); r != null; r = rs.next()) {
for (KeyValue kv : r.raw()) {
// each kv represents a column
}
}
rs.close();
69.4.5. ColumnRangeFilter
A ColumnRangeFilter allows efficient intra row scanning.
A ColumnRangeFilter can seek ahead to the first matching column for each involved column family. It can be used to efficiently get a ‘slice’ of the columns of a very wide row. i.e. you have a million columns in a row but you only want to look at columns bbbb-bbdd.
Note: The same column qualifier can be used in different column families. This filter returns all matching columns.
Example: Find all columns in a row and family between “bbbb” (inclusive) and “bbdd” (inclusive)
Table t = ...;
byte[] row = ...;
byte[] family = ...;
byte[] startColumn = Bytes.toBytes("bbbb");
byte[] endColumn = Bytes.toBytes("bbdd");
Scan scan = new Scan(row, row); // (optional) limit to one row
scan.addFamily(family); // (optional) limit to one family
Filter f = new ColumnRangeFilter(startColumn, true, endColumn, true);
scan.setFilter(f);
scan.setBatch(10); // set this if there could be many columns returned
ResultScanner rs = t.getScanner(scan);
for (Result r = rs.next(); r != null; r = rs.next()) {
for (KeyValue kv : r.raw()) {
// each kv represents a column
}
}
rs.close();
Note: Introduced in HBase 0.92
69.5. RowKey
69.5.1. RowFilter
It is generally a better idea to use the startRow/stopRow methods on Scan for row selection, however RowFilter can also be used.
69.6. Utility
69.6.1. FirstKeyOnlyFilter
This is primarily used for rowcount jobs. See FirstKeyOnlyFilter.
70. Master
HMaster
is the implementation of the Master Server. The Master server is responsible for monitoring all RegionServer instances in the cluster, and is the interface for all metadata changes. In a distributed cluster, the Master typically runs on the NameNode. J Mohamed Zahoor goes into some more detail on the Master Architecture in this blog posting, HBase HMaster Architecture .
70.1. Startup Behavior
If run in a multi-Master environment, all Masters compete to run the cluster. If the active Master loses its lease in ZooKeeper (or the Master shuts down), then the remaining Masters jostle to take over the Master role.
70.2. Runtime Impact
A common dist-list question involves what happens to an HBase cluster when the Master goes down. Because the HBase client talks directly to the RegionServers, the cluster can still function in a “steady state”. Additionally, per Catalog Tables, hbase:meta
exists as an HBase table and is not resident in the Master. However, the Master controls critical functions such as RegionServer failover and completing region splits. So while the cluster can still run for a short time without the Master, the Master should be restarted as soon as possible.
70.3. Interface
The methods exposed by HMasterInterface
are primarily metadata-oriented methods:
Table (createTable, modifyTable, removeTable, enable, disable)
ColumnFamily (addColumn, modifyColumn, removeColumn)
Region (move, assign, unassign) For example, when the
Admin
methoddisableTable
is invoked, it is serviced by the Master server.
70.4. Processes
The Master runs several background threads:
70.4.1. LoadBalancer
Periodically, and when there are no regions in transition, a load balancer will run and move regions around to balance the cluster’s load. See Balancer for configuring this property.
See Region-RegionServer Assignment for more information on region assignment.
70.4.2. CatalogJanitor
Periodically checks and cleans up the hbase:meta
table. See hbase:meta for more information on the meta table.
70.5. MasterProcWAL
HMaster records administrative operations and their running states, such as the handling of a crashed server, table creation, and other DDLs, into its own WAL file. The WALs are stored under the MasterProcWALs directory. The Master WALs are not like RegionServer WALs. Keeping up the Master WAL allows us run a state machine that is resilient across Master failures. For example, if a HMaster was in the middle of creating a table encounters an issue and fails, the next active HMaster can take up where the previous left off and carry the operation to completion. Since hbase-2.0.0, a new AssignmentManager (A.K.A AMv2) was introduced and the HMaster handles region assignment operations, server crash processing, balancing, etc., all via AMv2 persisting all state and transitions into MasterProcWALs rather than up into ZooKeeper, as we do in hbase-1.x.
See AMv2 Description for Devs (and Procedure Framework (Pv2): HBASE-12439 for its basis) if you would like to learn more about the new AssignmentManager.
70.5.1. Configurations for MasterProcWAL
Here are the list of configurations that effect MasterProcWAL operation. You should not have to change your defaults.
hbase.procedure.store.wal.periodic.roll.msec
Description
Frequency of generating a new WAL
Default
1h (3600000 in msec)
hbase.procedure.store.wal.roll.threshold
Description
Threshold in size before the WAL rolls. Every time the WAL reaches this size or the above period, 1 hour, passes since last log roll, the HMaster will generate a new WAL.
Default
32MB (33554432 in byte)
hbase.procedure.store.wal.warn.threshold
Description
If the number of WALs goes beyond this threshold, the following message should appear in the HMaster log with WARN level when rolling.
procedure WALs count=xx above the warning threshold 64\. check running procedures to see if something is stuck.
Default
64
hbase.procedure.store.wal.max.retries.before.roll
Description
Max number of retry when syncing slots (records) to its underlying storage, such as HDFS. Every attempt, the following message should appear in the HMaster log.
unable to sync slots, retry=xx
Default
3
hbase.procedure.store.wal.sync.failure.roll.max
Description
After the above 3 retrials, the log is rolled and the retry count is reset to 0, thereon a new set of retrial starts. This configuration controls the max number of attempts of log rolling upon sync failure. That is, HMaster is allowed to fail to sync 9 times in total. Once it exceeds, the following log should appear in the HMaster log.
Sync slots after log roll failed, abort.
Default
3
71. RegionServer
HRegionServer
is the RegionServer implementation. It is responsible for serving and managing regions. In a distributed cluster, a RegionServer runs on a DataNode.
71.1. Interface
The methods exposed by HRegionRegionInterface
contain both data-oriented and region-maintenance methods:
Data (get, put, delete, next, etc.)
Region (splitRegion, compactRegion, etc.) For example, when the
Admin
methodmajorCompact
is invoked on a table, the client is actually iterating through all regions for the specified table and requesting a major compaction directly to each region.
71.2. Processes
The RegionServer runs a variety of background threads:
71.2.1. CompactSplitThread
Checks for splits and handle minor compactions.
71.2.2. MajorCompactionChecker
Checks for major compactions.
71.2.3. MemStoreFlusher
Periodically flushes in-memory writes in the MemStore to StoreFiles.
71.2.4. LogRoller
Periodically checks the RegionServer’s WAL.
71.3. Coprocessors
Coprocessors were added in 0.92. There is a thorough Blog Overview of CoProcessors posted. Documentation will eventually move to this reference guide, but the blog is the most current information available at this time.
71.4. Block Cache
HBase provides two different BlockCache implementations to cache data read from HDFS: the default on-heap LruBlockCache
and the BucketCache
, which is (usually) off-heap. This section discusses benefits and drawbacks of each implementation, how to choose the appropriate option, and configuration options for each.
Block Cache Reporting: U
See the RegionServer UI for detail on caching deploy. See configurations, sizings, current usage, time-in-the-cache, and even detail on block counts and types.
71.4.1. Cache Choices
LruBlockCache
is the original implementation, and is entirely within the Java heap. BucketCache
is optional and mainly intended for keeping block cache data off-heap, although BucketCache
can also be a file-backed cache.
When you enable BucketCache, you are enabling a two tier caching system. We used to describe the tiers as “L1” and “L2” but have deprecated this terminology as of hbase-2.0.0. The “L1” cache referred to an instance of LruBlockCache and “L2” to an off-heap BucketCache. Instead, when BucketCache is enabled, all DATA blocks are kept in the BucketCache tier and meta blocks — INDEX and BLOOM blocks — are on-heap in the LruBlockCache
. Management of these two tiers and the policy that dictates how blocks move between them is done by CombinedBlockCache
.
71.4.2. General Cache Configurations
Apart from the cache implementation itself, you can set some general configuration options to control how the cache performs. See CacheConfig. After setting any of these options, restart or rolling restart your cluster for the configuration to take effect. Check logs for errors or unexpected behavior.
See also Prefetch Option for Blockcache, which discusses a new option introduced in HBASE-9857.
71.4.3. LruBlockCache Design
The LruBlockCache is an LRU cache that contains three levels of block priority to allow for scan-resistance and in-memory ColumnFamilies:
Single access priority: The first time a block is loaded from HDFS it normally has this priority and it will be part of the first group to be considered during evictions. The advantage is that scanned blocks are more likely to get evicted than blocks that are getting more usage.
Multi access priority: If a block in the previous priority group is accessed again, it upgrades to this priority. It is thus part of the second group considered during evictions.
In-memory access priority: If the block’s family was configured to be “in-memory”, it will be part of this priority disregarding the number of times it was accessed. Catalog tables are configured like this. This group is the last one considered during evictions.
To mark a column family as in-memory, call
HColumnDescriptor.setInMemory(true);
if creating a table from java, or set IN_MEMORY ⇒ true
when creating or altering a table in the shell: e.g.
hbase(main):003:0> create 't', {NAME => 'f', IN_MEMORY => 'true'}
For more information, see the LruBlockCache source
71.4.4. LruBlockCache Usage
Block caching is enabled by default for all the user tables which means that any read operation will load the LRU cache. This might be good for a large number of use cases, but further tunings are usually required in order to achieve better performance. An important concept is the working set size, or WSS, which is: “the amount of memory needed to compute the answer to a problem”. For a website, this would be the data that’s needed to answer the queries over a short amount of time.
The way to calculate how much memory is available in HBase for caching is:
number of region servers * heap size * hfile.block.cache.size * 0.99
The default value for the block cache is 0.4 which represents 40% of the available heap. The last value (99%) is the default acceptable loading factor in the LRU cache after which eviction is started. The reason it is included in this equation is that it would be unrealistic to say that it is possible to use 100% of the available memory since this would make the process blocking from the point where it loads new blocks. Here are some examples:
One region server with the heap size set to 1 GB and the default block cache size will have 405 MB of block cache available.
20 region servers with the heap size set to 8 GB and a default block cache size will have 63.3 of block cache.
100 region servers with the heap size set to 24 GB and a block cache size of 0.5 will have about 1.16 TB of block cache.
Your data is not the only resident of the block cache. Here are others that you may have to take into account:
Catalog Tables
The hbase:meta
table is forced into the block cache and have the in-memory priority which means that they are harder to evict.
The hbase:meta tables can occupy a few MBs depending on the number of regions.
HFiles Indexes
An HFile is the file format that HBase uses to store data in HDFS. It contains a multi-layered index which allows HBase to seek to the data without having to read the whole file. The size of those indexes is a factor of the block size (64KB by default), the size of your keys and the amount of data you are storing. For big data sets it’s not unusual to see numbers around 1GB per region server, although not all of it will be in cache because the LRU will evict indexes that aren’t used.
Keys
The values that are stored are only half the picture, since each value is stored along with its keys (row key, family qualifier, and timestamp). See Try to minimize row and column sizes.
Bloom Filters
Just like the HFile indexes, those data structures (when enabled) are stored in the LRU.
Currently the recommended way to measure HFile indexes and bloom filters sizes is to look at the region server web UI and checkout the relevant metrics. For keys, sampling can be done by using the HFile command line tool and look for the average key size metric. Since HBase 0.98.3, you can view details on BlockCache stats and metrics in a special Block Cache section in the UI.
It’s generally bad to use block caching when the WSS doesn’t fit in memory. This is the case when you have for example 40GB available across all your region servers’ block caches but you need to process 1TB of data. One of the reasons is that the churn generated by the evictions will trigger more garbage collections unnecessarily. Here are two use cases:
Fully random reading pattern: This is a case where you almost never access the same row twice within a short amount of time such that the chance of hitting a cached block is close to 0. Setting block caching on such a table is a waste of memory and CPU cycles, more so that it will generate more garbage to pick up by the JVM. For more information on monitoring GC, see JVM Garbage Collection Logs.
Mapping a table: In a typical MapReduce job that takes a table in input, every row will be read only once so there’s no need to put them into the block cache. The Scan object has the option of turning this off via the setCaching method (set it to false). You can still keep block caching turned on on this table if you need fast random read access. An example would be counting the number of rows in a table that serves live traffic, caching every block of that table would create massive churn and would surely evict data that’s currently in use.
Caching META blocks only (DATA blocks in fscache)
An interesting setup is one where we cache META blocks only and we read DATA blocks in on each access. If the DATA blocks fit inside fscache, this alternative may make sense when access is completely random across a very large dataset. To enable this setup, alter your table and for each column family set BLOCKCACHE ⇒ 'false'
. You are ‘disabling’ the BlockCache for this column family only. You can never disable the caching of META blocks. Since HBASE-4683 Always cache index and bloom blocks, we will cache META blocks even if the BlockCache is disabled.
71.4.5. Off-heap Block Cache
How to Enable BucketCache
The usual deploy of BucketCache is via a managing class that sets up two caching tiers: an on-heap cache implemented by LruBlockCache and a second cache implemented with BucketCache. The managing class is CombinedBlockCache by default. The previous link describes the caching ‘policy’ implemented by CombinedBlockCache. In short, it works by keeping meta blocks — INDEX and BLOOM in the on-heap LruBlockCache tier — and DATA blocks are kept in the BucketCache tier.
Pre-hbase-2.0.0 versions
Fetching will always be slower when fetching from BucketCache in pre-hbase-2.0.0, as compared to the native on-heap LruBlockCache. However, latencies tend to be less erratic across time, because there is less garbage collection when you use BucketCache since it is managing BlockCache allocations, not the GC. If the BucketCache is deployed in off-heap mode, this memory is not managed by the GC at all. This is why you’d use BucketCache in pre-2.0.0, so your latencies are less erratic, to mitigate GCs and heap fragmentation, and so you can safely use more memory. See Nick Dimiduk’s BlockCache 101 for comparisons running on-heap vs off-heap tests. Also see Comparing BlockCache Deploys which finds that if your dataset fits inside your LruBlockCache deploy, use it otherwise if you are experiencing cache churn (or you want your cache to exist beyond the vagaries of java GC), use BucketCache.
In pre-2.0.0, one can configure the BucketCache so it receives the victim
of an LruBlockCache eviction. All Data and index blocks are cached in L1 first. When eviction happens from L1, the blocks (or victims
) will get moved to L2. Set cacheDataInL1
via (HColumnDescriptor.setCacheDataInL1(true)
or in the shell, creating or amending column families setting CACHE_DATA_IN_L1
to true: e.g.
hbase(main):003:0> create 't', {NAME => 't', CONFIGURATION => {CACHE_DATA_IN_L1 => 'true'}}
hbase-2.0.0+ versions
HBASE-11425 changed the HBase read path so it could hold the read-data off-heap avoiding copying of cached data on to the java heap. See Offheap read-path. In hbase-2.0.0, off-heap latencies approach those of on-heap cache latencies with the added benefit of NOT provoking GC.
From HBase 2.0.0 onwards, the notions of L1 and L2 have been deprecated. When BucketCache is turned on, the DATA blocks will always go to BucketCache and INDEX/BLOOM blocks go to on heap LRUBlockCache. cacheDataInL1
support hase been removed.
The BucketCache Block Cache can be deployed off-heap, file or mmaped file mode.
You set which via the hbase.bucketcache.ioengine
setting. Setting it to offheap
will have BucketCache make its allocations off-heap, and an ioengine setting of file:PATH_TO_FILE
will direct BucketCache to use file caching (Useful in particular if you have some fast I/O attached to the box such as SSDs). From 2.0.0, it is possible to have more than one file backing the BucketCache. This is very useful specially when the Cache size requirement is high. For multiple backing files, configure ioengine as files:PATH_TO_FILE1,PATH_TO_FILE2,PATH_TO_FILE3
. BucketCache can be configured to use an mmapped file also. Configure ioengine as mmap:PATH_TO_FILE
for this.
It is possible to deploy a tiered setup where we bypass the CombinedBlockCache policy and have BucketCache working as a strict L2 cache to the L1 LruBlockCache. For such a setup, set hbase.bucketcache.combinedcache.enabled
to false
. In this mode, on eviction from L1, blocks go to L2. When a block is cached, it is cached first in L1. When we go to look for a cached block, we look first in L1 and if none found, then search L2. Let us call this deploy format, Raw L1+L2. NOTE: This L1+L2 mode is removed from 2.0.0. When BucketCache is used, it will be strictly the DATA cache and the LruBlockCache will cache INDEX/META blocks.
Other BucketCache configs include: specifying a location to persist cache to across restarts, how many threads to use writing the cache, etc. See the CacheConfig.html class for configuration options and descriptions.
To check it enabled, look for the log line describing cache setup; it will detail how BucketCache has been deployed. Also see the UI. It will detail the cache tiering and their configuration.
BucketCache Example Configuration
This sample provides a configuration for a 4 GB off-heap BucketCache with a 1 GB on-heap cache.
Configuration is performed on the RegionServer.
Setting hbase.bucketcache.ioengine
and hbase.bucketcache.size
> 0 enables CombinedBlockCache
. Let us presume that the RegionServer has been set to run with a 5G heap: i.e. HBASE_HEAPSIZE=5g
.
- First, edit the RegionServer’s hbase-env.sh and set
HBASE_OFFHEAPSIZE
to a value greater than the off-heap size wanted, in this case, 4 GB (expressed as 4G). Let’s set it to 5G. That’ll be 4G for our off-heap cache and 1G for any other uses of off-heap memory (there are other users of off-heap memory other than BlockCache; e.g. DFSClient in RegionServer can make use of off-heap memory). See Direct Memory Usage In HBase.HBASE_OFFHEAPSIZE=5G
- Next, add the following configuration to the RegionServer’s hbase-site.xml.
<property>
<name>hbase.bucketcache.ioengine</name>
<value>offheap</value>
</property>
<property>
<name>hfile.block.cache.size</name>
<value>0.2</value>
</property>
<property>
<name>hbase.bucketcache.size</name>
<value>4196</value>
</property>
- Restart or rolling restart your cluster, and check the logs for any issues.
In the above, we set the BucketCache to be 4G. We configured the on-heap LruBlockCache have 20% (0.2) of the RegionServer’s heap size (0.2 * 5G = 1G). In other words, you configure the L1 LruBlockCache as you would normally (as if there were no L2 cache present).
HBASE-10641 introduced the ability to configure multiple sizes for the buckets of the BucketCache, in HBase 0.98 and newer. To configurable multiple bucket sizes, configure the new property hbase.bucketcache.bucket.sizes
to a comma-separated list of block sizes, ordered from smallest to largest, with no spaces. The goal is to optimize the bucket sizes based on your data access patterns. The following example configures buckets of size 4096 and 8192.
<property>
<name>hbase.bucketcache.bucket.sizes</name>
<value>4096,8192</value>
</property>
Direct Memory Usage In HBase
The default maximum direct memory varies by JVM. Traditionally it is 64M or some relation to allocated heap size (-Xmx) or no limit at all (JDK7 apparently). HBase servers use direct memory, in particular short-circuit reading (See Leveraging local data), the hosted DFSClient will allocate direct memory buffers. How much the DFSClient uses is not easy to quantify; it is the number of open HFiles *
hbase.dfs.client.read.shortcircuit.buffer.size
wherehbase.dfs.client.read.shortcircuit.buffer.size
is set to 128k in HBase — see hbase-default.xml default configurations. If you do off-heap block caching, you’ll be making use of direct memory. The RPCServer uses a ByteBuffer pool. From 2.0.0, these buffers are off-heap ByteBuffers. Starting your JVM, make sure the-XX:MaxDirectMemorySize
setting in conf/hbase-env.sh considers off-heap BlockCache (hbase.bucketcache.size
), DFSClient usage, RPC side ByteBufferPool max size. This has to be bit higher than sum of off heap BlockCache size and max ByteBufferPool size. Allocating an extra of 1-2 GB for the max direct memory size has worked in tests. Direct memory, which is part of the Java process heap, is separate from the object heap allocated by -Xmx. The value allocated byMaxDirectMemorySize
must not exceed physical RAM, and is likely to be less than the total available RAM due to other memory requirements and system constraints.You can see how much memory — on-heap and off-heap/direct — a RegionServer is configured to use and how much it is using at any one time by looking at the Server Metrics: Memory tab in the UI. It can also be gotten via JMX. In particular the direct memory currently used by the server can be found on the
java.nio.type=BufferPool,name=direct
bean. Terracotta has a good write up on using off-heap memory in Java. It is for their product BigMemory but a lot of the issues noted apply in general to any attempt at going off-heap. Check it out.hbase.bucketcache.percentage.in.combinedcache
This is a pre-HBase 1.0 configuration removed because it was confusing. It was a float that you would set to some value between 0.0 and 1.0. Its default was 0.9. If the deploy was using CombinedBlockCache, then the LruBlockCache L1 size was calculated to be
(1 - hbase.bucketcache.percentage.in.combinedcache) * size-of-bucketcache
and the BucketCache size washbase.bucketcache.percentage.in.combinedcache * size-of-bucket-cache
. where size-of-bucket-cache itself is EITHER the value of the configurationhbase.bucketcache.size
IF it was specified as Megabytes ORhbase.bucketcache.size
*-XX:MaxDirectMemorySize
ifhbase.bucketcache.size
is between 0 and 1.0.In 1.0, it should be more straight-forward. Onheap LruBlockCache size is set as a fraction of java heap using
hfile.block.cache.size setting
(not the best name) and BucketCache is set as above in absolute Megabytes.
71.4.6. Compressed BlockCache
HBASE-11331 introduced lazy BlockCache decompression, more simply referred to as compressed BlockCache. When compressed BlockCache is enabled data and encoded data blocks are cached in the BlockCache in their on-disk format, rather than being decompressed and decrypted before caching.
For a RegionServer hosting more data than can fit into cache, enabling this feature with SNAPPY compression has been shown to result in 50% increase in throughput and 30% improvement in mean latency while, increasing garbage collection by 80% and increasing overall CPU load by 2%. See HBASE-11331 for more details about how performance was measured and achieved. For a RegionServer hosting data that can comfortably fit into cache, or if your workload is sensitive to extra CPU or garbage-collection load, you may receive less benefit.
The compressed BlockCache is disabled by default. To enable it, set hbase.block.data.cachecompressed
to true
in hbase-site.xml on all RegionServers.
71.5. RegionServer Offheap Read/Write Path
71.5.1. Offheap read-path
In hbase-2.0.0, HBASE-11425 changed the HBase read path so it could hold the read-data off-heap avoiding copying of cached data on to the java heap. This reduces GC pauses given there is less garbage made and so less to clear. The off-heap read path has a performance that is similar/better to that of the on-heap LRU cache. This feature is available since HBase 2.0.0. If the BucketCache is in file
mode, fetching will always be slower compared to the native on-heap LruBlockCache. Refer to below blogs for more details and test results on off heaped read path Offheaping the Read Path in Apache HBase: Part 1 of 2 and Offheap Read-Path in Production - The Alibaba story
For an end-to-end off-heaped read-path, first of all there should be an off-heap backed Off-heap Block Cache(BC). Configure ‘hbase.bucketcache.ioengine’ to off-heap in hbase-site.xml. Also specify the total capacity of the BC using hbase.bucketcache.size
config. Please remember to adjust value of ‘HBASEOFFHEAPSIZE’ in _hbase-env.sh. This is how we specify the max possible off-heap memory allocation for the RegionServer java process. This should be bigger than the off-heap BC size. Please keep in mind that there is no default for hbase.bucketcache.ioengine
which means the BC is turned OFF by default (See Direct Memory Usage In HBase).
Next thing to tune is the ByteBuffer pool on the RPC server side. The buffers from this pool will be used to accumulate the cell bytes and create a result cell block to send back to the client side. hbase.ipc.server.reservoir.enabled
can be used to turn this pool ON or OFF. By default this pool is ON and available. HBase will create off heap ByteBuffers and pool them. Please make sure not to turn this OFF if you want end-to-end off-heaping in read path. If this pool is turned off, the server will create temp buffers on heap to accumulate the cell bytes and make a result cell block. This can impact the GC on a highly read loaded server. The user can tune this pool with respect to how many buffers are in the pool and what should be the size of each ByteBuffer. Use the config hbase.ipc.server.reservoir.initial.buffer.size
to tune each of the buffer sizes. Default is 64 KB.
When the read pattern is a random row read load and each of the rows are smaller in size compared to this 64 KB, try reducing this. When the result size is larger than one ByteBuffer size, the server will try to grab more than one buffer and make a result cell block out of these. When the pool is running out of buffers, the server will end up creating temporary on-heap buffers.
The maximum number of ByteBuffers in the pool can be tuned using the config ‘hbase.ipc.server.reservoir.initial.max’. Its value defaults to 64 * region server handlers configured (See the config ‘hbase.regionserver.handler.count’). The math is such that by default we consider 2 MB as the result cell block size per read result and each handler will be handling a read. For 2 MB size, we need 32 buffers each of size 64 KB (See default buffer size in pool). So per handler 32 ByteBuffers(BB). We allocate twice this size as the max BBs count such that one handler can be creating the response and handing it to the RPC Responder thread and then handling a new request creating a new response cell block (using pooled buffers). Even if the responder could not send back the first TCP reply immediately, our count should allow that we should still have enough buffers in our pool without having to make temporary buffers on the heap. Again for smaller sized random row reads, tune this max count. There are lazily created buffers and the count is the max count to be pooled.
If you still see GC issues even after making end-to-end read path off-heap, look for issues in the appropriate buffer pool. Check the below RegionServer log with INFO level:
Pool already reached its max capacity : XXX and no free buffers now. Consider increasing the value for 'hbase.ipc.server.reservoir.initial.max' ?
The setting for HBASE_OFFHEAPSIZE in hbase-env.sh should consider this off heap buffer pool at the RPC side also. We need to config this max off heap size for the RegionServer as a bit higher than the sum of this max pool size and the off heap cache size. The TCP layer will also need to create direct bytebuffers for TCP communication. Also the DFS client will need some off-heap to do its workings especially if short-circuit reads are configured. Allocating an extra of 1 - 2 GB for the max direct memory size has worked in tests.
If you are using co processors and refer the Cells in the read results, DO NOT store reference to these Cells out of the scope of the CP hook methods. Some times the CPs need store info about the cell (Like its row key) for considering in the next CP hook call etc. For such cases, pls clone the required fields of the entire Cell as per the use cases. [ See CellUtil#cloneXXX(Cell) APIs ]
71.5.2. Offheap write-path
TODO
71.6. RegionServer Splitting Implementation
As write requests are handled by the region server, they accumulate in an in-memory storage system called the memstore. Once the memstore fills, its content are written to disk as additional store files. This event is called a memstore flush. As store files accumulate, the RegionServer will compact them into fewer, larger files. After each flush or compaction finishes, the amount of data stored in the region has changed. The RegionServer consults the region split policy to determine if the region has grown too large or should be split for another policy-specific reason. A region split request is enqueued if the policy recommends it.
Logically, the process of splitting a region is simple. We find a suitable point in the keyspace of the region where we should divide the region in half, then split the region’s data into two new regions at that point. The details of the process however are not simple. When a split happens, the newly created daughter regions do not rewrite all the data into new files immediately. Instead, they create small files similar to symbolic link files, named Reference files, which point to either the top or bottom part of the parent store file according to the split point. The reference file is used just like a regular data file, but only half of the records are considered. The region can only be split if there are no more references to the immutable data files of the parent region. Those reference files are cleaned gradually by compactions, so that the region will stop referring to its parents files, and can be split further.
Although splitting the region is a local decision made by the RegionServer, the split process itself must coordinate with many actors. The RegionServer notifies the Master before and after the split, updates the .META.
table so that clients can discover the new daughter regions, and rearranges the directory structure and data files in HDFS. Splitting is a multi-task process. To enable rollback in case of an error, the RegionServer keeps an in-memory journal about the execution state. The steps taken by the RegionServer to execute the split are illustrated in RegionServer Split Process. Each step is labeled with its step number. Actions from RegionServers or Master are shown in red, while actions from the clients are show in green.
Figure 1. RegionServer Split Process
The RegionServer decides locally to split the region, and prepares the split. THE SPLIT TRANSACTION IS STARTED. As a first step, the RegionServer acquires a shared read lock on the table to prevent schema modifications during the splitting process. Then it creates a znode in zookeeper under
/hbase/region-in-transition/region-name
, and sets the znode’s state toSPLITTING
.The Master learns about this znode, since it has a watcher for the parent
region-in-transition
znode.The RegionServer creates a sub-directory named
.splits
under the parent’sregion
directory in HDFS.The RegionServer closes the parent region and marks the region as offline in its local data structures. THE SPLITTING REGION IS NOW OFFLINE. At this point, client requests coming to the parent region will throw
NotServingRegionException
. The client will retry with some backoff. The closing region is flushed.The RegionServer creates region directories under the
.splits
directory, for daughter regions A and B, and creates necessary data structures. Then it splits the store files, in the sense that it creates two Reference files per store file in the parent region. Those reference files will point to the parent region’s files.The RegionServer creates the actual region directory in HDFS, and moves the reference files for each daughter.
The RegionServer sends a
Put
request to the.META.
table, to set the parent as offline in the.META.
table and add information about daughter regions. At this point, there won’t be individual entries in.META.
for the daughters. Clients will see that the parent region is split if they scan.META.
, but won’t know about the daughters until they appear in.META.
. Also, if thisPut
to.META
. succeeds, the parent will be effectively split. If the RegionServer fails before this RPC succeeds, Master and the next Region Server opening the region will clean dirty state about the region split. After the.META.
update, though, the region split will be rolled-forward by Master.The RegionServer opens daughters A and B in parallel.
The RegionServer adds the daughters A and B to
.META.
, together with information that it hosts the regions. THE SPLIT REGIONS (DAUGHTERS WITH REFERENCES TO PARENT) ARE NOW ONLINE. After this point, clients can discover the new regions and issue requests to them. Clients cache the.META.
entries locally, but when they make requests to the RegionServer or.META.
, their caches will be invalidated, and they will learn about the new regions from.META.
.The RegionServer updates znode
/hbase/region-in-transition/region-name
in ZooKeeper to stateSPLIT
, so that the master can learn about it. The balancer can freely re-assign the daughter regions to other region servers if necessary. THE SPLIT TRANSACTION IS NOW FINISHED.After the split,
.META.
and HDFS will still contain references to the parent region. Those references will be removed when compactions in daughter regions rewrite the data files. Garbage collection tasks in the master periodically check whether the daughter regions still refer to the parent region’s files. If not, the parent region will be removed.
71.7. Write Ahead Log (WAL)
71.7.1. Purpose
The Write Ahead Log (WAL) records all changes to data in HBase, to file-based storage. Under normal operations, the WAL is not needed because data changes move from the MemStore to StoreFiles. However, if a RegionServer crashes or becomes unavailable before the MemStore is flushed, the WAL ensures that the changes to the data can be replayed. If writing to the WAL fails, the entire operation to modify the data fails.
HBase uses an implementation of the WAL interface. Usually, there is only one instance of a WAL per RegionServer. An exception is the RegionServer that is carrying hbase:meta; the meta table gets its own dedicated WAL. The RegionServer records Puts and Deletes to its WAL, before recording them these Mutations MemStore for the affected Store.
The HLo
Prior to 2.0, the interface for WALs in HBase was named
HLog
. In 0.94, HLog was the name of the implementation of the WAL. You will likely find references to the HLog in documentation tailored to these older versions.
The WAL resides in HDFS in the /hbase/WALs/ directory, with subdirectories per region.
For more general information about the concept of write ahead logs, see the Wikipedia Write-Ahead Log article.
71.7.2. WAL Providers
In HBase, there are a number of WAL imlementations (or ‘Providers’). Each is known by a short name label (that unfortunately is not always descriptive). You set the provider in hbase-site.xml passing the WAL provder short-name as the value on the hbase.wal.provider property (Set the provider for hbase:meta using the hbase.wal.meta_provider property, otherwise it uses the same provider configured by hbase.wal.provider).
asyncfs: The default. New since hbase-2.0.0 (HBASE-15536, HBASE-14790). This AsyncFSWAL provider, as it identifies itself in RegionServer logs, is built on a new non-blocking dfsclient implementation. It is currently resident in the hbase codebase but intent is to move it back up into HDFS itself. WALs edits are written concurrently (“fan-out”) style to each of the WAL-block replicas on each DataNode rather than in a chained pipeline as the default client does. Latencies should be better. See Apache HBase Improements and Practices at Xiaomi at slide 14 onward for more detail on implementation.
filesystem: This was the default in hbase-1.x releases. It is built on the blocking DFSClient and writes to replicas in classic DFSCLient pipeline mode. In logs it identifies as FSHLog or FSHLogProvider.
multiwal: This provider is made of multiple instances of asyncfs or filesystem. See the next section for more on multiwal.
Look for the lines like the below in the RegionServer log to see which provider is in place (The below shows the default AsyncFSWALProvider):
2018-04-02 13:22:37,983 INFO [regionserver/ve0528:16020] wal.WALFactory: Instantiating WALProvider of type class org.apache.hadoop.hbase.wal.AsyncFSWALProvider
As the AsyncFSWAL hacks into the internal of DFSClient implementation, it will be easily broken by upgrading the hadoop dependencies, even for a simple patch release. So if you do not specify the wal provider explicitly, we will first try to use the asyncfs, if failed, we will fall back to use filesystem. And notice that this may not always work, so if you still have problem starting HBase due to the problem of starting AsyncFSWAL, please specify filesystem explicitly in the config file.
EC support has been added to hadoop-3.x, and it is incompatible with WAL as the EC output stream does not support hflush/hsync. In order to create a non-EC file in an EC directory, we need to use the new builder-based create API for FileSystem, but it is only introduced in hadoop-2.9+ and for HBase we still need to support hadoop-2.7.x. So please do not enable EC for the WAL directory until we find a way to deal with it.
71.7.3. MultiWAL
With a single WAL per RegionServer, the RegionServer must write to the WAL serially, because HDFS files must be sequential. This causes the WAL to be a performance bottleneck.
HBase 1.0 introduces support MultiWal in HBASE-5699. MultiWAL allows a RegionServer to write multiple WAL streams in parallel, by using multiple pipelines in the underlying HDFS instance, which increases total throughput during writes. This parallelization is done by partitioning incoming edits by their Region. Thus, the current implementation will not help with increasing the throughput to a single Region.
RegionServers using the original WAL implementation and those using the MultiWAL implementation can each handle recovery of either set of WALs, so a zero-downtime configuration update is possible through a rolling restart.
Configure MultiWAL
To configure MultiWAL for a RegionServer, set the value of the property hbase.wal.provider
to multiwal
by pasting in the following XML:
<property>
<name>hbase.wal.provider</name>
<value>multiwal</value>
</property>
Restart the RegionServer for the changes to take effect.
To disable MultiWAL for a RegionServer, unset the property and restart the RegionServer.
71.7.4. WAL Flushing
TODO (describe).
71.7.5. WAL Splitting
A RegionServer serves many regions. All of the regions in a region server share the same active WAL file. Each edit in the WAL file includes information about which region it belongs to. When a region is opened, the edits in the WAL file which belong to that region need to be replayed. Therefore, edits in the WAL file must be grouped by region so that particular sets can be replayed to regenerate the data in a particular region. The process of grouping the WAL edits by region is called log splitting. It is a critical process for recovering data if a region server fails.
Log splitting is done by the HMaster during cluster start-up or by the ServerShutdownHandler as a region server shuts down. So that consistency is guaranteed, affected regions are unavailable until data is restored. All WAL edits need to be recovered and replayed before a given region can become available again. As a result, regions affected by log splitting are unavailable until the process completes.
Procedure: Log Splitting, Step by Step
- The /hbase/WALs/,, directory is renamed.
Renaming the directory is important because a RegionServer may still be up and accepting requests even if the HMaster thinks it is down. If the RegionServer does not respond immediately and does not heartbeat its ZooKeeper session, the HMaster may interpret this as a RegionServer failure. Renaming the logs directory ensures that existing, valid WAL files which are still in use by an active but busy RegionServer are not written to by accident.
The new directory is named according to the following pattern:/hbase/WALs/<host>,<port>,<startcode>-splitting
An example of such a renamed directory might look like the following:
/hbase/WALs/srv.example.com,60020,1254173957298-splitting
- Each log file is split, one at a time.
The log splitter reads the log file one edit entry at a time and puts each edit entry into the buffer corresponding to the edit’s region. At the same time, the splitter starts several writer threads. Writer threads pick up a corresponding buffer and write the edit entries in the buffer to a temporary recovered edit file. The temporary edit file is stored to disk with the following naming pattern:/hbase/<table_name>/<region_id>/recovered.edits/.temp
This file is used to store all the edits in the WAL log for this region. After log splitting completes, the .temp file is renamed to the sequence ID of the first log written to the file.
To determine whether all edits have been written, the sequence ID is compared to the sequence of the last edit that was written to the HFile. If the sequence of the last edit is greater than or equal to the sequence ID included in the file name, it is clear that all writes from the edit file have been completed.
- After log splitting is complete, each affected region is assigned to a RegionServer.
When the region is opened, the recovered.edits folder is checked for recovered edits files. If any such files are present, they are replayed by reading the edits and saving them to the MemStore. After all edit files are replayed, the contents of the MemStore are written to disk (HFile) and the edit files are deleted.
Handling of Errors During Log Splitting
If you set the hbase.hlog.split.skip.errors
option to true
, errors are treated as follows:
Any error encountered during splitting will be logged.
The problematic WAL log will be moved into the .corrupt directory under the hbase
rootdir
,Processing of the WAL will continue
If the hbase.hlog.split.skip.errors
option is set to false
, the default, the exception will be propagated and the split will be logged as failed. See HBASE-2958 When hbase.hlog.split.skip.errors is set to false, we fail the split but that’s it. We need to do more than just fail split if this flag is set.
How EOFExceptions are treated when splitting a crashed RegionServer’s WALs
If an EOFException occurs while splitting logs, the split proceeds even when hbase.hlog.split.skip.errors
is set to false
. An EOFException while reading the last log in the set of files to split is likely, because the RegionServer was likely in the process of writing a record at the time of a crash. For background, see HBASE-2643 Figure how to deal with eof splitting logs
Performance Improvements during Log Splitting
WAL log splitting and recovery can be resource intensive and take a long time, depending on the number of RegionServers involved in the crash and the size of the regions. Enabling or Disabling Distributed Log Splitting was developed to improve performance during log splitting.
Enabling or Disabling Distributed Log Splitting
Distributed log processing is enabled by default since HBase 0.92. The setting is controlled by the hbase.master.distributed.log.splitting
property, which can be set to true
or false
, but defaults to true
.
Distributed Log Splitting, Step by Step
After configuring distributed log splitting, the HMaster controls the process. The HMaster enrolls each RegionServer in the log splitting process, and the actual work of splitting the logs is done by the RegionServers. The general process for log splitting, as described in Distributed Log Splitting, Step by Step still applies here.
If distributed log processing is enabled, the HMaster creates a split log manager instance when the cluster is started.
The split log manager manages all log files which need to be scanned and split.
The split log manager places all the logs into the ZooKeeper splitWAL node (/hbase/splitWAL) as tasks.
You can view the contents of the splitWAL by issuing the following
zkCli
command. Example output is shown.ls /hbase/splitWAL
[hdfs%3A%2F%2Fhost2.sample.com%3A56020%2Fhbase%2FWALs%2Fhost8.sample.com%2C57020%2C1340474893275-splitting%2Fhost8.sample.com%253A57020.1340474893900,
hdfs%3A%2F%2Fhost2.sample.com%3A56020%2Fhbase%2FWALs%2Fhost3.sample.com%2C57020%2C1340474893299-splitting%2Fhost3.sample.com%253A57020.1340474893931,
hdfs%3A%2F%2Fhost2.sample.com%3A56020%2Fhbase%2FWALs%2Fhost4.sample.com%2C57020%2C1340474893287-splitting%2Fhost4.sample.com%253A57020.1340474893946]
The output contains some non-ASCII characters. When decoded, it looks much more simple:
[hdfs://host2.sample.com:56020/hbase/WALs
/host8.sample.com,57020,1340474893275-splitting
/host8.sample.com%3A57020.1340474893900,
hdfs://host2.sample.com:56020/hbase/WALs
/host3.sample.com,57020,1340474893299-splitting
/host3.sample.com%3A57020.1340474893931,
hdfs://host2.sample.com:56020/hbase/WALs
/host4.sample.com,57020,1340474893287-splitting
/host4.sample.com%3A57020.1340474893946]
The listing represents WAL file names to be scanned and split, which is a list of log splitting tasks.
The split log manager monitors the log-splitting tasks and workers.
The split log manager is responsible for the following ongoing tasks:Once the split log manager publishes all the tasks to the splitWAL znode, it monitors these task nodes and waits for them to be processed.
Checks to see if there are any dead split log workers queued up. If it finds tasks claimed by unresponsive workers, it will resubmit those tasks. If the resubmit fails due to some ZooKeeper exception, the dead worker is queued up again for retry.
Checks to see if there are any unassigned tasks. If it finds any, it create an ephemeral rescan node so that each split log worker is notified to re-scan unassigned tasks via the
nodeChildrenChanged
ZooKeeper event.Checks for tasks which are assigned but expired. If any are found, they are moved back to
TASK_UNASSIGNED
state again so that they can be retried. It is possible that these tasks are assigned to slow workers, or they may already be finished. This is not a problem, because log splitting tasks have the property of idempotence. In other words, the same log splitting task can be processed many times without causing any problem.The split log manager watches the HBase split log znodes constantly. If any split log task node data is changed, the split log manager retrieves the node data. The node data contains the current state of the task. You can use the
zkCli
get
command to retrieve the current state of a task. In the example output below, the first line of the output shows that the task is currently unassigned. ``` get /hbase/splitWAL/hdfs%3A%2F%2Fhost2.sample.com%3A56020%2Fhbase%2FWALs%2Fhost6.sample.com%2C57020%2C1340474893287-splitting%2Fhost6.sample.com%253A57020.1340474893945
unassigned host2.sample.com:57000 cZxid = 0×7115 ctime = Sat Jun 23 11:13:40 PDT 2012 …
<br />Based on the state of the task whose data is changed, the split log manager does one of the following:
-
Resubmit the task if it is unassigned
-
Heartbeat the task if it is assigned
-
Resubmit or fail the task if it is resigned (see [Reasons a Task Will Fail](docs_en_#distributed.log.replay.failure.reasons))
-
Resubmit or fail the task if it is completed with errors (see [Reasons a Task Will Fail](docs_en_#distributed.log.replay.failure.reasons))
-
Resubmit or fail the task if it could not complete due to errors (see [Reasons a Task Will Fail](docs_en_#distributed.log.replay.failure.reasons))
-
Delete the task if it is successfully completed or failed
> Reasons a Task Will Fail
> -
The task has been deleted.
> -
The node no longer exists.
> -
The log status manager failed to move the state of the task to `TASK_UNASSIGNED`.
> -
The number of resubmits is over the resubmit threshold.
3.
Each RegionServer’s split log worker performs the log-splitting tasks.
<br />Each RegionServer runs a daemon thread called the _split log worker_, which does the work to split the logs. The daemon thread starts when the RegionServer starts, and registers itself to watch HBase znodes. If any splitWAL znode children change, it notifies a sleeping worker thread to wake up and grab more tasks. If a worker’s current task’s node data is changed, the worker checks to see if the task has been taken by another worker. If so, the worker thread stops work on the current task.
<br />The worker monitors the splitWAL znode constantly. When a new task appears, the split log worker retrieves the task paths and checks each one until it finds an unclaimed task, which it attempts to claim. If the claim was successful, it attempts to perform the task and updates the task’s `state` property based on the splitting outcome. At this point, the split log worker scans for another unclaimed task.
<br />How the Split Log Worker Approaches a Task
-
It queries the task state and only takes action if the task is in `TASK_UNASSIGNED`state.
-
If the task is in `TASK_UNASSIGNED` state, the worker attempts to set the state to `TASK_OWNED` by itself. If it fails to set the state, another worker will try to grab it. The split log manager will also ask all workers to rescan later if the task remains unassigned.
-
If the worker succeeds in taking ownership of the task, it tries to get the task state again to make sure it really gets it asynchronously. In the meantime, it starts a split task executor to do the actual work:
-
Get the HBase root folder, create a temp folder under the root, and split the log file to the temp folder.
-
If the split was successful, the task executor sets the task to state `TASK_DONE`.
-
If the worker catches an unexpected IOException, the task is set to state `TASK_ERR`.
-
If the worker is shutting down, set the task to state `TASK_RESIGNED`.
-
If the task is taken by another worker, just log it.
4.
The split log manager monitors for uncompleted tasks.
<br />The split log manager returns when all tasks are completed successfully. If all tasks are completed with some failures, the split log manager throws an exception so that the log splitting can be retried. Due to an asynchronous implementation, in very rare cases, the split log manager loses track of some completed tasks. For that reason, it periodically checks for remaining uncompleted task in its task map or ZooKeeper. If none are found, it throws an exception so that the log splitting can be retried right away instead of hanging there waiting for something that won’t happen.
<a name="b38b39c5"></a>
#### 71.7.6. WAL Compression
The content of the WAL can be compressed using LRU Dictionary compression. This can be used to speed up WAL replication to different datanodes. The dictionary can store up to 2 elements; eviction starts after this number is exceeded.
To enable WAL compression, set the `hbase.regionserver.wal.enablecompression` property to `true`. The default value for this property is `false`. By default, WAL tag compression is turned on when WAL compression is enabled. You can turn off WAL tag compression by setting the `hbase.regionserver.wal.tags.enablecompression` property to 'false'.
A possible downside to WAL compression is that we lose more data from the last block in the WAL if it ill-terminated mid-write. If entries in this last block were added with new dictionary entries but we failed persist the amended dictionary because of an abrupt termination, a read of this last block may not be able to resolve last-written entries.
<a name="5d8d2087"></a>
#### 71.7.7. Durability
It is possible to set _durability_ on each Mutation or on a Table basis. Options include:
-
_SKIP_WAL_: Do not write Mutations to the WAL (See the next section, [Disabling the WAL](docs_en_#wal.disable)).
-
_ASYNC_WAL_: Write the WAL asynchronously; do not hold-up clients waiting on the sync of their write to the filesystem but return immediately. The edit becomes visible. Meanwhile, in the background, the Mutation will be flushed to the WAL at some time later. This option currently may lose data. See HBASE-16689.
-
_SYNC_WAL_: The **default**. Each edit is sync’d to HDFS before we return success to the client.
-
_FSYNC_WAL_: Each edit is fsync’d to HDFS and the filesystem before we return success to the client.
Do not confuse the _ASYNC_WAL_ option on a Mutation or Table with the _AsyncFSWAL_ writer; they are distinct options unfortunately closely named
<a name="fdf61ad6"></a>
#### 71.7.8. Disabling the WAL
It is possible to disable the WAL, to improve performance in certain specific situations. However, disabling the WAL puts your data at risk. The only situation where this is recommended is during a bulk load. This is because, in the event of a problem, the bulk load can be re-run with no risk of data loss.
The WAL is disabled by calling the HBase client field `Mutation.writeToWAL(false)`. Use the `Mutation.setDurability(Durability.SKIP_WAL)` and Mutation.getDurability() methods to set and get the field’s value. There is no way to disable the WAL for only a specific table.
> If you disable the WAL for anything other than bulk loads, your data is at risk.
<a name="f40c00d9"></a>
## 72. Regions
Regions are the basic element of availability and distribution for tables, and are comprised of a Store per Column Family. The hierarchy of objects is as follows:
Table (HBase table) Region (Regions for the table) Store (Store per ColumnFamily for each Region for the table) MemStore (MemStore for each Store for each Region for the table) StoreFile (StoreFiles for each Store for each Region for the table) Block (Blocks within a StoreFile within a Store for each Region for the table)
For a description of what HBase files look like when written to HDFS, see [Browsing HDFS for HBase Objects](docs_en_#trouble.namenode.hbase.objects).
<a name="8614c52e"></a>
### 72.1. Considerations for Number of Regions
In general, HBase is designed to run with a small (20-200) number of relatively large (5-20Gb) regions per server. The considerations for this are as follows:
<a name="a21e0cb8"></a>
#### 72.1.1. Why should I keep my Region count low?
Typically you want to keep your region count low on HBase for numerous reasons. Usually right around 100 regions per RegionServer has yielded the best results. Here are some of the reasons below for keeping region count low:
1.
MSLAB (MemStore-local allocation buffer) requires 2MB per MemStore (that’s 2MB per family per region). 1000 regions that have 2 families each is 3.9GB of heap used, and it’s not even storing data yet. NB: the 2MB value is configurable.
2.
If you fill all the regions at somewhat the same rate, the global memory usage makes it that it forces tiny flushes when you have too many regions which in turn generates compactions. Rewriting the same data tens of times is the last thing you want. An example is filling 1000 regions (with one family) equally and let’s consider a lower bound for global MemStore usage of 5GB (the region server would have a big heap). Once it reaches 5GB it will force flush the biggest region, at that point they should almost all have about 5MB of data so it would flush that amount. 5MB inserted later, it would flush another region that will now have a bit over 5MB of data, and so on. This is currently the main limiting factor for the number of regions; see [Number of regions per RS - upper bound](docs_en_#ops.capacity.regions.count) for detailed formula.
3.
The master as is is allergic to tons of regions, and will take a lot of time assigning them and moving them around in batches. The reason is that it’s heavy on ZK usage, and it’s not very async at the moment (could really be improved — and has been improved a bunch in 0.96 HBase).
4.
In older versions of HBase (pre-HFile v2, 0.90 and previous), tons of regions on a few RS can cause the store file index to rise, increasing heap usage and potentially creating memory pressure or OOME on the RSs
Another issue is the effect of the number of regions on MapReduce jobs; it is typical to have one mapper per HBase region. Thus, hosting only 5 regions per RS may not be enough to get sufficient number of tasks for a MapReduce job, while 1000 regions will generate far too many tasks.
See [Determining region count and size](docs_en_#ops.capacity.regions) for configuration guidelines.
<a name="12f518b9"></a>
### 72.2. Region-RegionServer Assignment
This section describes how Regions are assigned to RegionServers.
<a name="26d5d586"></a>
#### 72.2.1. Startup
When HBase starts regions are assigned as follows (short version):
1.
The Master invokes the `AssignmentManager` upon startup.
2.
The `AssignmentManager` looks at the existing region assignments in `hbase:meta`.
3.
If the region assignment is still valid (i.e., if the RegionServer is still online) then the assignment is kept.
4.
If the assignment is invalid, then the `LoadBalancerFactory` is invoked to assign the region. The load balancer (`StochasticLoadBalancer` by default in HBase 1.0) assign the region to a RegionServer.
5.
`hbase:meta` is updated with the RegionServer assignment (if needed) and the RegionServer start codes (start time of the RegionServer process) upon region opening by the RegionServer.
<a name="a5c57c18"></a>
#### 72.2.2. Failover
When a RegionServer fails:
1.
The regions immediately become unavailable because the RegionServer is down.
2.
The Master will detect that the RegionServer has failed.
3.
The region assignments will be considered invalid and will be re-assigned just like the startup sequence.
4.
In-flight queries are re-tried, and not lost.
5.
Operations are switched to a new RegionServer within the following amount of time:
ZooKeeper session timeout + split time + assignment/replay time
<a name="eec405f5"></a>
#### 72.2.3. Region Load Balancing
Regions can be periodically moved by the [LoadBalancer](docs_en_#master.processes.loadbalancer).
<a name="a3248bc4"></a>
#### 72.2.4. Region State Transition
HBase maintains a state for each region and persists the state in `hbase:meta`. The state of the `hbase:meta` region itself is persisted in ZooKeeper. You can see the states of regions in transition in the Master web UI. Following is the list of possible region states.
Possible Region States
-
`OFFLINE`: the region is offline and not opening
-
`OPENING`: the region is in the process of being opened
-
`OPEN`: the region is open and the RegionServer has notified the master
-
`FAILED_OPEN`: the RegionServer failed to open the region
-
`CLOSING`: the region is in the process of being closed
-
`CLOSED`: the RegionServer has closed the region and notified the master
-
`FAILED_CLOSE`: the RegionServer failed to close the region
-
`SPLITTING`: the RegionServer notified the master that the region is splitting
-
`SPLIT`: the RegionServer notified the master that the region has finished splitting
-
`SPLITTING_NEW`: this region is being created by a split which is in progress
-
`MERGING`: the RegionServer notified the master that this region is being merged with another region
-
`MERGED`: the RegionServer notified the master that this region has been merged
-
`MERGING_NEW`: this region is being created by a merge of two regions
Figure 2. Region State TransitionsGraph Legend
-
Brown: Offline state, a special state that can be transient (after closed before opening), terminal (regions of disabled tables), or initial (regions of newly created tables)
-
Palegreen: Online state that regions can serve requests
-
Lightblue: Transient states
-
Red: Failure states that need OPS attention
-
Gold: Terminal states of regions split/merged
-
Grey: Initial states of regions created through split/merge
Transition State Descriptions
1.
The master moves a region from `OFFLINE` to `OPENING` state and tries to assign the region to a RegionServer. The RegionServer may or may not have received the open region request. The master retries sending the open region request to the RegionServer until the RPC goes through or the master runs out of retries. After the RegionServer receives the open region request, the RegionServer begins opening the region.
2.
If the master is running out of retries, the master prevents the RegionServer from opening the region by moving the region to `CLOSING` state and trying to close it, even if the RegionServer is starting to open the region.
3.
After the RegionServer opens the region, it continues to try to notify the master until the master moves the region to `OPEN` state and notifies the RegionServer. The region is now open.
4.
If the RegionServer cannot open the region, it notifies the master. The master moves the region to `CLOSED` state and tries to open the region on a different RegionServer.
5.
If the master cannot open the region on any of a certain number of regions, it moves the region to `FAILED_OPEN` state, and takes no further action until an operator intervenes from the HBase shell, or the server is dead.
6.
The master moves a region from `OPEN` to `CLOSING` state. The RegionServer holding the region may or may not have received the close region request. The master retries sending the close request to the server until the RPC goes through or the master runs out of retries.
7.
If the RegionServer is not online, or throws `NotServingRegionException`, the master moves the region to `OFFLINE` state and re-assigns it to a different RegionServer.
8.
If the RegionServer is online, but not reachable after the master runs out of retries, the master moves the region to `FAILED_CLOSE` state and takes no further action until an operator intervenes from the HBase shell, or the server is dead.
9.
If the RegionServer gets the close region request, it closes the region and notifies the master. The master moves the region to `CLOSED` state and re-assigns it to a different RegionServer.
10.
Before assigning a region, the master moves the region to `OFFLINE` state automatically if it is in `CLOSED` state.
11.
When a RegionServer is about to split a region, it notifies the master. The master moves the region to be split from `OPEN` to `SPLITTING` state and add the two new regions to be created to the RegionServer. These two regions are in `SPLITTING_NEW` state initially.
12.
After notifying the master, the RegionServer starts to split the region. Once past the point of no return, the RegionServer notifies the master again so the master can update the `hbase:meta` table. However, the master does not update the region states until it is notified by the server that the split is done. If the split is successful, the splitting region is moved from `SPLITTING` to `SPLIT` state and the two new regions are moved from `SPLITTING_NEW` to `OPEN` state.
13.
If the split fails, the splitting region is moved from `SPLITTING` back to `OPEN` state, and the two new regions which were created are moved from `SPLITTING_NEW` to `OFFLINE` state.
14.
When a RegionServer is about to merge two regions, it notifies the master first. The master moves the two regions to be merged from `OPEN` to `MERGING` state, and adds the new region which will hold the contents of the merged regions region to the RegionServer. The new region is in `MERGING_NEW` state initially.
15.
After notifying the master, the RegionServer starts to merge the two regions. Once past the point of no return, the RegionServer notifies the master again so the master can update the META. However, the master does not update the region states until it is notified by the RegionServer that the merge has completed. If the merge is successful, the two merging regions are moved from `MERGING` to `MERGED` state and the new region is moved from `MERGING_NEW` to `OPEN` state.
16.
If the merge fails, the two merging regions are moved from `MERGING` back to `OPEN` state, and the new region which was created to hold the contents of the merged regions is moved from `MERGING_NEW` to `OFFLINE` state.
17.
For regions in `FAILED_OPEN` or `FAILED_CLOSE` states, the master tries to close them again when they are reassigned by an operator via HBase Shell.
<a name="35ca970a"></a>
### 72.3. Region-RegionServer Locality
Over time, Region-RegionServer locality is achieved via HDFS block replication. The HDFS client does the following by default when choosing locations to write replicas:
1.
First replica is written to local node
2.
Second replica is written to a random node on another rack
3.
Third replica is written on the same rack as the second, but on a different node chosen randomly
4.
Subsequent replicas are written on random nodes on the cluster. See _Replica Placement: The First Baby Steps_ on this page: [HDFS Architecture](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html)
Thus, HBase eventually achieves locality for a region after a flush or a compaction. In a RegionServer failover situation a RegionServer may be assigned regions with non-local StoreFiles (because none of the replicas are local), however as new data is written in the region, or the table is compacted and StoreFiles are re-written, they will become "local" to the RegionServer.
For more information, see _Replica Placement: The First Baby Steps_ on this page: [HDFS Architecture](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html) and also Lars George’s blog on [HBase and HDFS locality](http://www.larsgeorge.com/2010/05/hbase-file-locality-in-hdfs.html).
<a name="7b4616e4"></a>
### 72.4. Region Splits
Regions split when they reach a configured threshold. Below we treat the topic in short. For a longer exposition, see [Apache HBase Region Splitting and Merging](http://hortonworks.com/blog/apache-hbase-region-splitting-and-merging/) by our Enis Soztutar.
Splits run unaided on the RegionServer; i.e. the Master does not participate. The RegionServer splits a region, offlines the split region and then adds the daughter regions to `hbase:meta`, opens daughters on the parent’s hosting RegionServer and then reports the split to the Master. See [Managed Splitting](docs_en_#disable.splitting) for how to manually manage splits (and for why you might do this).
<a name="59252ab6"></a>
#### 72.4.1. Custom Split Policies
You can override the default split policy using a custom [RegionSplitPolicy](https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.html)(HBase 0.94+). Typically a custom split policy should extend HBase’s default split policy: [IncreasingToUpperBoundRegionSplitPolicy](https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/regionserver/IncreasingToUpperBoundRegionSplitPolicy.html).
The policy can set globally through the HBase configuration or on a per-table basis.
Configuring the Split Policy Globally in _hbase-site.xml_
Configuring a Split Policy On a Table Using the Java API
HTableDescriptor tableDesc = new HTableDescriptor(“test”);
tableDesc.setValue(HTableDescriptor.SPLITPOLICY, ConstantSizeRegionSplitPolicy.class.getName());
tableDesc.addFamily(new HColumnDescriptor(Bytes.toBytes(“cf1”)));
admin.createTable(tableDesc);
——
Configuring the Split Policy On a Table Using HBase Shell
hbase> create ‘test’, {METADATA => {‘SPLIT_POLICY’ => ‘org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy’}},{NAME => ‘cf1’}
The policy can be set globally through the HBaseConfiguration used or on a per table basis:
HTableDescriptor myHtd = …;
myHtd.setValue(HTableDescriptor.SPLIT_POLICY, MyCustomSplitPolicy.class.getName());
``
> The
DisabledRegionSplitPolicy` policy blocks manual region splitting.
### 72.5. Manual Region Splitting
It is possible to manually split your table, either at table creation (pre-splitting), or at a later time as an administrative action. You might choose to split your region for one or more of the following reasons. There may be other valid reasons, but the need to manually split your table might also point to problems with your schema design.
Reasons to Manually Split Your Table
-
Your data is sorted by timeseries or another similar algorithm that sorts new data at the end of the table. This means that the Region Server holding the last region is always under load, and the other Region Servers are idle, or mostly idle. See also [Monotonically Increasing Row Keys/Timeseries Data](docs_en#timeseries).
-
You have developed an unexpected hotspot in one region of your table. For instance, an application which tracks web searches might be inundated by a lot of searches for a celebrity in the event of news about that celebrity. See perf.one.region for more discussion about this particular scenario.
-
After a big increase in the number of RegionServers in your cluster, to get the load spread out quickly.
-
Before a bulk-load which is likely to cause unusual and uneven load across regions.
See Managed Splitting for a discussion about the dangers and possible benefits of managing splitting completely manually.
> The DisabledRegionSplitPolicy
policy blocks manual region splitting.
#### 72.5.1. Determining Split Points
The goal of splitting your table manually is to improve the chances of balancing the load across the cluster in situations where good rowkey design alone won’t get you there. Keeping that in mind, the way you split your regions is very dependent upon the characteristics of your data. It may be that you already know the best way to split your table. If not, the way you split your table depends on what your keys are like.
Alphanumeric Rowkeys
If your rowkeys start with a letter or number, you can split your table at letter or number boundaries. For instance, the following command creates a table with regions that split at each vowel, so the first region has A-D, the second region has E-H, the third region has I-N, the fourth region has O-V, and the fifth region has U-Z.
Using a Custom Algorithm
The RegionSplitter tool is provided with HBase, and uses a SplitAlgorithm to determine split points for you. As parameters, you give it the algorithm, desired number of regions, and column families. It includes three split algorithms. The first is the [HexStringSplit](https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/util/RegionSplitter.HexStringSplit.html)
algorithm, which assumes the row keys are hexadecimal strings. The second is the [DecimalStringSplit](https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/util/RegionSplitter.DecimalStringSplit.html)
algorithm, which assumes the row keys are decimal strings in the range 00000000 to 99999999. The third, [UniformSplit](https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/util/RegionSplitter.UniformSplit.html)
, assumes the row keys are random byte arrays. You will probably need to develop your own [SplitAlgorithm](https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/util/RegionSplitter.SplitAlgorithm.html)
, using the provided ones as models.
### 72.6. Online Region Merges
Both Master and RegionServer participate in the event of online region merges. Client sends merge RPC to the master, then the master moves the regions together to the RegionServer where the more heavily loaded region resided. Finally the master sends the merge request to this RegionServer which then runs the merge. Similar to process of region splitting, region merges run as a local transaction on the RegionServer. It offlines the regions and then merges two regions on the file system, atomically delete merging regions from hbase:meta
and adds the merged region to hbase:meta
, opens the merged region on the RegionServer and reports the merge to the Master.
An example of region merges in the HBase shell
$ hbase> merge_region 'ENCODED_REGIONNAME', 'ENCODED_REGIONNAME'
$ hbase> merge_region 'ENCODED_REGIONNAME', 'ENCODED_REGIONNAME', true
It’s an asynchronous operation and call returns immediately without waiting merge completed. Passing true
as the optional third parameter will force a merge. Normally only adjacent regions can be merged. The force
parameter overrides this behaviour and is for expert use only.
### 72.7. Store
A Store hosts a MemStore and 0 or more StoreFiles (HFiles). A Store corresponds to a column family for a table for a given region.
#### 72.7.1. MemStore
The MemStore holds in-memory modifications to the Store. Modifications are Cells/KeyValues. When a flush is requested, the current MemStore is moved to a snapshot and is cleared. HBase continues to serve edits from the new MemStore and backing snapshot until the flusher reports that the flush succeeded. At this point, the snapshot is discarded. Note that when the flush happens, MemStores that belong to the same region will all be flushed.
#### 72.7.2. MemStore Flush
A MemStore flush can be triggered under any of the conditions listed below. The minimum flush unit is per region, not at individual MemStore level.
1.
When a MemStore reaches the size specified by hbase.hregion.memstore.flush.size
, all MemStores that belong to its region will be flushed out to disk.
2.
When the overall MemStore usage reaches the value specified by hbase.regionserver.global.memstore.upperLimit
, MemStores from various regions will be flushed out to disk to reduce overall MemStore usage in a RegionServer.
The flush order is based on the descending order of a region’s MemStore usage.
Regions will have their MemStores flushed until the overall MemStore usage drops to or slightly below
hbase.regionserver.global.memstore.lowerLimit
.
3.
When the number of WAL log entries in a given region server’s WAL reaches the value specified in hbase.regionserver.max.logs
, MemStores from various regions will be flushed out to disk to reduce the number of logs in the WAL.
The flush order is based on time.
Regions with the oldest MemStores are flushed first until WAL count drops below
hbase.regionserver.max.logs
.
#### 72.7.3. Scans
-
When a client issues a scan against a table, HBase generates RegionScanner
objects, one per region, to serve the scan request.
-
The RegionScanner
object contains a list of StoreScanner
objects, one per column family.
-
Each StoreScanner
object further contains a list of StoreFileScanner
objects, corresponding to each StoreFile and HFile of the corresponding column family, and a list of KeyValueScanner
objects for the MemStore.
-
The two lists are merged into one, which is sorted in ascending order with the scan object for the MemStore at the end of the list.
-
When a StoreFileScanner
object is constructed, it is associated with a MultiVersionConcurrencyControl
read point, which is the current memstoreTS
, filtering out any new updates beyond the read point.
#### 72.7.4. StoreFile (HFile)
StoreFiles are where your data lives.
##### HFile Format
The HFile file format is based on the SSTable file described in the [BigTable 2006] paper and on Hadoop’s TFile (The unit test suite and the compression harness were taken directly from TFile). Schubert Zhang’s blog post on HFile: A Block-Indexed File Format to Store Sorted Key-Value Pairs makes for a thorough introduction to HBase’s HFile. Matteo Bertozzi has also put up a helpful description, HBase I/O: HFile.
For more information, see the HFile source code. Also see HBase file format with inline blocks (version 2) for information about the HFile v2 format that was included in 0.92.
##### HFile Tool
To view a textualized version of HFile content, you can use the hbase hfile
tool. Type the following to see usage:
$ ${HBASE_HOME}/bin/hbase hfile
For example, to view the content of the file hdfs://10.81.47.41:8020/hbase/default/TEST/1418428042/DSMP/4759508618286845475, type the following:
$ ${HBASE_HOME}/bin/hbase hfile -v -f hdfs://10.81.47.41:8020/hbase/default/TEST/1418428042/DSMP/4759508618286845475
If you leave off the option -v to see just a summary on the HFile. See usage for other things to do with the hfile
tool.
> In the output of this tool, you might see ‘seqid=0’ for certain keys in places such as ‘Mid-key’/‘firstKey’/‘lastKey’. These are ‘KeyOnlyKeyValue’ type instances - meaning their seqid is irrelevant & we just need the keys of these Key-Value instances.
##### StoreFile Directory Structure on HDFS
For more information of what StoreFiles look like on HDFS with respect to the directory structure, see Browsing HDFS for HBase Objects.
#### 72.7.5. Blocks
StoreFiles are composed of blocks. The blocksize is configured on a per-ColumnFamily basis.
Compression happens at the block level within StoreFiles. For more information on compression, see Compression and Data Block Encoding In HBase.
For more information on blocks, see the HFileBlock source code.
#### 72.7.6. KeyValue
The KeyValue class is the heart of data storage in HBase. KeyValue wraps a byte array and takes offsets and lengths into the passed array which specify where to start interpreting the content as KeyValue.
The KeyValue format inside a byte array is:
-
keylength
-
valuelength
-
key
-
value
The Key is further decomposed as:
-
rowlength
-
row (i.e., the rowkey)
-
columnfamilylength
-
columnfamily
-
columnqualifier
-
timestamp
-
keytype (e.g., Put, Delete, DeleteColumn, DeleteFamily)
KeyValue instances are not split across blocks. For example, if there is an 8 MB KeyValue, even if the block-size is 64kb this KeyValue will be read in as a coherent block. For more information, see the KeyValue source code.
##### Example
To emphasize the points above, examine what happens with two Puts for two different columns for the same row:
-
Put #1: rowkey=row1, cf:attr1=value1
-
Put #2: rowkey=row1, cf:attr2=value2
Even though these are for the same row, a KeyValue is created for each column:
Key portion for Put #1:
-
rowlength -----------→ 4
-
row -----------------→ row1
-
columnfamilylength --→ 2
-
columnfamily --------→ cf
-
columnqualifier -----→ attr1
-
timestamp -----------→ server time of Put
-
keytype -------------→ Put
Key portion for Put #2:
-
rowlength -----------→ 4
-
row -----------------→ row1
-
columnfamilylength --→ 2
-
columnfamily --------→ cf
-
columnqualifier -----→ attr2
-
timestamp -----------→ server time of Put
-
keytype -------------→ Put
It is critical to understand that the rowkey, ColumnFamily, and column (aka columnqualifier) are embedded within the KeyValue instance. The longer these identifiers are, the bigger the KeyValue is.
#### 72.7.7. Compaction
Ambiguous Terminology
-
A StoreFile is a facade of HFile. In terms of compaction, use of StoreFile seems to have prevailed in the past.
-
A Store is the same thing as a ColumnFamily. StoreFiles are related to a Store, or ColumnFamily.
-
If you want to read more about StoreFiles versus HFiles and Stores versus ColumnFamilies, see HBASE-11316.
When the MemStore reaches a given size (hbase.hregion.memstore.flush.size
), it flushes its contents to a StoreFile. The number of StoreFiles in a Store increases over time. Compaction is an operation which reduces the number of StoreFiles in a Store, by merging them together, in order to increase performance on read operations. Compactions can be resource-intensive to perform, and can either help or hinder performance depending on many factors.
Compactions fall into two categories: minor and major. Minor and major compactions differ in the following ways.
Minor compactions usually select a small number of small, adjacent StoreFiles and rewrite them as a single StoreFile. Minor compactions do not drop (filter out) deletes or expired versions, because of potential side effects. See Compaction and Deletions and Compaction and Versions for information on how deletes and versions are handled in relation to compactions. The end result of a minor compaction is fewer, larger StoreFiles for a given Store.
The end result of a major compaction is a single StoreFile per Store. Major compactions also process delete markers and max versions. See Compaction and Deletions and Compaction and Versions for information on how deletes and versions are handled in relation to compactions.
Compaction and Deletions
When an explicit deletion occurs in HBase, the data is not actually deleted. Instead, a tombstone marker is written. The tombstone marker prevents the data from being returned with queries. During a major compaction, the data is actually deleted, and the tombstone marker is removed from the StoreFile. If the deletion happens because of an expired TTL, no tombstone is created. Instead, the expired data is filtered out and is not written back to the compacted StoreFile.
Compaction and Versions
When you create a Column Family, you can specify the maximum number of versions to keep, by specifying HColumnDescriptor.setMaxVersions(int versions)
. The default value is 3
. If more versions than the specified maximum exist, the excess versions are filtered out and not written back to the compacted StoreFile.
> Major Compactions Can Impact Query Result
> In some situations, older versions can be inadvertently resurrected if a newer version is explicitly deleted. See Major compactions change query results for a more in-depth explanation. This situation is only possible before the compaction finishes.
In theory, major compactions improve performance. However, on a highly loaded system, major compactions can require an inappropriate number of resources and adversely affect performance. In a default configuration, major compactions are scheduled automatically to run once in a 7-day period. This is sometimes inappropriate for systems in production. You can manage major compactions manually. See Managed Compactions.
Compactions do not perform region merges. See Merge for more information on region merging.
Compaction Switch
We can switch on and off the compactions at region servers. Switching off compactions will also interrupt any currently ongoing compactions. It can be done dynamically using the “compactionswitch” command from hbase shell. If done from the command line, this setting will be lost on restart of the server. To persist the changes across region servers modify the configuration hbase.regionserver .compaction.enabled in hbase-site.xml and restart HBase.
##### Compaction Policy - HBase 0.96.x and newer
Compacting large StoreFiles, or too many StoreFiles at once, can cause more IO load than your cluster is able to handle without causing performance problems. The method by which HBase selects which StoreFiles to include in a compaction (and whether the compaction is a minor or major compaction) is called the _compaction policy.
Prior to HBase 0.96.x, there was only one compaction policy. That original compaction policy is still available as RatioBasedCompactionPolicy
. The new compaction default policy, called ExploringCompactionPolicy
, was subsequently backported to HBase 0.94 and HBase 0.95, and is the default in HBase 0.96 and newer. It was implemented in HBASE-7842. In short, ExploringCompactionPolicy
attempts to select the best possible set of StoreFiles to compact with the least amount of work, while the RatioBasedCompactionPolicy
selects the first set that meets the criteria.
Regardless of the compaction policy used, file selection is controlled by several configurable parameters and happens in a multi-step approach. These parameters will be explained in context, and then will be given in a table which shows their descriptions, defaults, and implications of changing them.
###### Being Stuck
When the MemStore gets too large, it needs to flush its contents to a StoreFile. However, Stores are configured with a bound on the number StoreFiles, hbase.hstore.blockingStoreFiles
, and if in excess, the MemStore flush must wait until the StoreFile count is reduced by one or more compactions. If the MemStore is too large and the number of StoreFiles is also too high, the algorithm is said to be “stuck”. By default we’ll wait on compactions up to hbase.hstore.blockingWaitTime
milliseconds. If this period expires, we’ll flush anyways even though we are in excess of the hbase.hstore.blockingStoreFiles
count.
Upping the hbase.hstore.blockingStoreFiles
count will allow flushes to happen but a Store with many StoreFiles in will likely have higher read latencies. Try to figure why Compactions are not keeping up. Is it a write spurt that is bringing about this situation or is a regular occurance and the cluster is under-provisioned for the volume of writes?
###### The ExploringCompactionPolicy Algorithm
The ExploringCompactionPolicy algorithm considers each possible set of adjacent StoreFiles before choosing the set where compaction will have the most benefit.
One situation where the ExploringCompactionPolicy works especially well is when you are bulk-loading data and the bulk loads create larger StoreFiles than the StoreFiles which are holding data older than the bulk-loaded data. This can “trick” HBase into choosing to perform a major compaction each time a compaction is needed, and cause a lot of extra overhead. With the ExploringCompactionPolicy, major compactions happen much less frequently because minor compactions are more efficient.
In general, ExploringCompactionPolicy is the right choice for most situations, and thus is the default compaction policy. You can also use ExploringCompactionPolicy along with Experimental: Stripe Compactions.
The logic of this policy can be examined in hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java. The following is a walk-through of the logic of the ExploringCompactionPolicy.
1.
Make a list of all existing StoreFiles in the Store. The rest of the algorithm filters this list to come up with the subset of HFiles which will be chosen for compaction.
2.
If this was a user-requested compaction, attempt to perform the requested compaction type, regardless of what would normally be chosen. Note that even if the user requests a major compaction, it may not be possible to perform a major compaction. This may be because not all StoreFiles in the Column Family are available to compact or because there are too many Stores in the Column Family.
3.
Some StoreFiles are automatically excluded from consideration. These include:
-
StoreFiles that are larger than hbase.hstore.compaction.max.size
-
StoreFiles that were created by a bulk-load operation which explicitly excluded compaction. You may decide to exclude StoreFiles resulting from bulk loads, from compaction. To do this, specify the hbase.mapreduce.hfileoutputformat.compaction.exclude
parameter during the bulk load operation.
4.
Iterate through the list from step 1, and make a list of all potential sets of StoreFiles to compact together. A potential set is a grouping of hbase.hstore.compaction.min
contiguous StoreFiles in the list. For each set, perform some sanity-checking and figure out whether this is the best compaction that could be done:
-
If the number of StoreFiles in this set (not the size of the StoreFiles) is fewer than hbase.hstore.compaction.min
or more than hbase.hstore.compaction.max
, take it out of consideration.
-
Compare the size of this set of StoreFiles with the size of the smallest possible compaction that has been found in the list so far. If the size of this set of StoreFiles represents the smallest compaction that could be done, store it to be used as a fall-back if the algorithm is “stuck” and no StoreFiles would otherwise be chosen. See Being Stuck.
-
Do size-based sanity checks against each StoreFile in this set of StoreFiles.
-
If the size of this StoreFile is larger than hbase.hstore.compaction.max.size
, take it out of consideration.
-
If the size is greater than or equal to hbase.hstore.compaction.min.size
, sanity-check it against the file-based ratio to see whether it is too large to be considered.
The sanity-checking is successful if: - There is only one StoreFile in this set, or - For each StoreFile, its size multiplied by
hbase.hstore.compaction.ratio
(or hbase.hstore.compaction.ratio.offpeak
if off-peak hours are configured and it is during off-peak hours) is less than the sum of the sizes of the other HFiles in the set.
5.
If this set of StoreFiles is still in consideration, compare it to the previously-selected best compaction. If it is better, replace the previously-selected best compaction with this one.
6.
When the entire list of potential compactions has been processed, perform the best compaction that was found. If no StoreFiles were selected for compaction, but there are multiple StoreFiles, assume the algorithm is stuck (see Being Stuck) and if so, perform the smallest compaction that was found in step 3.
###### RatioBasedCompactionPolicy Algorithm
The RatioBasedCompactionPolicy was the only compaction policy prior to HBase 0.96, though ExploringCompactionPolicy has now been backported to HBase 0.94 and 0.95. To use the RatioBasedCompactionPolicy rather than the ExploringCompactionPolicy, set hbase.hstore.defaultengine.compactionpolicy.class
to RatioBasedCompactionPolicy
in the hbase-site.xml file. To switch back to the ExploringCompactionPolicy, remove the setting from the hbase-site.xml.
The following section walks you through the algorithm used to select StoreFiles for compaction in the RatioBasedCompactionPolicy.
1.
The first phase is to create a list of all candidates for compaction. A list is created of all StoreFiles not already in the compaction queue, and all StoreFiles newer than the newest file that is currently being compacted. This list of StoreFiles is ordered by the sequence ID. The sequence ID is generated when a Put is appended to the write-ahead log (WAL), and is stored in the metadata of the HFile.
2.
Check to see if the algorithm is stuck (see Being Stuck, and if so, a major compaction is forced. This is a key area where The ExploringCompactionPolicy Algorithm is often a better choice than the RatioBasedCompactionPolicy.
3.
If the compaction was user-requested, try to perform the type of compaction that was requested. Note that a major compaction may not be possible if all HFiles are not available for compaction or if too many StoreFiles exist (more than hbase.hstore.compaction.max
).
4.
Some StoreFiles are automatically excluded from consideration. These include:
-
StoreFiles that are larger than hbase.hstore.compaction.max.size
-
StoreFiles that were created by a bulk-load operation which explicitly excluded compaction. You may decide to exclude StoreFiles resulting from bulk loads, from compaction. To do this, specify the hbase.mapreduce.hfileoutputformat.compaction.exclude
parameter during the bulk load operation.
5.
The maximum number of StoreFiles allowed in a major compaction is controlled by the hbase.hstore.compaction.max
parameter. If the list contains more than this number of StoreFiles, a minor compaction is performed even if a major compaction would otherwise have been done. However, a user-requested major compaction still occurs even if there are more than hbase.hstore.compaction.max
StoreFiles to compact.
6.
If the list contains fewer than hbase.hstore.compaction.min
StoreFiles to compact, a minor compaction is aborted. Note that a major compaction can be performed on a single HFile. Its function is to remove deletes and expired versions, and reset locality on the StoreFile.
7.
The value of the hbase.hstore.compaction.ratio
parameter is multiplied by the sum of StoreFiles smaller than a given file, to determine whether that StoreFile is selected for compaction during a minor compaction. For instance, if hbase.hstore.compaction.ratio is 1.2, FileX is 5MB, FileY is 2MB, and FileZ is 3MB:
5 <= 1.2 x (2 + 3) or 5 <= 6
In this scenario, FileX is eligible for minor compaction. If FileX were 7MB, it would not be eligible for minor compaction. This ratio favors smaller StoreFile. You can configure a different ratio for use in off-peak hours, using the parameter
hbase.hstore.compaction.ratio.offpeak
, if you also configure hbase.offpeak.start.hour
and hbase.offpeak.end.hour
.
8.
If the last major compaction was too long ago and there is more than one StoreFile to be compacted, a major compaction is run, even if it would otherwise have been minor. By default, the maximum time between major compactions is 7 days, plus or minus a 4.8 hour period, and determined randomly within those parameters. Prior to HBase 0.96, the major compaction period was 24 hours. See hbase.hregion.majorcompaction
in the table below to tune or disable time-based major compactions.
###### Parameters Used by Compaction Algorithm
This table contains the main configuration parameters for compaction. This list is not exhaustive. To tune these parameters from the defaults, edit the hbase-default.xml file. For a full list of all configuration parameters available, see config.files
hbase.hstore.compaction.min
The minimum number of StoreFiles which must be eligible for compaction before compaction can run. The goal of tuning hbase.hstore.compaction.min
is to avoid ending up with too many tiny StoreFiles to compact. Setting this value to 2 would cause a minor compaction each time you have two StoreFiles in a Store, and this is probably not appropriate. If you set this value too high, all the other values will need to be adjusted accordingly. For most cases, the default value is appropriate. In previous versions of HBase, the parameter hbase.hstore.compaction.min
was called hbase.hstore.compactionThreshold
.
Default: 3
hbase.hstore.compaction.max
The maximum number of StoreFiles which will be selected for a single minor compaction, regardless of the number of eligible StoreFiles. Effectively, the value of hbase.hstore.compaction.max
controls the length of time it takes a single compaction to complete. Setting it larger means that more StoreFiles are included in a compaction. For most cases, the default value is appropriate.
Default: 10
hbase.hstore.compaction.min.size
A StoreFile smaller than this size will always be eligible for minor compaction. StoreFiles this size or larger are evaluated by hbase.hstore.compaction.ratio
to determine if they are eligible. Because this limit represents the “automatic include” limit for all StoreFiles smaller than this value, this value may need to be reduced in write-heavy environments where many files in the 1-2 MB range are being flushed, because every StoreFile will be targeted for compaction and the resulting StoreFiles may still be under the minimum size and require further compaction. If this parameter is lowered, the ratio check is triggered more quickly. This addressed some issues seen in earlier versions of HBase but changing this parameter is no longer necessary in most situations.
Default:128 MB
hbase.hstore.compaction.max.size
A StoreFile larger than this size will be excluded from compaction. The effect of raising hbase.hstore.compaction.max.size
is fewer, larger StoreFiles that do not get compacted often. If you feel that compaction is happening too often without much benefit, you can try raising this value.
Default: Long.MAX_VALUE
hbase.hstore.compaction.ratio
For minor compaction, this ratio is used to determine whether a given StoreFile which is larger than hbase.hstore.compaction.min.size
is eligible for compaction. Its effect is to limit compaction of large StoreFile. The value of hbase.hstore.compaction.ratio
is expressed as a floating-point decimal.
-
A large ratio, such as 10, will produce a single giant StoreFile. Conversely, a value of .25, will produce behavior similar to the BigTable compaction algorithm, producing four StoreFiles.
-
A moderate value of between 1.0 and 1.4 is recommended. When tuning this value, you are balancing write costs with read costs. Raising the value (to something like 1.4) will have more write costs, because you will compact larger StoreFiles. However, during reads, HBase will need to seek through fewer StoreFiles to accomplish the read. Consider this approach if you cannot take advantage of Bloom Filters.
-
Alternatively, you can lower this value to something like 1.0 to reduce the background cost of writes, and use to limit the number of StoreFiles touched during reads. For most cases, the default value is appropriate.
Default:
1.2F
hbase.hstore.compaction.ratio.offpeak
The compaction ratio used during off-peak compactions, if off-peak hours are also configured (see below). Expressed as a floating-point decimal. This allows for more aggressive (or less aggressive, if you set it lower than hbase.hstore.compaction.ratio
) compaction during a set time period. Ignored if off-peak is disabled (default). This works the same as hbase.hstore.compaction.ratio
.
Default: 5.0F
hbase.offpeak.start.hour
The start of off-peak hours, expressed as an integer between 0 and 23, inclusive. Set to -1 to disable off-peak.
Default: -1
(disabled)
hbase.offpeak.end.hour
The end of off-peak hours, expressed as an integer between 0 and 23, inclusive. Set to -1 to disable off-peak.
Default: -1
(disabled)
hbase.regionserver.thread.compaction.throttle
There are two different thread pools for compactions, one for large compactions and the other for small compactions. This helps to keep compaction of lean tables (such as hbase:meta
) fast. If a compaction is larger than this threshold, it goes into the large compaction pool. In most cases, the default value is appropriate.
Default: 2 x hbase.hstore.compaction.max x hbase.hregion.memstore.flush.size
(which defaults to 128
)
hbase.hregion.majorcompaction
Time between major compactions, expressed in milliseconds. Set to 0 to disable time-based automatic major compactions. User-requested and size-based major compactions will still run. This value is multiplied by hbase.hregion.majorcompaction.jitter
to cause compaction to start at a somewhat-random time during a given window of time.
Default: 7 days (604800000
milliseconds)
hbase.hregion.majorcompaction.jitter
A multiplier applied to hbase.hregion.majorcompaction to cause compaction to occur a given amount of time either side of hbase.hregion.majorcompaction
. The smaller the number, the closer the compactions will happen to the hbase.hregion.majorcompaction
interval. Expressed as a floating-point decimal.
Default: .50F
##### Compaction File Selection
> Legacy Informatio
> This section has been preserved for historical reasons and refers to the way compaction worked prior to HBase 0.96.x. You can still use this behavior if you enable RatioBasedCompactionPolicy Algorithm. For information on the way that compactions work in HBase 0.96.x and later, see Compaction.
To understand the core algorithm for StoreFile selection, there is some ASCII-art in the Store source code that will serve as useful reference.
It has been copied below:
/* normal skew:
*
* older ----> newer
* _
* | | _
* | | | | _
* --|-|- |-|- |-|---_-------_------- minCompactSize
* | | | | | | | | _ | |
* | | | | | | | | | | | |
* | | | | | | | | | | | |
*/
Important knobs:
-
hbase.hstore.compaction.ratio
Ratio used in compaction file selection algorithm (default 1.2f).
-
hbase.hstore.compaction.min
(in HBase v 0.90 this is called hbase.hstore.compactionThreshold
) (files) Minimum number of StoreFiles per Store to be selected for a compaction to occur (default 2).
-
hbase.hstore.compaction.max
(files) Maximum number of StoreFiles to compact per minor compaction (default 10).
-
hbase.hstore.compaction.min.size
(bytes) Any StoreFile smaller than this setting with automatically be a candidate for compaction. Defaults to hbase.hregion.memstore.flush.size
(128 mb).
-
hbase.hstore.compaction.max.size
(.92) (bytes) Any StoreFile larger than this setting with automatically be excluded from compaction (default Long.MAXVALUE).
The minor compaction StoreFile selection logic is size based, and selects a file for compaction when the file ⇐ sum(smaller_files) * hbase.hstore.compaction.ratio
.
###### Minor Compaction File Selection - Example #1 (Basic Example)
This example mirrors an example from the unit test TestCompactSelection
.
-
hbase.hstore.compaction.ratio
= 1.0f
-
hbase.hstore.compaction.min
= 3 (files)
-
hbase.hstore.compaction.max
= 5 (files)
-
hbase.hstore.compaction.min.size
= 10 (bytes)
-
hbase.hstore.compaction.max.size
= 1000 (bytes)
The following StoreFiles exist: 100, 50, 23, 12, and 12 bytes apiece (oldest to newest). With the above parameters, the files that would be selected for minor compaction are 23, 12, and 12.
Why?
-
100 → No, because sum(50, 23, 12, 12) 1.0 = 97.
-
50 → No, because sum(23, 12, 12) 1.0 = 47.
-
23 → Yes, because sum(12, 12) 1.0 = 24.
-
12 → Yes, because the previous file has been included, and because this does not exceed the max-file limit of 5
-
12 → Yes, because the previous file had been included, and because this does not exceed the max-file limit of 5.
###### Minor Compaction File Selection - Example #2 (Not Enough Files ToCompact)
This example mirrors an example from the unit test TestCompactSelection
.
-
hbase.hstore.compaction.ratio
= 1.0f
-
hbase.hstore.compaction.min
= 3 (files)
-
hbase.hstore.compaction.max
= 5 (files)
-
hbase.hstore.compaction.min.size
= 10 (bytes)
-
hbase.hstore.compaction.max.size
= 1000 (bytes)
The following StoreFiles exist: 100, 25, 12, and 12 bytes apiece (oldest to newest). With the above parameters, no compaction will be started.
Why?
-
100 → No, because sum(25, 12, 12) 1.0 = 47
-
25 → No, because sum(12, 12) 1.0 = 24
-
12 → No. Candidate because sum(12) 1.0 = 12, there are only 2 files to compact and that is less than the threshold of 3
-
12 → No. Candidate because the previous StoreFile was, but there are not enough files to compact
###### Minor Compaction File Selection - Example #3 (Limiting Files To Compact)
This example mirrors an example from the unit test TestCompactSelection
.
-
hbase.hstore.compaction.ratio
= 1.0f
-
hbase.hstore.compaction.min
= 3 (files)
-
hbase.hstore.compaction.max
= 5 (files)
-
hbase.hstore.compaction.min.size
= 10 (bytes)
-
hbase.hstore.compaction.max.size
= 1000 (bytes)
The following StoreFiles exist: 7, 6, 5, 4, 3, 2, and 1 bytes apiece (oldest to newest). With the above parameters, the files that would be selected for minor compaction are 7, 6, 5, 4, 3.
Why?
-
7 → Yes, because sum(6, 5, 4, 3, 2, 1) 1.0 = 21. Also, 7 is less than the min-size
-
6 → Yes, because sum(5, 4, 3, 2, 1) 1.0 = 15. Also, 6 is less than the min-size.
-
5 → Yes, because sum(4, 3, 2, 1) 1.0 = 10. Also, 5 is less than the min-size.
-
4 → Yes, because sum(3, 2, 1) 1.0 = 6. Also, 4 is less than the min-size.
-
3 → Yes, because sum(2, 1) * 1.0 = 3. Also, 3 is less than the min-size.
-
2 → No. Candidate because previous file was selected and 2 is less than the min-size, but the max-number of files to compact has been reached.
-
1 → No. Candidate because previous file was selected and 1 is less than the min-size, but max-number of files to compact has been reached.
> Impact of Key Configuration OptionsThis information is now included in the configuration parameter table in [Parameters Used by Compaction Algorithm](docs_en#compaction.parameters).
##### Date Tiered Compaction
Date tiered compaction is a date-aware store file compaction strategy that is beneficial for time-range scans for time-series data.
##### When To Use Date Tiered Compactions
Consider using Date Tiered Compaction for reads for limited time ranges, especially scans of recent data
Don’t use it for
-
random gets without a limited time range
-
frequent deletes and updates
-
Frequent out of order data writes creating long tails, especially writes with future timestamps
-
frequent bulk loads with heavily overlapping time ranges
Performance Improvements
Performance testing has shown that the performance of time-range scans improve greatly for limited time ranges, especially scans of recent data.
###### Enabling Date Tiered Compaction
You can enable Date Tiered compaction for a table or a column family, by setting its hbase.hstore.engine.class
to org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine
.
You also need to set hbase.hstore.blockingStoreFiles
to a high number, such as 60, if using all default settings, rather than the default value of 12). Use 1.5~2 x projected file count if changing the parameters, Projected file count = windows per tier x tier count + incoming window min + files older than max age
You also need to set hbase.hstore.compaction.max
to the same value as hbase.hstore.blockingStoreFiles
to unblock major compaction.
Procedure: Enable Date Tiered Compaction
1.
Run one of following commands in the HBase shell. Replace the table name orders_table
with the name of your table.
alter 'orders_table', CONFIGURATION => {'hbase.hstore.engine.class' => 'org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine', 'hbase.hstore.blockingStoreFiles' => '60', 'hbase.hstore.compaction.min'=>'2', 'hbase.hstore.compaction.max'=>'60'}
alter 'orders_table', {NAME => 'blobs_cf', CONFIGURATION => {'hbase.hstore.engine.class' => 'org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine', 'hbase.hstore.blockingStoreFiles' => '60', 'hbase.hstore.compaction.min'=>'2', 'hbase.hstore.compaction.max'=>'60'}}
create 'orders_table', 'blobs_cf', CONFIGURATION => {'hbase.hstore.engine.class' => 'org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine', 'hbase.hstore.blockingStoreFiles' => '60', 'hbase.hstore.compaction.min'=>'2', 'hbase.hstore.compaction.max'=>'60'}
2.
Configure other options if needed. See Configuring Date Tiered Compaction for more information.
Procedure: Disable Date Tiered Compaction
1.
Set the hbase.hstore.engine.class
option to either nil or org.apache.hadoop.hbase.regionserver.DefaultStoreEngine
. Either option has the same effect. Make sure you set the other options you changed to the original settings too.
alter 'orders_table', CONFIGURATION => {'hbase.hstore.engine.class' => 'org.apache.hadoop.hbase.regionserver.DefaultStoreEngine', 'hbase.hstore.blockingStoreFiles' => '12', 'hbase.hstore.compaction.min'=>'6', 'hbase.hstore.compaction.max'=>'12'}}
When you change the store engine either way, a major compaction will likely be performed on most regions. This is not necessary on new tables.
###### Configuring Date Tiered Compaction
Each of the settings for date tiered compaction should be configured at the table or column family level. If you use HBase shell, the general command pattern is as follows:
alter 'orders_table', CONFIGURATION => {'key' => 'value', ..., 'key' => 'value'}}
Tier Parameters
You can configure your date tiers by changing the settings for the following parameters:
| Setting | Notes |
| —- | —- |
| hbase.hstore.compaction.date.tiered.max.storefile.age.millis
| |
Files with max-timestamp smaller than this will no longer be compacted.Default at Long.MAXVALUE.
||
hbase.hstore.compaction.date.tiered.base.window.millis
|
Base window size in milliseconds. Default at 6 hours.
||
hbase.hstore.compaction.date.tiered.windows.per.tier
|
Number of windows per tier. Default at 4.
||
hbase.hstore.compaction.date.tiered.incoming.window.min
|
Minimal number of files to compact in the incoming window. Set it to expected number of files in the window to avoid wasteful compaction. Default at 6.
||
hbase.hstore.compaction.date.tiered.window.policy.class
|
The policy to select store files within the same time window. It doesn’t apply to the incoming window. Default at exploring compaction. This is to avoid wasteful compaction.
|
Compaction Throttler
With tiered compaction all servers in the cluster will promote windows to higher tier at the same time, so using a compaction throttle is recommended: Set hbase.regionserver.throughput.controller
to org.apache.hadoop.hbase.regionserver.compactions.PressureAwareCompactionThroughputController
.
> For more information about date tiered compaction, please refer to the design specification at https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8
##### Experimental: Stripe Compactions
Stripe compactions is an experimental feature added in HBase 0.98 which aims to improve compactions for large regions or non-uniformly distributed row keys. In order to achieve smaller and/or more granular compactions, the StoreFiles within a region are maintained separately for several row-key sub-ranges, or “stripes”, of the region. The stripes are transparent to the rest of HBase, so other operations on the HFiles or data work without modification.
Stripe compactions change the HFile layout, creating sub-regions within regions. These sub-regions are easier to compact, and should result in fewer major compactions. This approach alleviates some of the challenges of larger regions.
Stripe compaction is fully compatible with [Compaction](docs_en#compaction) and works in conjunction with either the ExploringCompactionPolicy or RatioBasedCompactionPolicy. It can be enabled for existing tables, and the table will continue to operate normally if it is disabled later.
##### When To Use Stripe Compactions
Consider using stripe compaction if you have either of the following:
-
Large regions. You can get the positive effects of smaller regions without additional overhead for MemStore and region management overhead.
-
Non-uniform keys, such as time dimension in a key. Only the stripes receiving the new keys will need to compact. Old data will not compact as often, if at all
Performance Improvements
Performance testing has shown that the performance of reads improves somewhat, and variability of performance of reads and writes is greatly reduced. An overall long-term performance improvement is seen on large non-uniform-row key regions, such as a hash-prefixed timestamp key. These performance gains are the most dramatic on a table which is already large. It is possible that the performance improvement might extend to region splits.
###### Enabling Stripe Compaction
You can enable stripe compaction for a table or a column family, by setting its hbase.hstore.engine.class
to org.apache.hadoop.hbase.regionserver.StripeStoreEngine
. You also need to set the hbase.hstore.blockingStoreFiles
to a high number, such as 100 (rather than the default value of 10).
Procedure: Enable Stripe Compaction
1.
Run one of following commands in the HBase shell. Replace the table name orders_table
with the name of your table.
alter 'orders_table', CONFIGURATION => {'hbase.hstore.engine.class' => 'org.apache.hadoop.hbase.regionserver.StripeStoreEngine', 'hbase.hstore.blockingStoreFiles' => '100'}
alter 'orders_table', {NAME => 'blobs_cf', CONFIGURATION => {'hbase.hstore.engine.class' => 'org.apache.hadoop.hbase.regionserver.StripeStoreEngine', 'hbase.hstore.blockingStoreFiles' => '100'}}
create 'orders_table', 'blobs_cf', CONFIGURATION => {'hbase.hstore.engine.class' => 'org.apache.hadoop.hbase.regionserver.StripeStoreEngine', 'hbase.hstore.blockingStoreFiles' => '100'}
2.
Configure other options if needed. See Configuring Stripe Compaction for more information.
3.
Enable the table.
Procedure: Disable Stripe Compaction
1.
Set the hbase.hstore.engine.class
option to either nil or org.apache.hadoop.hbase.regionserver.DefaultStoreEngine
. Either option has the same effect.
alter 'orders_table', CONFIGURATION => {'hbase.hstore.engine.class' => 'rg.apache.hadoop.hbase.regionserver.DefaultStoreEngine'}
2.
Enable the table.
When you enable a large table after changing the store engine either way, a major compaction will likely be performed on most regions. This is not necessary on new tables.
###### Configuring Stripe Compaction
Each of the settings for stripe compaction should be configured at the table or column family level. If you use HBase shell, the general command pattern is as follows:
alter 'orders_table', CONFIGURATION => {'key' => 'value', ..., 'key' => 'value'}}
Region and stripe sizing
You can configure your stripe sizing based upon your region sizing. By default, your new regions will start with one stripe. On the next compaction after the stripe has grown too large (16 x MemStore flushes size), it is split into two stripes. Stripe splitting continues as the region grows, until the region is large enough to split.
You can improve this pattern for your own data. A good rule is to aim for a stripe size of at least 1 GB, and about 8-12 stripes for uniform row keys. For example, if your regions are 30 GB, 12 x 2.5 GB stripes might be a good starting point.
| Setting | Notes |
| —- | —- |
| hbase.store.stripe.initialStripeCount
| |
The number of stripes to create when stripe compaction is enabled. You can use it as follows:
-
For relatively uniform row keys, if you know the approximate target number of stripes from the above, you can avoid some splitting overhead by starting with several stripes (2, 5, 10…). If the early data is not representative of overall row key distribution, this will not be as efficient.
-
For existing tables with a large amount of data, this setting will effectively pre-split your stripes.
-
For keys such as hash-prefixed sequential keys, with more than one hash prefix per region, pre-splitting may make sense.
||
hbase.store.stripe.sizeToSplit
|
The maximum size a stripe grows before splitting. Use this in conjunction with hbase.store.stripe.splitPartCount
to control the target stripe size (sizeToSplit = splitPartsCount * target stripe size
), according to the above sizing considerations.
||
hbase.store.stripe.splitPartCount
|
The number of new stripes to create when splitting a stripe. The default is 2, which is appropriate for most cases. For non-uniform row keys, you can experiment with increasing the number to 3 or 4, to isolate the arriving updates into narrower slice of the region without additional splits being required.
|
MemStore Size Settings
By default, the flush creates several files from one MemStore, according to existing stripe boundaries and row keys to flush. This approach minimizes write amplification, but can be undesirable if the MemStore is small and there are many stripes, because the files will be too small.
In this type of situation, you can set hbase.store.stripe.compaction.flushToL0
to true
. This will cause a MemStore flush to create a single file instead. When at least hbase.store.stripe.compaction.minFilesL0
such files (by default, 4) accumulate, they will be compacted into striped files.
Normal Compaction Configuration and Stripe Compaction
All the settings that apply to normal compactions (see Parameters Used by Compaction Algorithm) apply to stripe compactions. The exceptions are the minimum and maximum number of files, which are set to higher values by default because the files in stripes are smaller. To control these for stripe compactions, use hbase.store.stripe.compaction.minFiles
and hbase.store.stripe.compaction.maxFiles
, rather than hbase.hstore.compaction.min
and hbase.hstore.compaction.max
.
## 73. Bulk Loading
### 73.1. Overview
HBase includes several methods of loading data into tables. The most straightforward method is to either use the TableOutputFormat
class from a MapReduce job, or use the normal client APIs; however, these are not always the most efficient methods.
The bulk load feature uses a MapReduce job to output table data in HBase’s internal data format, and then directly loads the generated StoreFiles into a running cluster. Using bulk load will use less CPU and network resources than simply using the HBase API.
### 73.2. Bulk Load Architecture
The HBase bulk load process consists of two main steps.
#### 73.2.1. Preparing data via a MapReduce job
The first step of a bulk load is to generate HBase data files (StoreFiles) from a MapReduce job using HFileOutputFormat2
. This output format writes out data in HBase’s internal storage format so that they can be later loaded very efficiently into the cluster.
In order to function efficiently, HFileOutputFormat2
must be configured such that each output HFile fits within a single region. In order to do this, jobs whose output will be bulk loaded into HBase use Hadoop’s TotalOrderPartitioner
class to partition the map output into disjoint ranges of the key space, corresponding to the key ranges of the regions in the table.
HFileOutputFormat2
includes a convenience function, configureIncrementalLoad()
, which automatically sets up a TotalOrderPartitioner
based on the current region boundaries of a table.
#### 73.2.2. Completing the data load
After a data import has been prepared, either by using the importtsv
tool with the “importtsv.bulk.output” option or by some other MapReduce job using the HFileOutputFormat
, the completebulkload
tool is used to import the data into the running cluster. This command line tool iterates through the prepared data files, and for each one determines the region the file belongs to. It then contacts the appropriate RegionServer which adopts the HFile, moving it into its storage directory and making the data available to clients.
If the region boundaries have changed during the course of bulk load preparation, or between the preparation and completion steps, the completebulkload
utility will automatically split the data files into pieces corresponding to the new boundaries. This process is not optimally efficient, so users should take care to minimize the delay between preparing a bulk load and importing it into the cluster, especially if other clients are simultaneously loading data through other means.
$ hadoop jar hbase-server-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] /user/todd/myoutput mytable
The -c config-file
option can be used to specify a file containing the appropriate hbase parameters (e.g., hbase-site.xml) if not supplied already on the CLASSPATH (In addition, the CLASSPATH must contain the directory that has the zookeeper configuration file if zookeeper is NOT managed by HBase).
> If the target table does not already exist in HBase, this tool will create the table automatically.
### 73.3. See Also
For more information about the referenced utilities, see ImportTsv and CompleteBulkLoad.
See How-to: Use HBase Bulk Loading, and Why for a recent blog on current state of bulk loading.
### 73.4. Advanced Usage
Although the importtsv
tool is useful in many cases, advanced users may want to generate data programmatically, or import data from other formats. To get started doing so, dig into ImportTsv.java
and check the JavaDoc for HFileOutputFormat.
The import step of the bulk load can also be done programmatically. See the LoadIncrementalHFiles
class for more information.
### 73.5. Bulk Loading Replication
HBASE-13153 adds replication support for bulk loaded HFiles, available since HBase 1.3/2.0. This feature is enabled by setting hbase.replication.bulkload.enabled
to true
(default is false
). You also need to copy the source cluster configuration files to the destination cluster.
Additional configurations are required too:
1.
hbase.replication.source.fs.conf.provider
This defines the class which loads the source cluster file system client configuration in the destination cluster. This should be configured for all the RS in the destination cluster. Default is
org.apache.hadoop.hbase.replication.regionserver.DefaultSourceFSConfigurationProvider
.
2.
hbase.replication.conf.dir
This represents the base directory where the file system client configurations of the source cluster are copied to the destination cluster. This should be configured for all the RS in the destination cluster. Default is
$HBASE_CONF_DIR
.
3.
hbase.replication.cluster.id
This configuration is required in the cluster where replication for bulk loaded data is enabled. A source cluster is uniquely identified by the destination cluster using this id. This should be configured for all the RS in the source cluster configuration file for all the RS. For example: If source cluster FS client configurations are copied to the destination cluster under directory
/home/user/dc1/
, then hbase.replication.cluster.id
should be configured as dc1
and hbase.replication.conf.dir
as /home/user
.
> DefaultSourceFSConfigurationProvider
supports only xml
type files. It loads source cluster FS client configuration only once, so if source cluster FS client configuration files are updated, every peer(s) cluster RS must be restarted to reload the configuration.
## 74. HDFS
As HBase runs on HDFS (and each StoreFile is written as a file on HDFS), it is important to have an understanding of the HDFS Architecture especially in terms of how it stores files, handles failovers, and replicates blocks.
See the Hadoop documentation on HDFS Architecture for more information.
### 74.1. NameNode
The NameNode is responsible for maintaining the filesystem metadata. See the above HDFS Architecture link for more information.
### 74.2. DataNode
The DataNodes are responsible for storing HDFS blocks. See the above HDFS Architecture link for more information.
## 75. Timeline-consistent High Available Reads
> The current Assignment Manager V2 does not work well with region replica, so this feature maybe broken. Use it with caution.
### 75.1. Introduction
HBase, architecturally, always had the strong consistency guarantee from the start. All reads and writes are routed through a single region server, which guarantees that all writes happen in an order, and all reads are seeing the most recent committed data.
However, because of this single homing of the reads to a single location, if the server becomes unavailable, the regions of the table that were hosted in the region server become unavailable for some time. There are three phases in the region recovery process - detection, assignment, and recovery. Of these, the detection is usually the longest and is presently in the order of 20-30 seconds depending on the ZooKeeper session timeout. During this time and before the recovery is complete, the clients will not be able to read the region data.
However, for some use cases, either the data may be read-only, or doing reads against some stale data is acceptable. With timeline-consistent high available reads, HBase can be used for these kind of latency-sensitive use cases where the application can expect to have a time bound on the read completion.
For achieving high availability for reads, HBase provides a feature called region replication. In this model, for each region of a table, there will be multiple replicas that are opened in different RegionServers. By default, the region replication is set to 1, so only a single region replica is deployed and there will not be any changes from the original model. If region replication is set to 2 or more, then the master will assign replicas of the regions of the table. The Load Balancer ensures that the region replicas are not co-hosted in the same region servers and also in the same rack (if possible).
All of the replicas for a single region will have a unique replicaid, starting from 0. The region replica having replica_id==0 is called the primary region, and the others _secondary regions or secondaries. Only the primary can accept writes from the client, and the primary will always contain the latest changes. Since all writes still have to go through the primary region, the writes are not highly-available (meaning they might block for some time if the region becomes unavailable).
### 75.2. Timeline Consistency
With this feature, HBase introduces a Consistency definition, which can be provided per read operation (get or scan).
public enum Consistency {
STRONG,
TIMELINE
}
Consistency.STRONG
is the default consistency model provided by HBase. In case the table has region replication = 1, or in a table with region replicas but the reads are done with this consistency, the read is always performed by the primary regions, so that there will not be any change from the previous behaviour, and the client always observes the latest data.
In case a read is performed with Consistency.TIMELINE
, then the read RPC will be sent to the primary region server first. After a short interval (hbase.client.primaryCallTimeout.get
, 10ms by default), parallel RPC for secondary region replicas will also be sent if the primary does not respond back. After this, the result is returned from whichever RPC is finished first. If the response came back from the primary region replica, we can always know that the data is latest. For this Result.isStale() API has been added to inspect the staleness. If the result is from a secondary region, then Result.isStale() will be set to true. The user can then inspect this field to possibly reason about the data.
In terms of semantics, TIMELINE consistency as implemented by HBase differs from pure eventual consistency in these respects:
-
Single homed and ordered updates: Region replication or not, on the write side, there is still only 1 defined replica (primary) which can accept writes. This replica is responsible for ordering the edits and preventing conflicts. This guarantees that two different writes are not committed at the same time by different replicas and the data diverges. With this, there is no need to do read-repair or last-timestamp-wins kind of conflict resolution.
-
The secondaries also apply the edits in the order that the primary committed them. This way the secondaries will contain a snapshot of the primaries data at any point in time. This is similar to RDBMS replications and even HBase’s own multi-datacenter replication, however in a single cluster.
-
On the read side, the client can detect whether the read is coming from up-to-date data or is stale data. Also, the client can issue reads with different consistency requirements on a per-operation basis to ensure its own semantic guarantees.
-
The client can still observe edits out-of-order, and can go back in time, if it observes reads from one secondary replica first, then another secondary replica. There is no stickiness to region replicas or a transaction-id based guarantee. If required, this can be implemented later though.

hbase.regionserver.storefile.refresh.period
to a non-zero value. See Configuration section below.
#### 75.5.2. Asnyc WAL replication
The second mechanism for propagation of writes to secondaries is done via “Async WAL Replication” feature and is only available in HBase-1.1+. This works similarly to HBase’s multi-datacenter replication, but instead the data from a region is replicated to the secondary regions. Each secondary replica always receives and observes the writes in the same order that the primary region committed them. In some sense, this design can be thought of as “in-cluster replication”, where instead of replicating to a different datacenter, the data goes to secondary regions to keep secondary region’s in-memory state up to date. The data files are shared between the primary region and the other replicas, so that there is no extra storage overhead. However, the secondary regions will have recent non-flushed data in their memstores, which increases the memory overhead. The primary region writes flush, compaction, and bulk load events to its WAL as well, which are also replicated through wal replication to secondaries. When they observe the flush/compaction or bulk load event, the secondary regions replay the event to pick up the new files and drop the old ones.
Committing writes in the same order as in primary ensures that the secondaries won’t diverge from the primary regions data, but since the log replication is asynchronous, the data might still be stale in secondary regions. Since this feature works as a replication endpoint, the performance and latency characteristics is expected to be similar to inter-cluster replication.
Async WAL Replication is disabled by default. You can enable this feature by setting hbase.region.replica.replication.enabled
to true
. Asyn WAL Replication feature will add a new replication peer named region_replica_replication
as a replication peer when you create a table with region replication > 1 for the first time. Once enabled, if you want to disable this feature, you need to do two actions: Set configuration property hbase.region.replica.replication.enabled
to false in hbase-site.xml
(see Configuration section below) _ Disable the replication peer named region_replica_replication
in the cluster using hbase shell or Admin
class:
hbase> disable_peer 'region_replica_replication'
### 75.6. Store File TTL
In both of the write propagation approaches mentioned above, store files of the primary will be opened in secondaries independent of the primary region. So for files that the primary compacted away, the secondaries might still be referring to these files for reading. Both features are using HFileLinks to refer to files, but there is no protection (yet) for guaranteeing that the file will not be deleted prematurely. Thus, as a guard, you should set the configuration property hbase.master.hfilecleaner.ttl
to a larger value, such as 1 hour to guarantee that you will not receive IOExceptions for requests going to replicas.
### 75.7. Region replication for META table’s region
Currently, Async WAL Replication is not done for the META table’s WAL. The meta table’s secondary replicas still refreshes themselves from the persistent store files. Hence the hbase.regionserver.meta.storefile.refresh.period
needs to be set to a certain non-zero value for refreshing the meta store files. Note that this configuration is configured differently than hbase.regionserver.storefile.refresh.period
.
### 75.8. Memory accounting
The secondary region replicas refer to the data files of the primary region replica, but they have their own memstores (in HBase-1.1+) and uses block cache as well. However, one distinction is that the secondary region replicas cannot flush the data when there is memory pressure for their memstores. They can only free up memstore memory when the primary region does a flush and this flush is replicated to the secondary. Since in a region server hosting primary replicas for some regions and secondaries for some others, the secondaries might cause extra flushes to the primary regions in the same host. In extreme situations, there can be no memory left for adding new writes coming from the primary via wal replication. For unblocking this situation (and since secondary cannot flush by itself), the secondary is allowed to do a “store file refresh” by doing a file system list operation to pick up new files from primary, and possibly dropping its memstore. This refresh will only be performed if the memstore size of the biggest secondary region replica is at least hbase.region.replica.storefile.refresh.memstore.multiplier
(default 4) times bigger than the biggest memstore of a primary replica. One caveat is that if this is performed, the secondary can observe partial row updates across column families (since column families are flushed independently). The default should be good to not do this operation frequently. You can set this value to a large number to disable this feature if desired, but be warned that it might cause the replication to block forever.
### 75.9. Secondary replica failover
When a secondary region replica first comes online, or fails over, it may have served some edits from its memstore. Since the recovery is handled differently for secondary replicas, the secondary has to ensure that it does not go back in time before it starts serving requests after assignment. For doing that, the secondary waits until it observes a full flush cycle (start flush, commit flush) or a “region open event” replicated from the primary. Until this happens, the secondary region replica will reject all read requests by throwing an IOException with message “The region’s reads are disabled”. However, the other replicas will probably still be available to read, thus not causing any impact for the rpc with TIMELINE consistency. To facilitate faster recovery, the secondary region will trigger a flush request from the primary when it is opened. The configuration property hbase.region.replica.wait.for.primary.flush
(enabled by default) can be used to disable this feature if needed.
### 75.10. Configuration properties
To use highly available reads, you should set the following properties in hbase-site.xml
file. There is no specific configuration to enable or disable region replicas. Instead you can change the number of region replicas per table to increase or decrease at the table creation or with alter table. The following configuration is for using async wal replication and using meta replicas of 3.
#### 75.10.1. Server side properties
```
false
, replicas do not receive memstore updates from
the primary RegionServer. If you set this to true
, you can still disable
memstore replication on a per-table basis, by setting the table’s
REGION_MEMSTORE_REPLICATION
configuration property to false
. If
memstore replication is disabled, the secondaries will only receive
updates for events like flushes and bulkloads, and will not have access to
data which the primary has not yet flushed. This preserves the guarantee
of row-level consistency, even when the read requests Consistency.TIMELINE
.
One thing to keep in mind also is that, region replica placement policy is only enforced by the `StochasticLoadBalancer` which is the default balancer. If you are using a custom load balancer property in hbase-site.xml (`hbase.master.loadbalancer.class`) replicas of regions might end up being hosted in the same server.
<a name="0250afb4"></a>
#### 75.10.2. Client side properties
Ensure to set the following for all clients (and servers) that will use region replicas.
Note HBase-1.0.x users should use `hbase.ipc.client.allowsInterrupt` rather than `hbase.ipc.client.specificThreadForWriting`.
<a name="31c7d133"></a>
### 75.11. User Interface
In the masters user interface, the region replicas of a table are also shown together with the primary regions. You can notice that the replicas of a region will share the same start and end keys and the same region name prefix. The only difference would be the appended replica_id (which is encoded as hex), and the region encoded name will be different. You can also see the replica ids shown explicitly in the UI.
<a name="5b502d81"></a>
### 75.12. Creating a table with region replication
Region replication is a per-table property. All tables have `REGION_REPLICATION = 1` by default, which means that there is only one replica per region. You can set and change the number of replicas per region of a table by supplying the `REGION_REPLICATION` property in the table descriptor.
<a name="e4f11071"></a>
#### 75.12.1. Shell
create ‘t1’, ‘f1’, {REGION_REPLICATION => 2}
describe ‘t1’ for i in 1..100 put ‘t1’, “r#{i}”, ‘f1:c1’, i end flush ‘t1’
<a name="2e1fc8a4"></a>
#### 75.12.2. Java
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(“test_table”)); htd.setRegionReplication(2); … admin.createTable(htd);
You can also use `setRegionReplication()` and alter table to increase, decrease the region replication for a table.
<a name="d15fb193"></a>
### 75.13. Read API and Usage
<a name="1deebc66"></a>
#### 75.13.1. Shell
You can do reads in shell using a the Consistency.TIMELINE semantics as follows
hbase(main):001:0> get ‘t1’,’r6’, {CONSISTENCY => “TIMELINE”}
You can simulate a region server pausing or becoming unavailable and do a read from the secondary replica:
$ kill -STOP
hbase(main):001:0> get ‘t1’,’r6’, {CONSISTENCY => “TIMELINE”}
Using scans is also similar
hbase> scan ‘t1’, {CONSISTENCY => ‘TIMELINE’}
<a name="f08f68f2"></a>
#### 75.13.2. Java
You can set the consistency for Gets and Scans and do requests as follows.
Get get = new Get(row); get.setConsistency(Consistency.TIMELINE); … Result result = table.get(get);
You can also pass multiple gets:
Get get1 = new Get(row);
get1.setConsistency(Consistency.TIMELINE);
…
ArrayList
And Scans:
Scan scan = new Scan(); scan.setConsistency(Consistency.TIMELINE); … ResultScanner scanner = table.getScanner(scan);
You can inspect whether the results are coming from primary region or not by calling the `Result.isStale()` method:
Result result = table.get(get); if (result.isStale()) { … }
<a name="756a10ff"></a>
### 75.14. Resources
1.
More information about the design and implementation can be found at the jira issue: [HBASE-10070](https://issues.apache.org/jira/browse/HBASE-10070)
2.
HBaseCon 2014 talk: [HBase Read High Availability Using Timeline-Consistent Region Replicas](https://hbase.apache.org/www.hbasecon.com/#2014-PresentationsRecordings) also contains some details and [slides](http://www.slideshare.net/enissoz/hbase-high-availability-for-reads-with-time).
<a name="1e1f68db"></a>
## 76. Storing Medium-sized Objects (MOB)
Data comes in many sizes, and saving all of your data in HBase, including binary data such as images and documents, is ideal. While HBase can technically handle binary objects with cells that are larger than 100 KB in size, HBase’s normal read and write paths are optimized for values smaller than 100KB in size. When HBase deals with large numbers of objects over this threshold, referred to here as medium objects, or MOBs, performance is degraded due to write amplification caused by splits and compactions. When using MOBs, ideally your objects will be between 100KB and 10MB (see the [FAQ](docs_en_#faq)). HBase FIX_VERSION_NUMBER adds support for better managing large numbers of MOBs while maintaining performance, consistency, and low operational overhead. MOB support is provided by the work done in [HBASE-11339](https://issues.apache.org/jira/browse/HBASE-11339). To take advantage of MOB, you need to use [HFile version 3](docs_en_#hfilev3). Optionally, configure the MOB file reader’s cache settings for each RegionServer (see [Configuring the MOB Cache](docs_en_#mob.cache.configure)), then configure specific columns to hold MOB data. Client code does not need to change to take advantage of HBase MOB support. The feature is transparent to the client.
MOB compaction
MOB data is flushed into MOB files after MemStore flush. There will be lots of MOB files after some time. To reduce MOB file count, there is a periodic task which compacts small MOB files into a large one (MOB compaction).
<a name="e4bdeb08"></a>
### 76.1. Configuring Columns for MOB
You can configure columns to support MOB during table creation or alteration, either in HBase Shell or via the Java API. The two relevant properties are the boolean `IS_MOB` and the `MOB_THRESHOLD`, which is the number of bytes at which an object is considered to be a MOB. Only `IS_MOB` is required. If you do not specify the `MOB_THRESHOLD`, the default threshold value of 100 KB is used.
Configure a Column for MOB Using HBase Shell
hbase> create ‘t1’, {NAME => ‘f1’, IS_MOB => true, MOB_THRESHOLD => 102400} hbase> alter ‘t1’, {NAME => ‘f1’, IS_MOB => true, MOB_THRESHOLD => 102400}
Example 23. Configure a Column for MOB Using the Java API
… HColumnDescriptor hcd = new HColumnDescriptor(“f”); hcd.setMobEnabled(true); … hcd.setMobThreshold(102400L); …
<a name="bdc306e6"></a>
### 76.2. Configure MOB Compaction Policy
By default, MOB files for one specific day are compacted into one large MOB file. To reduce MOB file count more, there are other MOB Compaction policies supported.
daily policy - compact MOB Files for one day into one large MOB file (default policy) weekly policy - compact MOB Files for one week into one large MOB file montly policy - compact MOB Files for one month into one large MOB File
Configure MOB compaction policy Using HBase Shell
hbase> create ‘t1’, {NAME => ‘f1’, IS_MOB => true, MOB_THRESHOLD => 102400, MOB_COMPACT_PARTITION_POLICY => ‘daily’} hbase> create ‘t1’, {NAME => ‘f1’, IS_MOB => true, MOB_THRESHOLD => 102400, MOB_COMPACT_PARTITION_POLICY => ‘weekly’} hbase> create ‘t1’, {NAME => ‘f1’, IS_MOB => true, MOB_THRESHOLD => 102400, MOB_COMPACT_PARTITION_POLICY => ‘monthly’}
hbase> alter ‘t1’, {NAME => ‘f1’, IS_MOB => true, MOB_THRESHOLD => 102400, MOB_COMPACT_PARTITION_POLICY => ‘daily’} hbase> alter ‘t1’, {NAME => ‘f1’, IS_MOB => true, MOB_THRESHOLD => 102400, MOB_COMPACT_PARTITION_POLICY => ‘weekly’} hbase> alter ‘t1’, {NAME => ‘f1’, IS_MOB => true, MOB_THRESHOLD => 102400, MOB_COMPACT_PARTITION_POLICY => ‘monthly’}
<a name="3de95af6"></a>
### 76.3. Configure MOB Compaction mergeable threshold
If the size of a mob file is less than this value, it’s regarded as a small file and needs to be merged in mob compaction. The default value is 1280MB.
<a name="ef8c390c"></a>
### 76.4. Testing MOB
The utility `org.apache.hadoop.hbase.IntegrationTestIngestWithMOB` is provided to assist with testing the MOB feature. The utility is run as follows:
$ sudo -u hbase hbase org.apache.hadoop.hbase.IntegrationTestIngestWithMOB \ -threshold 1024 \ -minMobDataSize 512 \ -maxMobDataSize 5120
-
`**threshold**` is the threshold at which cells are considered to be MOBs. The default is 1 kB, expressed in bytes.
-
`**minMobDataSize**` is the minimum value for the size of MOB data. The default is 512 B, expressed in bytes.
-
`**maxMobDataSize**` is the maximum value for the size of MOB data. The default is 5 kB, expressed in bytes.
<a name="515fc9ea"></a>
### 76.5. Configuring the MOB Cache
Because there can be a large number of MOB files at any time, as compared to the number of HFiles, MOB files are not always kept open. The MOB file reader cache is a LRU cache which keeps the most recently used MOB files open. To configure the MOB file reader’s cache on each RegionServer, add the following properties to the RegionServer’s `hbase-site.xml`, customize the configuration to suit your environment, and restart or rolling restart the RegionServer.
Example 24. Example MOB Cache Configuration
hbase.mob.file.cache.size
threshold.
The default value is 0.5f, which means that half the files (the least-recently-used
ones) are evicted.
<a name="91330c93"></a>
### 76.6. MOB Optimization Tasks
<a name="efba6e62"></a>
#### 76.6.1. Manually Compacting MOB Files
To manually compact MOB files, rather than waiting for the [configuration](docs_en_#mob.cache.configure) to trigger compaction, use the `compact` or `major_compact` HBase shell commands. These commands require the first argument to be the table name, and take a column family as the second argument. and take a compaction type as the third argument.
hbase> compact ‘t1’, ‘c1’, ‘MOB’ hbase> major_compact ‘t1’, ‘c1’, ‘MOB’ ```
These commands are also available via Admin.compact
and Admin.majorCompact
methods.