Procedure v2 …aims to provide a unified way to build…multi-step procedures with a rollback/roll-forward ability in case of failure (e.g. create/delete table) — Matteo Bertozzi, the author of Pv2.

With Pv2 you can build and run state machines. It was built by Matteo to make distributed state transitions in HBase resilient in the face of process failures. Previous to Pv2, state transition handling was spread about the codebase with implementation varying by transition-type and context. Pv2 was inspired by FATE, of Apache Accumulo.

Early Pv2 aspects have been shipping in HBase with a good while now but it has continued to evolve as it takes on more involved scenarios. What we have now is powerful but intricate in operation and incomplete, in need of cleanup and hardening. In this doc we have given overview on the system so you can make use of it (and help with its polishing).

This system has the awkward name of Pv2 because HBase already had the notion of a Procedure used in snapshots (see hbase-server org.apache.hadoop.hbase.procedure as opposed to hbase-procedure org.apache.hadoop.hbase.procedure2). Pv2 supercedes and is to replace Procedure.

180. Procedures

A Procedure is a transform made on an HBase entity. Examples of HBase entities would be Regions and Tables.
Procedures are run by a ProcedureExecutor instance. Procedure current state is kept in the ProcedureStore.
The ProcedureExecutor has but a primitive view on what goes on inside a Procedure. From its PoV, Procedures are submitted and then the ProcedureExecutor keeps calling #execute(Object) until the Procedure is done. Execute may be called multiple times in the case of failure or restart, so Procedure code must be idempotent yielding the same result each time it run. Procedure code can also implement rollback so steps can be undone if failure. A call to execute() can result in one of following possibilities:

  • execute() returns

    • null: indicates we are done.

    • this: indicates there is more to do so, persist current procedure state and re-execute().

    • Array of sub-procedures: indicates a set of procedures needed to be run to completion before we can proceed (after which we expect the framework to call our execute again).

  • execute() throws exception

    • suspend: indicates execution of procedure is suspended and can be resumed due to some external event. The procedure state is persisted.

    • yield: procedure is added back to scheduler. The procedure state is not persisted.

    • interrupted: currently same as yield.

    • Any exception not listed above: Procedure state is changed to FAILED (after which we expect the framework will attempt rollback).

The ProcedureExecutor stamps the frameworks notions of Procedure State into the Procedure itself; e.g. it marks Procedures as INITIALIZING on submit. It moves the state to RUNNABLE when it goes to execute. When done, a Procedure gets marked FAILED or SUCCESS depending. Here is the list of all states as of this writing:

  • INITIALIZING Procedure in construction, not yet added to the executor

  • RUNNABLE Procedure added to the executor, and ready to be executed.

  • WAITING The procedure is waiting on children (subprocedures) to be completed

  • WAITING_TIMEOUT The procedure is waiting a timeout or an external event

  • ROLLEDBACK The procedure failed and was rolledback.

  • SUCCESS The procedure execution completed successfully.

  • FAILED The procedure execution failed, may need to rollback.

After each execute, the Procedure state is persisted to the ProcedureStore. Hooks are invoked on Procedures so they can preserve custom state. Post-fault, the ProcedureExecutor re-hydrates its pre-crash state by replaying the content of the ProcedureStore. This makes the Procedure Framework resilient against process failure.

180.1. Implementation

In implementation, Procedures tend to divide transforms into finer-grained tasks and while some of these work items are handed off to sub-procedures, the bulk are done as processing steps in-Procedure; each invocation of the execute is used to perform a single step, and then the Procedure relinquishes returning to the framework. The Procedure does its own tracking of where it is in the processing.

What comprises a sub-task, or step in the execution is up to the Procedure author but generally it is a small piece of work that cannot be further decomposed and that moves the processing forward toward its end state. Having procedures made of many small steps rather than a few large ones allows the Procedure framework give out insight on where we are in the processing. It also allows the framework be more fair in its execution. As stated per above, each step may be called multiple times (failure/restart) so steps must be implemented idempotent.
It is easy to confuse the state that the Procedure itself is keeping with that of the Framework itself. Try to keep them distinct.

180.2. Rollback

Rollback is called when the procedure or one of the sub-procedures has failed. The rollback step is supposed to cleanup the resources created during the execute() step. In case of failure and restart, rollback() may be called multiple times, so again the code must be idempotent.

180.3. Metrics

There are hooks for collecting metrics on submit of the procedure and on finish.

  • updateMetricsOnSubmit()

  • updateMetricsOnFinish()

Individual procedures can override these methods to collect procedure specific metrics. The default implementations of these methods try to get an object implementing an interface ProcedureMetrics which encapsulates following set of generic metrics:

  • SubmittedCount (Counter): Total number of procedure instances submitted of a type.

  • Time (Histogram): Histogram of runtime for procedure instances.

  • FailedCount (Counter): Total number of failed procedure instances.

Individual procedures can implement this object and define these generic set of metrics.

180.4. Baggage

Procedures can carry baggage. One example is the step the procedure last attained (see previous section); procedures persist the enum that marks where they are currently. Other examples might be the Region or Server name the Procedure is currently working. After each call to execute, the Procedure#serializeStateData is called. Procedures can persist whatever.

180.5. Result/State and Queries

(From Matteo’s ProcedureV2 and Notification Bus doc)
In the case of asynchronous operations, the result must be kept around until the client asks for it. Once we receive a “get” of the result we can schedule the delete of the record. For some operations the result may be “unnecessary” especially in case of failure (e.g. if the create table fail, we can query the operation result or we can just do a list table to see if it was created) so in some cases we can schedule the delete after a timeout. On the client side the operation will return a “Procedure ID”, this ID can be used to wait until the procedure is completed and get the result/exception.

  1. Admin.doOperation() { longprocId=master.doOperation(); master.waitCompletion(procId); } +

