object StreamingJob {def main(args: Array[String]): Unit = {// set up the streaming execution environmentval env = StreamExecutionEnvironment.getExecutionEnvironment// 接收socket文本流val dataSet = env.socketTextStream("192.168.174.128",7777)//进行转换处理统计dataSet.flatMap ( _.split(",")).filter (_.nonEmpty).map ((_, 1)).keyBy(0).sum(1).print()//启动任务执行env.execute("stream word count")}}
主机名以及端口
val dataSet = env.socketTextStream("192.168.174.128",7777) 中192.168.174.128 为主机名(此处为虚拟机中centos的ip),7777 为监听的端口号
要实现Windows下的IDEA连接虚拟机中的主机名,要配置以下条件
- Linux配置主机名映射 https://www.yuque.com/u1046159/vaeckn/flmgod
- 关闭centos的防火墙(否则IDEA运行流处理代码会报错连接超时)
```powershell
查看防火墙状态
firewall-cmd —state
停止firewall
systemctl stop firewalld.service
禁止firewall开机启动
systemctl disable firewalld.service ```
创建指定端口号
然后使用 nc -lk 端口号创建一个端口,在IDEA中启动流处理程序,在端口中输入信息,IDEA就可以统计词频
centos中
IDEA中
其中前面的小数字5,表示当前任务并行执行在flink哪个子任务中(flink运行在分布式系统中),开发环境中的并行度默认为当前电脑的CPU核心数(生产环境中需要配置文件去配置)
关闭端口
使用Ctrl+z停止后,使用 netstat -tunlp | grep 端口号 发现端口仍存在
图中创建了7777端口号
使用 kill -9 进程号 杀死进程,如图使用 kill -9 1718
