- 节点间的参数传递
https://www.cnblogs.com/xing901022/p/6501448.html - shell action 传递参数示例
https://www.yuque.com/huioo/wdag30/of5v7u#44d0z java action 传递参数示例
http://www.myexception.cn/open-source/1306509.htmloutput写
参考代码如下:“src\sharelib\oozie\src\main\java\org\apache\oozie\action\hadoop\ShellMain.java”
执行shell action,将输出完整写入到“oozie.action.output.properties”属性指定的文件中。/*** Execute the shell command** @param actionConf* @return command exit value* @throws IOException*/private int execute(Configuration actionConf) throws Exception {String exec = getExec(actionConf);List<String> args = getShellArguments(actionConf);ArrayList<String> cmdArray = getCmdList(exec, args.toArray(new String[args.size()]));ProcessBuilder builder = new ProcessBuilder(cmdArray);Map<String, String> envp = getEnvMap(builder.environment(), actionConf);··· ···boolean captureOutput = actionConf.getBoolean(CONF_OOZIE_SHELL_CAPTURE_OUTPUT, false);// Execute the CommandProcess p = builder.start();Thread[] thrArray = handleShellOutput(p, captureOutput);··· ···}/*** Print the output written by the Shell execution in its stdout/stderr.* Also write the stdout output to a file for capturing.** @param p process* @param captureOutput indicates if STDOUT should be captured or not.* @return Array of threads (one for stdout and another one for stderr* processing* @throws IOException thrown if an IO error occurrs.*/protected Thread[] handleShellOutput(Process p, boolean captureOutput)throws IOException {BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));BufferedReader error = new BufferedReader(new InputStreamReader(p.getErrorStream()));OutputWriteThread thrStdout = new OutputWriteThread(input, true, captureOutput);thrStdout.setDaemon(true);thrStdout.start();OutputWriteThread thrStderr = new OutputWriteThread(error, false, false);thrStderr.setDaemon(true);thrStderr.start();return new Thread[]{ thrStdout, thrStderr };}/*** Thread to write output to LM stdout/stderr. Also write the content for* capture-output.*/class OutputWriteThread extends Thread {BufferedReader reader = null;boolean isStdout = false;boolean needCaptured = false;public OutputWriteThread(BufferedReader reader, boolean isStdout, boolean needCaptured) {this.reader = reader;this.isStdout = isStdout;this.needCaptured = needCaptured;}@Overridepublic void run() {String line;BufferedWriter os = null;try {if (needCaptured) {File file = new File(System.getProperty(// oozie.action.output.propertiesLauncherMapper.ACTION_PREFIX + LauncherMapper.ACTION_DATA_OUTPUT_PROPS));os = new BufferedWriter(new FileWriter(file));}while ((line = reader.readLine()) != null) {if (isStdout) { // For stdout// 1. Writing to LM STDOUTSystem.out.println("Stdoutput " + line);// 2. Writing for capture outputif (os != null) {os.write(line); //os.newLine(); // Writes a line separator.}··· ···os.close();··· ···
actionData写output
参考代码:“src\sharelib\oozie\src\main\java\org\apache\oozie\action\hadoop\LauncherMapper.java”
actionData读取写入的内容,将上面shell的数据捕获。在action node中使用。 ```java private MapactionData; ··· ··· public LauncherMapper() { actionData = new HashMap<String,String>();
}
··· ···if (errorMessage == null) {handleActionData();if (actionData.get(ACTION_DATA_OUTPUT_PROPS) != null) {System.out.println();System.out.println("Oozie Launcher, capturing output data:");System.out.println("=======================");System.out.println(actionData.get(ACTION_DATA_OUTPUT_PROPS));System.out.println();System.out.println("=======================");System.out.println();}··· ···finally {uploadActionDataToHDFS();}··· ···private void handleActionData() throws IOException, LauncherException {··· ···// output dataString outputProp = System.getProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS);if (outputProp != null) {File actionOutputData = new File(outputProp);if (actionOutputData.exists()) {int maxOutputData = getJobConf().getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024);actionData.put(ACTION_DATA_OUTPUT_PROPS,getLocalFileContentStr(actionOutputData, "Output", maxOutputData));}}··· ···}public static String getLocalFileContentStr(File file, String type, int maxLen) throws LauncherException, IOException {StringBuffer sb = new StringBuffer();FileReader reader = new FileReader(file);char[] buffer = new char[2048];int read;int count = 0;while ((read = reader.read(buffer)) > -1) {count += read;if (maxLen > -1 && count > maxLen) {throw new LauncherException(type + " data exceeds its limit ["+ maxLen + "]");}sb.append(buffer, 0, read);}reader.close();return sb.toString();}
<a name="noDK2"></a>## Apache oozieoozie源码,上面是cdh版本4.0.0版本,下面是Apache oozie版本4.0.0版本。处于同一个文件中。```javaif (errorMessage == null) {File outputData = new File(System.getProperty("oozie.action.output.properties"));if (outputData.exists()) {URI actionDirUri = new Path(actionDir, ACTION_OUTPUT_PROPS).toUri();FileSystem fs = FileSystem.get(actionDirUri, getJobConf());fs.copyFromLocalFile(new Path(outputData.toString()), new Path(actionDir,ACTION_OUTPUT_PROPS));reporter.incrCounter(COUNTER_GROUP, COUNTER_OUTPUT_DATA, 1);int maxOutputData = getJobConf().getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024);if (outputData.length() > maxOutputData) {String msg = MessageFormat.format("Output data size [{0}] exceeds maximum [{1}]",outputData.length(), maxOutputData);failLauncher(0, msg, null);}System.out.println();System.out.println("Oozie Launcher, capturing output data:");System.out.println("=======================");Properties props = new Properties();props.load(new FileReader(outputData));props.store(System.out, "");System.out.println();System.out.println("=======================");System.out.println();}
actionData读取
参考代码如下:“core\src\main\java\org\apache\oozie\action\hadoop\JavaActionExecutor.java”
actionData读取,载入content。“props = PropertiesUtils.readProperties(reader, maxActionOutputLen);”
import org.apache.oozie.util.PropertiesUtils;··· ···/*** Get the output data of an action. Subclasses should override this method* to get action specific output data.** @param actionFs the FileSystem object* @param runningJob the runningJob* @param action the Workflow action* @param context executor context**/protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {Properties props = null;if (getCaptureOutput(action)) {props = new Properties();if (LauncherMapperHelper.hasOutputData(runningJob)) {Path actionOutput = LauncherMapperHelper.getOutputDataPath(context.getActionDir());InputStream is = actionFs.open(actionOutput);BufferedReader reader = new BufferedReader(new InputStreamReader(is));props = PropertiesUtils.readProperties(reader, maxActionOutputLen);reader.close();}}context.setExecutionData(SUCCEEDED, props);}protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {Element eConf = XmlUtils.parseXml(action.getConf());Namespace ns = eConf.getNamespace();Element captureOutput = eConf.getChild("capture-output", ns);return captureOutput != null;}··· ···
actionData读取方式
参考代码:“core\src\main\java\org\apache\oozie\util\PropertiesUtils.java”
actionData读取加载方式。
https://docs.oracle.com/javase/8/docs/api/java/util/Properties.html
public class Properties extends Hashtable<Object,Object>load(Reader reader)
Reads a property list (key and element pairs) from the input character stream in a simple line-oriented format.
https://docs.oracle.com/javase/8/docs/api/java/util/Properties.html#load-java.io.Reader-import java.util.Properties;··· ···public static Properties stringToProperties(String str) {ParamChecker.notNull(str, "str");try {StringReader sr = new StringReader(str);Properties props = new Properties();props.load(sr);sr.close();return props;}catch (IOException ex) {throw new RuntimeException(ex);}}public static Properties readProperties(Reader reader, int maxDataLen) throws IOException {String data = IOUtils.getReaderAsString(reader, maxDataLen);return stringToProperties(data);}
“IOUtils.getReaderAsString(reader, maxDataLen);”
··· ···/*** Return a reader as string. <p/>** @param reader reader to read into a string.* @param maxLen max content length allowed, if -1 there is no limit.* @return the reader content.* @throws IOException thrown if the resource could not be read.*/public static String getReaderAsString(Reader reader, int maxLen) throws IOException {ParamChecker.notNull(reader, "reader");StringBuffer sb = new StringBuffer();char[] buffer = new char[2048];int read;int count = 0;while ((read = reader.read(buffer)) > -1) {count += read;if (maxLen > -1 && count > maxLen) {throw new IllegalArgumentException(XLog.format("stream exceeds limit [{0}]", maxLen));}sb.append(buffer, 0, read);}reader.close();return sb.toString();}··· ···
