一、探讨单例模式

单例模式:保证系统中一个类只产生一个对象实例

1、可以省略new操作花费的时间
2、减少CG压力,缩短CG停顿时间

单例模式与并行没有直接的关系,只是因为太常见了所以讨论它。。

简单的实现:(饿汉式)

  1. public class Singleton{
  2. private Singleton(){
  3. System.out.println("Singleton is create")
  4. }
  5. private static Singleton instance = new Singleton();
  6. public static Singleton getInstance(){
  7. return instance;
  8. }
  9. }

注意:
1、构造函数设为private
2、instance对象必须是private 并static的,如果不是private,则 安全性无法保证,可能会变成null。因为工厂方法getInstance()必须是static的,所以instance也必须是是static。

这种创建方式不足的地方是,无法保证对象真的是在getInstance()方法第一次被调用时创建的。

如你的单例是这样的:

  1. public class Singleton{
  2. public static int STATUS = 1;
  3. private Singleton(){
  4. System.out.println("Singleton is create")
  5. }
  6. private static Singleton instance = new Singleton();
  7. public static Singleton getInstance(){
  8. return instance;
  9. }
  10. }

任何地方引用这个STATUS都会导致instance实例被创建。

如果想要精确的控制创建时间,一种延迟加载的策略。他只会在instance第一次使用的时候创建对象

具体实现:(懒汉式)

  1. public class Singleton{
  2. private Singleton(){
  3. System.out.println("Singleton is create")
  4. }
  5. private static Singleton instance = null;
  6. public static synchronized Singleton getInstance(){ // 加锁控制其并发
  7. if(instance == null){
  8. instance = new Singleton();
  9. }
  10. return instance;
  11. }
  12. }

结合二者优势的单例:

  1. public class StaticSingleton{
  2. private StaticSingleton(){
  3. System.out.println("Singleton is create")
  4. }
  5. private static class SingletonHolder {
  6. private static StaticSingleton instance = new StaticSingleton(); // 类初始化时创建
  7. }
  8. public static StaticSingleton getInstance(){
  9. return SingletonHolder.instance;
  10. }
  11. }

getInstance()方法没有锁,在高并发环境下性能优越。在getInstance()方法第一次调用时,StaticSingleton的实例才会被创建。因为这种方法巧妙的使用了内部类和类的初始化方式。内部类SingletonHolder是private,无法从外部访问。只能在getInstance()方法中利用虚拟机的类初始化机制创建单例。

二、不变模式

不变模式:一旦一个对象被创建,他的内部永远不会发生改变。没有线程可以对其修改状态和数据,内部状态也不会自行改变。

不变模式比只读属性更具有一致性和不变性。只读属性本身的状态可以修改。如,存活时间。

不变模式需要满足一下条件:

  • 当对象创建后,其内部状态和数据不在发生任何变化。
  • 对象需要被共享,被多线程频繁访问。

实现需要注意一下四点:

  • 去除setter方法及所有修改自身属性的方法
  • 将所有属性设为私有,并用final标记。
  • 确保没有子类可以重载修改它的行为
  • 有一个可以创建完整对象的构造函数
  1. public final class Product){ //确保无子类
  2. private final String no;
  3. private final String name;
  4. private final double price;
  5. public Product(String no,String name,double price){
  6. super();
  7. this.no = no;
  8. this.name = name;
  9. this.price = price;
  10. }
  11. public String getNo(){
  12. return no;
  13. }
  14. public String getName(){
  15. return name;
  16. }
  17. public String getPrice(){
  18. return price;
  19. }
  20. }

三、生产者-消费者模式

image.png

消费者线程与生产者线程不直接通信,而是在共享内存缓冲区中获取任务。这样生产者和消费者进行解耦。
其中BlockingQueue充当了共享缓冲区。
详细的不再赘述,详细可以看回书中P215。

四、Future模式

如果我们调用的方法执行的很慢,那么我们就要进行等待。但有时我们并不着急要结果。因此,我们可以让被吊用着立即返回。让它在后台慢慢处理。虽然它无法立即给出我所需要的数据,但它会返回一个契约给你,你可以凭借这个契约去重新获取你所需要的信息。

1、Future模式的主要角色

追要参与者:

参与者 作用
Main 系统启动,调用Client发出请求
Client 返回Data对象,立即返回FutureData,并开启ClientThread线程装配RealData
Data 返回数据的接口
FutureData Future数据构造很快,但是是一个虚拟的数据,需要装配RealData
RealData 真实数据,其构造是比较慢的

2、Future模式的简单实现

Data:

  1. public interface Data {
  2. public String getResult();
  3. }

