Apache Spark is a software framework that is used to process data in memory in a distributed manner, and is replacing MapReduce in many use cases.

Spark itself is out of scope of this document, please refer to the Spark site for more information on the Spark project and subprojects. This document will focus on 4 main interaction points between Spark and HBase. Those interaction points are:

Basic Spark

The ability to have an HBase Connection at any point in your Spark DAG.

Spark Streaming

The ability to have an HBase Connection at any point in your Spark Streaming application.

Spark Bulk Load

The ability to write directly to HBase HFiles for bulk insertion into HBase

SparkSQL/DataFrames

The ability to write SparkSQL that draws on tables that are represented in HBase.

The following sections will walk through examples of all these interaction points.

104. Basic Spark

This section discusses Spark HBase integration at the lowest and simplest levels. All the other interaction points are built upon the concepts that will be described here.

At the root of all Spark and HBase integration is the HBaseContext. The HBaseContext takes in HBase configurations and pushes them to the Spark executors. This allows us to have an HBase Connection per Spark Executor in a static location.

For reference, Spark Executors can be on the same nodes as the Region Servers or on different nodes there is no dependence of co-location. Think of every Spark Executor as a multi-threaded client application. This allows any Spark Tasks running on the executors to access the shared Connection object.

Example 31. HBaseContext Usage Example

This example shows how HBaseContext can be used to do a foreachPartition on a RDD in Scala:

  1. val sc = new SparkContext("local", "test")
  2. val config = new HBaseConfiguration()
  3. ...
  4. val hbaseContext = new HBaseContext(sc, config)
  5. rdd.hbaseForeachPartition(hbaseContext, (it, conn) => {
  6. val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1"))
  7. it.foreach((putRecord) => {
  8. . val put = new Put(putRecord._1)
  9. . putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
  10. . bufferedMutator.mutate(put)
  11. })
  12. bufferedMutator.flush()
  13. bufferedMutator.close()
  14. })

Here is the same example implemented in Java:

  1. JavaSparkContext jsc = new JavaSparkContext(sparkConf);
  2. try {
  3. List<byte[]> list = new ArrayList<>();
  4. list.add(Bytes.toBytes("1"));
  5. ...
  6. list.add(Bytes.toBytes("5"));
  7. JavaRDD<byte[]> rdd = jsc.parallelize(list);
  8. Configuration conf = HBaseConfiguration.create();
  9. JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
  10. hbaseContext.foreachPartition(rdd,
  11. new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
  12. public void call(Tuple2<Iterator<byte[]>, Connection> t)
  13. throws Exception {
  14. Table table = t._2().getTable(TableName.valueOf(tableName));
  15. BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
  16. while (t._1().hasNext()) {
  17. byte[] b = t._1().next();
  18. Result r = table.get(new Get(b));
  19. if (r.getExists()) {
  20. mutator.mutate(new Put(b));
  21. }
  22. }
  23. mutator.flush();
  24. mutator.close();
  25. table.close();
  26. }
  27. });
  28. } finally {
  29. jsc.stop();
  30. }

All functionality between Spark and HBase will be supported both in Scala and in Java, with the exception of SparkSQL which will support any language that is supported by Spark. For the remaining of this documentation we will focus on Scala examples for now.

The examples above illustrate how to do a foreachPartition with a connection. A number of other Spark base functions are supported out of the box:

bulkPut

For massively parallel sending of puts to HBase

bulkDelete

For massively parallel sending of deletes to HBase

bulkGet

For massively parallel sending of gets to HBase to create a new RDD

mapPartition

To do a Spark Map function with a Connection object to allow full access to HBase

hBaseRDD

To simplify a distributed scan to create a RDD

For examples of all these functionalities, see the HBase-Spark Module.

105. Spark Streaming

Spark Streaming is a micro batching stream processing framework built on top of Spark. HBase and Spark Streaming make great companions in that HBase can help serve the following benefits alongside Spark Streaming.

  • A place to grab reference data or profile data on the fly

  • A place to store counts or aggregates in a way that supports Spark Streaming promise of only once processing.

The HBase-Spark module’s integration points with Spark Streaming are similar to its normal Spark integration points, in that the following commands are possible straight off a Spark Streaming DStream.

