从flink源码中案例开始讲解

  1. public class WordCount {
  2. public static void main(String[] args) throws Exception {
  3. final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
  4. // set up the execution environment
  5. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  6. // make parameters available in the web interface
  7. env.getConfig().setGlobalJobParameters(params);
  8. // get input data
  9. DataSet<String> text = null;
  10. if (params.has("input")) {
  11. // union all the inputs from text files
  12. for (String input : params.getMultiParameterRequired("input")) {
  13. if (text == null) {
  14. text = env.readTextFile(input);
  15. } else {
  16. text = text.union(env.readTextFile(input));
  17. }
  18. }
  19. Preconditions.checkNotNull(text, "Input DataSet should not be null.");
  20. } else {
  21. // get default test text data
  22. System.out.println("Executing WordCount example with default input data set.");
  23. System.out.println("Use --input to specify file input.");
  24. text = WordCountData.getDefaultTextLineDataSet(env);
  25. }
  26. DataSet<Tuple2<String, Integer>> counts =
  27. // split up the lines in pairs (2-tuples) containing: (word,1)
  28. text.flatMap(new Tokenizer())
  29. // group by the tuple field "0" and sum up tuple field "1"
  30. .groupBy(0)
  31. .sum(1);
  32. // emit result
  33. if (params.has("output")) {
  34. counts.writeAsCsv(params.get("output"), "\n", " ");
  35. // execute program
  36. env.execute("WordCount Example");
  37. } else {
  38. System.out.println("Printing result to stdout. Use --output to specify output path.");
  39. counts.print();
  40. }
  41. }

程序的最终开始执行是从 env.execute开始
image.png
从 env.execute 点击 ,我们首先看 final JobClient jobClient = executeAsync(jobName); 从这个方法继续点击
image.png
从这个方法中,我们可以看到程序开始执行的位置
image.png
从这个方法中查看详细的执行步骤
image.png
可以看出,所有的执行器是 PipelineExecutor 接口, 具体的执行器,需要继承该方法
image.png
LocalExecutor的** execute 方法中,可以看到详细的作业执行流程,接下来,我们分别从这几个方法中入手,详细分析具体的实现步骤细节**
image.png

1.getJobGraph 方法中详细 将 streamGraph 如何转化为 JobGraph
2.startMiniCluster 方法是如何 启动ResourceManager, JobMaster 等 信息
3.submitJob如何提交自己写的代码作业