一、简单转换算子
1、map算子
// 将输入数据 一对一处理
val streamMap = stream.map{x=>x*2}
//用法二、通过Map转换算子把textFile转换为SensorReading类型的DataStream
val dsStream : DataStream[SensorReading] = stream.map(e => {
val datas = e.split(",")
SensorReading(datas(0).trim.toString, datas(1).trim.toLong, datas(2).trim.toDouble)//返回值
})
2、flatMap算子
//把一个集合类型打散变成一个新的集合类型
//如:flatMap(List(1,2,3))(i=>List(i,i)),结果是(1,1,2,2,3,3)
用法一:flapMap 将输入数据打散成一个List
val stream1 = env.fromCollection(List(
SensorReading("1", 1111111111, 21.11),
SensorReading("2", 1111111112, 21.22),
SensorReading("1", 1111111112, 21.22)
))
val flapMap = stream1.flatMap(t=> List(t.id, t.timestamps, t.temperature))
.map(t=> (t,t.toString.toDouble*2))
flapMap.print("flapMap transform:")
//用法二:flapMap 将输入的list数据 组合在一起 例如wordcount
//(首先把第一个元素进行处理,再进行第二个元素的处理,最后把所有处理后的元素进行合并)
val flapMap1 = List("a b c d", "b c f e").flatMap(_.split(" "))
print(flapMap1)
3、filter算子
//fliter 过滤出表达式返回true的数据
val fil = stream1.filter(t => t.id == "1" && t.id.toInt%2!=0)
fil.print("fliter transform:")
二、聚合分组算子
1、keyBy算子和滚动聚合算子
两者需要配合使用
//转换算子四:keyBy DataStream->KeyedStream 分区不分流只是将流对象转换了
//数据形式没改变只是可以进行聚合操作了,其实就是分组了,每个分区内部包含相同key的所有元素
//数据如下:
//s1, 1589897275136, 35.980
//s1, 1581287812378, 35.980
//s2, 1589897275141, 34.543
//s3, 1589891285141, 36.789
//s4, 1589897221736, 33.134
//s4, 1589897221736, 33.134
//s4, 1512736712361, 33.134
val keyDSStream = dsStream.keyBy(_.id).sum(1) // 每组根据第二个参数进行求和,下标为1,这里keyBy的参数可以指定下标或者属性值
keyDSStream.print("------1").setParallelism(1)
dsStream.keyBy(_.id).max(1).print("max")
dsStream.keyBy(_.id).min(1).print("min")
//minBy、maxBy和min、max的区别在于前者会取最小最大的整一条数据,后者只会取指定那个属性值的最小或者最大,其他属性值可能不是那条的
//reduce聚合(比较灵活) 当前传感器最新的温度+10 上一次的时间戳+1
//第一个参数表示已经规约好了的数据,第二个参数表示当前要处理的数据
val reduce: DataStream[SensorReading] = keyBy.reduce((curState, newData)=> SensorReading(curState.id, curState.timestamp+1, newData.temperature+10))
reduce.print("reduce transform:").setParallelism(1)
三、多流转换算子
1、Split算子和Select算子
split算子根据某些特征,把一个DataStream分成多个DataStream(DataStream->SplitStream)。
而select算子,是从一个SplitStream中获取一个或者多个DataStream(SplitStream->DataStream)。
Split算子一般配合select算子进行使用
//多流转换算子 map&select
//map可以对数据进行分组,select可以从分组的dataStream中获取不同的Stream
val splitDStream = dsStream.split(datas => {
if (datas.temperature > 35) Seq("higth") else Seq("low")
})
val low = splitDStream.select("low") // 通过使用select选择哪些流进行处理,可以传多个参数指定多个流
val higth = splitDStream.select("higth")
val all = splitDStream.select("higth","low")
2、合流算子Connect和CoMap
DataStream,DataStream->ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被连接之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
ConnectedStreams->DataStream:作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStream中的每一个Stream分别进行map和flatMap处理。
// 例子一:
val warning = higth.map( data => (data.id,data.temperature))
val connectStream = warning.connect(low) //两个进行connect元素的类型可以不一样
val coMapResultStream = connectStream.map( //传入两个参数,第一个参数表示connect前面的元素,第二个参数表示connect后面的元素
warningData => (warningData._1,warningData._2), //对一个进行处理
lowData => (lowData) //对第二个进行处理
).print("connect = ") //输出的类型也可以不一样
// 例子二:
//可以把两个stream进行合并,然后通过map或者flatMap操作
val someStream : DataStream[Int] = env.fromCollection(List(1,2,3,4,5,6))
val otherStream : DataStream[String] = env.fromCollection(List("A","B","C","D"))
val connect = someStream.connect(otherStream)
//map方法第一个参数是表示connect前的元素
connect.map(
x => (x,"Int"),
y => (y,"String")
).print("connect = ").setParallelism(1)
//例子三:union联合算子,可以合并多条流 , 但是流之间的数据结果必须相同
val sec : DataStream[String] = env.fromCollection(List("E"))
val third : DataStream[String] = env.fromCollection(List("F"))
otherStream.union(sec,third).print("UNION = ")
//结果 A B C D E F