第一个例子:批处理统计wordcount
package startimport org.apache.flink.api.scala._//把scala隐式转换也引入,否则执行会报错import org.apache.flink.api.scala.ExecutionEnvironmentobject WordCount {def main(args: Array[String]): Unit = {//1、创建一个批处理的执行环境val env = ExecutionEnvironment.getExecutionEnvironment//2、从文件读取数据,得到一个DataSet类型的字符串var inpurtPath = "C:\\ideaworkshop\\FlinkDemo\\NetWorkFlowAnalysis\\src\\main\\resources\\file.txt";val inputDataSet = env.readTextFile(inpurtPath)//3、对数据进行转换分词var words = inputDataSet.flatMap(_.split(" ")).map((_,1)) //map算子转换为一个二元组.groupBy(0) //以第一个元素作为key,进行分组.sum(1) //对所有数据的第二个元素进行求和//4、数据输出words.print()}}
第二个例子:流处理统计wordcount
package startimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment/*** 流式计算wordCount*/object StreamWordCountDemo {def main(args: Array[String]): Unit = {//1、创建流处理执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//2、接收一个socket文本流,监听本地7777端口,得到一个DataStream类型的字符串val textDataStream = env.socketTextStream("localhost",7777)//3、进行转化处理统计val resultDataStream = textDataStream.flatMap(_.split(" ")).filter(_.nonEmpty).map((_,1)).keyBy(0).sum(1)resultDataStream.print()println("--------------------------------------------")//比批处理多一步,启动任务执行,执行作业运行名字为streams wordcountenv.execute("streams wordcount")}}//在window上可以通过netcat工具使用"nc -l -p 7777"命令指定端口,作为这里流式处理的输入/**在控制台输出格式如下,前面的数字表示在哪个线程执行,可以通过env.setParallelism(12)设置并行线程数,也可以在每一步后面setParallelism参数1> (Hello,1)3> (Tony,1)2> (World,1)1> (Hello,2)**/
第二例子的扩展:通过ParameterTool接收程序输入的参数
package startimport org.apache.flink.api.java.utils.ParameterToolimport org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}/*** 流式计算wordCount*/object StreamWordCountDemo {def main(args: Array[String]): Unit = {//1、创建流处理执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//4、这样可以在运行程序通过参数--host localhost --port 7777 进行指定val parameterTool: ParameterTool = ParameterTool.fromArgs(args)val host: String = parameterTool.get("host")val port: Int = parameterTool.getInt("port")//2、接收一个socket文本流,监听本地7777端口,得到一个DataStream类型的字符串val textDataStream = env.socketTextStream(host,port)//3、进行转化处理统计val words = textDataStream.flatMap(_.split(" ")).filter(_.nonEmpty).map((_,1)).keyBy(0).sum(1)words.print()println("--------------------------------------------")env.execute("streams wordcount")}}
第三个例子:来源Source的读取,从集合读取数据、从文本读取数据、从kafka读取数据、从自定义source读取数据。
package datasourceimport java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.functions.source.SourceFunctionimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011import scala.util.Random//定义的样例类,相当于定义数据类型case class SensorReading(id:String,timestamp:Long,temperature:Double)/*** dataSource的种类*/object SourceDemo {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//1、从自定义数据集中读取数据,有界流val stream1 = env.fromCollection(List(SensorReading("s1", 11111181, 35.980),SensorReading("s2", 11111181, 35.980),SensorReading("s3", 11111181, 35.980),SensorReading("s4", 11111181, 35.980)))stream1.print("stream1").setParallelism(1)println("-----------------------------------------------")//2、从文件读取数据,有界流val stream2 = env.readTextFile("C:\\ideaworkshop\\FlinkDemo\\LearnFirst\\src\\main\\resources\\sensor.txt")stream2.print("stream2")//3、从kakfa中读取/*val properties = new Properties()//通过properties加载kafka的配置properties.setProperty("bootstrap.servers","localhost:9200")properties.setProperty("group.id","consumer-group")//FlinkKafkaConsumer011的11表示版本号,读取的是String类型的数据,从sensor的topic读取数据,反序列化的类val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor",new SimpleStringSchema(),properties))stream3.print("stream3")*///4、自定义Sourceval stream4 = env.addSource(new SensorSource())stream4.print("stream4")env.execute("Source Test")}}/*** 自定义dataSource,继承SourceFunction接口(指定输出类型作为泛型),重写run方法和cancel方法*/class SensorSource() extends SourceFunction[SensorReading]{var running:Boolean = trueoverride def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {val random = new Random()var curTemp = 1.to(10).map(e => ("sensor"+e , System.currentTimeMillis() , 60+random.nextGaussian()))//不断生产数据流while (running){curTemp.foreach(//使用ctxt => {ctx.collect(SensorReading(t._1 , System.currentTimeMillis() , t._3+random.nextGaussian() ))Thread.sleep(2000)})}}override def cancel(): Unit = {running = false}}
