28讲如何使用设计模式优化并发编程 - 图128讲如何使⽤设计模式优化并发编程

你好,我是刘超。

28讲如何使用设计模式优化并发编程 - 图2在我们使⽤多线程编程时,很多时候需要根据业务场景设计⼀套业务功能。其实,在多线程编程中,本身就存在很多成熟的功能设计模式,学好它们,⽤好它们,那就是如⻁添翼了。今天我就带你了解⼏种并发编程中常⽤的设计模式。

线程上下⽂设计模式

线程上下⽂是指贯穿线程整个⽣命周期的对象中的⼀些全局信息。例如,我们⽐较熟悉的Spring中的ApplicationContext就是
⼀个关于上下⽂的类,它在整个系统的⽣命周期中保存了配置信息、⽤户信息以及注册的bean等上下⽂信息。这样的解释可能有点抽象,我们不妨通过⼀个具体的案例,来看看到底在什么的场景下才需要上下⽂呢?
在执⾏⼀个⽐较⻓的请求任务时,这个请求可能会经历很多层的⽅法调⽤,假设我们需要将最开始的⽅法的中间结果传递到末尾的⽅法中进⾏计算,⼀个简单的实现⽅式就是在每个函数中新增这个中间结果的参数,依次传递下去。代码如下:



public class ContextTest {

// 上下⽂类
public class Context { private String name; private long id

public long getId() { return id;
}
public void setId(long id) { this.id = id;
}


public String getName() { return this.name;
}


public void setName(String name) { this.name = name;
}
}

// 设置上下⽂名字
public class QueryNameAction {
public void execute(Context context) { try {
Thread.sleep(1000L);
String name = Thread.currentThread().getName(); context.setName(name);
} catch (InterruptedException e) { e.printStackTrace();
}
}
}

// 设置上下⽂ID
public class QueryIdAction {
public void execute(Context context) { try {
Thread.sleep(1000L);
long id = Thread.currentThread().getId(); context.setId(id);
} catch (InterruptedException e) { e.printStackTrace();
}
}
}

// 执⾏⽅法
public class ExecutionTask implements Runnable {

private QueryNameAction queryNameAction = new QueryNameAction(); private QueryIdAction queryIdAction = new QueryIdAction();


@Override
public void run() {
final Context context = new Context(); queryNameAction.execute(context); System.out.println(“The name query successful”); queryIdAction.execute(context); System.out.println(“The id query successful”);

System.out.println(“The Name is “ + context.getName() + “ and id “ + context.getId());
}
}


public static void main(String[] args) {
IntStream.range(1, 5).forEach(i -> new Thread(new ContextTest().new ExecutionTask()).start());
}
}

执⾏结果:



The name query successful The name query successful The name query successful The name query successful The id query successful The id query successful The id query successful The id query successful
The Name is Thread-1 and id 11 The Name is Thread-2 and id 12 The Name is Thread-3 and id 13 The Name is Thread-0 and id 10

然⽽这种⽅式太笨拙了,每次调⽤⽅法时,都需要传⼊Context作为参数,⽽且影响⼀些中间公共⽅法的封装。那能不能设置⼀个全局变量呢?如果是在多线程情况下,需要考虑线程安全,这样的话就⼜涉及到了锁竞争。
除了以上这些⽅法,其实我们还可以使⽤ThreadLocal实现上下⽂。ThreadLocal是线程本地变量,可以实现多线程的数据隔 离。ThreadLocal为每⼀个使⽤该变量的线程都提供⼀份独⽴的副本,线程间的数据是隔离的,每⼀个线程只能访问各⾃内部的副本变量。

ThreadLocal中有三个常⽤的⽅法:set、get、initialValue,我们可以通过以下⼀个简单的例⼦来看看ThreadLocal的使⽤:



private void testThreadLocal() { Thread t = new Thread() {
ThreadLocal mStringThreadLocal = new ThreadLocal();


@Override
public void run() { super.run();
mStringThreadLocal.set(“test”); mStringThreadLocal.get();
}
};


t.start();
}

接下来,我们使⽤ThreadLocal来重新实现最开始的上下⽂设计。你会发现,我们在两个⽅法中并没有通过变量来传递上下
⽂,只是通过ThreadLocal获取了当前线程的上下⽂信息:



