• 节点间的参数传递
    https://www.cnblogs.com/xing901022/p/6501448.html
  • shell action 传递参数示例
    https://www.yuque.com/huioo/wdag30/of5v7u#44d0z
  • java action 传递参数示例
    http://www.myexception.cn/open-source/1306509.html

    output写

    参考代码如下:“src\sharelib\oozie\src\main\java\org\apache\oozie\action\hadoop\ShellMain.java”
    执行shell action,将输出完整写入到“oozie.action.output.properties”属性指定的文件中。

    1. /**
    2. * Execute the shell command
    3. *
    4. * @param actionConf
    5. * @return command exit value
    6. * @throws IOException
    7. */
    8. private int execute(Configuration actionConf) throws Exception {
    9. String exec = getExec(actionConf);
    10. List<String> args = getShellArguments(actionConf);
    11. ArrayList<String> cmdArray = getCmdList(exec, args.toArray(new String[args.size()]));
    12. ProcessBuilder builder = new ProcessBuilder(cmdArray);
    13. Map<String, String> envp = getEnvMap(builder.environment(), actionConf);
    14. ··· ···
    15. boolean captureOutput = actionConf.getBoolean(CONF_OOZIE_SHELL_CAPTURE_OUTPUT, false);
    16. // Execute the Command
    17. Process p = builder.start();
    18. Thread[] thrArray = handleShellOutput(p, captureOutput);
    19. ··· ···
    20. }
    21. /**
    22. * Print the output written by the Shell execution in its stdout/stderr.
    23. * Also write the stdout output to a file for capturing.
    24. *
    25. * @param p process
    26. * @param captureOutput indicates if STDOUT should be captured or not.
    27. * @return Array of threads (one for stdout and another one for stderr
    28. * processing
    29. * @throws IOException thrown if an IO error occurrs.
    30. */
    31. protected Thread[] handleShellOutput(Process p, boolean captureOutput)
    32. throws IOException {
    33. BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
    34. BufferedReader error = new BufferedReader(new InputStreamReader(p.getErrorStream()));
    35. OutputWriteThread thrStdout = new OutputWriteThread(input, true, captureOutput);
    36. thrStdout.setDaemon(true);
    37. thrStdout.start();
    38. OutputWriteThread thrStderr = new OutputWriteThread(error, false, false);
    39. thrStderr.setDaemon(true);
    40. thrStderr.start();
    41. return new Thread[]{ thrStdout, thrStderr };
    42. }
    43. /**
    44. * Thread to write output to LM stdout/stderr. Also write the content for
    45. * capture-output.
    46. */
    47. class OutputWriteThread extends Thread {
    48. BufferedReader reader = null;
    49. boolean isStdout = false;
    50. boolean needCaptured = false;
    51. public OutputWriteThread(BufferedReader reader, boolean isStdout, boolean needCaptured) {
    52. this.reader = reader;
    53. this.isStdout = isStdout;
    54. this.needCaptured = needCaptured;
    55. }
    56. @Override
    57. public void run() {
    58. String line;
    59. BufferedWriter os = null;
    60. try {
    61. if (needCaptured) {
    62. File file = new File(System.getProperty(
    63. // oozie.action.output.properties
    64. LauncherMapper.ACTION_PREFIX + LauncherMapper.ACTION_DATA_OUTPUT_PROPS
    65. ));
    66. os = new BufferedWriter(new FileWriter(file));
    67. }
    68. while ((line = reader.readLine()) != null) {
    69. if (isStdout) { // For stdout
    70. // 1. Writing to LM STDOUT
    71. System.out.println("Stdoutput " + line);
    72. // 2. Writing for capture output
    73. if (os != null) {
    74. os.write(line); //
    75. os.newLine(); // Writes a line separator.
    76. }
    77. ··· ···
    78. os.close();
    79. ··· ···

    actionData写output

    参考代码:“src\sharelib\oozie\src\main\java\org\apache\oozie\action\hadoop\LauncherMapper.java”
    actionData读取写入的内容,将上面shell的数据捕获。在action node中使用。 ```java private Map actionData; ··· ··· public LauncherMapper() {

    1. actionData = new HashMap<String,String>();

    }

  1. ··· ···
  2. if (errorMessage == null) {
  3. handleActionData();
  4. if (actionData.get(ACTION_DATA_OUTPUT_PROPS) != null) {
  5. System.out.println();
  6. System.out.println("Oozie Launcher, capturing output data:");
  7. System.out.println("=======================");
  8. System.out.println(actionData.get(ACTION_DATA_OUTPUT_PROPS));
  9. System.out.println();
  10. System.out.println("=======================");
  11. System.out.println();
  12. }
  13. ··· ···
  14. finally {
  15. uploadActionDataToHDFS();
  16. }
  17. ··· ···
  18. private void handleActionData() throws IOException, LauncherException {
  19. ··· ···
  20. // output data
  21. String outputProp = System.getProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS);
  22. if (outputProp != null) {
  23. File actionOutputData = new File(outputProp);
  24. if (actionOutputData.exists()) {
  25. int maxOutputData = getJobConf().getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024);
  26. actionData.put(ACTION_DATA_OUTPUT_PROPS,
  27. getLocalFileContentStr(actionOutputData, "Output", maxOutputData));
  28. }
  29. }
  30. ··· ···
  31. }
  32. public static String getLocalFileContentStr(File file, String type, int maxLen) throws LauncherException, IOException {
  33. StringBuffer sb = new StringBuffer();
  34. FileReader reader = new FileReader(file);
  35. char[] buffer = new char[2048];
  36. int read;
  37. int count = 0;
  38. while ((read = reader.read(buffer)) > -1) {
  39. count += read;
  40. if (maxLen > -1 && count > maxLen) {
  41. throw new LauncherException(type + " data exceeds its limit ["+ maxLen + "]");
  42. }
  43. sb.append(buffer, 0, read);
  44. }
  45. reader.close();
  46. return sb.toString();
  47. }
  1. <a name="noDK2"></a>
  2. ## Apache oozie
  3. oozie源码,上面是cdh版本4.0.0版本,下面是Apache oozie版本4.0.0版本。处于同一个文件中。
  4. ```java
  5. if (errorMessage == null) {
  6. File outputData = new File(System.getProperty("oozie.action.output.properties"));
  7. if (outputData.exists()) {
  8. URI actionDirUri = new Path(actionDir, ACTION_OUTPUT_PROPS).toUri();
  9. FileSystem fs = FileSystem.get(actionDirUri, getJobConf());
  10. fs.copyFromLocalFile(new Path(outputData.toString()), new Path(actionDir,
  11. ACTION_OUTPUT_PROPS));
  12. reporter.incrCounter(COUNTER_GROUP, COUNTER_OUTPUT_DATA, 1);
  13. int maxOutputData = getJobConf().getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024);
  14. if (outputData.length() > maxOutputData) {
  15. String msg = MessageFormat.format("Output data size [{0}] exceeds maximum [{1}]",
  16. outputData.length(), maxOutputData);
  17. failLauncher(0, msg, null);
  18. }
  19. System.out.println();
  20. System.out.println("Oozie Launcher, capturing output data:");
  21. System.out.println("=======================");
  22. Properties props = new Properties();
  23. props.load(new FileReader(outputData));
  24. props.store(System.out, "");
  25. System.out.println();
  26. System.out.println("=======================");
  27. System.out.println();
  28. }

actionData读取

参考代码如下:“core\src\main\java\org\apache\oozie\action\hadoop\JavaActionExecutor.java”
actionData读取,载入content。“props = PropertiesUtils.readProperties(reader, maxActionOutputLen);”

  1. import org.apache.oozie.util.PropertiesUtils;
  2. ··· ···
  3. /**
  4. * Get the output data of an action. Subclasses should override this method
  5. * to get action specific output data.
  6. *
  7. * @param actionFs the FileSystem object
  8. * @param runningJob the runningJob
  9. * @param action the Workflow action
  10. * @param context executor context
  11. *
  12. */
  13. protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
  14. throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
  15. Properties props = null;
  16. if (getCaptureOutput(action)) {
  17. props = new Properties();
  18. if (LauncherMapperHelper.hasOutputData(runningJob)) {
  19. Path actionOutput = LauncherMapperHelper.getOutputDataPath(context.getActionDir());
  20. InputStream is = actionFs.open(actionOutput);
  21. BufferedReader reader = new BufferedReader(new InputStreamReader(is));
  22. props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
  23. reader.close();
  24. }
  25. }
  26. context.setExecutionData(SUCCEEDED, props);
  27. }
  28. protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
  29. Element eConf = XmlUtils.parseXml(action.getConf());
  30. Namespace ns = eConf.getNamespace();
  31. Element captureOutput = eConf.getChild("capture-output", ns);
  32. return captureOutput != null;
  33. }
  34. ··· ···

actionData读取方式

参考代码:“core\src\main\java\org\apache\oozie\util\PropertiesUtils.java”
actionData读取加载方式。

  • https://docs.oracle.com/javase/8/docs/api/java/util/Properties.html
    public class Properties extends Hashtable<Object,Object>
    load(Reader reader)
    Reads a property list (key and element pairs) from the input character stream in a simple line-oriented format.
    https://docs.oracle.com/javase/8/docs/api/java/util/Properties.html#load-java.io.Reader-

    1. import java.util.Properties;
    2. ··· ···
    3. public static Properties stringToProperties(String str) {
    4. ParamChecker.notNull(str, "str");
    5. try {
    6. StringReader sr = new StringReader(str);
    7. Properties props = new Properties();
    8. props.load(sr);
    9. sr.close();
    10. return props;
    11. }
    12. catch (IOException ex) {
    13. throw new RuntimeException(ex);
    14. }
    15. }
    16. public static Properties readProperties(Reader reader, int maxDataLen) throws IOException {
    17. String data = IOUtils.getReaderAsString(reader, maxDataLen);
    18. return stringToProperties(data);
    19. }

    “IOUtils.getReaderAsString(reader, maxDataLen);”

    1. ··· ···
    2. /**
    3. * Return a reader as string. <p/>
    4. *
    5. * @param reader reader to read into a string.
    6. * @param maxLen max content length allowed, if -1 there is no limit.
    7. * @return the reader content.
    8. * @throws IOException thrown if the resource could not be read.
    9. */
    10. public static String getReaderAsString(Reader reader, int maxLen) throws IOException {
    11. ParamChecker.notNull(reader, "reader");
    12. StringBuffer sb = new StringBuffer();
    13. char[] buffer = new char[2048];
    14. int read;
    15. int count = 0;
    16. while ((read = reader.read(buffer)) > -1) {
    17. count += read;
    18. if (maxLen > -1 && count > maxLen) {
    19. throw new IllegalArgumentException(XLog.format("stream exceeds limit [{0}]", maxLen));
    20. }
    21. sb.append(buffer, 0, read);
    22. }
    23. reader.close();
    24. return sb.toString();
    25. }
    26. ··· ···