区别

Spark是基于内存,MR是基于硬盘。
Spark多个作业之间数据通信是基于内存,而Hadoop是基于磁盘

分布式计算模拟

网络IO模拟
image.png

Task

  1. class Task extends Serializable {
  2. val datas = List(1,2,3,4) //数据源
  3. //val logic = ( num:Int )=>{ num * 2 }
  4. val logic : (Int)=>Int = _ * 2 // 算子
  5. }
  1. class SubTask extends Serializable {
  2. var datas : List[Int] = _
  3. var logic : (Int)=>Int = _
  4. // 计算
  5. def compute() = {
  6. datas.map(logic)
  7. }
  8. }

Driver

  1. object Driver {
  2. def main(args: Array[String]): Unit = {
  3. // 连接服务器
  4. val client1 = new Socket("localhost", 9999)
  5. val client2 = new Socket("localhost", 8888)
  6. val task = new Task()
  7. val out1: OutputStream = client1.getOutputStream
  8. val objOut1 = new ObjectOutputStream(out1)
  9. val subTask = new SubTask()
  10. subTask.logic = task.logic //算子
  11. subTask.datas = task.datas.take(2) //分区
  12. objOut1.writeObject(subTask)
  13. objOut1.flush()
  14. objOut1.close()
  15. client1.close()
  16. val out2: OutputStream = client2.getOutputStream
  17. val objOut2 = new ObjectOutputStream(out2)
  18. val subTask1 = new SubTask()
  19. subTask1.logic = task.logic
  20. subTask1.datas = task.datas.takeRight(2)
  21. objOut2.writeObject(subTask1)
  22. objOut2.flush()
  23. objOut2.close()
  24. client2.close()
  25. println("客户端数据发送完毕")
  26. }
  27. }

Executor

  1. object Executor {
  2. def main(args: Array[String]): Unit = {
  3. // 启动服务器,接收数据
  4. val server = new ServerSocket(9999)
  5. println("服务器启动,等待接收数据")
  6. // 等待客户端的连接
  7. val client: Socket = server.accept()
  8. val in: InputStream = client.getInputStream
  9. val objIn = new ObjectInputStream(in)
  10. val task: SubTask = objIn.readObject().asInstanceOf[SubTask]
  11. val ints: List[Int] = task.compute()
  12. println("计算节点[9999]计算的结果为:" + ints)
  13. objIn.close()
  14. client.close()
  15. server.close()
  16. }
  17. }
  1. object Executor2 {
  2. def main(args: Array[String]): Unit = {
  3. // 启动服务器,接收数据
  4. val server = new ServerSocket(8888)
  5. println("服务器启动,等待接收数据")
  6. // 等待客户端的连接
  7. val client: Socket = server.accept()
  8. val in: InputStream = client.getInputStream
  9. val objIn = new ObjectInputStream(in)
  10. val task: SubTask = objIn.readObject().asInstanceOf[SubTask]
  11. val ints: List[Int] = task.compute()
  12. println("计算节点[8888]计算的结果为:" + ints)
  13. objIn.close()
  14. client.close()
  15. server.close()
  16. }
  17. }