Apache MapReduce is a software framework used to analyze large amounts of data. It is provided by Apache Hadoop. MapReduce itself is out of the scope of this document. A good place to get started with MapReduce is https://hadoop.apache.org/docs/r2.6.0/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html. MapReduce version 2 (MR2)is now part of YARN.

This chapter discusses specific configuration steps you need to take to use MapReduce on data within HBase. In addition, it discusses other interactions and issues between HBase and MapReduce jobs. Finally, it discusses Cascading, an alternative API for MapReduce.

mapred and `mapreduce

There are two mapreduce packages in HBase as in MapReduce itself: org.apache.hadoop.hbase.mapred and org.apache.hadoop.hbase.mapreduce. The former does old-style API and the latter the new mode. The latter has more facility though you can usually find an equivalent in the older package. Pick the package that goes with your MapReduce deploy. When in doubt or starting over, pick org.apache.hadoop.hbase.mapreduce. In the notes below, we refer to o.a.h.h.mapreduce but replace with o.a.h.h.mapred if that is what you are using.

48. HBase, MapReduce, and the CLASSPATH

By default, MapReduce jobs deployed to a MapReduce cluster do not have access to either the HBase configuration under $HBASE_CONF_DIR or the HBase classes.

To give the MapReduce jobs the access they need, you could add hbase-site.xml_to $HADOOPHOME/conf and add HBase jars to the $HADOOP_HOME/lib directory. You would then need to copy these changes across your cluster. Or you could edit $HADOOP_HOME/conf/hadoop-env.sh and add hbase dependencies to the HADOOP_CLASSPATH variable. Neither of these approaches is recommended because it will pollute your Hadoop install with HBase references. It also requires you restart the Hadoop cluster before Hadoop can use the HBase data.

The recommended approach is to let HBase add its dependency jars and use HADOOP_CLASSPATH or -libjars.

Since HBase 0.90.x, HBase adds its dependency JARs to the job configuration itself. The dependencies only need to be available on the local CLASSPATH and from here they’ll be picked up and bundled into the fat job jar deployed to the MapReduce cluster. A basic trick just passes the full hbase classpath — all hbase and dependent jars as well as configurations — to the mapreduce job runner letting hbase utility pick out from the full-on classpath what it needs adding them to the MapReduce job configuration (See the source at TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job) for how this is done).

The following example runs the bundled HBase RowCounter MapReduce job against a table named usertable. It sets into HADOOP_CLASSPATH the jars hbase needs to run in an MapReduce context (including configuration files such as hbase-site.xml). Be sure to use the correct version of the HBase JAR for your system; replace the VERSION string in the below command line w/ the version of your local hbase install. The backticks (`` symbols) cause the shell to execute the sub-commands, setting the output ofhbase classpathintoHADOOP_CLASSPATH`. This example assumes you use a BASH-compatible shell.

  1. $ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
  2. ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-mapreduce-VERSION.jar \
  3. org.apache.hadoop.hbase.mapreduce.RowCounter usertable

The above command will launch a row counting mapreduce job against the hbase cluster that is pointed to by your local configuration on a cluster that the hadoop configs are pointing to.

The main for the hbase-mapreduce.jar is a Driver that lists a few basic mapreduce tasks that ship with hbase. For example, presuming your install is hbase 2.0.0-SNAPSHOT:

  1. $ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
  2. ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-mapreduce-2.0.0-SNAPSHOT.jar
  3. An example program must be given as the first argument.
  4. Valid program names are:
  5. CellCounter: Count cells in HBase table.
  6. WALPlayer: Replay WAL files.
  7. completebulkload: Complete a bulk data load.
  8. copytable: Export a table from local cluster to peer cluster.
  9. export: Write table data to HDFS.
  10. exportsnapshot: Export the specific snapshot to a given FileSystem.
  11. import: Import data written by Export.
  12. importtsv: Import data in TSV format.
  13. rowcounter: Count rows in HBase table.
  14. verifyrep: Compare the data from tables in two different clusters. WARNING: It doesn't work for incrementColumnValues'd cells since the timestamp is changed after being appended to the log.