public class ContextTest {
// 上下⽂类
public static class Context { private String name; private long id;

public long getId() { return id;
}


public void setId(long id) { this.id = id;
}


public String getName() { return this.name;
}


public void setName(String name) { this.name = name;
}
}

// 复制上下⽂到ThreadLocal中
public final static class ActionContext {


private static final ThreadLocal threadLocal = new ThreadLocal() {
@Override
protected Context initialValue() { return new Context();
}
};


public static ActionContext getActionContext() { return ContextHolder.actionContext;
}


public Context getContext() { return threadLocal.get();
}

// 获取ActionContext单例
public static class ContextHolder {
private final static ActionContext actionContext = new ActionContext();
}
}

// 设置上下⽂名字
public class QueryNameAction { public void execute() {
try { Thread.sleep(1000L);
String name = Thread.currentThread().getName(); ActionContext.getActionContext().getContext().setName(name);
} catch (InterruptedException e) { e.printStackTrace();
}
}
}

// 设置上下⽂ID
public class QueryIdAction { public void execute() {
try { Thread.sleep(1000L);
long id = Thread.currentThread().getId(); ActionContext.getActionContext().getContext().setId(id);
} catch (InterruptedException e) {
e.printStackTrace();


}
}
}

// 执⾏⽅法
public class ExecutionTask implements Runnable {
private QueryNameAction queryNameAction = new QueryNameAction(); private QueryIdAction queryIdAction = new QueryIdAction();

@Override
public void run() {
queryNameAction.execute();//设置线程名System.out.println(“The name query successful”);
queryIdAction.execute();//设置线程ID
System.out.println(“The id query successful”);


System.out.println(“The Name is “ + ActionContext.getActionContext().getContext().getName() + “ and id “ +
}
}


public static void main(String[] args) {
IntStream.range(1, 5).forEach(i -> new Thread(new ContextTest().new ExecutionTask()).start());
}
}
28讲如何使用设计模式优化并发编程 - 图3

运⾏结果:



The name query successful The name query successful The name query successful The name query successful The id query successful The id query successful The id query successful The id query successful
The Name is Thread-2 and id 12 The Name is Thread-0 and id 10 The Name is Thread-1 and id 11 The Name is Thread-3 and id 13

Thread-Per-Message设计模式

Thread-Per-Message设计模式翻译过来的意思就是每个消息⼀个线程的意思。例如,我们在处理Socket通信的时候,通常是

⼀个线程处理事件监听以及I/O读写,如果I/O读写操作⾮常耗时,这个时候便会影响到事件监听处理事件。

这个时候Thread-Per-Message模式就可以很好地解决这个问题,⼀个线程监听I/O事件,每当监听到⼀个I/O事件,则交给另⼀个处理线程执⾏I/O操作。下⾯,我们还是通过⼀个例⼦来学习下该设计模式的实现。

28讲如何使用设计模式优化并发编程 - 图4
//IO处理
public class ServerHandler implements Runnable{ private Socket socket;

public ServerHandler(Socket socket) { this.socket = socket;
}

public void run() { BufferedReader in = null; PrintWriter out = null; String msg = null;
try {
in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(),true);
while ((msg = in.readLine()) != null && msg.length()!=0) {//当连接成功后在此等待接收消息(挂起,进⼊阻塞状
System.out.println(“server received : “ + msg); out.print(“received~\n”);
out.flush();
}
} catch (Exception e) { e.printStackTrace();
} finally { try {
in.close();
} catch (IOException e) { e.printStackTrace();
}
try {
out.close();
} catch (Exception e) { e.printStackTrace();
}
try {
socket.close();
} catch (IOException e) { e.printStackTrace();
}
}
}
}

//Socket 启 动 服 务 public class Server {


private static int DEFAULT_PORT = 12345; private static ServerSocket server;

public static void start() throws IOException { start(DEFAULT_PORT);
}


public static void start(int port) throws IOException { if (server != null) {
return;
}


try {
//启动服务
server = new ServerSocket(port);
// 通过⽆线循环监听客户端连接
while (true) {


Socket socket = server.accept();
// 当有新的客户端接⼊时,会执⾏下⾯的代码
long start = System.currentTimeMillis();
new Thread(new ServerHandler(socket)).start();


long end = System.currentTimeMillis();


System.out.println(“Spend time is “ + (end - start));
}
} finally {
if (server != null) {
System.out.println(“服务器已关闭。”); server.close();
}


}


}


public static void main(String[] args) throws InterruptedException{


// 运⾏服务端
new Thread(new Runnable() {


public void run() { try {
Server.start();
} catch (IOException e) { e.printStackTrace();
}
}
}).start();


}
}

以上,我们是完成了⼀个使⽤Thread-Per-Message设计模式实现的Socket服务端的代码。但这⾥是有⼀个问题的,你发现了
吗?