bulkPut

For massively parallel sending of puts to HBase

bulkDelete

For massively parallel sending of deletes to HBase

bulkGet

For massively parallel sending of gets to HBase to create a new RDD

mapPartition

To do a Spark Map function with a Connection object to allow full access to HBase

hBaseRDD

To simplify a distributed scan to create a RDD

Example 32. bulkPut Example with DStreams

Below is an example of bulkPut with DStreams. It is very close in feel to the RDD bulk put.

  1. val sc = new SparkContext("local", "test")
  2. val config = new HBaseConfiguration()
  3. val hbaseContext = new HBaseContext(sc, config)
  4. val ssc = new StreamingContext(sc, Milliseconds(200))
  5. val rdd1 = ...
  6. val rdd2 = ...
  7. val queue = mutable.Queue[RDD[(Array[Byte], Array[(Array[Byte],
  8. Array[Byte], Array[Byte])])]]()
  9. queue += rdd1
  10. queue += rdd2
  11. val dStream = ssc.queueStream(queue)
  12. dStream.hbaseBulkPut(
  13. hbaseContext,
  14. TableName.valueOf(tableName),
  15. (putRecord) => {
  16. val put = new Put(putRecord._1)
  17. putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
  18. put
  19. })

There are three inputs to the hbaseBulkPut function. . The hbaseContext that carries the configuration boardcast information link us to the HBase Connections in the executors . The table name of the table we are putting data into . A function that will convert a record in the DStream into an HBase Put object.

106. Bulk Load

There are two options for bulk loading data into HBase with Spark. There is the basic bulk load functionality that will work for cases where your rows have millions of columns and cases where your columns are not consolidated and partitions before the on the map side of the Spark bulk load process.

There is also a thin record bulk load option with Spark, this second option is designed for tables that have less then 10k columns per row. The advantage of this second option is higher throughput and less over all load on the Spark shuffle operation.

Both implementations work more or less like the MapReduce bulk load process in that a partitioner partitions the rowkeys based on region splits and the row keys are sent to the reducers in order, so that HFiles can be written out directly from the reduce phase.

In Spark terms, the bulk load will be implemented around a the Spark repartitionAndSortWithinPartitions followed by a Spark foreachPartition.

First lets look at an example of using the basic bulk load functionality

Example 33. Bulk Loading Example

