第一个例子:批处理统计wordcount

    1. package start
    2. import org.apache.flink.api.scala._//把scala隐式转换也引入,否则执行会报错
    3. import org.apache.flink.api.scala.ExecutionEnvironment
    4. object WordCount {
    5. def main(args: Array[String]): Unit = {
    6. //1、创建一个批处理的执行环境
    7. val env = ExecutionEnvironment.getExecutionEnvironment
    8. //2、从文件读取数据,得到一个DataSet类型的字符串
    9. var inpurtPath = "C:\\ideaworkshop\\FlinkDemo\\NetWorkFlowAnalysis\\src\\main\\resources\\file.txt";
    10. val inputDataSet = env.readTextFile(inpurtPath)
    11. //3、对数据进行转换分词
    12. var words = inputDataSet.flatMap(_.split(" "))
    13. .map((_,1)) //map算子转换为一个二元组
    14. .groupBy(0) //以第一个元素作为key,进行分组
    15. .sum(1) //对所有数据的第二个元素进行求和
    16. //4、数据输出
    17. words.print()
    18. }
    19. }

    第二个例子:流处理统计wordcount

    1. package start
    2. import org.apache.flink.streaming.api.scala._
    3. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    4. /**
    5. * 流式计算wordCount
    6. */
    7. object StreamWordCountDemo {
    8. def main(args: Array[String]): Unit = {
    9. //1、创建流处理执行环境
    10. val env = StreamExecutionEnvironment.getExecutionEnvironment
    11. //2、接收一个socket文本流,监听本地7777端口,得到一个DataStream类型的字符串
    12. val textDataStream = env.socketTextStream("localhost",7777)
    13. //3、进行转化处理统计
    14. val resultDataStream = textDataStream.flatMap(_.split(" ")).filter(_.nonEmpty)
    15. .map((_,1))
    16. .keyBy(0).sum(1)
    17. resultDataStream.print()
    18. println("--------------------------------------------")
    19. //比批处理多一步,启动任务执行,执行作业运行名字为streams wordcount
    20. env.execute("streams wordcount")
    21. }
    22. }
    23. //在window上可以通过netcat工具使用"nc -l -p 7777"命令指定端口,作为这里流式处理的输入
    24. /**
    25. 在控制台输出格式如下,前面的数字表示在哪个线程执行,可以通过env.setParallelism(12)设置并行线程数,
    26. 也可以在每一步后面setParallelism参数
    27. 1> (Hello,1)
    28. 3> (Tony,1)
    29. 2> (World,1)
    30. 1> (Hello,2)
    31. **/

    第二例子的扩展:通过ParameterTool接收程序输入的参数

    1. package start
    2. import org.apache.flink.api.java.utils.ParameterTool
    3. import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
    4. /**
    5. * 流式计算wordCount
    6. */
    7. object StreamWordCountDemo {
    8. def main(args: Array[String]): Unit = {
    9. //1、创建流处理执行环境
    10. val env = StreamExecutionEnvironment.getExecutionEnvironment
    11. env.setParallelism(1)
    12. //4、这样可以在运行程序通过参数--host localhost --port 7777 进行指定
    13. val parameterTool: ParameterTool = ParameterTool.fromArgs(args)
    14. val host: String = parameterTool.get("host")
    15. val port: Int = parameterTool.getInt("port")
    16. //2、接收一个socket文本流,监听本地7777端口,得到一个DataStream类型的字符串
    17. val textDataStream = env.socketTextStream(host,port)
    18. //3、进行转化处理统计
    19. val words = textDataStream.flatMap(_.split(" ")).filter(_.nonEmpty)
    20. .map((_,1))
    21. .keyBy(0).sum(1)
    22. words.print()
    23. println("--------------------------------------------")
    24. env.execute("streams wordcount")
    25. }
    26. }

    第三个例子:来源Source的读取,从集合读取数据、从文本读取数据、从kafka读取数据、从自定义source读取数据

    1. package datasource
    2. import java.util.Properties
    3. import org.apache.flink.api.common.serialization.SimpleStringSchema
    4. import org.apache.flink.streaming.api.functions.source.SourceFunction
    5. import org.apache.flink.streaming.api.scala._
    6. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
    7. import scala.util.Random
    8. //定义的样例类,相当于定义数据类型
    9. case class SensorReading(id:String,timestamp:Long,temperature:Double)
    10. /**
    11. * dataSource的种类
    12. */
    13. object SourceDemo {
    14. def main(args: Array[String]): Unit = {
    15. val env = StreamExecutionEnvironment.getExecutionEnvironment
    16. //1、从自定义数据集中读取数据,有界流
    17. val stream1 = env.fromCollection(List(
    18. SensorReading("s1", 11111181, 35.980),
    19. SensorReading("s2", 11111181, 35.980),
    20. SensorReading("s3", 11111181, 35.980),
    21. SensorReading("s4", 11111181, 35.980)
    22. ))
    23. stream1.print("stream1").setParallelism(1)
    24. println("-----------------------------------------------")
    25. //2、从文件读取数据,有界流
    26. val stream2 = env.readTextFile("C:\\ideaworkshop\\FlinkDemo\\LearnFirst\\src\\main\\resources\\sensor.txt")
    27. stream2.print("stream2")
    28. //3、从kakfa中读取
    29. /*val properties = new Properties()//通过properties加载kafka的配置
    30. properties.setProperty("bootstrap.servers","localhost:9200")
    31. properties.setProperty("group.id","consumer-group")
    32. //FlinkKafkaConsumer011的11表示版本号,读取的是String类型的数据,从sensor的topic读取数据,反序列化的类
    33. val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor",new SimpleStringSchema(),properties))
    34. stream3.print("stream3")*/
    35. //4、自定义Source
    36. val stream4 = env.addSource(new SensorSource())
    37. stream4.print("stream4")
    38. env.execute("Source Test")
    39. }
    40. }
    41. /**
    42. * 自定义dataSource,继承SourceFunction接口(指定输出类型作为泛型),重写run方法和cancel方法
    43. */
    44. class SensorSource() extends SourceFunction[SensorReading]{
    45. var running:Boolean = true
    46. override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
    47. val random = new Random()
    48. var curTemp = 1.to(10).map(e => ("sensor"+e , System.currentTimeMillis() , 60+random.nextGaussian()))
    49. //不断生产数据流
    50. while (running){
    51. curTemp.foreach(
    52. //使用ctx
    53. t => {
    54. ctx.collect(SensorReading(t._1 , System.currentTimeMillis() , t._3+random.nextGaussian() ))
    55. Thread.sleep(2000)
    56. }
    57. )
    58. }
    59. }
    60. override def cancel(): Unit = {
    61. running = false
    62. }
    63. }