使⽤这种设计模式,如果遇到⼤的⾼并发,就会出现严重的性能问题。如果针对每个I/O请求都创建⼀个线程来处理,在有⼤量请求同时进来时,就会创建⼤量线程,⽽此时JVM有可能会因为⽆法处理这么多线程,⽽出现内存溢出的问题。

退⼀步讲,即使是不会有⼤量线程的场景,每次请求过来也都需要创建和销毁线程,这对系统来说,也是⼀笔不⼩的性能开销。

⾯对这种情况,我们可以使⽤线程池来代替线程的创建和销毁,这样就可以避免创建⼤量线程⽽带来的性能问题,是⼀种很好的调优⽅法。

Worker-Thread设计模式

这⾥的Worker是⼯⼈的意思,代表在Worker Thread设计模式中,会有⼀些⼯⼈(线程)不断轮流处理过来的⼯作,当没有⼯作时,⼯⼈则会处于等待状态,直到有新的⼯作进来。除了⼯⼈⻆⾊,Worker Thread设计模式中还包括了流⽔线和产品。

这种设计模式相⽐Thread-Per-Message设计模式,可以减少频繁创建、销毁线程所带来的性能开销,还有⽆限制地创建线程所带来的内存溢出⻛险。

我们可以假设⼀个场景来看下该模式的实现,通过Worker Thread设计模式来完成⼀个物流分拣的作业。

假设⼀个物流仓库的物流分拣流⽔线上有8个机器⼈,它们不断从流⽔线上获取包裹并对其进⾏包装,送其上⻋。当仓库中的商品被打包好后,会投放到物流分拣流⽔线上,⽽不是直接交给机器⼈,机器⼈会再从流⽔线中随机分拣包裹。代码如下:



//包裹类
public class Package { private String name; private String address;

public String getName() { return name;
}


public void setName(String name) { this.name = name;
}


public String getAddress() { return address;
}


public void setAddress(String address) { this.address = address;
}


public void execute() { System.out.println(Thread.currentThread().getName()+” executed “+this);
}
}


//流⽔线
public class PackageChannel {
private final static int MAX_PACKAGE_NUM = 100;


private final Package[] packageQueue; private final Worker[] workerPool; private int head;
private int tail; private int count;

public PackageChannel(int workers) { this.packageQueue = new Package[MAX_PACKAGE_NUM]; this.head = 0;
this.tail = 0;
this.count = 0;
this.workerPool = new Worker[workers];
this.init();
}


private void init() {
for (int i = 0; i < workerPool.length; i++) { workerPool[i] = new Worker(“Worker-“ + i, this);
}
}


/*
push switch to start all of worker to work
*/
public void startWorker() { Arrays.asList(workerPool).forEach(Worker::start);
}


public synchronized void put(Package packagereq) { while (count >= packageQueue.length) {
try { this.wait();
} catch (InterruptedException e) { e.printStackTrace();
}
}
this.packageQueue[tail] = packagereq; this.tail = (tail + 1) % packageQueue.length; this.count++;
this.notifyAll();
}


public synchronized Package take() { while (count <= 0) {
try { this.wait();
} catch (InterruptedException e) { e.printStackTrace();
}
}
Package request = this.packageQueue[head];
this.head = (this.head + 1) % this.packageQueue.length; this.count—;
this.notifyAll(); return request;
}


}


//机器⼈
public class Worker extends Thread{
private static final Random random = new Random(System.currentTimeMillis()); private final PackageChannel channel;

public Worker(String name, PackageChannel channel) { super(name);
this.channel = channel;
}


@Override
public void run() { while (true) {
channel.take().execute();


try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) { e.printStackTrace();
}
}
}


}


public class Test {
public static void main(String[] args) {
//新建8个⼯⼈
final PackageChannel channel = new PackageChannel(8);
//开始⼯作channel.startWorker();
//为流⽔线添加包裹
for(int i=0; i<100; i++) {
Package packagereq = new Package(); packagereq.setAddress(“test”); packagereq.setName(“test”); channel.put(packagereq);
}
}
}

我们可以看到,这⾥有8个⼯⼈在不断地分拣仓库中已经包装好的商品。

总结

平时,如果需要传递或隔离⼀些线程变量时,我们可以考虑使⽤上下⽂设计模式。在数据库读写分离的业务场景中,则经常会
⽤到ThreadLocal实现动态切换数据源操作。但在使⽤ThreadLocal时,我们需要注意内存泄漏问题,在之前的第25讲中,我们已经讨论过这个问题了。

