Collections提供了一系列的方法来个给容器加锁,返回一个新的容器:
同步类容器的状态都是串行化的,他们虽然实现了线程安全,但是严重的降低了并发性,在多线程环境中,严重降低了程序的吞吐量。而并发类容器是专门针对并发设计的,比如使用 ConcurrentHashMap
代替基于散列的 HashMap
;使用 CopyOnWriteArrayList
代替 Voctor
,并发的 CopyOnWriteArraySet
,以及并发的 Queue 等等。
1. ConcurrentMap
Concurrentmap 接口下有俩个重要的实现
- Concurrenthashmap
- ConcurrentSkipListMap(支持并发排序功能,弥补 ConcurrentHashMap)
ConcurrentHashMap 内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的 Hashtable,Segment 通过继承
ReentrantLock 来进行加锁,所以每次需要加锁的操作锁住的是一个 segment。只要多个修改操作发生在不同的段上,它们就可以并发进行。把一个整体分成了 16 个段(Segment)。也就是最高支持 16 个线程的并发修改操作,这也是在多线程场景时减小锁的粒度从而降低锁竞争的一种方案。并且代码中大多共享变量使用 volatile
关键字声明,目的是第一时间获取修改的内容,性能非常好。
另:ConcurrentSkipListMap 相当于 TreeMap(可以排序);
ConcurrentHashMap<String, Object> chm = new ConcurrentHashMap<String, Object>();
chm.put("k1", "v1");
chm.put("k2", "v2");
chm.put("k3", "v3");
chm.putIfAbsent("k3", "v4"); // 如果k3存在,则不加入value
for (Map.Entry<String, Object> me : chm.entrySet()) {
System.out.println("key:" + me.getKey() + ",value:" + me.getValue());
}
Java 8 引入了红黑树:
2. Copy-On-Write 容器
简称 COW,是一种程序设计中的优化策略。
Coplon Write 容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行 Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器,原容器进行垃圾回收。这样做的好处是我们可以对 Copywrite 容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以 Copyonwrite 容器也是一种读写分离的思想,读和写不同的容器。
- JDK 里的 COW 容器有两种:
CopyonwritearrayList
和CopyonwritearraySet
。 - 适用场景:读多写少。(原因是copy容器的时候耗时)
3. Queue
在并发队列上 JDK 提供了两套实现,无论哪种都继承自 Queue。
- 以
ConcurrentLinkedQueue
为代表的高性能队列 - 以
BlockingQueue
接口为代表的阻塞队列
3.1 ConcurrentLinkedQueue
ConcurrentLinkedQueue
:是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常 concurrentLinkedQueue
性能好于 BlockingQueue
。它是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出的原则。头是最先加入的,尾是最近加入的,该队列不允许 null 元素。
ConcurrentLinkedQueue 重要方法:
- 加入元素:
add()
和offer()
在 Concurrentlinkedqueue 中,这俩个方法没有任何区别) - 取元素:
poll()
和peek()
,两者都是取头元素节点,区别在于前者会除元素,后者不会。
//高性能无阻塞无界队
Concurrentlinkedqueue <String> q =new Concurrentlinkedqueuestring> ();
q.add("a");
q.offer("b");
q.offer("c");
q.offer("d");
q.offer("e");
system.out.println(q.poll()); // a 从头部出元素,并把元素从队列删除
System.out.println(q.size()); // 4
System.out.println(q.peek()); // b
System.out.println(q.size()); // 4
3.2 BlockingQueue 接口
3.2.1 ArrayBlockingQueue
基于数组的阻塞队列实现,在 ArrayBlockingQueue 内部,维护了一个定长数组,以便缓存队列中的数据对象,其内部没实现读写分离,也就意味着生产和消费不能完全并行,长度是需要定义的,可以指定先进先出或者先进后出,也叫有界队列,在很多场合非常适合使用。
ArrayBlockingQueue<String> array = new ArrayBlockingQueue<String>(5);
array.add("a");
array.add("b");
array.add("c");
array.put("d");
array.offer("e");
System.out.println(array.offer("f", 2, TimeUnit.SECONDS)); // 2秒之内能加入返回true,否则返回false
3.2.2 LinkedblockingQueue
基于链表的阻塞队列,同 ArayBlockingQueue
类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成), Linkedblockingqueue
之所以能够高效的处理并发数据,是因为其内部实现采用分离锁(读写分离两个锁),从而实现生产者和消费者操作的完全并行运行。他是一个无界队列。
LinkedBlockingQueue<String> arr = new LinkedBlockingQueue<>(); // 可以加数字参数定容也可以不加
arr.add("a");
arr.add("b");
arr.add("c");
arr.offer("d");
List<String> list = new ArrayList<String>();
System.out.println(arr.drainTo(list, 3)); // 把arr中的前三个值放入list
System.out.println(list.size());
for (String string : list) {
System.out.println(string);
}
3.2.3 SynchronousQueue
一种没有缓冲的队列,生产者产生的数据直接会被消费者获取并消费。此队列不能添加元素。
final SynchronousQueue queue = new SynchronousQueue();
new Thread(() -> {
try {
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
queue.add("zcq");
}).start();
注意:此队列的add方法并不是不能用,但是必须现有take,再有add。元素并不是加到SynchronizeQueue队列里(没有容积),而是直接扔给了take取值的线程。
**
3.2.4 PriorityBlockingQueue
基于优先级的阻塞队列(优先级的判断通过构造函数传入的 Computor 对象来决定,也就是说传入队列的对象必须实现 Comparable 接口),在实现 PriorityBlockingQueue
时,内部控制线程同步的锁采用的是公平锁,他也是一个无界的队列。
public class UsePriorityBlockingQueue {
public static void main(String[] args) throws InterruptedException {
PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<Task>();
Task t1 = new Task(3, "id3");
Task t2 = new Task(1, "id1");
Task t3 = new Task(2, "id2");
queue.add(t1);
queue.add(t2);
queue.add(t3);
System.out.println(queue);
System.out.println(queue.take().getName());
System.out.println(queue);
System.out.println(queue.take().getName());
}
}
@Data
class Task implements Comparable<Task> {
private int id;
private String name;
public Task(int id, String name) {
super();
this.id = id;
this.name = name;
}
@Override
public int compareTo(Task task) {
return this.id > task.id ? 1 : (this.id < task.id ? -1 : 0);
}
}
PriorityBlockingQueue
在 take()
方法执行的时候才进行排序(选择排序法)。
3.2.5 DelayQueue
带有延迟时间的 Queue,其中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。Delayqueue 中的元素必须实现 Delayed
接口,DelayQueue,是一个没有大小限制的队列,应用场景很多,比如对缓存超时的数据进行移除、任务超时处理、空闲连接的关闭等等。
例子:网民上网,按照不同时间依次下机
- 创建网民对象
@Data
public class WangMin implements Delayed {
private String name;
private String id;
private long endTime;
private TimeUnit timeUnit = TimeUnit.SECONDS;
public WangMin(String name, String id, long endTime) {
this.name = name;
this.id = id;
this.endTime = endTime;
}
/**
* 判断是否到了截止时间
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return endTime - System.currentTimeMillis();
}
/**
* 相互比较排序
* @return
*/
@Override
public int compareTo(Delayed delayed) {
WangMin w = (WangMin) delayed;
return this.getDelay(timeUnit) - w.getDelay(timeUnit) > 0 ? 1 : 0;
}
}
创建网吧对象
public class WangBa implements Runnable { private DelayQueue<WangMin> queue = new DelayQueue(); private boolean yingye = true; public void shangji(String name, String id, int money) { WangMin man = new WangMin(name, id, 1000 * money + System.currentTimeMillis()); System.out.println("网民:" + name + ";身份证:" + id + ",交了" + money + "元钱。开始上机。。。"); queue.add(man); } public void xiaji(WangMin man) { System.out.println("网民:" + man.getName() + ";身份证:" + man.getId() + ",时间到了下机。。。"); } @Override public void run() { while (yingye) { try { WangMin man = queue.take(); xiaji(man); } catch (InterruptedException e) { e.printStackTrace(); } } } }
main 方法
System.out.println("网吧开始营业 =======");
WangBa wangba = new WangBa();
Thread shangwang = new Thread(wangba);
shangwang.start();
wangba.shangji("路人甲", "123", 1);
wangba.shangji("路人乙", "1234", 10);
wangba.shangji("路人丙", "12345", 5);
- 结果
网吧开始营业 ======= 网民:路人甲;身份证:123,交了1元钱。开始上机。。。 网民:路人乙;身份证:1234,交了10元钱。开始上机。。。 网民:路人丙;身份证:12345,交了5元钱。开始上机。。。 网民:路人甲;身份证:123,时间到了下机。。。 网民:路人丙;身份证:12345,时间到了下机。。。 网民:路人乙;身份证:1234,时间到了下机。。。