从flink源码中案例开始讲解
public class WordCount {
public static void main(String[] args) throws Exception {
final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
// get input data
DataSet<String> text = null;
if (params.has("input")) {
// union all the inputs from text files
for (String input : params.getMultiParameterRequired("input")) {
if (text == null) {
text = env.readTextFile(input);
} else {
text = text.union(env.readTextFile(input));
}
}
Preconditions.checkNotNull(text, "Input DataSet should not be null.");
} else {
// get default test text data
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
text = WordCountData.getDefaultTextLineDataSet(env);
}
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// emit result
if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", " ");
// execute program
env.execute("WordCount Example");
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
}
程序的最终开始执行是从 env.execute开始
从 env.execute 点击 ,我们首先看 final JobClient jobClient = executeAsync(jobName); 从这个方法继续点击
从这个方法中,我们可以看到程序开始执行的位置
从这个方法中查看详细的执行步骤
可以看出,所有的执行器是 PipelineExecutor 接口, 具体的执行器,需要继承该方法
该LocalExecutor的** execute 方法中,可以看到详细的作业执行流程,接下来,我们分别从这几个方法中入手,详细分析具体的实现步骤细节**
1.getJobGraph 方法中详细 将 streamGraph 如何转化为 JobGraph
2.startMiniCluster 方法是如何 启动ResourceManager, JobMaster 等 信息
3.submitJob如何提交自己写的代码作业