测试版本

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>javabook.apache</artifactId>
  7. <groupId>org.example</groupId>
  8. <version>1.0-SNAPSHOT</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <artifactId>javabook.apache.flink</artifactId>
  12. <properties>
  13. <maven.compiler.source>8</maven.compiler.source>
  14. <maven.compiler.target>8</maven.compiler.target>
  15. </properties>
  16. <dependencies>
  17. <dependency>
  18. <groupId>org.apache.flink</groupId>
  19. <artifactId>flink-java</artifactId>
  20. <version>1.13.1</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.apache.flink</groupId>
  24. <artifactId>flink-clients_2.12</artifactId>
  25. <version>1.13.1</version>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.apache.flink</groupId>
  29. <artifactId>flink-streaming-java_2.12</artifactId>
  30. <version>1.13.1</version>
  31. <scope>provided</scope>
  32. </dependency>
  33. </dependencies>
  34. </project>

测试代码

package online.javabook.flink.quickstart;

import online.javabook.flink.quickstart.flatmapper.MyFlatMapper;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * nc -lkv -p 7777
 */
public class SocketWordCount {
    public static void main(String[] args) throws Exception{

        // env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        // paras
        String host = parameterTool.get("host");
        int port = parameterTool.getInt("port");

        // inputDataStream
        DataStream<String> inputDataStream = env.socketTextStream(host, port);

        // outputDataStream
        DataStream<Tuple2<String, Integer>> outputDataStream = inputDataStream.flatMap( new MyFlatMapper())
                .keyBy(0)
                .sum(1);

        outputDataStream.print();
        env.execute();
    }
}

上传代码

image.png

监听数据

[java@node101 ~]$ nc -lkv -p 7777
Ncat: Version 7.70 ( https://nmap.org/ncat )
Ncat: Listening on :::7777
Ncat: Listening on 0.0.0.0:7777
hello world

image.png

Flink命令

命令行提交job

[java@node101 bin]$ ./flink run -c online.javabook.flink.quickstart.SocketWordCount -p 3 /target/javabook.apache.flink-1.0-SNAPSHOT.jar --host node101 --port 7777

取消运行中的iob

[java@node101 bin]$ ./flink cancle [jobId]

查看运行中的iob

## 运行状态的job
[java@node101 bin]$ ./flink list

## 所有状态的job
[java@node101 bin]$ ./flink list -a