哪些数据可以作为数据

  • Java的基本数据类型:String、Long、Integer、Boolean、Array
  • 复合类型:Tuples、POJOs 和 Scala case classes

复合类型对象必须要能够序列化,目前自带的是Kryo

如何把这些数据转为流

因为数据分为有界和无界,所以数据的处理也是分成两类:

  • 无界数据的处理
  • 有界数据的处理

无界数据的处理

按照数据不同的来源渠道,Flink提供了以下几种无界数据的读取方式:

  • 文件读取
  • 文本文件读取
  • Socket读取
  • 集合读取
  • 自定义读取

文件读取

读取文本文件的API主要是 readFile()

  1. public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
  2. String filePath,
  3. FileProcessingMode watchType,
  4. long interval)
  • inputFormat ,定义如何读取文件
  • filePath ,待读取的文件
  • watchType ,读取的模式
    • PROCESS_CONTINUOUSLY ,会检查文件的修改时间,如果发生了变化就立刻再读取一次
    • PROCESS_ONCE ,只读取文件一次
  • interval ,在 PROCESS_CONTINUOUSLY 模式下检测发生改变的间隔时间(ms,毫秒)

另外要注意, readFile 是并行处理的,即各个线程会持有各自的FileInputFormat进行文件处理,线程数量不设置的话默认是电脑当前的核数。所以一定一定要做好文件分片读取的准备,否则会重复读取!下面举个栗子:

public static void main(String[] args) throws Exception {
    String path = ReadFileTest.class.getResource("/test.txt").getPath();
    StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
    streamExecutionEnvironment
    //  .setParallelism(1)  可以设置并行读取的线程数量
        .readFile(new FileInputFormat<String>() {
        private long handleLen = -1; // 要处理的字节长度
        private boolean isEnd = false; // 该文件是否读取完毕
        private long start = -1; // 该FileInputFormat开始读取的位置
        @Override
        public void open(FileInputSplit fileSplit) throws IOException {
            super.open(fileSplit);
            this.isEnd = false;  // 每次open的时候,把结束标志作为false(因为文件修改时会重新触发)
            this.start = fileSplit.getStart();  // 获取我这个线程要读取的位置
            this.handleLen = fileSplit.getLength();  // 获取我这个线程要读取的长度
        }
        // 每次要开始读时,会判断一些是否结束
        public boolean reachedEnd() throws IOException {
            return isEnd;
        }
        // 读取数据,一次读多少你自己定,不要读到了其他线程负责的范围就行(正常情况下)
        public String nextRecord(String reuse) throws IOException {
            byte read = (byte) this.stream.read();
            if (this.stream.getPos() == (start + handleLen)) {
                this.isEnd = true;
            }
            return new String(new byte[]{read});
        }
    }, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000).map(new MapFunction<String, String>() {
        public String map(String value) throws Exception {
            return Thread.currentThread().getName() + ">>" + value;
        }
    }).print();
    streamExecutionEnvironment.execute();
}

输出如下所示:

6> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (6/8)>>6
4> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (4/8)>>8
3> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (3/8)>>6
8> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (8/8)>>6
1> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (1/8)>>1
7> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (7/8)>>6
2> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (2/8)>>7
1> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (1/8)>>7
8> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (8/8)>>1
3> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (3/8)>>8
6> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (6/8)>>6
2> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (2/8)>>6
7> Split Reader: Custom File Source -> Map -> Sink: Print to Std. Out (7/8)>>6

文本文件读取

readTextFile() 方法是用来读取文本文件的,它是以文本文件内的行为最小读取单位,即一直从文件中读取行,直到文件读取结束:

public static void main(String[] args) throws Exception {
    String path = ReadFileTest.class.getResource("/test.txt").getPath();
    StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment
        .getExecutionEnvironment();
    streamExecutionEnvironment
            .readTextFile(path)
            .map(new MapFunction<String, String>() {
                public String map(String value) throws Exception {
                    return Thread.currentThread().getName() + ">>" + value;
                }
            }).print();
    streamExecutionEnvironment.execute();
}

注意: readTextFile() 方法底层用的是 readFile() 那一套

Socket读取

应用程序启动后会去监听 localhost:7777 的地址,将读取来的数据

public static void main( String[] args ) throws Exception {
    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    // 构建Socket读取
    DataStreamSource<String> streamSource = see.socketTextStream("localhost", 7777);
    // 每当接收到一个数据,就自动执行计算
    SingleOutputStreamOperator<Tuple> sum = streamSource.flatMap((str, collector) -> {
        String[] data = str.split(" ");
        for (String item : data) {
            collector.collect(new Tuple2<>(item, 1));
        }
    }, TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(String.class, Integer.class))
            .keyBy((KeySelector<Tuple, Object>) tuple -> tuple.getField(0))
            .sum(1);
    // 数据下沉:输出
    sum.print();
    // 给该计划取名字
    see.execute("Word Count Job");
}

在Windows上,大家可以搜索“win10 nc 命令”下载工具,然后用命令行启动(启动任务之后才能输入):

C:\Users\W650>nc64.exe -L -p 7777
hello yuque
hello codeleven

输出结果:

6> (yuque,1)
3> (hello,1)
4> (codeleven,1)
3> (hello,2)

集合读取

fromCollection(Collection)
fromCollection(Iterator, Class)
fromElements(T ...)
fromParallelCollection(SplittableIterator, Class)

自定义

通过实现 SourceFunction 可以处理一切外部系统的数据:

addSource(SourceFunction<OUT> function)