一、简单转换算子

1、map算子

  1. // 将输入数据 一对一处理
  2. val streamMap = stream.map{x=>x*2}
  3. //用法二、通过Map转换算子把textFile转换为SensorReading类型的DataStream
  4. val dsStream : DataStream[SensorReading] = stream.map(e => {
  5. val datas = e.split(",")
  6. SensorReading(datas(0).trim.toString, datas(1).trim.toLong, datas(2).trim.toDouble)//返回值
  7. })

2、flatMap算子

  1. //把一个集合类型打散变成一个新的集合类型
  2. //如:flatMap(List(1,2,3))(i=>List(i,i)),结果是(1,1,2,2,3,3)
  3. 用法一:flapMap 将输入数据打散成一个List
  4. val stream1 = env.fromCollection(List(
  5. SensorReading("1", 1111111111, 21.11),
  6. SensorReading("2", 1111111112, 21.22),
  7. SensorReading("1", 1111111112, 21.22)
  8. ))
  9. val flapMap = stream1.flatMap(t=> List(t.id, t.timestamps, t.temperature))
  10. .map(t=> (t,t.toString.toDouble*2))
  11. flapMap.print("flapMap transform:")
  12. //用法二:flapMap 将输入的list数据 组合在一起 例如wordcount
  13. //(首先把第一个元素进行处理,再进行第二个元素的处理,最后把所有处理后的元素进行合并)
  14. val flapMap1 = List("a b c d", "b c f e").flatMap(_.split(" "))
  15. print(flapMap1)

3、filter算子

  1. //fliter 过滤出表达式返回true的数据
  2. val fil = stream1.filter(t => t.id == "1" && t.id.toInt%2!=0)
  3. fil.print("fliter transform:")

二、聚合分组算子

1、keyBy算子和滚动聚合算子

image.png
两者需要配合使用

  1. //转换算子四:keyBy DataStream->KeyedStream 分区不分流只是将流对象转换了
  2. //数据形式没改变只是可以进行聚合操作了,其实就是分组了,每个分区内部包含相同key的所有元素
  3. //数据如下:
  4. //s1, 1589897275136, 35.980
  5. //s1, 1581287812378, 35.980
  6. //s2, 1589897275141, 34.543
  7. //s3, 1589891285141, 36.789
  8. //s4, 1589897221736, 33.134
  9. //s4, 1589897221736, 33.134
  10. //s4, 1512736712361, 33.134
  11. val keyDSStream = dsStream.keyBy(_.id).sum(1) // 每组根据第二个参数进行求和,下标为1,这里keyBy的参数可以指定下标或者属性值
  12. keyDSStream.print("------1").setParallelism(1)
  13. dsStream.keyBy(_.id).max(1).print("max")
  14. dsStream.keyBy(_.id).min(1).print("min")
  15. //minBy、maxBy和min、max的区别在于前者会取最小最大的整一条数据,后者只会取指定那个属性值的最小或者最大,其他属性值可能不是那条的
  16. //reduce聚合(比较灵活) 当前传感器最新的温度+10 上一次的时间戳+1
  17. //第一个参数表示已经规约好了的数据,第二个参数表示当前要处理的数据
  18. val reduce: DataStream[SensorReading] = keyBy.reduce((curState, newData)=> SensorReading(curState.id, curState.timestamp+1, newData.temperature+10))
  19. reduce.print("reduce transform:").setParallelism(1)

三、多流转换算子

1、Split算子和Select算子

image.pngimage.png
split算子根据某些特征,把一个DataStream分成多个DataStream(DataStream->SplitStream)。
而select算子,是从一个SplitStream中获取一个或者多个DataStream(SplitStream->DataStream)。
Split算子一般配合select算子进行使用

  1. //多流转换算子 map&select
  2. //map可以对数据进行分组,select可以从分组的dataStream中获取不同的Stream
  3. val splitDStream = dsStream.split(datas => {
  4. if (datas.temperature > 35) Seq("higth") else Seq("low")
  5. })
  6. val low = splitDStream.select("low") // 通过使用select选择哪些流进行处理,可以传多个参数指定多个流
  7. val higth = splitDStream.select("higth")
  8. val all = splitDStream.select("higth","low")

2、合流算子Connect和CoMap

image.png
DataStream,DataStream->ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被连接之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
image.png
ConnectedStreams->DataStream:作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStream中的每一个Stream分别进行map和flatMap处理。

  1. // 例子一:
  2. val warning = higth.map( data => (data.id,data.temperature))
  3. val connectStream = warning.connect(low) //两个进行connect元素的类型可以不一样
  4. val coMapResultStream = connectStream.map( //传入两个参数,第一个参数表示connect前面的元素,第二个参数表示connect后面的元素
  5. warningData => (warningData._1,warningData._2), //对一个进行处理
  6. lowData => (lowData) //对第二个进行处理
  7. ).print("connect = ") //输出的类型也可以不一样
  8. // 例子二:
  9. //可以把两个stream进行合并,然后通过map或者flatMap操作
  10. val someStream : DataStream[Int] = env.fromCollection(List(1,2,3,4,5,6))
  11. val otherStream : DataStream[String] = env.fromCollection(List("A","B","C","D"))
  12. val connect = someStream.connect(otherStream)
  13. //map方法第一个参数是表示connect前的元素
  14. connect.map(
  15. x => (x,"Int"),
  16. y => (y,"String")
  17. ).print("connect = ").setParallelism(1)
  18. //例子三:union联合算子,可以合并多条流 , 但是流之间的数据结果必须相同
  19. val sec : DataStream[String] = env.fromCollection(List("E"))
  20. val third : DataStream[String] = env.fromCollection(List("F"))
  21. otherStream.union(sec,third).print("UNION = ")
  22. //结果 A B C D E F