哪些数据可以作为数据
- Java的基本数据类型:String、Long、Integer、Boolean、Array
- 复合类型:Tuples、POJOs 和 Scala case classes
复合类型对象必须要能够序列化,目前自带的是Kryo
如何把这些数据转为流
因为数据分为有界和无界,所以数据的处理也是分成两类:
- 无界数据的处理
- 有界数据的处理
无界数据的处理
按照数据不同的来源渠道,Flink提供了以下几种无界数据的读取方式:
- 文件读取
- 文本文件读取
- Socket读取
- 集合读取
- 自定义读取
文件读取
读取文本文件的API主要是 readFile()
:
public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
String filePath,
FileProcessingMode watchType,
long interval)
inputFormat
,定义如何读取文件filePath
,待读取的文件watchType
,读取的模式PROCESS_CONTINUOUSLY
,会检查文件的修改时间,如果发生了变化就立刻再读取一次PROCESS_ONCE
,只读取文件一次
interval
,在PROCESS_CONTINUOUSLY
模式下检测发生改变的间隔时间(ms,毫秒)
另外要注意, readFile
是并行处理的,即各个线程会持有各自的FileInputFormat进行文件处理,线程数量不设置的话默认是电脑当前的核数。所以一定一定要做好文件分片读取的准备,否则会重复读取!下面举个栗子:
public static void main(String[] args) throws Exception {
String path = ReadFileTest.class.getResource("/test.txt").getPath();
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment
// .setParallelism(1) 可以设置并行读取的线程数量
.readFile(new FileInputFormat<String>() {
private long handleLen = -1; // 要处理的字节长度
private boolean isEnd = false; // 该文件是否读取完毕
private long start = -1; // 该FileInputFormat开始读取的位置
@Override
public void open(FileInputSplit fileSplit) throws IOException {
super.open(fileSplit);
this.isEnd = false; // 每次open的时候,把结束标志作为false(因为文件修改时会重新触发)
this.start = fileSplit.getStart(); // 获取我这个线程要读取的位置
this.handleLen = fileSplit.getLength(); // 获取我这个线程要读取的长度
}
// 每次要开始读时,会判断一些是否结束
public boolean reachedEnd() throws IOException {
return isEnd;
}
// 读取数据,一次读多少你自己定,不要读到了其他线程负责的范围就行(正常情况下)
public String nextRecord(String reuse) throws IOException {
byte read = (byte) this.stream.read();
if (this.stream.getPos() == (start + handleLen)) {
this.isEnd = true;
}
return new String(new byte[]{read});
}
}, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000).map(new MapFunction<String, String>() {
public String map(String value) throws Exception {
return Thread.currentThread().getName() + ">>" + value;
}
}).print();
streamExecutionEnvironment.execute();
}
输出如下所示:
6> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (6/8)>>6
4> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (4/8)>>8
3> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (3/8)>>6
8> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (8/8)>>6
1> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (1/8)>>1
7> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (7/8)>>6
2> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (2/8)>>7
1> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (1/8)>>7
8> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (8/8)>>1
3> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (3/8)>>8
6> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (6/8)>>6
2> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (2/8)>>6
7> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (7/8)>>6
文本文件读取
readTextFile()
方法是用来读取文本文件的,它是以文本文件内的行为最小读取单位,即一直从文件中读取行,直到文件读取结束:
public static void main(String[] args) throws Exception {
String path = ReadFileTest.class.getResource("/test.txt").getPath();
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment
.getExecutionEnvironment();
streamExecutionEnvironment
.readTextFile(path)
.map(new MapFunction<String, String>() {
public String map(String value) throws Exception {
return Thread.currentThread().getName() + ">>" + value;
}
}).print();
streamExecutionEnvironment.execute();
}
注意: readTextFile()
方法底层用的是 readFile()
那一套
Socket读取
应用程序启动后会去监听 localhost:7777
的地址,将读取来的数据
public static void main( String[] args ) throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
// 构建Socket读取
DataStreamSource<String> streamSource = see.socketTextStream("localhost", 7777);
// 每当接收到一个数据,就自动执行计算
SingleOutputStreamOperator<Tuple> sum = streamSource.flatMap((str, collector) -> {
String[] data = str.split(" ");
for (String item : data) {
collector.collect(new Tuple2<>(item, 1));
}
}, TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(String.class, Integer.class))
.keyBy((KeySelector<Tuple, Object>) tuple -> tuple.getField(0))
.sum(1);
// 数据下沉:输出
sum.print();
// 给该计划取名字
see.execute("Word Count Job");
}
在Windows上,大家可以搜索“win10 nc 命令”下载工具,然后用命令行启动(启动任务之后才能输入):
C:\Users\W650>nc64.exe -L -p 7777
hello yuque
hello codeleven
输出结果:
6> (yuque,1)
3> (hello,1)
4> (codeleven,1)
3> (hello,2)
集合读取
fromCollection(Collection)
fromCollection(Iterator, Class)
fromElements(T ...)
fromParallelCollection(SplittableIterator, Class)
自定义
通过实现 SourceFunction
可以处理一切外部系统的数据:
addSource(SourceFunction<OUT> function)