7.1 线程安全的同步容器类
1 .通过synchronizedSortedSet静态方法包装出一个同步容器
package com.crazymakercircle.syncontainer;
// 省略import
public class CollectionsDemo
{
public static void main(String[] args) throws InterruptedException
{
// 创建一下基础的有序集合
SortedSet<String> elementSet = new TreeSet<String>();
// 增加元素
elementSet.add("element 1");
elementSet.add("element 2");
// 将 elementSet 包装成一个同步容器
SortedSet sorset = Collections.synchronizedSortedSet(elementSet);
// 输出容器中的元素
System.out.println("SortedSet is :" + sorset);
CountDownLatch latch=new CountDownLatch(5);
for (int i = 0; i < 5; i++)
{
int finalI = i;
ThreadUtil.getCpuIntenseTargetThreadPool()
.submit(() ->{
// 向同步容器中增加一个元素
sorset.add("element " + (3 + finalI));
Print.tco("add element"+ (3 + finalI));
latch.countDown();
});
}
latch.await();
// 输出容器中的元素
System.out.println("SortedSet is :" + sorset);
}
}
- java.util.Collections所提供的同步包装方法
-
7.2 JUC高并发容器
7.3 CopyOnWriteArrayList
7.3.1 CopyOnWriteArrayList的使用
前面讲到,Collections可以将基础容器包装为线程安全的同步容器,但是这些同步容器包装类在进行元素迭代时并不能进行元素添加操作。下面是一个简单的例子: ```java package com.crazymakercircle.lockfree; // 省略import public class CopyOnWriteArrayListTest {
//并发操作的执行目标
public static class CocurrentTarget implements Runnable
{
//并发操作的目标队列
List<String> targetList = null;
public CocurrentTarget(List<String> targetList)
{
this.targetList = targetList;
}
@Override
public void run()
{
Iterator<String> iterator = targetList.iterator();
//迭代操作
while (iterator.hasNext())
{
// 在迭代操作时,进行列表的修改
String threadName = currentThread().getName();
Print.tco("开始往同步队列加入线程名称:" + threadName);
targetList.add(threadName);
}
}
}
//测试同步队列:在迭代操作时,进行列表的修改
@Test
public void testSynchronizedList()
{
List<String> notSafeList = asList("a", "b", "c");
List<String> synList = Collections.synchronizedList(notSafeList);
//创建一个执行目标
CocurrentTarget synchronizedListListDemo =
new CocurrentTarget(synList);
//10个线程并发
for (int i = 0; i < 10; i++)
{
new Thread(synchronizedListListDemo , "线程" + i).start();
}
//主线程等待
sleepSeconds(1000);
}
}
那么,该如何解决此问题呢?可使用CopyOnWriteArrayList替代Collections.synchronizedList同步包装实例,具体的代码如下:
```java
package com.crazymakercircle.lockfree;
// 省略import
public class CopyOnWriteArrayListTest
{
//测试CopyOnWriteArrayList
@Test
public void testcopyOnWriteArrayList()
{
List<String> notSafeList = asList("a", "b", "c");
//创建一个CopyOnWriteArrayList队列
List<String> copyOnWriteArrayList = new CopyOnWriteArrayList();
copyOnWriteArrayList.addAll(notSafeList);
//并发执行目标
CocurrentTarget copyOnWriteArrayListDemo =
new CocurrentTarget(copyOnWriteArrayList);
for (int i = 0; i < 10; i++)
{
new Thread(copyOnWriteArrayListDemo, "线程" + i).start();
}
//主线程等待
sleepSeconds(1000);
}
}
7.3.2 CopyOnWriteArrayList的原理
CopyOnWriteArrayList的核心成员如下:
public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
private static final long serialVersionUID = 8673264195747942595L;
/** 对所有的修改器方法进行保护,访问器方法并不需要保护 */
final transient ReentrantLock lock = new ReentrantLock();
/** 内部对象数组,通过 getArray/setArray方法访问 */
private transient volatile Object[] array;
/**
*获取内部对象数组
*/
final Object[] getArray() {
return array;
}
/**
*设置内部对象数组
*/
final void setArray(Object[] a) {
array = a;
}
// 省略其他代码
}
7.3.3 CopyOnWriteArrayList读取操作
/** 操作内存的引用*/
private transient volatile Object[] array;
public E get(int index) {
return get(getArray(), index);
}
//获取元素
@SuppressWarnings("unchecked")
private E get(Object[] a, int index) {
return (E) a[index];
}
//返回操作内存
final Object[] getArray() {
return array;
}
7.3.4 CopyOnWriteArrayList写入操作
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock(); // 加锁
try {
Object[] elements = getArray();
int len = elements.length;
// 复制新数组
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock(); // 释放锁
}
}
7.3.5 CopyOnWriteArrayList的迭代器实现
static final class COWIterator<E> implements ListIterator<E> {
/**对象数组的快照(snapshot)*/
private final Object[] snapshot;
/** Index of element to be returned by subsequent call to next. */
private int cursor;
private COWIterator(Object[] elements, int initialCursor) {
cursor = initialCursor;
snapshot = elements;
}
public boolean hasNext() {
return cursor < snapshot.length;
}
//下一个元素
public E next() {
if (! hasNext())
throw new NoSuchElementException();
return (E) snapshot[cursor++];
}
}
7.4 BlockingQueue
7.4.1 BlockingQueue的特点
7.4.2 阻塞队列的常用方法
public interface BlockingQueue<E> extends Queue<E> {
//将指定的元素添加到此队列的尾部
//在成功时返回true,如果此队列已满,就抛出IllegalStateException
boolean add(E e);
//非阻塞式添加:将指定的元素添加到此队列的尾部(如果立即可行且不会超过该队列的容量)
//如果该队列已满,就直接返回
boolean offer(E e)
//限时阻塞式添加:将指定的元素添加到此队列的尾部
//如果该队列已满,那么在到达指定的等待时间之前,添加线程会阻塞,等待可用的空间,该方法可中断
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
//阻塞式添加:将指定的元素添加到此队列的尾部,如果该队列已满,就一直等待(阻塞)
void put(E e) throws InterruptedException;
//阻塞式删除:获取并移除此队列的头部,如果没有元素就等待(阻塞)
//直到有元素,将唤醒等待线程执行该操作
E take() throws InterruptedException;
//非阻塞式删除:获取并移除此队列的头部,如果没有元素就直接返回null(空)
E poll() throws InterruptedException;
//限时阻塞式删除:获取并移除此队列的头部,在指定的等待时间前一直等待获取元素,超过时间,方法将结束
E poll(long timeout, TimeUnit unit) throws InterruptedException;
//获取但不移除此队列的头元素,没有则抛出异常NoSuchElementException
E element();
//获取但不移除此队列的头元素,如果此队列为空,就返回null
E peek();
//从此队列中移除指定元素,返回删除是否成功
boolean remove(Object o);
}
7.4.3 常见的BlockingQueue
- ArrayBlockingQueue
- LinkedBlockingQueue
- DelayQueue
- PriorityBlockingQueue
-
7.4.4 ArrayBlockingQueue的基本使用
```java package com.crazymakercircle.producerandcomsumer.store; // 省略import public class ArrayBlockingQueuePetStore {
public static final int MAX_AMOUNT = 10; //数据区长度
//共享数据区,类定义
static class DataBuffer<T>
{
//使用阻塞队列保存数据
private ArrayBlockingQueue<T> dataList =
new ArrayBlockingQueue<>(MAX_AMOUNT);
// 向数据区增加一个元素,委托给阻塞队列
public void add(T element) throws Exception
{
dataList.add(element); //直接委托
}
/**
* 从数据区取出一个商品,委托给阻塞队列
*/
public T fetch() throws Exception
{
return dataList.take(); //直接委托
}
}
public static void main(String[] args) throws InterruptedException
{
Print.cfo("当前进程的ID是" + JvmUtil.getProcessID());
System.setErr(System.out);
//共享数据区,实例对象
DataBuffer<IGoods> dataBuffer = new DataBuffer<>();
//生产者执行的操作
Callable<IGoods> produceAction = () ->
{
//首先生成一个随机的商品
IGoods goods = Goods.produceOne();
//将商品加上共享数据区
dataBuffer.add(goods);
return goods;
};
//消费者执行的操作
Callable<IGoods> consumerAction = () ->
{
// 从PetStore获取商品
IGoods goods = null;
goods = dataBuffer.fetch();
return goods;
};
// 同时并发执行的线程数
final int THREAD_TOTAL = 20;
// 线程池,用于多线程模拟测试
<a name="DV7Lo"></a>
## 7.4.5 ArrayBlockingQueue构造器和成员
1. ArrayBlockingQueue构造器
```java
//默认非公平阻塞队列
ArrayBlockingQueue queue = new ArrayBlockingQueue(capacity);
//公平阻塞队列
ArrayBlockingQueue queue1 = new ArrayBlockingQueue(capacity,true);
//只带一个capacity参数的构造器
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
//带两个参数的构造器
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair); //根据fair参数构造公平锁/获取非公平锁
notEmpty = lock.newCondition(); //有元素加入,队列为非空
notFull = lock.newCondition(); //有元素被取出,队列为未满
}
ArrayBlockingQueue内部的成员变量 ```java public class ArrayBlockingQueue
extends AbstractQueue implements BlockingQueue<E>, java.io.Serializable {
/** 存储数据的数组 */
final Object[] items;
/**获取、删除元素的索引,主要用于take、poll、peek、remove方法 */
int takeIndex;
/**添加元素的索引,主要用于 put、offer、add方法*/
int putIndex;
/** 队列元素的个数 */
int count;
/** 控制并发访问的显式锁 */
final ReentrantLock lock;
/**notEmpty条件对象,用于通知take线程(消费队列),可执行删除操作 */
private final Condition notEmpty;
/**notFull条件对象,用于通知put线程(生产队列),可执行添加操作 */
private final Condition notFull;
/**
迭代器
*/
transient Itrs itrs = null;
}