You can use the above listed shortnames for mapreduce jobs as in the below re-run of the row counter job (again, presuming your install is hbase 2.0.0-SNAPSHOT):

  1. $ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
  2. ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-mapreduce-2.0.0-SNAPSHOT.jar \
  3. rowcounter usertable

You might find the more selective hbase mapredcp tool output of interest; it lists the minimum set of jars needed to run a basic mapreduce job against an hbase install. It does not include configuration. You’ll probably need to add these if you want your MapReduce job to find the target cluster. You’ll probably have to also add pointers to extra jars once you start to do anything of substance. Just specify the extras by passing the system propery -Dtmpjars when you run hbase mapredcp.

For jobs that do not package their dependencies or call TableMapReduceUtil#addDependencyJars, the following command structure is necessary:

  1. $ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf hadoop jar MyApp.jar MyJobMainClass -libjars $(${HBASE_HOME}/bin/hbase mapredcp | tr ':' ',') ...

| |

The example may not work if you are running HBase from its build directory rather than an installed location. You may see an error like the following:

  1. java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.mapreduce.RowCounter$RowCounterMapper

If this occurs, try modifying the command as follows, so that it uses the HBase JARs from the target/ directory within the build environment.

  1. $ HADOOP_CLASSPATH=${HBASE_BUILD_HOME}/hbase-mapreduce/target/hbase-mapreduce-VERSION-SNAPSHOT.jar:`${HBASE_BUILD_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_BUILD_HOME}/hbase-mapreduce/target/hbase-mapreduce-VERSION-SNAPSHOT.jar rowcounter usertable

|

Notice to MapReduce users of HBase between 0.96.1 and 0.98.4

Some MapReduce jobs that use HBase fail to launch. The symptom is an exception similar to the following:

  1. Exception in thread "main" java.lang.IllegalAccessError: class
  2. com.google.protobuf.ZeroCopyLiteralByteString cannot access its superclass
  3. com.google.protobuf.LiteralByteString
  4. at java.lang.ClassLoader.defineClass1(Native Method)
  5. at java.lang.ClassLoader.defineClass(ClassLoader.java:792)
  6. at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
  7. at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
  8. at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
  9. at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
  10. at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
  11. at java.security.AccessController.doPrivileged(Native Method)
  12. at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
  13. at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  14. at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  15. at
  16. org.apache.hadoop.hbase.protobuf.ProtobufUtil.toScan(ProtobufUtil.java:818)
  17. at
  18. org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.convertScanToString(TableMapReduceUtil.java:433)
  19. at
  20. org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:186)
  21. at
  22. org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:147)
  23. at
  24. org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:270)
  25. at
  26. org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:100)
  27. ...

This is caused by an optimization introduced in HBASE-9867 that inadvertently introduced a classloader dependency.

This affects both jobs using the -libjars option and “fat jar,” those which package their runtime dependencies in a nested lib folder.

In order to satisfy the new classloader requirements, hbase-protocol.jar must be included in Hadoop’s classpath. See HBase, MapReduce, and the CLASSPATH for current recommendations for resolving classpath errors. The following is included for historical purposes.

This can be resolved system-wide by including a reference to the hbase-protocol.jar in Hadoop’s lib directory, via a symlink or by copying the jar into the new location.

This can also be achieved on a per-job launch basis by including it in the HADOOP_CLASSPATH environment variable at job submission time. When launching jobs that package their dependencies, all three of the following job launching commands satisfy this requirement:

  1. $ HADOOP_CLASSPATH=/path/to/hbase-protocol.jar:/path/to/hbase/conf hadoop jar MyJob.jar MyJobMainClass
  2. $ HADOOP_CLASSPATH=$(hbase mapredcp):/path/to/hbase/conf hadoop jar MyJob.jar MyJobMainClass
  3. $ HADOOP_CLASSPATH=$(hbase classpath) hadoop jar MyJob.jar MyJobMainClass

For jars that do not package their dependencies, the following command structure is necessary:

  1. $ HADOOP_CLASSPATH=$(hbase mapredcp):/etc/hbase/conf hadoop jar MyApp.jar MyJobMainClass -libjars $(hbase mapredcp | tr ':' ',') ...

See also HBASE-10304 for further discussion of this issue.

49. MapReduce Scan Caching

TableMapReduceUtil now restores the option to set scanner caching (the number of rows which are cached before returning the result to the client) on the Scan object that is passed in. This functionality was lost due to a bug in HBase 0.95 (HBASE-11558), which is fixed for HBase 0.98.5 and 0.96.3. The priority order for choosing the scanner caching is as follows:

  1. Caching settings which are set on the scan object.

  2. Caching settings which are specified via the configuration option hbase.client.scanner.caching, which can either be set manually in hbase-site.xml or via the helper method TableMapReduceUtil.setScannerCaching().

  3. The default value HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING, which is set to 100.

Optimizing the caching settings is a balance between the time the client waits for a result and the number of sets of results the client needs to receive. If the caching setting is too large, the client could end up waiting for a long time or the request could even time out. If the setting is too small, the scan needs to return results in several pieces. If you think of the scan as a shovel, a bigger cache setting is analogous to a bigger shovel, and a smaller cache setting is equivalent to more shoveling in order to fill the bucket.

The list of priorities mentioned above allows you to set a reasonable default, and override it for specific operations.

See the API documentation for Scan for more details.

50. Bundled HBase MapReduce Jobs

The HBase JAR also serves as a Driver for some bundled MapReduce jobs. To learn about the bundled MapReduce jobs, run the following command.

  1. $ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-mapreduce-VERSION.jar
  2. An example program must be given as the first argument.
  3. Valid program names are:
  4. copytable: Export a table from local cluster to peer cluster
  5. completebulkload: Complete a bulk data load.
  6. export: Write table data to HDFS.
  7. import: Import data written by Export.
  8. importtsv: Import data in TSV format.
  9. rowcounter: Count rows in HBase table

Each of the valid program names are bundled MapReduce jobs. To run one of the jobs, model your command after the following example.

  1. $ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-mapreduce-VERSION.jar rowcounter myTable

51. HBase as a MapReduce Job Data Source and Data Sink

HBase can be used as a data source, TableInputFormat, and data sink, TableOutputFormat or MultiTableOutputFormat, for MapReduce jobs. Writing MapReduce jobs that read or write HBase, it is advisable to subclass TableMapper and/or TableReducer. See the do-nothing pass-through classes IdentityTableMapper and IdentityTableReducer for basic usage. For a more involved example, see RowCounter or review the org.apache.hadoop.hbase.mapreduce.TestTableMapReduce unit test.

If you run MapReduce jobs that use HBase as source or sink, need to specify source and sink table and column names in your configuration.

When you read from HBase, the TableInputFormat requests the list of regions from HBase and makes a map, which is either a map-per-region or mapreduce.job.maps map, whichever is smaller. If your job only has two maps, raise mapreduce.job.maps to a number greater than the number of regions. Maps will run on the adjacent TaskTracker/NodeManager if you are running a TaskTracer/NodeManager and RegionServer per node. When writing to HBase, it may make sense to avoid the Reduce step and write back into HBase from within your map. This approach works when your job does not need the sort and collation that MapReduce does on the map-emitted data. On insert, HBase ‘sorts’ so there is no point double-sorting (and shuffling data around your MapReduce cluster) unless you need to. If you do not need the Reduce, your map might emit counts of records processed for reporting at the end of the job, or set the number of Reduces to zero and use TableOutputFormat. If running the Reduce step makes sense in your case, you should typically use multiple reducers so that load is spread across the HBase cluster.

A new HBase partitioner, the HRegionPartitioner, can run as many reducers the number of existing regions. The HRegionPartitioner is suitable when your table is large and your upload will not greatly alter the number of existing regions upon completion. Otherwise use the default partitioner.

52. Writing HFiles Directly During Bulk Import

If you are importing into a new table, you can bypass the HBase API and write your content directly to the filesystem, formatted into HBase data files (HFiles). Your import will run faster, perhaps an order of magnitude faster. For more on how this mechanism works, see Bulk Loading.

53. RowCounter Example

The included RowCounter MapReduce job uses TableInputFormat and does a count of all rows in the specified table. To run it, use the following command:

  1. $ ./bin/hadoop jar hbase-X.X.X.jar

This will invoke the HBase MapReduce Driver class. Select rowcounter from the choice of jobs offered. This will print rowcounter usage advice to standard output. Specify the tablename, column to count, and output directory. If you have classpath errors, see HBase, MapReduce, and the CLASSPATH.

54. Map-Task Splitting

54.1. The Default HBase MapReduce Splitter

When TableInputFormat is used to source an HBase table in a MapReduce job, its splitter will make a map task for each region of the table. Thus, if there are 100 regions in the table, there will be 100 map-tasks for the job - regardless of how many column families are selected in the Scan.

54.2. Custom Splitters

For those interested in implementing custom splitters, see the method getSplits in TableInputFormatBase. That is where the logic for map-task assignment resides.

55. HBase MapReduce Examples

55.1. HBase MapReduce Read Example

The following is an example of using HBase as a MapReduce source in read-only manner. Specifically, there is a Mapper instance but no Reducer, and nothing is being emitted from the Mapper. The job would be defined as follows…

  1. Configuration config = HBaseConfiguration.create();
  2. Job job = new Job(config, "ExampleRead");
  3. job.setJarByClass(MyReadJob.class); // class that contains mapper
  4. Scan scan = new Scan();
  5. scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
  6. scan.setCacheBlocks(false); // don't set to true for MR jobs
  7. // set other scan attrs
  8. ...
  9. TableMapReduceUtil.initTableMapperJob(
  10. tableName, // input HBase table name
  11. scan, // Scan instance to control CF and attribute selection
  12. MyMapper.class, // mapper
  13. null, // mapper output key
  14. null, // mapper output value
  15. job);
  16. job.setOutputFormatClass(NullOutputFormat.class); // because we aren't emitting anything from mapper
  17. boolean b = job.waitForCompletion(true);
  18. if (!b) {
  19. throw new IOException("error with job!");
  20. }

…and the mapper instance would extend TableMapper

  1. public static class MyMapper extends TableMapper<Text, Text> {
  2. public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException {
  3. // process data for the row from the Result instance.
  4. }
  5. }

55.2. HBase MapReduce Read/Write Example

The following is an example of using HBase both as a source and as a sink with MapReduce. This example will simply copy data from one table to another.

  1. Configuration config = HBaseConfiguration.create();
  2. Job job = new Job(config,"ExampleReadWrite");
  3. job.setJarByClass(MyReadWriteJob.class); // class that contains mapper
  4. Scan scan = new Scan();
  5. scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
  6. scan.setCacheBlocks(false); // don't set to true for MR jobs
  7. // set other scan attrs
  8. TableMapReduceUtil.initTableMapperJob(
  9. sourceTable, // input table
  10. scan, // Scan instance to control CF and attribute selection
  11. MyMapper.class, // mapper class
  12. null, // mapper output key
  13. null, // mapper output value
  14. job);
  15. TableMapReduceUtil.initTableReducerJob(
  16. targetTable, // output table
  17. null, // reducer class
  18. job);
  19. job.setNumReduceTasks(0);
  20. boolean b = job.waitForCompletion(true);
  21. if (!b) {
  22. throw new IOException("error with job!");
  23. }

An explanation is required of what TableMapReduceUtil is doing, especially with the reducer. TableOutputFormat is being used as the outputFormat class, and several parameters are being set on the config (e.g., TableOutputFormat.OUTPUT_TABLE), as well as setting the reducer output key to ImmutableBytesWritable and reducer value to Writable. These could be set by the programmer on the job and conf, but TableMapReduceUtil tries to make things easier.

The following is the example mapper, which will create a Put and matching the input Result and emit it. Note: this is what the CopyTable utility does.

  1. public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> {
  2. public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
  3. // this example is just copying the data from the source table...
  4. context.write(row, resultToPut(row,value));
  5. }
  6. private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {
  7. Put put = new Put(key.get());
  8. for (KeyValue kv : result.raw()) {
  9. put.add(kv);
  10. }
  11. return put;
  12. }
  13. }

There isn’t actually a reducer step, so TableOutputFormat takes care of sending the Put to the target table.

This is just an example, developers could choose not to use TableOutputFormat and connect to the target table themselves.

55.3. HBase MapReduce Read/Write Example With Multi-Table Output

TODO: example for MultiTableOutputFormat.

55.4. HBase MapReduce Summary to HBase Example

The following example uses HBase as a MapReduce source and sink with a summarization step. This example will count the number of distinct instances of a value in a table and write those summarized counts in another table.

  1. Configuration config = HBaseConfiguration.create();
  2. Job job = new Job(config,"ExampleSummary");
  3. job.setJarByClass(MySummaryJob.class); // class that contains mapper and reducer
  4. Scan scan = new Scan();
  5. scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
  6. scan.setCacheBlocks(false); // don't set to true for MR jobs
  7. // set other scan attrs
  8. TableMapReduceUtil.initTableMapperJob(
  9. sourceTable, // input table
  10. scan, // Scan instance to control CF and attribute selection
  11. MyMapper.class, // mapper class
  12. Text.class, // mapper output key
  13. IntWritable.class, // mapper output value
  14. job);
  15. TableMapReduceUtil.initTableReducerJob(
  16. targetTable, // output table
  17. MyTableReducer.class, // reducer class
  18. job);
  19. job.setNumReduceTasks(1); // at least one, adjust as required
  20. boolean b = job.waitForCompletion(true);
  21. if (!b) {
  22. throw new IOException("error with job!");
  23. }

In this example mapper a column with a String-value is chosen as the value to summarize upon. This value is used as the key to emit from the mapper, and an IntWritable represents an instance counter.

  1. public static class MyMapper extends TableMapper<Text, IntWritable> {
  2. public static final byte[] CF = "cf".getBytes();
  3. public static final byte[] ATTR1 = "attr1".getBytes();
  4. private final IntWritable ONE = new IntWritable(1);
  5. private Text text = new Text();
  6. public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
  7. String val = new String(value.getValue(CF, ATTR1));
  8. text.set(val); // we can only emit Writables...
  9. context.write(text, ONE);
  10. }
  11. }

In the reducer, the “ones” are counted (just like any other MR example that does this), and then emits a Put.

  1. public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
  2. public static final byte[] CF = "cf".getBytes();
  3. public static final byte[] COUNT = "count".getBytes();
  4. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  5. int i = 0;
  6. for (IntWritable val : values) {
  7. i += val.get();
  8. }
  9. Put put = new Put(Bytes.toBytes(key.toString()));
  10. put.add(CF, COUNT, Bytes.toBytes(i));
  11. context.write(null, put);
  12. }
  13. }

55.5. HBase MapReduce Summary to File Example

This very similar to the summary example above, with exception that this is using HBase as a MapReduce source but HDFS as the sink. The differences are in the job setup and in the reducer. The mapper remains the same.

  1. Configuration config = HBaseConfiguration.create();
  2. Job job = new Job(config,"ExampleSummaryToFile");
  3. job.setJarByClass(MySummaryFileJob.class); // class that contains mapper and reducer
  4. Scan scan = new Scan();
  5. scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
  6. scan.setCacheBlocks(false); // don't set to true for MR jobs
  7. // set other scan attrs
  8. TableMapReduceUtil.initTableMapperJob(
  9. sourceTable, // input table
  10. scan, // Scan instance to control CF and attribute selection
  11. MyMapper.class, // mapper class
  12. Text.class, // mapper output key
  13. IntWritable.class, // mapper output value
  14. job);
  15. job.setReducerClass(MyReducer.class); // reducer class
  16. job.setNumReduceTasks(1); // at least one, adjust as required
  17. FileOutputFormat.setOutputPath(job, new Path("/tmp/mr/mySummaryFile")); // adjust directories as required
  18. boolean b = job.waitForCompletion(true);
  19. if (!b) {
  20. throw new IOException("error with job!");
  21. }

As stated above, the previous Mapper can run unchanged with this example. As for the Reducer, it is a “generic” Reducer instead of extending TableMapper and emitting Puts.

  1. public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  2. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  3. int i = 0;
  4. for (IntWritable val : values) {
  5. i += val.get();
  6. }
  7. context.write(key, new IntWritable(i));
  8. }
  9. }

55.6. HBase MapReduce Summary to HBase Without Reducer

It is also possible to perform summaries without a reducer - if you use HBase as the reducer.

An HBase target table would need to exist for the job summary. The Table method incrementColumnValue would be used to atomically increment values. From a performance perspective, it might make sense to keep a Map of values with their values to be incremented for each map-task, and make one update per key at during the cleanup method of the mapper. However, your mileage may vary depending on the number of rows to be processed and unique keys.

In the end, the summary results are in HBase.

55.7. HBase MapReduce Summary to RDBMS

Sometimes it is more appropriate to generate summaries to an RDBMS. For these cases, it is possible to generate summaries directly to an RDBMS via a custom reducer. The setup method can connect to an RDBMS (the connection information can be passed via custom parameters in the context) and the cleanup method can close the connection.

It is critical to understand that number of reducers for the job affects the summarization implementation, and you’ll have to design this into your reducer. Specifically, whether it is designed to run as a singleton (one reducer) or multiple reducers. Neither is right or wrong, it depends on your use-case. Recognize that the more reducers that are assigned to the job, the more simultaneous connections to the RDBMS will be created - this will scale, but only to a point.

  1. public static class MyRdbmsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  2. private Connection c = null;
  3. public void setup(Context context) {
  4. // create DB connection...
  5. }
  6. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  7. // do summarization
  8. // in this example the keys are Text, but this is just an example
  9. }
  10. public void cleanup(Context context) {
  11. // close db connection
  12. }
  13. }

In the end, the summary results are written to your RDBMS table/s.

56. Accessing Other HBase Tables in a MapReduce Job

Although the framework currently allows one HBase table as input to a MapReduce job, other HBase tables can be accessed as lookup tables, etc., in a MapReduce job via creating an Table instance in the setup method of the Mapper.

  1. public class MyMapper extends TableMapper<Text, LongWritable> {
  2. private Table myOtherTable;
  3. public void setup(Context context) {
  4. // In here create a Connection to the cluster and save it or use the Connection
  5. // from the existing table
  6. myOtherTable = connection.getTable("myOtherTable");
  7. }
  8. public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
  9. // process Result...
  10. // use 'myOtherTable' for lookups
  11. }

57. Speculative Execution

It is generally advisable to turn off speculative execution for MapReduce jobs that use HBase as a source. This can either be done on a per-Job basis through properties, or on the entire cluster. Especially for longer running jobs, speculative execution will create duplicate map-tasks which will double-write your data to HBase; this is probably not what you want.

See spec.ex for more information.

58. Cascading

Cascading is an alternative API for MapReduce, which actually uses MapReduce, but allows you to write your MapReduce code in a simplified way.

The following example shows a Cascading Flow which “sinks” data into an HBase cluster. The same hBaseTap API could be used to “source” data as well.

  1. // read data from the default filesystem
  2. // emits two fields: "offset" and "line"
  3. Tap source = new Hfs( new TextLine(), inputFileLhs );
  4. // store data in an HBase cluster
  5. // accepts fields "num", "lower", and "upper"
  6. // will automatically scope incoming fields to their proper familyname, "left" or "right"
  7. Fields keyFields = new Fields( "num" );
  8. String[] familyNames = {"left", "right"};
  9. Fields[] valueFields = new Fields[] {new Fields( "lower" ), new Fields( "upper" ) };
  10. Tap hBaseTap = new HBaseTap( "multitable", new HBaseScheme( keyFields, familyNames, valueFields ), SinkMode.REPLACE );
  11. // a simple pipe assembly to parse the input into fields
  12. // a real app would likely chain multiple Pipes together for more complex processing
  13. Pipe parsePipe = new Each( "insert", new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ), " " ) );
  14. // "plan" a cluster executable Flow
  15. // this connects the source Tap and hBaseTap (the sink Tap) to the parsePipe
  16. Flow parseFlow = new FlowConnector( properties ).connect( source, hBaseTap, parsePipe );
  17. // start the flow, and block until complete
  18. parseFlow.complete();
  19. // open an iterator on the HBase table we stuffed data into
  20. TupleEntryIterator iterator = parseFlow.openSink();
  21. while(iterator.hasNext())
  22. {
  23. // print out each tuple from HBase
  24. System.out.println( "iterator.next() = " + iterator.next() );
  25. }
  26. iterator.close();