第一个例子:批处理统计wordcount
package start
import org.apache.flink.api.scala._//把scala隐式转换也引入,否则执行会报错
import org.apache.flink.api.scala.ExecutionEnvironment
object WordCount {
def main(args: Array[String]): Unit = {
//1、创建一个批处理的执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//2、从文件读取数据,得到一个DataSet类型的字符串
var inpurtPath = "C:\\ideaworkshop\\FlinkDemo\\NetWorkFlowAnalysis\\src\\main\\resources\\file.txt";
val inputDataSet = env.readTextFile(inpurtPath)
//3、对数据进行转换分词
var words = inputDataSet.flatMap(_.split(" "))
.map((_,1)) //map算子转换为一个二元组
.groupBy(0) //以第一个元素作为key,进行分组
.sum(1) //对所有数据的第二个元素进行求和
//4、数据输出
words.print()
}
}
第二个例子:流处理统计wordcount
package start
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* 流式计算wordCount
*/
object StreamWordCountDemo {
def main(args: Array[String]): Unit = {
//1、创建流处理执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2、接收一个socket文本流,监听本地7777端口,得到一个DataStream类型的字符串
val textDataStream = env.socketTextStream("localhost",7777)
//3、进行转化处理统计
val resultDataStream = textDataStream.flatMap(_.split(" ")).filter(_.nonEmpty)
.map((_,1))
.keyBy(0).sum(1)
resultDataStream.print()
println("--------------------------------------------")
//比批处理多一步,启动任务执行,执行作业运行名字为streams wordcount
env.execute("streams wordcount")
}
}
//在window上可以通过netcat工具使用"nc -l -p 7777"命令指定端口,作为这里流式处理的输入
/**
在控制台输出格式如下,前面的数字表示在哪个线程执行,可以通过env.setParallelism(12)设置并行线程数,
也可以在每一步后面setParallelism参数
1> (Hello,1)
3> (Tony,1)
2> (World,1)
1> (Hello,2)
**/
第二例子的扩展:通过ParameterTool接收程序输入的参数
package start
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
/**
* 流式计算wordCount
*/
object StreamWordCountDemo {
def main(args: Array[String]): Unit = {
//1、创建流处理执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//4、这样可以在运行程序通过参数--host localhost --port 7777 进行指定
val parameterTool: ParameterTool = ParameterTool.fromArgs(args)
val host: String = parameterTool.get("host")
val port: Int = parameterTool.getInt("port")
//2、接收一个socket文本流,监听本地7777端口,得到一个DataStream类型的字符串
val textDataStream = env.socketTextStream(host,port)
//3、进行转化处理统计
val words = textDataStream.flatMap(_.split(" ")).filter(_.nonEmpty)
.map((_,1))
.keyBy(0).sum(1)
words.print()
println("--------------------------------------------")
env.execute("streams wordcount")
}
}
第三个例子:来源Source的读取,从集合读取数据、从文本读取数据、从kafka读取数据、从自定义source读取数据。
package datasource
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import scala.util.Random
//定义的样例类,相当于定义数据类型
case class SensorReading(id:String,timestamp:Long,temperature:Double)
/**
* dataSource的种类
*/
object SourceDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//1、从自定义数据集中读取数据,有界流
val stream1 = env.fromCollection(List(
SensorReading("s1", 11111181, 35.980),
SensorReading("s2", 11111181, 35.980),
SensorReading("s3", 11111181, 35.980),
SensorReading("s4", 11111181, 35.980)
))
stream1.print("stream1").setParallelism(1)
println("-----------------------------------------------")
//2、从文件读取数据,有界流
val stream2 = env.readTextFile("C:\\ideaworkshop\\FlinkDemo\\LearnFirst\\src\\main\\resources\\sensor.txt")
stream2.print("stream2")
//3、从kakfa中读取
/*val properties = new Properties()//通过properties加载kafka的配置
properties.setProperty("bootstrap.servers","localhost:9200")
properties.setProperty("group.id","consumer-group")
//FlinkKafkaConsumer011的11表示版本号,读取的是String类型的数据,从sensor的topic读取数据,反序列化的类
val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor",new SimpleStringSchema(),properties))
stream3.print("stream3")*/
//4、自定义Source
val stream4 = env.addSource(new SensorSource())
stream4.print("stream4")
env.execute("Source Test")
}
}
/**
* 自定义dataSource,继承SourceFunction接口(指定输出类型作为泛型),重写run方法和cancel方法
*/
class SensorSource() extends SourceFunction[SensorReading]{
var running:Boolean = true
override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
val random = new Random()
var curTemp = 1.to(10).map(e => ("sensor"+e , System.currentTimeMillis() , 60+random.nextGaussian()))
//不断生产数据流
while (running){
curTemp.foreach(
//使用ctx
t => {
ctx.collect(SensorReading(t._1 , System.currentTimeMillis() , t._3+random.nextGaussian() ))
Thread.sleep(2000)
}
)
}
}
override def cancel(): Unit = {
running = false
}
}