1. object StreamingJob {
  2. def main(args: Array[String]): Unit = {
  3. // set up the streaming execution environment
  4. val env = StreamExecutionEnvironment.getExecutionEnvironment
  5. // 接收socket文本流
  6. val dataSet = env.socketTextStream("192.168.174.128",7777)
  7. //进行转换处理统计
  8. dataSet.flatMap ( _.split(","))
  9. .filter (_.nonEmpty)
  10. .map ((_, 1))
  11. .keyBy(0)
  12. .sum(1)
  13. .print()
  14. //启动任务执行
  15. env.execute("stream word count")
  16. }
  17. }

主机名以及端口

val dataSet = env.socketTextStream("192.168.174.128",7777)192.168.174.128 为主机名(此处为虚拟机中centos的ip),7777 为监听的端口号

要实现Windows下的IDEA连接虚拟机中的主机名,要配置以下条件

停止firewall

systemctl stop firewalld.service

禁止firewall开机启动

systemctl disable firewalld.service ```

创建指定端口号

然后使用 nc -lk 端口号创建一个端口,在IDEA中启动流处理程序,在端口中输入信息,IDEA就可以统计词频

centos中
image.png

IDEA中
image.png

其中前面的小数字5,表示当前任务并行执行在flink哪个子任务中(flink运行在分布式系统中),开发环境中的并行度默认为当前电脑的CPU核心数(生产环境中需要配置文件去配置)

关闭端口

使用Ctrl+z停止后,使用 netstat -tunlp | grep 端口号 发现端口仍存在
image.png
图中创建了7777端口号
使用 kill -9 进程号 杀死进程,如图使用 kill -9 1718
image.png