28 | Immutability模式:如何利用不变性解决并发问题?

一种解决并发问题的设计模式:不变性(Immutability)模式。所谓不变性,简单来讲,就是对象一旦被创建之后,状态就不再发生变化。换句话说,就是变量一旦被赋值,就不允许修改了(没有写操作);没有修改操作,也就是保持了不变性。
实现一个具备不可变性的类,还是挺简单的。将一个类所有的属性都设置成 final 的,并且只允许存在只读方法,那么这个类基本上就具备不可变性了。更严格的做法是这个类本身也是 final 的,也就是不允许继承。因为子类可以覆盖父类的方法,有可能改变不可变性,所以推荐你在实际工作中,使用这种更严格的做法。
Java SDK 里很多类都具备不可变性,只是由于它们的使用太简单,最后反而被忽略了。例如经常用到的 String 和 Long、Integer、Double 等基础类型的包装类都具备不可变性,这些对象的线程安全性都是靠不可变性来保证的。如果你仔细翻看这些类的声明、属性和方法,你会发现它们都严格遵守不可变类的三点要求:类和属性都是 final 的,所有方法均是只读的。

总结

具备不变性的对象,只有一种状态,这个状态由对象内部所有的不变属性共同决定。其实还有一种更简单的不变性对象,那就是无状态。无状态对象内部没有属性,只有方法。除了无状态的对象,你可能还听说过无状态的服务、无状态的协议等等。无状态有很多好处,最核心的一点就是性能。在多线程领域,无状态对象没有线程安全问题,无需同步处理,自然性能很好;在分布式领域,无状态意味着可以无限地水平扩展,所以分布式领域里面性能的瓶颈一定不是出在无状态的服务节点上。

利用享元模式避免创建重复对象

面向对象相关的设计模式,享元模式(Flyweight Pattern)。利用享元模式可以减少创建对象的数量,从而减少内存占用。Java 语言里面 Long、Integer、Short、Byte 等这些基本数据类型的包装类都用到了享元模式。
享元模式本质上其实就是一个对象池,利用享元模式创建对象的逻辑也很简单:创建之前,首先去对象池里看看是不是存在;如果已经存在,就利用对象池里的对象;如果不存在,就会新创建一个对象,并且把这个新创建出来的对象放进对象池里。
Long 这个类并没有照搬享元模式,Long 内部维护了一个静态的对象池,仅缓存了[-128,127]之间的数字,这个对象池在 JVM 启动的时候就创建好了,而且这个对象池一直都不会变化,也就是说它是静态的。

29 | Copy-on-Write模式:不是延时策略的COW

使用 Copy-on-Write 更多地体现的是一种延时策略,只有在真正需要复制的时候才复制,而不是提前复制好。 Java 提供的 Copy-on-Write 容器,由于在修改的同时会复制整个容器,所以在提升读操作性能的同时,是以内存复制为代价的。
CopyOnWriteArrayList 和 CopyOnWriteArraySet 这两个 Copy-on-Write 容器在修改的时候会复制整个数组。如果是修改非常少、数组数量也不大,并且对读性能要求苛刻的场景,使用 Copy-on-Write 容器效果就非常好了。

类似Dubbo的RPC框架路由实现

image.png
路由表读多写少,ConcurrentHashMap>整体结构,String为服务名(接口),value为对应的服务列表(ip、端口);

  1. //路由信息
  2. public final class Router{
  3. private final String ip;
  4. private final Integer port;
  5. private final String iface;
  6. //构造函数
  7. public Router(String ip, Integer port, String iface){
  8. this.ip = ip;
  9. this.port = port;
  10. this.iface = iface;
  11. }
  12. //重写equals方法
  13. public boolean equals(Object obj){
  14. if (obj instanceof Router) {
  15. Router r = (Router)obj;
  16. return iface.equals(r.iface) && ip.equals(r.ip) && port.equals(r.port);
  17. }
  18. return false;
  19. }
  20. public int hashCode() {
  21. //省略hashCode相关代码
  22. }
  23. }
  24. //路由表信息
  25. public class RouterTable {
  26. //Key:接口名
  27. //Value:路由集合
  28. ConcurrentHashMap<String, CopyOnWriteArraySet<Router>> rt = new ConcurrentHashMap<>();
  29. //根据接口名获取路由表
  30. public Set<Router> get(String iface){
  31. return rt.get(iface);
  32. }
  33. //删除路由
  34. public void remove(Router router) {
  35. Set<Router> set = rt.get(router.iface);
  36. if (set != null) {
  37. set.remove(router);
  38. }
  39. }
  40. //增加路由
  41. public void add(Router router) {
  42. Set<Router> set = rt.computeIfAbsent(route.iface, r -> new CopyOnWriteArraySet<>());
  43. set.add(router);
  44. }
  45. }

30 | 线程本地存储模式:没有共享,就没有伤害

