一、探讨单例模式
单例模式:保证系统中一个类只产生一个对象实例
1、可以省略new操作花费的时间
2、减少CG压力,缩短CG停顿时间
单例模式与并行没有直接的关系,只是因为太常见了所以讨论它。。
简单的实现:(饿汉式)
public class Singleton{private Singleton(){System.out.println("Singleton is create")}private static Singleton instance = new Singleton();public static Singleton getInstance(){return instance;}}
注意:
1、构造函数设为private
2、instance对象必须是private 并static的,如果不是private,则 安全性无法保证,可能会变成null。因为工厂方法getInstance()必须是static的,所以instance也必须是是static。
这种创建方式不足的地方是,无法保证对象真的是在getInstance()方法第一次被调用时创建的。
如你的单例是这样的:
public class Singleton{public static int STATUS = 1;private Singleton(){System.out.println("Singleton is create")}private static Singleton instance = new Singleton();public static Singleton getInstance(){return instance;}}
任何地方引用这个STATUS都会导致instance实例被创建。
如果想要精确的控制创建时间,一种延迟加载的策略。他只会在instance第一次使用的时候创建对象
具体实现:(懒汉式)
public class Singleton{private Singleton(){System.out.println("Singleton is create")}private static Singleton instance = null;public static synchronized Singleton getInstance(){ // 加锁控制其并发if(instance == null){instance = new Singleton();}return instance;}}
结合二者优势的单例:
public class StaticSingleton{private StaticSingleton(){System.out.println("Singleton is create")}private static class SingletonHolder {private static StaticSingleton instance = new StaticSingleton(); // 类初始化时创建}public static StaticSingleton getInstance(){return SingletonHolder.instance;}}
getInstance()方法没有锁,在高并发环境下性能优越。在getInstance()方法第一次调用时,StaticSingleton的实例才会被创建。因为这种方法巧妙的使用了内部类和类的初始化方式。内部类SingletonHolder是private,无法从外部访问。只能在getInstance()方法中利用虚拟机的类初始化机制创建单例。
二、不变模式
不变模式:一旦一个对象被创建,他的内部永远不会发生改变。没有线程可以对其修改状态和数据,内部状态也不会自行改变。
不变模式比只读属性更具有一致性和不变性。只读属性本身的状态可以修改。如,存活时间。
不变模式需要满足一下条件:
- 当对象创建后,其内部状态和数据不在发生任何变化。
- 对象需要被共享,被多线程频繁访问。
实现需要注意一下四点:
- 去除setter方法及所有修改自身属性的方法
- 将所有属性设为私有,并用final标记。
- 确保没有子类可以重载修改它的行为
- 有一个可以创建完整对象的构造函数
public final class Product){ //确保无子类private final String no;private final String name;private final double price;public Product(String no,String name,double price){super();this.no = no;this.name = name;this.price = price;}public String getNo(){return no;}public String getName(){return name;}public String getPrice(){return price;}}
三、生产者-消费者模式

消费者线程与生产者线程不直接通信,而是在共享内存缓冲区中获取任务。这样生产者和消费者进行解耦。
其中BlockingQueue充当了共享缓冲区。
详细的不再赘述,详细可以看回书中P215。
四、Future模式
如果我们调用的方法执行的很慢,那么我们就要进行等待。但有时我们并不着急要结果。因此,我们可以让被吊用着立即返回。让它在后台慢慢处理。虽然它无法立即给出我所需要的数据,但它会返回一个契约给你,你可以凭借这个契约去重新获取你所需要的信息。
1、Future模式的主要角色
追要参与者:
| 参与者 | 作用 |
|---|---|
| Main | 系统启动,调用Client发出请求 |
| Client | 返回Data对象,立即返回FutureData,并开启ClientThread线程装配RealData |
| Data | 返回数据的接口 |
| FutureData | Future数据构造很快,但是是一个虚拟的数据,需要装配RealData |
| RealData | 真实数据,其构造是比较慢的 |
2、Future模式的简单实现
Data:
public interface Data {public String getResult();}
FutureData:
public class FutureData implements Data {protected RealData realData = null;protected boolean isReady = false;public synchronized void setRealData(RealData realData){if(!isReady){this.realData = realData;isReady = true;notifyAll(); // RealData已经被注入,通知getResult()方法}}@Overridepublic synchronized String getResult() {while (!isReady){try {wait();} catch (InterruptedException e) {e.printStackTrace();}}return realData.result;}}
RealData:
public class RealData implements Data{protected final String result;public RealData(String result) {StringBuilder stringBuilder = new StringBuilder();for (int i = 0; i < 10; i++) {stringBuilder.append(result);try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}}this.result = stringBuilder.toString();}@Overridepublic String getResult() {return result;}}
Client:
public class Client {public Data request(final String queryStr) {final FutureData futureData = new FutureData();new Thread() {public void run() {RealData realData = new RealData(queryStr);futureData.setRealData(realData);}}.start();return futureData;}//------------------------- 启动类 ------------------------------------------public static void main(String[] args) {Client client = new Client();Data data = client.request("name");try {System.out.println("处理其他逻辑...");Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("数据 = " + data.getResult());}}
3、JDK中的Future模式

其实就是我们所了解的带返回值的线程。
案例:
RealData:
public class RealData implements Callable {private String para;public RealData(String para){this.para = para;}@Overridepublic Object call() throws Exception {StringBuilder sb = new StringBuilder();for (int i = 0; i < 10; i++) {sb.append(para);try {Thread.sleep(100);}catch (InterruptedException ignored){}}return sb.toString();}}
FutureMain:
public class FutureMain {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> future = new FutureTask<>(new RealData("a"));ExecutorService executorService = Executors.newFixedThreadPool(1);executorService.submit(future);System.out.println("请求完毕");try{System.out.println("做其他的逻辑处理");Thread.sleep(2000);}catch (InterruptedException ignored){}System.out.println("数据:"+future.get());}}
除此之外,还有一些常用的API:
4、Guava对Future模式的支持
Guava中,增强了Future模式,增加了对Future模式完成时的回调接口,使Future完成时可以自动通知应用进行后续处理。
public class FutureDemo {public static void main(String[] args) throws InterruptedException {// 将一个普通线程池包装成一个包含通知功能的Future线程池ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));// 拥有完成时的通知功能的FutureListenableFuture future = service.submit(new RealData("x"));// 回调函数,当Future执行完成后,则执行以下回调代码future.addListener(() -> {System.out.println("异步处理");try{System.out.println(future.get());}catch (Exception e){e.printStackTrace();}},MoreExecutors.directExecutor());System.out.println("main task done...");Thread.sleep(3000);}}
这个过程没有阻塞,可以更好的提升系统的并行度
另一种增强了对异常单的处理:
public static void main(String[] args) throws InterruptedException {// 将一个普通线程池包装成一个包含通知功能的Future线程池ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));// 拥有完成时的通知功能的FutureListenableFuture future = service.submit(new RealData("x"));// 使用Futures工具类将FutureCallback接口注册到给定的Future中Futures.addCallback(future, new FutureCallback<String>() {@Overridepublic void onSuccess(String s) {System.out.println("异步处理成功,"+s);}@Overridepublic void onFailure(Throwable throwable) {System.out.println("处理失败:"+throwable);}});System.out.println("main task done...");Thread.sleep(3000);}
五、并行流水线
不是所有的计算都可以并发计算的。假如有一个计算, 如果没有计算出B+C,就算不出(B+C)* B。遇到这种情况,我们可以采用流水线的思想。
逻辑联想:
假如我们做一个玩具小熊,我们不可能找来四个人,一个人组装身体,一个人安装四肢和头部,一个人穿衣服,一个人打包发货。因为这些步骤都是需要依赖前一步的步骤的。但是如果我们此时要做的不是一个玩偶。而是10000个。那么我们就可以一个人只负责一个步骤,做完后交给后面的人继续完成下一个步骤。流水线满载时,每次只需要一步就可以产生一个玩偶。
下面将上面的运算拆分成三个步骤:
P1:A=B+C
P2:D=A*B
P3:D=D/2
示例代码:
Msg:(信息的载体)
public class Msg {public double i;public double j;public String orgStr = null;}
Plus:(流程的第一个步骤)
public class Plus implements Runnable{public static BlockingQueue<Msg> bq = new LinkedBlockingDeque<>();@Overridepublic void run() {while (true){try {Msg msg = bq.take();msg.j = msg.i+msg.j;Multiply.bq.add(msg);} catch (InterruptedException e) {e.printStackTrace();}}}}
Multiply:(流程的第二个步骤)
public class Multiply implements Runnable{public static BlockingQueue<Msg> bq = new LinkedBlockingDeque<>();@Overridepublic void run() {while (true){try{Msg msg = bq.take();msg.i = msg.i* msg.j;Div.bq.add(msg);}catch (InterruptedException e){}}}}
Div:(流程的第三个步骤)
public class Div implements Runnable{public static BlockingQueue<Msg> bq = new LinkedBlockingDeque<>();@Overridepublic void run() {while (true){try {Msg msg = bq.take();msg.i = msg.i/2;System.out.println(msg.orgStr+"="+msg.i);} catch (InterruptedException e) {e.printStackTrace();}}}}
启动Main:
public class PStreamMain {public static void main(String[] args) {new Thread(new Plus()).start();new Thread(new Multiply()).start();new Thread(new Div()).start();for (int i = 0; i <= 10; i++) {for (int j = 0; j <= 10; j++) {Msg msg = new Msg();msg.i = i;msg.j = j;msg.orgStr = "(("+i+"+"+j+")*"+i+")/2";Plus.bq.add(msg);}}}}
六、并行搜索
有序数据,可以通过二分查找法去搜索数据。对于无序数据,如果使用串行程序,只需要遍历一下数组就可以得到结果。但如果要使用并行方式,则需要额外的增加一些线程间的通信机制。
一种简单的策略就是将原始数据按期望的线程数进行分割。如果我们计划使用两个线程进行搜索。那么我们可以把一个数组或集合分成两个。当有一个线程找到数据后,返回即可。
先定义一个整型数组,我们需要查找数组内的元素:static int[] arr;
定义线程池,和存放结果的result。在result中,我们会保存符合条件的元素在arr数组中的下标。默认为-1,表示没有找到给定元素。
static ExecutorService poo = Executors.newCachedThreadPool();static final int Thread_Num = 2;static AtomicInteger result = new AtomicInteger(-1);
并发搜索需要指定线程搜索的起始与结束位置:
public static int search(int searchValue,int beginPos,int endPos){int i = 0;// 开始与结束for(i=beginPos;i<endPos;i++){if(result.get()>=0){ // 如果其他线程找到了,就返回return result.get();}if(arr[i]==searchValue){if(!result.compareAndSet(-1,i)){ // 如果值替换失败了,说明其他的线程找到了,也返回return result.get();}return i; // 返回下标}}return -1;}
定义一个线程进行查找,它会调用前面的search()方法
public static class SearchTask implements Callale<Integer>{int begin,end,searchValue;public SearchTask(int searchValue,int begin,int end){this.begin = begin;this.end = end;this.searchValue = searchValue;}public Integer call(){int re = search(searchValue,begin,end));return re;}}
最后是pSearch()方法并行查找函数,它会根据线程数量对arr数组进行划分,并建立对应的任务提交给线程池处理。
public static int pSearch(int searchValue) throws Exception{int subArrSize = arr.length/Thread_Num+1;List<Future<Integer>> re = new ArratList<>();for(int i = 0;i<arr.length;i+=subArrSize){int end = i+subArrSize;if(end>=arr.length)end=arr.length;re.add(pool.submit(new SearchTask(searchValue,i,end)));}for(Future<Integer> fu:re){if(fu.get()>=0)return fu.get();}return -1;}
七、并行排序
1、分离数据相关性,奇偶交换排序
相信大家都听过冒泡排序算法,这里就不在赘述。奇偶交换排序就是基于这种思想的
对奇偶交换排序来说,他们分为两个阶段,奇交换和偶交换。奇交换比较的是奇数索引及其相邻的后续元素。而欧交换比较的是偶数索引及其相邻的后续元素。奇交换和偶交换会成对出现。
