image.png

阻塞命令式编程

原有的tomcat业务处理是业务线程池中的一个线程处理,request和response都是在这个线程进行业务处理,执行我们编写代码命令。后面的命令需要等到前面的命令产生的数据输入。阻塞式顺序执行。中间的操作如果调用别的服务,或者新开线程处理,也需要阻塞等待数据返回。最后客户端等待所有命令执行完毕,获得执行结果返回结束阻塞。

使用Mq异步解耦的方法

A服务不阻塞等待B服务,而是发送Mq让B服务异步消费,自己直接返回客户端。但是如果客户端需要B服务处理的结果,那么通知客户端成为问题,所以之前都是异步处理不用实时给客户返回的任务。不适合客户端需要实时等待处理结果的业务
如果实在要使用:

  1. 那么 http 协议想要获取B服务的处理结果,只有通过轮询,长轮询的方法去向服务器取结果。
    1. 轮询:ajax定时调用后端开放的查询api
    2. 长轮询:服务器端接收请求,hold住请求,知道有数据才返回,感受上就相当于服务器向客户端推送。
  2. 异步回调的协议:
    1. websocket,ws://192.168.5.6/xxoo

ServerSentEvent轮询

实现的结果,服务器给客户端推消息

  1. <script>
  2. var sse = new EventSource("/url")
  3. //事件返回
  4. sse.onmessage = function(data){
  5. data.data//可以取到数据
  6. }
  7. </script>
@requestMapping(value ="/sse",produces="text/event-stream;charset="utf-8")
public object xxoo(){
    return new data();
}
@RestController
@RequestMapping(path = "/sse")
public class SseRest {  

    private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();

    @GetMapping(path = "/subscribe")
    public SseEmitter subscribe(String id) {
        // 超时时间设置为1小时
        SseEmitter sseEmitter = new SseEmitter(3600000L);
        sseCache.put(id, sseEmitter);
        // 超时回调 触发
        sseEmitter.onTimeout(() -> sseCache.remove(id));
        // 结束之后的回调触发
        sseEmitter.onCompletion(() -> System.out.println("完成!!!"));
        return sseEmitter;
    }

    @GetMapping(path = "/push")
    public String push(String id, String content) throws IOException {
        SseEmitter sseEmitter = sseCache.get(id);
        if (sseEmitter != null) {
            // 发送消息
            sseEmitter.send(content);
        }
        return "over";
    }

    @GetMapping(path = "over")
    public String over(String id) {
        SseEmitter sseEmitter = sseCache.get(id);
        if (sseEmitter != null) {
            // 执行完毕,断开连接
            sseEmitter.complete();
            sseCache.remove(id);
        }
        return "over";
    }
    @GetMapping(path = "/push-all")
    public String pushAll(String content) throws IOException {
        for (String s : sseCache.keySet()) {
            SseEmitter sseEmitter = sseCache.get(s);
            if (sseEmitter != null) {
                // 发送消息
                sseEmitter.send(content);
            }
        }

        return "over";
    }
}

异步回调和事件(消息)通知的区别

异步回调

A线程开一个新线程B处理业务方法C,并注册了一个回调函数D,A继续执行或执行结束,该回调函数D最后是在另外一个B线程执行。本质上C和D之间还是阻塞的,我们并不能用这种回调的方式解决阻塞问题。

事件通知

不同的线程通过订阅和发布事件通道中的事件,来执行具体的业务逻辑。使用了观察者模式

tomcat 下的响应式编程如何实现

业务线程池不处理具体的servlet业务。通过事件通道,基于事件通知由新建的业务线程池进行处理,出于容错机制,我们可以资源隔离,不同的线程池都不阻塞等待,处理不同的业务。

响应式系统的主流框架

image.png
image.png

project reactor

  1. netty
  2. servlet3.1
  3. spring webflux
  4. spring data reactive repositories

什么样的服务是可以异步MQ处理

  1. 边界明确:和主业务不是一个服务。
  2. 不需要实时向客户端返回结果。

    微服务内的响应式web

    响应式web,业务线程收到请求,发布事件,直接返回,不再阻塞。
    微服务内需要达到响应式,需要解决以下两个问题

    服务调用阻塞

    通过事件/消息驱动,异步的发布和订阅事件的方法reactor解耦

    数据库连接阻塞

    支持reactor,可以主动推消息的数据库,MongoDB等

image.png

image.png

servlet3.0/3.1

3.0 我们先通过request.startAsync()获取到该请求对应的AsyncContext,然后调用AsyncContext的start()方法进行异步处理,处理完毕后需要调用complete()方法告知Servlet容器。
3.1 IO 也是非阻塞的,添加读写监听器。readListener onAllDataRead 。

webFlux的使用

引入webFlux包,使用的web容器就变成了netty了。

package com.mashibing.admin;

import java.util.Random;
import java.util.stream.IntStream;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.WebSession;

import com.mashibing.admin.pojo.Person;
import com.mashibing.admin.service.PersonService;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

// 注解式
// 函数式
@RestController
@RequestMapping("/person")
public class PersonController {

    @Autowired
    PersonService personSrv;


    @GetMapping("")
    Mono<Object> get(String name){


        System.out.println("线程 get" + Thread.currentThread().getName());
    System.out.println("---1");

        // 异步
    Mono<Object> mono = Mono.create(sink -> {

        // 组装数据序列
        System.out.println("线程 create" + Thread.currentThread().getName());
            sink.success(personSrv.getPerson());
        })


        .doOnSubscribe(sub -> {

        // 1 订阅    
            System.out.println("xxx");
        })

        .doOnNext(data -> {

            // 得到数据

            System.out.println("data:" + data);
        })

        .doOnSuccess(onSuccess -> {

            // 整体完成
            System.out.println("onSuccess");
        });


    System.out.println("---2");

    // SpringMvc  值 在这个环节准备好
    // 得到一个包装 数据序列 -> 包含特征  -> 容器 拿到这个序列 -> 执行序列里的方法

    // Ajax   a()  -> b(c()) ->

    // 1,  写回调接口 , 让b调
    // 2,  直接传方法过去


    // 看起来 像是异步,实质上,阻塞的过程 在容器内部
    return mono;

    }




    @GetMapping("xxoo")
    // ServerHttpRequest webFlux 中特有
    // 拓展思维,SpringCloud Gateway
    Mono<Object> get2(ServerHttpRequest request,String name
            ,
            WebSession session
            ){

        //System.out.println("name:" + name);

        if(StringUtils.isEmpty(session.getAttribute("code"))) {

            System.out.println("我要set了~");
            session.getAttributes().put("code", 250);
        }

        System.out.println("code = " + session.getAttribute("code"));

        return Mono.just("么么哒");
    }


    @GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> sse(){



        // 1. 封装对象
        Flux<String> flux = Flux.fromStream(IntStream.range(1, 10).mapToObj(i -> {

            try {
                Thread.sleep(new Random().nextInt(3000));
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            return "xxoo" + i;
        }))

        .doOnSubscribe(sub -> {

            System.out.println("sub 了");

        })    

        .doOnComplete(() -> {

            System.out.println("doOnComplete");

        })

        .doOnNext(data -> {

            System.out.println("有data了~" + data);
        })

                ;

        // 2. 对象 连带里面的方法 给了容器
        return flux;

    }




}