使用

  1. private static final ThreadLocal<GusteauHolder> THREAD_LOCAL = new ThreadLocal<>();
  2. public static void init() {
  3. GusteauHolder gusteauHolder = new GusteauHolder();
  4. gusteauHolder.setTimestamp(System.currentTimeMillis());
  5. THREAD_LOCAL.set(gusteauHolder);
  6. }
  7. private static GusteauHolder get() {
  8. return THREAD_LOCAL.get();
  9. }
  10. public static void cleanup() {
  11. THREAD_LOCAL.remove();
  12. }
  13. public static void setTenant(Long tenantId) {
  14. get().setTenantId(tenantId);
  15. }
  16. public static Long getTenant() {
  17. return get().getTenantId();
  18. }
  19. public static void setAdmin(Long adminId) {
  20. get().setAdminId(adminId);
  21. }
  22. public static Long getAdmin() {
  23. return get().getAdminId();
  24. }
  25. public static void addLogParam(String paramName, Object paramValue) {
  26. get().addLogParam(paramName, paramValue);
  27. }
  28. public static Map<String, Object> getOpsLogParams() {
  29. return get().getOpsLogParams();
  30. }

内部源码

  1. class Thread {
  2. //内部持有ThreadLocalMap
  3. ThreadLocal.ThreadLocalMap threadLocals;
  4. }
  5. class ThreadLocal<T> {
  6. public T get() {
  7. //首先获取线程持有的
  8. //ThreadLocalMap
  9. ThreadLocalMap map = Thread.currentThread().threadLocals;
  10. //在ThreadLocalMap中
  11. //查找变量
  12. Entry e = map.getEntry(this);
  13. return e.value;
  14. }
  15. static class ThreadLocalMap {
  16. //内部是数组而不是Map
  17. Entry[] table;
  18. //根据ThreadLocal查找Entry
  19. Entry getEntry(ThreadLocal key) {
  20. //省略查找逻辑
  21. }
  22. //Entry定义
  23. static class Entry extends WeakReference<ThreadLocal> {
  24. Object value;
  25. }
  26. }
  27. }

ThreadLocal 与内存泄露

在线程池中使用 ThreadLocal,如果不谨慎就可能导致内存泄露。
因为线程池的生命周期和应用一样长,只要线程没有被销毁,Thread 持有的 ThreadLocalMap 一直都不会被回收,再加上 ThreadLocalMap 中的 Entry 对 ThreadLocal 是弱引用(WeakReference),所以只要 ThreadLocal 结束了自己的生命周期是可以被回收掉的。但是 Entry 中的 Value 却是被 Entry 强引用的,所以即便 Value 的生命周期结束了,Value 也是无法被回收的,从而导致内存泄露。
那在线程池中,正确使用 ThreadLocal :

  1. ExecutorService es;
  2. ThreadLocal tl;
  3. es.execute(()->{
  4. //ThreadLocal增加变量
  5. tl.set(obj);
  6. try {
  7. // 省略业务逻辑代码
  8. }finally {
  9. //手动清理ThreadLocal
  10. tl.remove();
  11. }
  12. });

InheritableThreadLocal 与继承性

在子线程中是通过 ThreadLocal 来访问父线程的线程变量 V 的。

用途

  1. Spring 使用 ThreadLocal 来传递事务信息
  2. 登陆用户的信息

35 | 两阶段终止模式:如何优雅地终止线程?

Java 语言的 Thread 类中曾经提供了一个 stop() 方法,用来终止线程,可是早已不建议使用了,原因是这个方法用的就是一剑封喉的做法,被终止的线程没有机会料理后事。
28-37 并发设计模式 - 图2
Java 线程的状态转换
28-37 并发设计模式 - 图3
我们要想终止一个线程,首先要把线程的状态从休眠状态转换到 RUNNABLE 状态。如何做到呢?这个要靠 Java Thread 类提供的 interrupt() 方法,它可以将休眠状态的线程转换到 RUNNABLE 状态。
线程转换到 RUNNABLE 状态之后,我们如何再将其终止呢?RUNNABLE 状态转换到终止状态,优雅的方式是让 Java 线程自己执行完 run() 方法,所以一般我们采用的方法是设置一个标志位,然后线程会在合适的时机检查这个标志位,如果发现符合终止条件,则自动退出 run() 方法。这个过程其实就是我们前面提到的第二阶段:响应终止指令。
综合上面这两点,我们能总结出终止指令,其实包括两方面内容:interrupt() 方法和线程终止的标志位。

示例

下面是自己维护了结束标志位terminated,当然可以不维护,但是JVM 的异常处理会清除线程的中断状态,所以如果内部抛出异常没有继续外抛异常的话,中断标志位就丢失了。

  1. class Proxy {
  2. //线程终止标志位
  3. volatile boolean terminated = false;
  4. boolean started = false;
  5. //采集线程
  6. Thread rptThread;
  7. //启动采集功能
  8. synchronized void start(){
  9. //不允许同时启动多个采集线程
  10. if (started) {
  11. return;
  12. }
  13. started = true;
  14. terminated = false;
  15. rptThread = new Thread(()->{
  16. while (!terminated){
  17. //省略采集、回传实现
  18. report();
  19. //每隔两秒钟采集、回传一次数据
  20. try {
  21. Thread.sleep(2000);
  22. } catch (InterruptedException e){
  23. //重新设置线程中断状态
  24. Thread.currentThread().interrupt();
  25. }
  26. }
  27. //执行到此处说明线程马上终止
  28. started = false;
  29. });
  30. rptThread.start();
  31. }
  32. //终止采集功能
  33. synchronized void stop(){
  34. //设置中断标志位
  35. terminated = true;
  36. //中断线程rptThread
  37. rptThread.interrupt();
  38. }
  39. }