The following example shows bulk loading with Spark.

  1. val sc = new SparkContext("local", "test")
  2. val config = new HBaseConfiguration()
  3. val hbaseContext = new HBaseContext(sc, config)
  4. val stagingFolder = ...
  5. val rdd = sc.parallelize(Array(
  6. (Bytes.toBytes("1"),
  7. (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
  8. (Bytes.toBytes("3"),
  9. (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
  10. rdd.hbaseBulkLoad(TableName.valueOf(tableName),
  11. t => {
  12. val rowKey = t._1
  13. val family:Array[Byte] = t._2(0)._1
  14. val qualifier = t._2(0)._2
  15. val value = t._2(0)._3
  16. val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
  17. Seq((keyFamilyQualifier, value)).iterator
  18. },
  19. stagingFolder.getPath)
  20. val load = new LoadIncrementalHFiles(config)
  21. load.doBulkLoad(new Path(stagingFolder.getPath),
  22. conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))

The hbaseBulkLoad function takes three required parameters:

  1. The table name of the table we intend to bulk load too

  2. A function that will convert a record in the RDD to a tuple key value par. With the tuple key being a KeyFamilyQualifer object and the value being the cell value. The KeyFamilyQualifer object will hold the RowKey, Column Family, and Column Qualifier. The shuffle will partition on the RowKey but will sort by all three values.

  3. The temporary path for the HFile to be written out too

Following the Spark bulk load command, use the HBase’s LoadIncrementalHFiles object to load the newly created HFiles into HBase.

Additional Parameters for Bulk Loading with Spark

You can set the following attributes with additional parameter options on hbaseBulkLoad.

  • Max file size of the HFiles

  • A flag to exclude HFiles from compactions

  • Column Family settings for compression, bloomType, blockSize, and dataBlockEncoding

Example 34. Using Additional Parameters

  1. val sc = new SparkContext("local", "test")
  2. val config = new HBaseConfiguration()
  3. val hbaseContext = new HBaseContext(sc, config)
  4. val stagingFolder = ...
  5. val rdd = sc.parallelize(Array(
  6. (Bytes.toBytes("1"),
  7. (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
  8. (Bytes.toBytes("3"),
  9. (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
  10. val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions]
  11. val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX")
  12. familyHBaseWriterOptions.put(Bytes.toBytes("columnFamily1"), f1Options)
  13. rdd.hbaseBulkLoad(TableName.valueOf(tableName),
  14. t => {
  15. val rowKey = t._1
  16. val family:Array[Byte] = t._2(0)._1
  17. val qualifier = t._2(0)._2
  18. val value = t._2(0)._3
  19. val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
  20. Seq((keyFamilyQualifier, value)).iterator
  21. },
  22. stagingFolder.getPath,
  23. familyHBaseWriterOptions,
  24. compactionExclude = false,
  25. HConstants.DEFAULT_MAX_FILE_SIZE)
  26. val load = new LoadIncrementalHFiles(config)
  27. load.doBulkLoad(new Path(stagingFolder.getPath),
  28. conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))

Now lets look at how you would call the thin record bulk load implementation

Example 35. Using thin record bulk load

  1. val sc = new SparkContext("local", "test")
  2. val config = new HBaseConfiguration()
  3. val hbaseContext = new HBaseContext(sc, config)
  4. val stagingFolder = ...
  5. val rdd = sc.parallelize(Array(
  6. ("1",
  7. (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
  8. ("3",
  9. (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
  10. rdd.hbaseBulkLoadThinRows(hbaseContext,
  11. TableName.valueOf(tableName),
  12. t => {
  13. val rowKey = t._1
  14. val familyQualifiersValues = new FamiliesQualifiersValues
  15. t._2.foreach(f => {
  16. val family:Array[Byte] = f._1
  17. val qualifier = f._2
  18. val value:Array[Byte] = f._3
  19. familyQualifiersValues +=(family, qualifier, value)
  20. })
  21. (new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues)
  22. },
  23. stagingFolder.getPath,
  24. new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions],
  25. compactionExclude = false,
  26. 20)
  27. val load = new LoadIncrementalHFiles(config)
  28. load.doBulkLoad(new Path(stagingFolder.getPath),
  29. conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))

Note that the big difference in using bulk load for thin rows is the function returns a tuple with the first value being the row key and the second value being an object of FamiliesQualifiersValues, which will contain all the values for this row for all column families.

107. SparkSQL/DataFrames

HBase-Spark Connector (in HBase-Spark Module) leverages DataSource API (SPARK-3247) introduced in Spark-1.2.0, bridges the gap between simple HBase KV store and complex relational SQL queries and enables users to perform complex data analytical work on top of HBase using Spark. HBase Dataframe is a standard Spark Dataframe, and is able to interact with any other data sources such as Hive, Orc, Parquet, JSON, etc. HBase-Spark Connector applies critical techniques such as partition pruning, column pruning, predicate pushdown and data locality.

To use HBase-Spark connector, users need to define the Catalog for the schema mapping between HBase and Spark tables, prepare the data and populate the HBase table, then load HBase DataFrame. After that, users can do integrated query and access records in HBase table with SQL query. Following illustrates the basic procedure.

107.1. Define catalog

  1. def catalog = s"""{
  2. |"table":{"namespace":"default", "name":"table1"},
  3. |"rowkey":"key",
  4. |"columns":{
  5. |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
  6. |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
  7. |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
  8. |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
  9. |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
  10. |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
  11. |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
  12. |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
  13. |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
  14. |}
  15. |}""".stripMargin

Catalog defines a mapping between HBase and Spark tables. There are two critical parts of this catalog. One is the rowkey definition and the other is the mapping between table column in Spark and the column family and column qualifier in HBase. The above defines a schema for a HBase table with name as table1, row key as key and a number of columns (col1 - col8). Note that the rowkey also has to be defined in details as a column (col0), which has a specific cf (rowkey).

107.2. Save the DataFrame

  1. case class HBaseRecord(
  2. col0: String,
  3. col1: Boolean,
  4. col2: Double,
  5. col3: Float,
  6. col4: Int,
  7. col5: Long,
  8. col6: Short,
  9. col7: String,
  10. col8: Byte)
  11. object HBaseRecord
  12. {
  13. def apply(i: Int, t: String): HBaseRecord = {
  14. val s = s"""row${"%03d".format(i)}"""
  15. HBaseRecord(s,
  16. i % 2 == 0,
  17. i.toDouble,
  18. i.toFloat,
  19. i,
  20. i.toLong,
  21. i.toShort,
  22. s"String$i: $t",
  23. i.toByte)
  24. }
  25. }
  26. val data = (0 to 255).map { i => HBaseRecord(i, "extra")}
  27. sc.parallelize(data).toDF.write.options(
  28. Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
  29. .format("org.apache.hadoop.hbase.spark ")
  30. .save()

data prepared by the user is a local Scala collection which has 256 HBaseRecord objects. sc.parallelize(data) function distributes data to form an RDD. toDF returns a DataFrame. write function returns a DataFrameWriter used to write the DataFrame to external storage systems (e.g. HBase here). Given a DataFrame with specified schema catalog, save function will create an HBase table with 5 regions and save the DataFrame inside.

107.3. Load the DataFrame

  1. def withCatalog(cat: String): DataFrame = {
  2. sqlContext
  3. .read
  4. .options(Map(HBaseTableCatalog.tableCatalog->cat))
  5. .format("org.apache.hadoop.hbase.spark")
  6. .load()
  7. }
  8. val df = withCatalog(catalog)

In ‘withCatalog’ function, sqlContext is a variable of SQLContext, which is the entry point for working with structured data (rows and columns) in Spark. read returns a DataFrameReader that can be used to read data in as a DataFrame. option function adds input options for the underlying data source to the DataFrameReader, and format function specifies the input data source format for the DataFrameReader. The load() function loads input in as a DataFrame. The date frame df returned by withCatalog function could be used to access HBase table, such as 4.4 and 4.5.

107.4. Language Integrated Query

  1. val s = df.filter(($"col0" <= "row050" && $"col0" > "row040") ||
  2. $"col0" === "row005" ||
  3. $"col0" <= "row005")
  4. .select("col0", "col1", "col4")
  5. s.show

DataFrame can do various operations, such as join, sort, select, filter, orderBy and so on. df.filter above filters rows using the given SQL expression. select selects a set of columns: col0, col1 and col4.

107.5. SQL Query

  1. df.registerTempTable("table1")
  2. sqlContext.sql("select count(col1) from table1").show

registerTempTable registers df DataFrame as a temporary table using the table name table1. The lifetime of this temporary table is tied to the SQLContext that was used to create df. sqlContext.sql function allows the user to execute SQL queries.

107.6. Others

Example 36. Query with different timestamps

In HBaseSparkConf, four parameters related to timestamp can be set. They are TIMESTAMP, MIN_TIMESTAMP, MAX_TIMESTAMP and MAX_VERSIONS respectively. Users can query records with different timestamps or time ranges with MIN_TIMESTAMP and MAX_TIMESTAMP. In the meantime, use concrete value instead of tsSpecified and oldMs in the examples below.

The example below shows how to load df DataFrame with different timestamps. tsSpecified is specified by the user. HBaseTableCatalog defines the HBase and Relation relation schema. writeCatalog defines catalog for the schema mapping.

  1. val df = sqlContext.read
  2. .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMESTAMP -> tsSpecified.toString))
  3. .format("org.apache.hadoop.hbase.spark")
  4. .load()

The example below shows how to load df DataFrame with different time ranges. oldMs is specified by the user.

  1. val df = sqlContext.read
  2. .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
  3. HBaseSparkConf.MAX_TIMESTAMP -> oldMs.toString))
  4. .format("org.apache.hadoop.hbase.spark")
  5. .load()

After loading df DataFrame, users can query data.

  1. df.registerTempTable("table")
  2. sqlContext.sql("select count(col1) from table").show

Example 37. Native Avro support

HBase-Spark Connector support different data formats like Avro, Jason, etc. The use case below shows how spark supports Avro. User can persist the Avro record into HBase directly. Internally, the Avro schema is converted to a native Spark Catalyst data type automatically. Note that both key-value parts in an HBase table can be defined in Avro format.

  1. Define catalog for the schema mapping:
  1. def catalog = s"""{
  2. |"table":{"namespace":"default", "name":"Avrotable"},
  3. |"rowkey":"key",
  4. |"columns":{
  5. |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
  6. |"col1":{"cf":"cf1", "col":"col1", "type":"binary"}
  7. |}
  8. |}""".stripMargin

catalog is a schema for a HBase table named Avrotable. row key as key and one column col1. The rowkey also has to be defined in details as a column (col0), which has a specific cf (rowkey).

  1. Prepare the Data:
  1. object AvroHBaseRecord {
  2. val schemaString =
  3. s"""{"namespace": "example.avro",
  4. | "type": "record", "name": "User",
  5. | "fields": [
  6. | {"name": "name", "type": "string"},
  7. | {"name": "favorite_number", "type": ["int", "null"]},
  8. | {"name": "favorite_color", "type": ["string", "null"]},
  9. | {"name": "favorite_array", "type": {"type": "array", "items": "string"}},
  10. | {"name": "favorite_map", "type": {"type": "map", "values": "int"}}
  11. | ] }""".stripMargin
  12. val avroSchema: Schema = {
  13. val p = new Schema.Parser
  14. p.parse(schemaString)
  15. }
  16. def apply(i: Int): AvroHBaseRecord = {
  17. val user = new GenericData.Record(avroSchema);
  18. user.put("name", s"name${"%03d".format(i)}")
  19. user.put("favorite_number", i)
  20. user.put("favorite_color", s"color${"%03d".format(i)}")
  21. val favoriteArray = new GenericData.Array[String](00a6a299767754da45e070dfd502eaf4).schema())
  22. favoriteArray.add(s"number${i}")
  23. favoriteArray.add(s"number${i+1}")
  24. user.put("favorite_array", favoriteArray)
  25. import collection.JavaConverters._
  26. val favoriteMap = Map[String, Int](45058b113c9086716e42c54aeb4f67eb), ("key2" -> (i+1))).asJava
  27. user.put("favorite_map", favoriteMap)
  28. val avroByte = AvroSedes.serialize(user, avroSchema)
  29. AvroHBaseRecord(s"name${"%03d".format(i)}", avroByte)
  30. }
  31. }
  32. val data = (0 to 255).map { i =>
  33. AvroHBaseRecord(i)
  34. }

schemaString is defined first, then it is parsed to get avroSchema. avroSchema is used to generate AvroHBaseRecord. data prepared by users is a local Scala collection which has 256 AvroHBaseRecord objects.

  1. Save DataFrame:
  1. sc.parallelize(data).toDF.write.options(
  2. Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
  3. .format("org.apache.spark.sql.execution.datasources.hbase")
  4. .save()

Given a data frame with specified schema catalog, above will create an HBase table with 5 regions and save the data frame inside.

  1. Load the DataFrame
  1. def avroCatalog = s"""{
  2. |"table":{"namespace":"default", "name":"avrotable"},
  3. |"rowkey":"key",
  4. |"columns":{
  5. |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
  6. |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
  7. |}
  8. |}""".stripMargin
  9. def withCatalog(cat: String): DataFrame = {
  10. sqlContext
  11. .read
  12. .options(Map("avroSchema" -> AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog -> avroCatalog))
  13. .format("org.apache.spark.sql.execution.datasources.hbase")
  14. .load()
  15. }
  16. val df = withCatalog(catalog)

In withCatalog function, read returns a DataFrameReader that can be used to read data in as a DataFrame. The option function adds input options for the underlying data source to the DataFrameReader. There are two options: one is to set avroSchema as AvroHBaseRecord.schemaString, and one is to set HBaseTableCatalog.tableCatalog as avroCatalog. The load() function loads input in as a DataFrame. The date frame df returned by withCatalog function could be used to access the HBase table.

  1. SQL Query
  1. df.registerTempTable("avrotable")
  2. val c = sqlContext.sql("select count(1) from avrotable").

After loading df DataFrame, users can query data. registerTempTable registers df DataFrame as a temporary table using the table name avrotable. sqlContext.sql function allows the user to execute SQL queries.