If the master goes down while performing the operation the backup master will pickup the half in­progress operation and complete it. The client will not notice the failure.

181. Subprocedures

Subprocedures are Procedure instances created and returned by #execute(Object) method of a procedure instance (parent procedure). As subprocedures are of type Procedure, they can instantiate their own subprocedures. As its a recursive, procedure stack is maintained by the framework. The framework makes sure that the parent procedure does not proceed till all sub-procedures and their subprocedures in a procedure stack are successfully finished.

182. ProcedureExecutor

ProcedureExecutor uses ProcedureStore and ProcedureScheduler and executes procedures submitted to it. Some of the basic operations supported are:

  • abort(procId): aborts specified procedure if its not finished

  • submit(Procedure): submits procedure for execution

  • retrieve: list of get methods to get Procedure instances and results

  • register/ unregister listeners: for listening on Procedure related notifications

When ProcedureExecutor starts it loads procedure instances persisted in ProcedureStore from previous run. All unfinished procedures are resumed from the last stored state.

183. Nonces

You can pass the nonce that came in with the RPC to the Procedure on submit at the executor. This nonce will then be serialized along w/ the Procedure on persist. If a crash, on reload, the nonce will be put back into a map of nonces to pid in case a client tries to run same procedure for a second time (it will be rejected). See the base Procedure and how nonce is a base data member.

184. Wait/Wake/Suspend/Yield

‘suspend’ means stop processing a procedure because we can make no more progress until a condition changes; i.e. we sent RPC and need to wait on response. The way this works is that a Procedure throws a suspend exception from down in its guts as a GOTO the end-of-the-current-processing step. Suspend also puts the Procedure back on the scheduler. Problematic is we do some accounting on our way out even on suspend making it so it can take time exiting (We have to update state in the WAL).

RegionTransitionProcedure#reportTransition is called on receipt of a report from a RS. For Assign and Unassign, this event response from the server we sent an RPC wakes up suspended Assign/Unassigns.

185. Locking

Procedure Locks are not about concurrency! They are about giving a Procedure read/write access to an HBase Entity such as a Table or Region so that is possible to shut out other Procedures from making modifications to an HBase Entity state while the current one is running.

Locking is optional, up to the Procedure implementor but if an entity is being operated on by a Procedure, all transforms need to be done via Procedures using the same locking scheme else havoc.

Two ProcedureExecutor Worker threads can actually end up both processing the same Procedure instance. If it happens, the threads are meant to be running different parts of the one Procedure — changes that do not stamp on each other (This gets awkward around the procedure frameworks notion of ‘suspend’. More on this below).

Locks optionally may be held for the life of a Procedure. For example, if moving a Region, you probably want to have exclusive access to the HBase Region until the Region completes (or fails). This is used in conjunction with {@link #holdLock(Object)}. If {@link #holdLock(Object)} returns true, the procedure executor will call acquireLock() once and thereafter not call {@link #releaseLock(Object)} until the Procedure is done (Normally, it calls release/acquire around each invocation of {@link #execute(Object)}.

Locks also may live the life of a procedure; i.e. once an Assign Procedure starts, we do not want another procedure meddling w/ the region under assignment. Procedures that hold the lock for the life of the procedure set Procedure#holdLock to true. AssignProcedure does this as do Split and Move (If in the middle of a Region move, you do not want it Splitting).

Locking can be for life of Procedure.

Some locks have a hierarchy. For example, taking a region lock also takes (read) lock on its containing table and namespace to prevent another Procedure obtaining an exclusive lock on the hosting table (or namespace).

186. Procedure Types

186.1. StateMachineProcedure

One can consider each call to #execute(Object) method as transitioning from one state to another in a state machine. Abstract class StateMachineProcedure is wrapper around base Procedure class which provides constructs for implementing a state machine as a Procedure. After each state transition current state is persisted so that, in case of crash/ restart, the state transition can be resumed from the previous state of a procedure before crash/ restart. Individual procedures need to define initial and terminus states and hooks executeFromState() and setNextState() are provided for state transitions.

186.2. RemoteProcedureDispatcher

A new RemoteProcedureDispatcher (+ subclass RSProcedureDispatcher) primitive takes care of running the Procedure-based Assignments ‘remote’ component. This dispatcher knows about ‘servers’. It does aggregation of assignments by time on a time/count basis so can send procedures in batches rather than one per RPC. Procedure status comes back on the back of the RegionServer heartbeat reporting online/offline regions (No more notifications via ZK). The response is passed to the AMv2 to ‘process’. It will check against the in-memory state. If there is a mismatch, it fences out the RegionServer on the assumption that something went wrong on the RS side. Timeouts trigger retries (Not Yet Implemented!). The Procedure machine ensures only one operation at a time on any one Region/Table using entity locking and smarts about what is serial and what can be run concurrently (Locking was zk-based — you’d put a znode in zk for a table — but now has been converted to be procedure-based as part of this project).

187. References

  • Matteo had a slide deck on what it the Procedure Framework would look like and the problems it addresses initially attached to the Pv2 issue.

  • A good doc by Matteo on problem and how Pv2 addresses it w/ roadmap (from the Pv2 JIRA). We should go back to the roadmap to do the Notification Bus, convertion of log splitting to Pv2, etc.