当主线程处理每次请求都⾮常耗时时,就可能出现阻塞问题,这时候我们可以考虑将主线程业务分⼯到新的业务线程中,从⽽提⾼系统的并⾏处理能⼒。⽽ Thread-Per-Message 设计模式以及 Worker-Thread 设计模式则都是通过多线程分⼯来提⾼系统并⾏处理能⼒的设计模式。

思考题

除了以上这些多线程的设计模式,平时你还使⽤过其它的设计模式来优化多线程业务吗?

期待在留⾔区看到你的答案。也欢迎你点击“请朋友读”,把今天的内容分享给身边的朋友,邀请他⼀起讨论。

28讲如何使用设计模式优化并发编程 - 图5

  1. 精选留⾔

28讲如何使用设计模式优化并发编程 - 图6nightmare
⼀个注册逻辑,下⾯有注册实现数组,注册实现⾥⾯有队列,并且本身实现runable ,注册⻔⾯依次从注册实现数组获取⼀个注册实现 并把请求放到注册实现的队列中,请求由⼀个注册实现来完成,请求由唯⼀的注册实现来完成,不会有并发问题 ⽽且如果 注册实现有复杂业务 还可以加上 work thread模式来优化
2019-07-27 09:40
28讲如何使用设计模式优化并发编程 - 图7QQ怪
⽐起Worker-Thread 设计模式类似⼯⼚⻋间⼯⼈的⼯作模式,还有⽤的⽐较多的是⽣产者和消费者模式,与之前的不同的是,
⽣产者和消费者模式核⼼是⼀个任务队列,⽣产者⽣产任务到任务队列中,消费者从队列消费任务,优点是解耦和平衡两者之前的速度差异。
2019-07-27 14:58
28讲如何使用设计模式优化并发编程 - 图8陆离
⽼师,讲到并发这⾥,我想问⼀个前⾯讲过的synchronized锁升级的⼏个问题。
1.当锁由⽆锁状态升级到偏向锁时,除了将Mark Work中的线程ID替换是否还有其他操作?替换完线程ID就代表获取到了锁吗

2.当锁状态为偏向锁状态时, 其他线程竞争锁只是CAS替换线程ID吗?如果之前的线程还没有执⾏完呢?
3.针对第2个问题,假设线程T1获取到了偏向锁,将线程ID设为T1。线程T2尝试获取偏向锁时,先检测锁的Mark Word线程ID 是否为T2,如果不是,会CAS替换,这个时候的期望值为null,更新值为T2,失败后进⼊偏向锁撤销。stop-the-world后检测T1是 否存活,如果否清空Mark work线程ID,锁恢复为⽆锁状态,唤醒T2,接着尝试获取锁。流程是这样的吗?
4.当锁升级为轻量级锁时,获取锁的标志是锁指针指向线程的锁记录,当有其他线程尝试CAS获取锁时,期望值是⽆锁时,Ma rk word中为hash age 01这样的内容吗?
5.当线程释放轻量锁时,需要将锁记录替换回Mark Word中,这种情况下锁还未释放为什么会有失败?
6.当锁升级为重量锁后,开始使⽤monitor对象,为什么Mark Word中还会把内容替换为指向线程锁记录的指针?这个时候还需要使⽤Mark word吗?
期待⽼师及同学的解答
2019-07-27 09:49

28讲如何使用设计模式优化并发编程 - 图9undifined
⽼师,有⼀个问题没想明⽩,就是异步的请求处理中,每⼀个线程接收将请求交给处理的线程后,怎么拿到返回结果并返回给
⽤户呢
2019-07-27 08:24
作者回复
可以通过Future模式拿到返回结果,虽然是异步执⾏,如果要等待返回结果,则主线程还是在阻塞等待。
2019-07-27 09:40

28讲如何使用设计模式优化并发编程 - 图10
讲真,我是觉得设计模式是优化代码组织结构的,性能提升只是因为你的实现途径导致了你适合⽤某种设计模式,so感觉这样标题怪怪的。

如果要这么说的话,mapreduce或者说javase引⼊的forkjoin,流⽔线模式,cow就都是了。
2019-07-27 08:05
作者回复
是的,有些设计模式更多的是优化代码逻辑结构,但还是有很多设计模式也起到了优化性能效果。例如,⽂中的Worker-Threa
d 设计模式其实就是⼀种线程池的优化⽅式,在并发量⼤时,⼀般的代码逻辑要么是串⾏执⾏,要么使⽤创建线程并发执⾏, 在⼤量并发时两者都可能会出现性能瓶颈,⽽这种Worker-Thread 这样的设计⽅式则即可以并发执⾏,⼜避免创建过多的线程导致性能瓶颈。
2019-07-27 10:01