FutureData:

  1. public class FutureData implements Data {
  2. protected RealData realData = null;
  3. protected boolean isReady = false;
  4. public synchronized void setRealData(RealData realData){
  5. if(!isReady){
  6. this.realData = realData;
  7. isReady = true;
  8. notifyAll(); // RealData已经被注入,通知getResult()方法
  9. }
  10. }
  11. @Override
  12. public synchronized String getResult() {
  13. while (!isReady){
  14. try {
  15. wait();
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. return realData.result;
  21. }
  22. }

RealData:

  1. public class RealData implements Data{
  2. protected final String result;
  3. public RealData(String result) {
  4. StringBuilder stringBuilder = new StringBuilder();
  5. for (int i = 0; i < 10; i++) {
  6. stringBuilder.append(result);
  7. try {
  8. Thread.sleep(200);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. }
  13. this.result = stringBuilder.toString();
  14. }
  15. @Override
  16. public String getResult() {
  17. return result;
  18. }
  19. }

Client:

  1. public class Client {
  2. public Data request(final String queryStr) {
  3. final FutureData futureData = new FutureData();
  4. new Thread() {
  5. public void run() {
  6. RealData realData = new RealData(queryStr);
  7. futureData.setRealData(realData);
  8. }
  9. }.start();
  10. return futureData;
  11. }
  12. //------------------------- 启动类 ------------------------------------------
  13. public static void main(String[] args) {
  14. Client client = new Client();
  15. Data data = client.request("name");
  16. try {
  17. System.out.println("处理其他逻辑...");
  18. Thread.sleep(1000);
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. System.out.println("数据 = " + data.getResult());
  23. }
  24. }

3、JDK中的Future模式

image.png

其实就是我们所了解的带返回值的线程。

案例:
RealData:

  1. public class RealData implements Callable {
  2. private String para;
  3. public RealData(String para){
  4. this.para = para;
  5. }
  6. @Override
  7. public Object call() throws Exception {
  8. StringBuilder sb = new StringBuilder();
  9. for (int i = 0; i < 10; i++) {
  10. sb.append(para);
  11. try {
  12. Thread.sleep(100);
  13. }catch (InterruptedException ignored){
  14. }
  15. }
  16. return sb.toString();
  17. }
  18. }

FutureMain:

  1. public class FutureMain {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. FutureTask<String> future = new FutureTask<>(new RealData("a"));
  4. ExecutorService executorService = Executors.newFixedThreadPool(1);
  5. executorService.submit(future);
  6. System.out.println("请求完毕");
  7. try{
  8. System.out.println("做其他的逻辑处理");
  9. Thread.sleep(2000);
  10. }catch (InterruptedException ignored){
  11. }
  12. System.out.println("数据:"+future.get());
  13. }
  14. }

除此之外,还有一些常用的API:
image.png

4、Guava对Future模式的支持

Guava中,增强了Future模式,增加了对Future模式完成时的回调接口,使Future完成时可以自动通知应用进行后续处理。

  1. public class FutureDemo {
  2. public static void main(String[] args) throws InterruptedException {
  3. // 将一个普通线程池包装成一个包含通知功能的Future线程池
  4. ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
  5. // 拥有完成时的通知功能的Future
  6. ListenableFuture future = service.submit(new RealData("x"));
  7. // 回调函数,当Future执行完成后,则执行以下回调代码
  8. future.addListener(() -> {
  9. System.out.println("异步处理");
  10. try{
  11. System.out.println(future.get());
  12. }catch (Exception e){
  13. e.printStackTrace();
  14. }
  15. },MoreExecutors.directExecutor());
  16. System.out.println("main task done...");
  17. Thread.sleep(3000);
  18. }
  19. }

这个过程没有阻塞,可以更好的提升系统的并行度

另一种增强了对异常单的处理:

  1. public static void main(String[] args) throws InterruptedException {
  2. // 将一个普通线程池包装成一个包含通知功能的Future线程池
  3. ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
  4. // 拥有完成时的通知功能的Future
  5. ListenableFuture future = service.submit(new RealData("x"));
  6. // 使用Futures工具类将FutureCallback接口注册到给定的Future中
  7. Futures.addCallback(future, new FutureCallback<String>() {
  8. @Override
  9. public void onSuccess(String s) {
  10. System.out.println("异步处理成功,"+s);
  11. }
  12. @Override
  13. public void onFailure(Throwable throwable) {
  14. System.out.println("处理失败:"+throwable);
  15. }
  16. });
  17. System.out.println("main task done...");
  18. Thread.sleep(3000);
  19. }

五、并行流水线

不是所有的计算都可以并发计算的。假如有一个计算,第五章 并行模式与算法 - 图4 如果没有计算出B+C,就算不出(B+C)* B。遇到这种情况,我们可以采用流水线的思想。

逻辑联想:
假如我们做一个玩具小熊,我们不可能找来四个人,一个人组装身体,一个人安装四肢和头部,一个人穿衣服,一个人打包发货。因为这些步骤都是需要依赖前一步的步骤的。但是如果我们此时要做的不是一个玩偶。而是10000个。那么我们就可以一个人只负责一个步骤,做完后交给后面的人继续完成下一个步骤。流水线满载时,每次只需要一步就可以产生一个玩偶。
image.png

下面将上面的运算第五章 并行模式与算法 - 图6拆分成三个步骤:
P1:A=B+C
P2:D=A*B
P3:D=D/2

示例代码:
Msg:(信息的载体)

  1. public class Msg {
  2. public double i;
  3. public double j;
  4. public String orgStr = null;
  5. }

Plus:(流程的第一个步骤)

  1. public class Plus implements Runnable{
  2. public static BlockingQueue<Msg> bq = new LinkedBlockingDeque<>();
  3. @Override
  4. public void run() {
  5. while (true){
  6. try {
  7. Msg msg = bq.take();
  8. msg.j = msg.i+msg.j;
  9. Multiply.bq.add(msg);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. }
  15. }

Multiply:(流程的第二个步骤)

  1. public class Multiply implements Runnable{
  2. public static BlockingQueue<Msg> bq = new LinkedBlockingDeque<>();
  3. @Override
  4. public void run() {
  5. while (true){
  6. try{
  7. Msg msg = bq.take();
  8. msg.i = msg.i* msg.j;
  9. Div.bq.add(msg);
  10. }catch (InterruptedException e){
  11. }
  12. }
  13. }
  14. }

Div:(流程的第三个步骤)

  1. public class Div implements Runnable{
  2. public static BlockingQueue<Msg> bq = new LinkedBlockingDeque<>();
  3. @Override
  4. public void run() {
  5. while (true){
  6. try {
  7. Msg msg = bq.take();
  8. msg.i = msg.i/2;
  9. System.out.println(msg.orgStr+"="+msg.i);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. }
  15. }

启动Main:

  1. public class PStreamMain {
  2. public static void main(String[] args) {
  3. new Thread(new Plus()).start();
  4. new Thread(new Multiply()).start();
  5. new Thread(new Div()).start();
  6. for (int i = 0; i <= 10; i++) {
  7. for (int j = 0; j <= 10; j++) {
  8. Msg msg = new Msg();
  9. msg.i = i;
  10. msg.j = j;
  11. msg.orgStr = "(("+i+"+"+j+")*"+i+")/2";
  12. Plus.bq.add(msg);
  13. }
  14. }
  15. }
  16. }

六、并行搜索

有序数据,可以通过二分查找法去搜索数据。对于无序数据,如果使用串行程序,只需要遍历一下数组就可以得到结果。但如果要使用并行方式,则需要额外的增加一些线程间的通信机制。

一种简单的策略就是将原始数据按期望的线程数进行分割。如果我们计划使用两个线程进行搜索。那么我们可以把一个数组或集合分成两个。当有一个线程找到数据后,返回即可。

先定义一个整型数组,我们需要查找数组内的元素:
static int[] arr;

定义线程池,和存放结果的result。在result中,我们会保存符合条件的元素在arr数组中的下标。默认为-1,表示没有找到给定元素。

  1. static ExecutorService poo = Executors.newCachedThreadPool();
  2. static final int Thread_Num = 2;
  3. static AtomicInteger result = new AtomicInteger(-1);

并发搜索需要指定线程搜索的起始与结束位置:

  1. public static int search(int searchValue,int beginPos,int endPos){
  2. int i = 0;
  3. // 开始与结束
  4. for(i=beginPos;i<endPos;i++){
  5. if(result.get()>=0){ // 如果其他线程找到了,就返回
  6. return result.get();
  7. }
  8. if(arr[i]==searchValue){
  9. if(!result.compareAndSet(-1,i)){ // 如果值替换失败了,说明其他的线程找到了,也返回
  10. return result.get();
  11. }
  12. return i; // 返回下标
  13. }
  14. }
  15. return -1;
  16. }

定义一个线程进行查找,它会调用前面的search()方法

  1. public static class SearchTask implements Callale<Integer>{
  2. int begin,end,searchValue;
  3. public SearchTask(int searchValue,int begin,int end){
  4. this.begin = begin;
  5. this.end = end;
  6. this.searchValue = searchValue;
  7. }
  8. public Integer call(){
  9. int re = search(searchValue,begin,end));
  10. return re;
  11. }
  12. }

最后是pSearch()方法并行查找函数,它会根据线程数量对arr数组进行划分,并建立对应的任务提交给线程池处理。

  1. public static int pSearch(int searchValue) throws Exception{
  2. int subArrSize = arr.length/Thread_Num+1;
  3. List<Future<Integer>> re = new ArratList<>();
  4. for(int i = 0;i<arr.length;i+=subArrSize){
  5. int end = i+subArrSize;
  6. if(end>=arr.length)end=arr.length;
  7. re.add(pool.submit(new SearchTask(searchValue,i,end)));
  8. }
  9. for(Future<Integer> fu:re){
  10. if(fu.get()>=0)return fu.get();
  11. }
  12. return -1;
  13. }

七、并行排序

1、分离数据相关性,奇偶交换排序

相信大家都听过冒泡排序算法,这里就不在赘述。奇偶交换排序就是基于这种思想的

对奇偶交换排序来说,他们分为两个阶段,奇交换和偶交换。奇交换比较的是奇数索引及其相邻的后续元素。而欧交换比较的是偶数索引及其相邻的后续元素。奇交换和偶交换会成对出现。

八、并行算法:矩阵算法

暂时先跳过七、八、九章节

九、准备好了再通知:网络NIO