CliDriver类run入口
hive 的执行入口是cli.sh文件。
sh关键代码:
updateCli() {if [ "$USE_DEPRECATED_CLI" == "true" ]; thenCLASS=org.apache.hadoop.hive.cli.CliDriverJAR=hive-cli-*.jarelseexport HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS -Dlog4j.configurationFile=beeline-log4j2.properties"CLASS=org.apache.hive.beeline.cli.HiveCliJAR=hive-beeline-*.jarupdateBeelineOptsfi}cli () {updateCliexecHiveCmd $CLASS $JAR "$@"}
本质是执行hive-cli.jar包中CliDriver类的main方法:
public static void main(String[] args) throws Exception {int ret = new CliDriver().run(args);System.exit(ret);}
main方法先构造CliDriver类,然后调用run方法:
public int run(String[] args) throws Exception {OptionsProcessor oproc = new OptionsProcessor();if (!oproc.process_stage1(args)) {return 1;}// NOTE: It is critical to do this here so that log4j is reinitialized// before any of the other core hive classes are loadedboolean logInitFailed = false;String logInitDetailMessage;try {logInitDetailMessage = LogUtils.initHiveLog4j();} catch (LogInitializationException e) {logInitFailed = true;logInitDetailMessage = e.getMessage();}CliSessionState ss = new CliSessionState(new HiveConf(SessionState.class));ss.in = System.in;try {ss.out = new PrintStream(System.out, true, "UTF-8");ss.info = new PrintStream(System.err, true, "UTF-8");ss.err = new CachingPrintStream(System.err, true, "UTF-8");} catch (UnsupportedEncodingException e) {return 3;}if (!oproc.process_stage2(ss)) {return 2;}if (!ss.getIsSilent()) {if (logInitFailed) {System.err.println(logInitDetailMessage);} else {SessionState.getConsole().printInfo(logInitDetailMessage);}}// set all properties specified via command lineHiveConf conf = ss.getConf();for (Map.Entry<Object, Object> item : ss.cmdProperties.entrySet()) {conf.set((String) item.getKey(), (String) item.getValue());ss.getOverriddenConfigurations().put((String) item.getKey(), (String) item.getValue());}// read prompt configuration and substitute variables.prompt = conf.getVar(HiveConf.ConfVars.CLIPROMPT);prompt = new VariableSubstitution(new HiveVariableSource() {@Overridepublic Map<String, String> getHiveVariable() {return SessionState.get().getHiveVariables();}}).substitute(conf, prompt);prompt2 = spacesForString(prompt);if (HiveConf.getBoolVar(conf, ConfVars.HIVE_CLI_TEZ_SESSION_ASYNC)) {// Start the session in a fire-and-forget manner. When the asynchronously initialized parts of// the session are needed, the corresponding getters and other methods will wait as needed.SessionState.beginStart(ss, console);} else {SessionState.start(ss);}ss.updateThreadName();// execute cli driver worktry {return executeDriver(ss, conf, oproc);} finally {ss.close();}}
CliDriver 类 run方法的核心:
1. 构建hive查询会话CliSessionState,并进行相关配置的设置
2. 执行driver:executeDriver()
接下来是执行 executeDriver方法:
private int executeDriver(CliSessionState ss, HiveConf conf, OptionsProcessor oproc)throws Exception {CliDriver cli = new CliDriver();cli.setHiveVariables(oproc.getHiveVariables());// use the specified database if specifiedcli.processSelectDatabase(ss);// Execute -i init files (always in silent mode)cli.processInitFiles(ss);if (ss.execString != null) {int cmdProcessStatus = cli.processLine(ss.execString);return cmdProcessStatus;}try {if (ss.fileName != null) {return cli.processFile(ss.fileName);}} catch (FileNotFoundException e) {System.err.println("Could not open input file for reading. (" + e.getMessage() + ")");return 3;}...非核心代码在此省略...}
在executeDriver中,先设置hive变量、数据库,接着进行执行sql(cli.processLine或cli.processFile),这里只分析cli.processLine(ss.execString)。
其实在cli.processLine中先根据命令的第一个记号(Token),分别进入相应的流程
quit or exit 系统正常退出
!开头的命令行,执行操作系统命令
source 开头的,读取外部文件,并执行文件中的命令
list 列出 jar file archive
其他命令提交给Commandprocess,进行命令的预处理
对于其他的情况则走processLocalCmd方法:
processLocalCmd方法里面只要执行预处理和Driver的run方法
常见的预处理有:
set : 调用SetProcess类,设置hive的环境参数,并保存在该进程的HiveConf中
dfs: 调用DfsProcess类,调用hadoop的shell接口,执行hadoop的相关命令
add: 调用AddResourceProcessor ,导入外部的资源,只对该进程有效
delete: 与add对应,删除资源
接下来就交给Driver的run方法了:
public CommandProcessorResponse run(String command, boolean alreadyCompiled)
throws CommandNeedRetryException {
CommandProcessorResponse cpr;
try {
cpr = runInternal(command, alreadyCompiled);
} finally {
releaseResources();
}
if(cpr.getResponseCode() == 0) {
return cpr;
}
SessionState ss = SessionState.get();
if(ss == null) {
return cpr;
}
MetaDataFormatter mdf = MetaDataFormatUtils.getFormatter(ss.getConf());
if(!(mdf instanceof JsonMetaDataFormatter)) {
return cpr;
}
/*Here we want to encode the error in machine readable way (e.g. JSON)
* Ideally, errorCode would always be set to a canonical error defined in ErrorMsg.
* In practice that is rarely the case, so the messy logic below tries to tease
* out canonical error code if it can. Exclude stack trace from output when
* the error is a specific/expected one.
* It's written to stdout for backward compatibility (WebHCat consumes it).*/
try {
if(downstreamError == null) {
mdf.error(ss.out, errorMessage, cpr.getResponseCode(), SQLState);
return cpr;
}
ErrorMsg canonicalErr = ErrorMsg.getErrorMsg(cpr.getResponseCode());
if(canonicalErr != null && canonicalErr != ErrorMsg.GENERIC_ERROR) {
/*Some HiveExceptions (e.g. SemanticException) don't set
canonical ErrorMsg explicitly, but there is logic
(e.g. #compile()) to find an appropriate canonical error and
return its code as error code. In this case we want to
preserve it for downstream code to interpret*/
mdf.error(ss.out, errorMessage, cpr.getResponseCode(), SQLState, null);
return cpr;
}
if(downstreamError instanceof HiveException) {
HiveException rc = (HiveException) downstreamError;
mdf.error(ss.out, errorMessage,
rc.getCanonicalErrorMsg().getErrorCode(), SQLState,
rc.getCanonicalErrorMsg() == ErrorMsg.GENERIC_ERROR ?
org.apache.hadoop.util.StringUtils.stringifyException(rc)
: null);
}
else {
ErrorMsg canonicalMsg =
ErrorMsg.getErrorMsg(downstreamError.getMessage());
mdf.error(ss.out, errorMessage, canonicalMsg.getErrorCode(),
SQLState, org.apache.hadoop.util.StringUtils.
stringifyException(downstreamError));
}
}
catch(HiveException ex) {
console.printError("Unable to JSON-encode the error",
org.apache.hadoop.util.StringUtils.stringifyException(ex));
}
return cpr;
}
Driver的run方法关键的地方是执行runInternal()方法:
private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled)
throws CommandNeedRetryException {
errorMessage = null;
SQLState = null;
downstreamError = null;
HiveDriverRunHookContext hookContext = new HiveDriverRunHookContextImpl(conf, command);
// Get all the driver run hooks and pre-execute them.
List<HiveDriverRunHook> driverRunHooks;
try {
driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS,
HiveDriverRunHook.class);
for (HiveDriverRunHook driverRunHook : driverRunHooks) {
driverRunHook.preDriverRun(hookContext);
}
} catch (Exception e) {
errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
SQLState = ErrorMsg.findSQLState(e.getMessage());
downstreamError = e;
console.printError(errorMessage + "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
return createProcessorResponse(12);
}
// Reset the perf logger
PerfLogger perfLogger = SessionState.getPerfLogger(true);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TIME_TO_SUBMIT);
int ret;
if (!alreadyCompiled) {
ret = compileInternal(command);
if (ret != 0) {
return createProcessorResponse(ret);
}
} else {
// Since we're reusing the compiled plan, we need to update its start time for current run
plan.setQueryStartTime(perfLogger.getStartTime(PerfLogger.DRIVER_RUN));
}
// the reason that we set the txn manager for the cxt here is because each
// query has its own ctx object. The txn mgr is shared across the
// same instance of Driver, which can run multiple queries.
HiveTxnManager txnManager = SessionState.get().getTxnMgr();
ctx.setHiveTxnManager(txnManager);
boolean startTxnImplicitly = false;
{
//this block ensures op makes sense in given context, e.g. COMMIT is valid only if txn is open
//DDL is not allowed in a txn, etc.
//an error in an open txn does a rollback of the txn
if (txnManager.isTxnOpen() && !plan.getOperation().isAllowedInTransaction()) {
assert !txnManager.getAutoCommit() : "didn't expect AC=true";
return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, null,
plan.getOperationName(), Long.toString(txnManager.getCurrentTxnId())));
}
if(!txnManager.isTxnOpen() && plan.getOperation().isRequiresOpenTransaction()) {
return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, null, plan.getOperationName()));
}
if(!txnManager.isTxnOpen() && plan.getOperation() == HiveOperation.QUERY && !txnManager.getAutoCommit()) {
//this effectively makes START TRANSACTION optional and supports JDBC setAutoCommit(false) semantics
//also, indirectly allows DDL to be executed outside a txn context
startTxnImplicitly = true;
}
if(txnManager.getAutoCommit() && plan.getOperation() == HiveOperation.START_TRANSACTION) {
return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT, null, plan.getOperationName()));
}
}
if(plan.getOperation() == HiveOperation.SET_AUTOCOMMIT) {
try {
if(plan.getAutoCommitValue() && !txnManager.getAutoCommit()) {
/*here, if there is an open txn, we want to commit it; this behavior matches
* https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)*/
releaseLocksAndCommitOrRollback(true, null);
txnManager.setAutoCommit(true);
}
else if(!plan.getAutoCommitValue() && txnManager.getAutoCommit()) {
txnManager.setAutoCommit(false);
}
else {/*didn't change autoCommit value - no-op*/}
}
catch(LockException e) {
return handleHiveException(e, 12);
}
}
if (requiresLock()) {
ret = acquireLocksAndOpenTxn(startTxnImplicitly);
if (ret != 0) {
return rollback(createProcessorResponse(ret));
}
}
ret = execute();
if (ret != 0) {
//if needRequireLock is false, the release here will do nothing because there is no lock
return rollback(createProcessorResponse(ret));
}
//if needRequireLock is false, the release here will do nothing because there is no lock
try {
if(txnManager.getAutoCommit() || plan.getOperation() == HiveOperation.COMMIT) {
releaseLocksAndCommitOrRollback(true, null);
}
else if(plan.getOperation() == HiveOperation.ROLLBACK) {
releaseLocksAndCommitOrRollback(false, null);
}
else {
//txn (if there is one started) is not finished
}
} catch (LockException e) {
return handleHiveException(e, 12);
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_RUN);
// Take all the driver run hooks and post-execute them.
try {
for (HiveDriverRunHook driverRunHook : driverRunHooks) {
driverRunHook.postDriverRun(hookContext);
}
} catch (Exception e) {
errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
SQLState = ErrorMsg.findSQLState(e.getMessage());
downstreamError = e;
console.printError(errorMessage + "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
return createProcessorResponse(12);
}
return createProcessorResponse(ret);
}
runInternal 方法主要干了下面几件事:
1)compileInternal():编译查询sql
2)acquireLocksAndOpenTxn(startTxnImplicitly);获取读写锁
3)execute();执行sql
4)处理执行结果
sql编译
compileInternal 内部实际调用Driver的compile方法:
public int compile(String command, boolean resetTaskIds) {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE);
command = new VariableSubstitution(new HiveVariableSource() {
@Override
public Map<String, String> getHiveVariable() {
return SessionState.get().getHiveVariables();
}
}).substitute(conf, command);
String queryStr = command;
try {
// command should be redacted to avoid to logging sensitive data
queryStr = HookUtils.redactLogString(conf, command);
} catch (Exception e) {
LOG.warn("WARNING! Query command could not be redacted." + e);
}
//holder for parent command type/string when executing reentrant queries
QueryState queryState = new QueryState();
if (ctx != null) {
close();
}
if (resetTaskIds) {
TaskFactory.resetId();
}
saveSession(queryState);
String queryId = conf.getVar(HiveConf.ConfVars.HIVEQUERYID);
LOG.info("Compiling command(queryId=" + queryId + "): " + queryStr);
SessionState.get().setupQueryCurrentTimestamp();
try {
// Initialize the transaction manager. This must be done before analyze is called.
final HiveTxnManager txnManager = SessionState.get().initTxnMgr(conf);
// In case when user Ctrl-C twice to kill Hive CLI JVM, we want to release locks
// if compile is being called multiple times, clear the old shutdownhook
ShutdownHookManager.removeShutdownHook(shutdownRunner);
shutdownRunner = new Runnable() {
@Override
public void run() {
try {
releaseLocksAndCommitOrRollback(false, txnManager);
} catch (LockException e) {
LOG.warn("Exception when releasing locks in ShutdownHook for Driver: " +
e.getMessage());
}
}
};
ShutdownHookManager.addShutdownHook(shutdownRunner, SHUTDOWN_HOOK_PRIORITY);
ctx = new Context(conf);
ctx.setTryCount(getTryCount());
ctx.setCmd(command);
ctx.setHDFSCleanup(true);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARSE);
ParseDriver pd = new ParseDriver();
ASTNode tree = pd.parse(command, ctx);
tree = ParseUtils.findRootNonNullToken(tree);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE);
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree);
List<HiveSemanticAnalyzerHook> saHooks =
getHooks(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK,
HiveSemanticAnalyzerHook.class);
// Flush the metastore cache. This assures that we don't pick up objects from a previous
// query running in this same thread. This has to be done after we get our semantic
// analyzer (this is when the connection to the metastore is made) but before we analyze,
// because at that point we need access to the objects.
Hive.get().getMSC().flushCache();
// Do semantic analysis and plan generation
if (saHooks != null && !saHooks.isEmpty()) {
HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
hookCtx.setConf(conf);
hookCtx.setUserName(userName);
hookCtx.setIpAddress(SessionState.get().getUserIpAddress());
hookCtx.setCommand(command);
for (HiveSemanticAnalyzerHook hook : saHooks) {
tree = hook.preAnalyze(hookCtx, tree);
}
sem.analyze(tree, ctx);
hookCtx.update(sem);
for (HiveSemanticAnalyzerHook hook : saHooks) {
hook.postAnalyze(hookCtx, sem.getRootTasks());
}
} else {
sem.analyze(tree, ctx);
}
// Record any ACID compliant FileSinkOperators we saw so we can add our transaction ID to
// them later.
acidSinks = sem.getAcidFileSinks();
LOG.info("Semantic Analysis Completed");
// validate the plan
sem.validate();
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);
// get the output schema
schema = getSchema(sem, conf);
plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,
SessionState.get().getHiveOperation(), schema);
conf.setVar(HiveConf.ConfVars.HIVEQUERYSTRING, queryStr);
conf.set("mapreduce.workflow.id", "hive_" + queryId);
conf.set("mapreduce.workflow.name", queryStr);
// initialize FetchTask right here
if (plan.getFetchTask() != null) {
plan.getFetchTask().initialize(conf, plan, null, ctx.getOpContext());
}
//do the authorization check
if (!sem.skipAuthorization() &&
HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) {
try {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DO_AUTHORIZATION);
doAuthorization(sem, command);
} catch (AuthorizationException authExp) {
console.printError("Authorization failed:" + authExp.getMessage()
+ ". Use SHOW GRANT to get more details.");
errorMessage = authExp.getMessage();
SQLState = "42000";
return 403;
} finally {
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DO_AUTHORIZATION);
}
}
if (conf.getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) {
String explainOutput = getExplainOutput(sem, plan, tree);
if (explainOutput != null) {
LOG.info("EXPLAIN output for queryid " + queryId + " : "
+ explainOutput);
}
}
return 0;
} catch (Exception e) {
ErrorMsg error = ErrorMsg.getErrorMsg(e.getMessage());
errorMessage = "FAILED: " + e.getClass().getSimpleName();
if (error != ErrorMsg.GENERIC_ERROR) {
errorMessage += " [Error " + error.getErrorCode() + "]:";
}
// HIVE-4889
if ((e instanceof IllegalArgumentException) && e.getMessage() == null && e.getCause() != null) {
errorMessage += " " + e.getCause().getMessage();
} else {
errorMessage += " " + e.getMessage();
}
SQLState = error.getSQLState();
downstreamError = e;
console.printError(errorMessage, "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
return error.getErrorCode();//todo: this is bad if returned as cmd shell exit
// since it exceeds valid range of shell return values
} finally {
double duration = perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.COMPILE)/1000.00;
dumpMetaCallTimingWithoutEx("compilation");
restoreSession(queryState);
LOG.info("Completed compiling command(queryId=" + queryId + "); Time taken: " + duration + " seconds");
}
}
1)将sql解析为AST语法树
ParseDriver pd = new ParseDriver();<br /> ASTNode tree = pd.parse(command, ctx);<br /> 在解析中,hive使用antlr工具以及语法文件(词法规则HiveLexer.g文件和语法规则的4个文件SelectClauseParser.g,FromClauseParser.g,IdentifiersParser.g,HiveParser.g)<br /> 对sql进行词法和语法的分析,最终生成抽象语法树AST TREE,代码:
public ASTNode parse(String command, Context ctx, boolean setTokenRewriteStream)
throws ParseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Parsing command: " + command);
}
HiveLexerX lexer = new HiveLexerX(new ANTLRNoCaseStringStream(command));
TokenRewriteStream tokens = new TokenRewriteStream(lexer);
if (ctx != null) {
if ( setTokenRewriteStream) {
ctx.setTokenRewriteStream(tokens);
}
lexer.setHiveConf(ctx.getConf());
}
HiveParser parser = new HiveParser(tokens);
if (ctx != null) {
parser.setHiveConf(ctx.getConf());
}
parser.setTreeAdaptor(adaptor);
HiveParser.statement_return r = null;
try {
r = parser.statement();
} catch (RecognitionException e) {
e.printStackTrace();
throw new ParseException(parser.errors);
}
if (lexer.getErrors().size() == 0 && parser.errors.size() == 0) {
LOG.debug("Parse Completed");
} else if (lexer.getErrors().size() != 0) {
throw new ParseException(lexer.getErrors());
} else {
throw new ParseException(parser.errors);
}
ASTNode tree = (ASTNode) r.getTree();
tree.setUnknownTokenBoundaries();
return tree;
}
2)将AST TREE 转换为查询单元QueryBlock
QueryBlock是一条SQL最基本的组成单元,包括三个部分:输入源,计算过程,输出。简单来讲一个QueryBlock就是一个子查询。
3)生成逻辑计划:operator tree
生成逻辑计划的本质是生成Operator Tree,Operator是map和reduce阶段执行的单元,常见的有:TableScanOperator,SelectOperator,FilterOperator,JoinOperator,GroupByOperator,ReduceSinkOperator等。
Operator在Map Reduce阶段之间的数据传递都是一个流式的过程。每一个Operator对一行数据完成操作后之后将数据传递给childOperator计算。
调用SemanticAnalyzer类方法中的genPlan方法生成Operator:
public Operator genPlan(QB qb, boolean skipAmbiguityCheck)
throws SemanticException {
// First generate all the opInfos for the elements in the from clause
// Must be deterministic order map - see HIVE-8707
Map<String, Operator> aliasToOpInfo = new LinkedHashMap<String, Operator>();
// Recurse over the subqueries to fill the subquery part of the plan
for (String alias : qb.getSubqAliases()) {
QBExpr qbexpr = qb.getSubqForAlias(alias);
aliasToOpInfo.put(alias, genPlan(qb, qbexpr));
}
// Recurse over all the source tables
for (String alias : qb.getTabAliases()) {
Operator op = genTablePlan(alias, qb);
aliasToOpInfo.put(alias, op);
}
if (aliasToOpInfo.isEmpty()) {
qb.getMetaData().setSrcForAlias(DUMMY_TABLE, getDummyTable());
TableScanOperator op = (TableScanOperator) genTablePlan(DUMMY_TABLE, qb);
op.getConf().setRowLimit(1);
qb.addAlias(DUMMY_TABLE);
qb.setTabAlias(DUMMY_TABLE, DUMMY_TABLE);
aliasToOpInfo.put(DUMMY_TABLE, op);
}
Operator srcOpInfo = null;
Operator lastPTFOp = null;
if(queryProperties.hasPTF()){
//After processing subqueries and source tables, process
// partitioned table functions
HashMap<ASTNode, PTFInvocationSpec> ptfNodeToSpec = qb.getPTFNodeToSpec();
if ( ptfNodeToSpec != null ) {
for(Entry<ASTNode, PTFInvocationSpec> entry : ptfNodeToSpec.entrySet()) {
ASTNode ast = entry.getKey();
PTFInvocationSpec spec = entry.getValue();
String inputAlias = spec.getQueryInputName();
Operator inOp = aliasToOpInfo.get(inputAlias);
if ( inOp == null ) {
throw new SemanticException(generateErrorMessage(ast,
"Cannot resolve input Operator for PTF invocation"));
}
lastPTFOp = genPTFPlan(spec, inOp);
String ptfAlias = spec.getFunction().getAlias();
if ( ptfAlias != null ) {
aliasToOpInfo.put(ptfAlias, lastPTFOp);
}
}
}
}
// For all the source tables that have a lateral view, attach the
// appropriate operators to the TS
genLateralViewPlans(aliasToOpInfo, qb);
// process join
if (qb.getParseInfo().getJoinExpr() != null) {
ASTNode joinExpr = qb.getParseInfo().getJoinExpr();
if (joinExpr.getToken().getType() == HiveParser.TOK_UNIQUEJOIN) {
QBJoinTree joinTree = genUniqueJoinTree(qb, joinExpr, aliasToOpInfo);
qb.setQbJoinTree(joinTree);
} else {
QBJoinTree joinTree = genJoinTree(qb, joinExpr, aliasToOpInfo);
qb.setQbJoinTree(joinTree);
/*
* if there is only one destination in Query try to push where predicates
* as Join conditions
*/
Set<String> dests = qb.getParseInfo().getClauseNames();
if ( dests.size() == 1 && joinTree.getNoOuterJoin()) {
String dest = dests.iterator().next();
ASTNode whereClause = qb.getParseInfo().getWhrForClause(dest);
if ( whereClause != null ) {
extractJoinCondsFromWhereClause(joinTree, qb, dest,
(ASTNode) whereClause.getChild(0),
aliasToOpInfo );
}
}
if (!disableJoinMerge)
mergeJoinTree(qb);
}
// if any filters are present in the join tree, push them on top of the
// table
pushJoinFilters(qb, qb.getQbJoinTree(), aliasToOpInfo);
srcOpInfo = genJoinPlan(qb, aliasToOpInfo);
} else {
// Now if there are more than 1 sources then we have a join case
// later we can extend this to the union all case as well
srcOpInfo = aliasToOpInfo.values().iterator().next();
// with ptfs, there maybe more (note for PTFChains:
// 1 ptf invocation may entail multiple PTF operators)
srcOpInfo = lastPTFOp != null ? lastPTFOp : srcOpInfo;
}
Operator bodyOpInfo = genBodyPlan(qb, srcOpInfo, aliasToOpInfo);
if (LOG.isDebugEnabled()) {
LOG.debug("Created Plan for Query Block " + qb.getId());
}
if (qb.getAlias() != null) {
rewriteRRForSubQ(qb.getAlias(), bodyOpInfo, skipAmbiguityCheck);
}
setQB(qb);
return bodyOpInfo;
}
过程大致如下:
1)如果有子查询,递归调用getPlan
2)循环获取TabAliases,生成TableScanOperator
3)获取qb.getParseInfo().getJoinExpr():生成QBJoinTree => ReduceSinkOperator + JoinOperator
4)调用genBodyPlan()获取FilterOperator:qb.getParseInfo().getWhrForClause()=>genFilterPlan()=>FilterOperator
5) 通过调用genSelectAllDesc(curr);获取SelectOperator
5) 最后获取 ReduceSinkOperator + GroupByOperator
4)优化逻辑计划
/**
* Invoke all the transformations one-by-one, and alter the query plan.
*
* @return ParseContext
* @throws SemanticException
*/
public ParseContext optimize() throws SemanticException {
for (Transform t : transformations) {
t.beginPerfLogging();
pctx = t.transform(pctx);
t.endPerfLogging(t.toString());
}
return pctx;
}
Transform为逻辑计划优化器的抽象类,大部分逻辑层优化器通过变换OperatorTree,合并操作符,达到减少MapReduce Job,减少shuffle数据量的目的。
② SimpleFetchOptimizer 优化没有GroupBy表达式的聚合查询
② MapJoinProcessor MapJoin,需要SQL中提供hint,0.11版本已不用
② BucketMapJoinOptimizer BucketMapJoin
② GroupByOptimizer Map端聚合
① ReduceSinkDeDuplication 合并线性的OperatorTree中partition/sort key相同的reduce
① PredicatePushDown 谓词前置
① CorrelationOptimizer 利用查询中的相关性,合并有相关性的Job,HIVE-2206
ColumnPruner 字段剪枝
表格中①的优化器均是一个Job干尽可能多的事情/合并。②的都是减少shuffle数据量,甚至不做Reduce。
5)生成物理计划(将OperatorTree生成MapReduce Job的过程)
TaskCompiler类的compile方法:
public void compile(final ParseContext pCtx, final List<Task<? extends Serializable>> rootTasks,
final HashSet<ReadEntity> inputs, final HashSet<WriteEntity> outputs) throws SemanticException {
Context ctx = pCtx.getContext();
GlobalLimitCtx globalLimitCtx = pCtx.getGlobalLimitCtx();
List<Task<MoveWork>> mvTask = new ArrayList<Task<MoveWork>>();
List<LoadTableDesc> loadTableWork = pCtx.getLoadTableWork();
List<LoadFileDesc> loadFileWork = pCtx.getLoadFileWork();
boolean isCStats = pCtx.getQueryProperties().isAnalyzeRewrite();
int outerQueryLimit = pCtx.getQueryProperties().getOuterQueryLimit();
if (pCtx.getFetchTask() != null) {
return;
}
optimizeOperatorPlan(pCtx, inputs, outputs);
/*
* In case of a select, use a fetch task instead of a move task.
* If the select is from analyze table column rewrite, don't create a fetch task. Instead create
* a column stats task later.
*/
if (pCtx.getQueryProperties().isQuery() && !isCStats) {
if ((!loadTableWork.isEmpty()) || (loadFileWork.size() != 1)) {
throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg());
}
LoadFileDesc loadFileDesc = loadFileWork.get(0);
String cols = loadFileDesc.getColumns();
String colTypes = loadFileDesc.getColumnTypes();
TableDesc resultTab = pCtx.getFetchTableDesc();
if (resultTab == null) {
String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat);
}
FetchWork fetch = new FetchWork(loadFileDesc.getSourcePath(), resultTab, outerQueryLimit);
fetch.setSource(pCtx.getFetchSource());
fetch.setSink(pCtx.getFetchSink());
pCtx.setFetchTask((FetchTask) TaskFactory.get(fetch, conf));
// For the FetchTask, the limit optimization requires we fetch all the rows
// in memory and count how many rows we get. It's not practical if the
// limit factor is too big
int fetchLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVELIMITOPTMAXFETCH);
if (globalLimitCtx.isEnable() && globalLimitCtx.getGlobalLimit() > fetchLimit) {
LOG.info("For FetchTask, LIMIT " + globalLimitCtx.getGlobalLimit() + " > " + fetchLimit
+ ". Doesn't qualify limit optimiztion.");
globalLimitCtx.disableOpt();
}
if (outerQueryLimit == 0) {
// Believe it or not, some tools do generate queries with limit 0 and than expect
// query to run quickly. Lets meet their requirement.
LOG.info("Limit 0. No query execution needed.");
return;
}
} else if (!isCStats) {
for (LoadTableDesc ltd : loadTableWork) {
Task<MoveWork> tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf);
mvTask.add(tsk);
// Check to see if we are stale'ing any indexes and auto-update them if we want
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) {
IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, inputs, conf);
try {
List<Task<? extends Serializable>> indexUpdateTasks = indexUpdater
.generateUpdateTasks();
for (Task<? extends Serializable> updateTask : indexUpdateTasks) {
tsk.addDependentTask(updateTask);
}
} catch (HiveException e) {
console
.printInfo("WARNING: could not auto-update stale indexes, which are not in sync");
}
}
}
boolean oneLoadFile = true;
for (LoadFileDesc lfd : loadFileWork) {
if (pCtx.getQueryProperties().isCTAS()) {
assert (oneLoadFile); // should not have more than 1 load file for
// CTAS
// make the movetask's destination directory the table's destination.
Path location;
String loc = pCtx.getCreateTable().getLocation();
if (loc == null) {
// get the table's default location
Path targetPath;
try {
String[] names = Utilities.getDbTableName(
pCtx.getCreateTable().getTableName());
if (!db.databaseExists(names[0])) {
throw new SemanticException("ERROR: The database " + names[0]
+ " does not exist.");
}
Warehouse wh = new Warehouse(conf);
targetPath = wh.getTablePath(db.getDatabase(names[0]), names[1]);
} catch (HiveException e) {
throw new SemanticException(e);
} catch (MetaException e) {
throw new SemanticException(e);
}
location = targetPath;
} else {
location = new Path(loc);
}
lfd.setTargetDir(location);
oneLoadFile = false;
}
mvTask.add(TaskFactory.get(new MoveWork(null, null, null, lfd, false), conf));
}
}
generateTaskTree(rootTasks, pCtx, mvTask, inputs, outputs);
/*
* If the query was the result of analyze table column compute statistics rewrite, create
* a column stats task instead of a fetch task to persist stats to the metastore.
*/
if (isCStats) {
genColumnStatsTask(pCtx.getAnalyzeRewrite(), loadTableWork, loadFileWork,
rootTasks, outerQueryLimit);
}
// For each task, set the key descriptor for the reducer
for (Task<? extends Serializable> rootTask : rootTasks) {
GenMapRedUtils.setKeyAndValueDescForTaskTree(rootTask);
}
// If a task contains an operator which instructs bucketizedhiveinputformat
// to be used, please do so
for (Task<? extends Serializable> rootTask : rootTasks) {
setInputFormat(rootTask);
}
optimizeTaskPlan(rootTasks, pCtx, ctx);
decideExecMode(rootTasks, ctx, globalLimitCtx);
if (pCtx.getQueryProperties().isCTAS()) {
// generate a DDL task and make it a dependent task of the leaf
CreateTableDesc crtTblDesc = pCtx.getCreateTable();
crtTblDesc.validate(conf);
// clear the mapredWork output file from outputs for CTAS
// DDLWork at the tail of the chain will have the output
Iterator<WriteEntity> outIter = outputs.iterator();
while (outIter.hasNext()) {
switch (outIter.next().getType()) {
case DFS_DIR:
case LOCAL_DIR:
outIter.remove();
break;
default:
break;
}
}
Task<? extends Serializable> crtTblTask = TaskFactory.get(new DDLWork(
inputs, outputs, crtTblDesc), conf);
// find all leaf tasks and make the DDLTask as a dependent task of all of
// them
HashSet<Task<? extends Serializable>> leaves = new LinkedHashSet<Task<? extends Serializable>>();
getLeafTasks(rootTasks, leaves);
assert (leaves.size() > 0);
for (Task<? extends Serializable> task : leaves) {
if (task instanceof StatsTask) {
// StatsTask require table to already exist
for (Task<? extends Serializable> parentOfStatsTask : task.getParentTasks()) {
parentOfStatsTask.addDependentTask(crtTblTask);
}
for (Task<? extends Serializable> parentOfCrtTblTask : crtTblTask.getParentTasks()) {
parentOfCrtTblTask.removeDependentTask(task);
}
crtTblTask.addDependentTask(task);
} else {
task.addDependentTask(crtTblTask);
}
}
}
if (globalLimitCtx.isEnable() && pCtx.getFetchTask() != null) {
LOG.info("set least row check for FetchTask: " + globalLimitCtx.getGlobalLimit());
pCtx.getFetchTask().getWork().setLeastNumRows(globalLimitCtx.getGlobalLimit());
}
if (globalLimitCtx.isEnable() && globalLimitCtx.getLastReduceLimitDesc() != null) {
LOG.info("set least row check for LimitDesc: " + globalLimitCtx.getGlobalLimit());
globalLimitCtx.getLastReduceLimitDesc().setLeastRows(globalLimitCtx.getGlobalLimit());
List<ExecDriver> mrTasks = Utilities.getMRTasks(rootTasks);
for (ExecDriver tsk : mrTasks) {
tsk.setRetryCmdWhenFail(true);
}
List<SparkTask> sparkTasks = Utilities.getSparkTasks(rootTasks);
for (SparkTask sparkTask : sparkTasks) {
sparkTask.setRetryCmdWhenFail(true);
}
}
Interner<TableDesc> interner = Interners.newStrongInterner();
for (Task<? extends Serializable> rootTask : rootTasks) {
GenMapRedUtils.internTableDesc(rootTask, interner);
GenMapRedUtils.deriveFinalExplainAttributes(rootTask, pCtx.getConf());
}
}
Task为物理执行计划的抽象类,常见的类有:
OperatorTree转化为MapReduce Job的过程分为下面几个阶段:
对输出表生成MoveTask
从OperatorTree的其中一个根节点向下深度优先遍历
ReduceSinkOperator标示Map/Reduce的界限,多个Job间的界限
遍历其他根节点,遇过碰到JoinOperator合并MapReduceTask
生成StatTask更新元数据
剪断Map与Reduce间的Operator的关系
6)优化物理计划
PhysicalOptimizer类的optimize方法:
/**
* invoke all the resolvers one-by-one, and alter the physical plan.
*
* @return PhysicalContext
* @throws HiveException
*/
public PhysicalContext optimize() throws SemanticException {
for (PhysicalPlanResolver r : resolvers) {
pctx = r.resolve(pctx);
}
return pctx;
}
其中PhysicalPlanResolver是物理优化器接口,常见的物理优化器有:……
Vectorizer: HIVE-4160,将在0.13中发布
SortMergeJoinResolver: 与bucket配合,类似于归并排序
SamplingOptimizer: 并行order by优化器,在0.12中发布
CommonJoinResolver + MapJoinResolver: MapJoin优化器
执行
Driver类的execute方法:
public int execute() throws CommandNeedRetryException {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE);
boolean noName = StringUtils.isEmpty(conf.get(MRJobConfig.JOB_NAME));
int maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH);
String queryId = plan.getQueryId();
// Get the query string from the conf file as the compileInternal() method might
// hide sensitive information during query redaction.
String queryStr = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYSTRING);
maxthreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.EXECPARALLETHREADNUMBER);
try {
LOG.info("Executing command(queryId=" + queryId + "): " + queryStr);
// compile and execute can get called from different threads in case of HS2
// so clear timing in this thread's Hive object before proceeding.
Hive.get().clearMetaCallTiming();
plan.setStarted();
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().startQuery(queryStr,
conf.getVar(HiveConf.ConfVars.HIVEQUERYID));
SessionState.get().getHiveHistory().logPlanProgress(plan);
}
resStream = null;
SessionState ss = SessionState.get();
HookContext hookContext = new HookContext(plan, conf, ctx.getPathToCS(), ss.getUserName(),
ss.getUserIpAddress(), operationId);
hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK);
for (Hook peh : getHooks(HiveConf.ConfVars.PREEXECHOOKS)) {
if (peh instanceof ExecuteWithHookContext) {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PRE_HOOK + peh.getClass().getName());
((ExecuteWithHookContext) peh).run(hookContext);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PRE_HOOK + peh.getClass().getName());
} else if (peh instanceof PreExecute) {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PRE_HOOK + peh.getClass().getName());
((PreExecute) peh).run(SessionState.get(), plan.getInputs(), plan.getOutputs(),
Utils.getUGI());
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PRE_HOOK + peh.getClass().getName());
}
}
int mrJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
int jobs = mrJobs
+ Utilities.getTezTasks(plan.getRootTasks()).size()
+ Utilities.getSparkTasks(plan.getRootTasks()).size();
if (jobs > 0) {
logMrWarning(mrJobs);
console.printInfo("Query ID = " + plan.getQueryId());
console.printInfo("Total jobs = " + jobs);
}
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_NUM_TASKS,
String.valueOf(jobs));
SessionState.get().getHiveHistory().setIdToTableMap(plan.getIdToTableNameMap());
}
String jobname = Utilities.abbreviate(queryStr, maxlen - 6);
// A runtime that launches runnable tasks as separate Threads through
// TaskRunners
// As soon as a task isRunnable, it is put in a queue
// At any time, at most maxthreads tasks can be running
// The main thread polls the TaskRunners to check if they have finished.
DriverContext driverCxt = new DriverContext(ctx);
driverCxt.prepare(plan);
ctx.setHDFSCleanup(true);
this.driverCxt = driverCxt; // for canceling the query (should be bound to session?)
SessionState.get().setMapRedStats(new LinkedHashMap<String, MapRedStats>());
SessionState.get().setStackTraces(new HashMap<String, List<List<String>>>());
SessionState.get().setLocalMapRedErrors(new HashMap<String, List<String>>());
// Add root Tasks to runnable
for (Task<? extends Serializable> tsk : plan.getRootTasks()) {
// This should never happen, if it does, it's a bug with the potential to produce
// incorrect results.
assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty();
driverCxt.addToRunnable(tsk);
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TIME_TO_SUBMIT);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS);
// Loop while you either have tasks running, or tasks queued up
while (!destroyed && driverCxt.isRunning()) {
// Launch upto maxthreads tasks
Task<? extends Serializable> task;
while ((task = driverCxt.getRunnable(maxthreads)) != null) {
TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);
if (!runner.isRunning()) {
break;
}
}
// poll the Tasks to see which one completed
TaskRunner tskRun = driverCxt.pollFinished();
if (tskRun == null) {
continue;
}
hookContext.addCompleteTask(tskRun);
Task<? extends Serializable> tsk = tskRun.getTask();
TaskResult result = tskRun.getTaskResult();
int exitVal = result.getExitVal();
if (exitVal != 0) {
if (tsk.ifRetryCmdWhenFail()) {
driverCxt.shutdown();
// in case we decided to run everything in local mode, restore the
// the jobtracker setting to its initial value
ctx.restoreOriginalTracker();
throw new CommandNeedRetryException();
}
Task<? extends Serializable> backupTask = tsk.getAndInitBackupTask();
if (backupTask != null) {
setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
console.printError(errorMessage);
errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName();
console.printError(errorMessage);
// add backup task to runnable
if (DriverContext.isLaunchable(backupTask)) {
driverCxt.addToRunnable(backupTask);
}
continue;
} else {
hookContext.setHookType(HookContext.HookType.ON_FAILURE_HOOK);
// Get all the failure execution hooks and execute them.
for (Hook ofh : getHooks(HiveConf.ConfVars.ONFAILUREHOOKS)) {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName());
((ExecuteWithHookContext) ofh).run(hookContext);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName());
}
setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
SQLState = "08S01";
console.printError(errorMessage);
driverCxt.shutdown();
// in case we decided to run everything in local mode, restore the
// the jobtracker setting to its initial value
ctx.restoreOriginalTracker();
return exitVal;
}
}
driverCxt.finished(tskRun);
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().setTaskProperty(queryId, tsk.getId(),
Keys.TASK_RET_CODE, String.valueOf(exitVal));
SessionState.get().getHiveHistory().endTask(queryId, tsk);
}
if (tsk.getChildTasks() != null) {
for (Task<? extends Serializable> child : tsk.getChildTasks()) {
if (DriverContext.isLaunchable(child)) {
driverCxt.addToRunnable(child);
}
}
}
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RUN_TASKS);
// in case we decided to run everything in local mode, restore the
// the jobtracker setting to its initial value
ctx.restoreOriginalTracker();
if (driverCxt.isShutdown()) {
SQLState = "HY008";
errorMessage = "FAILED: Operation cancelled";
console.printError(errorMessage);
return 1000;
}
// remove incomplete outputs.
// Some incomplete outputs may be added at the beginning, for eg: for dynamic partitions.
// remove them
HashSet<WriteEntity> remOutputs = new LinkedHashSet<WriteEntity>();
for (WriteEntity output : plan.getOutputs()) {
if (!output.isComplete()) {
remOutputs.add(output);
}
}
for (WriteEntity output : remOutputs) {
plan.getOutputs().remove(output);
}
hookContext.setHookType(HookContext.HookType.POST_EXEC_HOOK);
// Get all the post execution hooks and execute them.
for (Hook peh : getHooks(HiveConf.ConfVars.POSTEXECHOOKS)) {
if (peh instanceof ExecuteWithHookContext) {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.POST_HOOK + peh.getClass().getName());
((ExecuteWithHookContext) peh).run(hookContext);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.POST_HOOK + peh.getClass().getName());
} else if (peh instanceof PostExecute) {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.POST_HOOK + peh.getClass().getName());
((PostExecute) peh).run(SessionState.get(), plan.getInputs(), plan.getOutputs(),
(SessionState.get() != null ? SessionState.get().getLineageState().getLineageInfo()
: null), Utils.getUGI());
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.POST_HOOK + peh.getClass().getName());
}
}
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_RET_CODE,
String.valueOf(0));
SessionState.get().getHiveHistory().printRowCount(queryId);
}
} catch (CommandNeedRetryException e) {
throw e;
} catch (Exception e) {
ctx.restoreOriginalTracker();
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_RET_CODE,
String.valueOf(12));
}
// TODO: do better with handling types of Exception here
errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
SQLState = "08S01";
downstreamError = e;
console.printError(errorMessage + "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
return (12);
} finally {
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().endQuery(queryId);
}
if (noName) {
conf.set(MRJobConfig.JOB_NAME, "");
}
dumpMetaCallTimingWithoutEx("execution");
double duration = perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_EXECUTE)/1000.00;
Map<String, MapRedStats> stats = SessionState.get().getMapRedStats();
if (stats != null && !stats.isEmpty()) {
long totalCpu = 0;
console.printInfo("MapReduce Jobs Launched: ");
for (Map.Entry<String, MapRedStats> entry : stats.entrySet()) {
console.printInfo("Stage-" + entry.getKey() + ": " + entry.getValue());
totalCpu += entry.getValue().getCpuMSec();
}
console.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu));
}
LOG.info("Completed executing command(queryId=" + queryId + "); Time taken: " + duration + " seconds");
}
plan.setDone();
if (SessionState.get() != null) {
try {
SessionState.get().getHiveHistory().logPlanProgress(plan);
} catch (Exception e) {
// ignore
}
}
console.printInfo("OK");
return (0);
}
关键代码:
// Launch upto maxthreads tasks
Task<? extends Serializable> task;
while ((task = driverCxt.getRunnable(maxthreads)) != null) {
TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);
if (!runner.isRunning()) {
break;
}
}
// poll the Tasks to see which one completed
TaskRunner tskRun = driverCxt.pollFinished();
if (tskRun == null) {
continue;
}
hookContext.addCompleteTask(tskRun);
Task<? extends Serializable> tsk = tskRun.getTask();
TaskResult result = tskRun.getTaskResult();
最终是执行Task类的executeTask方法,
public int executeTask() {
try {
SessionState ss = SessionState.get();
this.setStarted();
if (ss != null) {
ss.getHiveHistory().logPlanProgress(queryPlan);
}
int retval = execute(driverContext);
this.setDone();
if (ss != null) {
ss.getHiveHistory().logPlanProgress(queryPlan);
}
return retval;
} catch (IOException e) {
throw new RuntimeException("Unexpected error: " + e.getMessage(), e);
}
}
而在executeTask方法里面,实际上是执行每个Task实现类的execute(driverContext)方法。最后还是通过yarnClient提交job到yarn集群上。
