并发
Thread.yield():
声明该线程已经执行完任务中最重要的部分,现在是切换到其他任务的好时机;
Executor:
Java SE5的java.util.concurrent包中包含执行器(Executor)来帮助管理Thread对象,简化了并发编程;Executor管理异步任务的执行,无须显示地管理线程的声明周期;
使用方法如下:
ExecutorService exec = Executors.newCachedThreadPool();for(int i = 0; i <5; i++)exec.execute(new LiftOff());exec.shutdown();
上述的newCachedThreadPool()会为每一个任务都创建一个线程,通常使用这个方法;除此之外,还有:
newCachedThreadPool(params: number):创建固定个数的线程newSingleThreadPool:创建单个线程,其他线程需等待该线程完成后再加入
从任务中产生返回值:
Runnable是执行工作的独立任务,但是它不返回任何值,如果我们希望在任务结束时返回一个值,那么我们需要用到Callable接口来替代Runnable接口;
使用方法如下:
import java.util.ArrayList;import java.util.concurrent.*;class TaskWithResult implements Callable<String> {private int id;public TaskWithResult(int id) {this.id = id;}public String call() {return "result of TaskWithResult " + id;}}public class CallableDemo {public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();ArrayList<Future<String>> results = new ArrayList<>();for (int i = 0; i <10; i++)results.add(exec.submit(new TaskWithResult(i))); // 利用exec.submit调用Callable的call方法for (Future<String> fs : results)try {System.out.println(fs.get());} catch (InterruptedException e) {System.out.println(e);return;} catch (ExecutionException e) {System.out.println(e);} finally {exec.shutdown();}}}
睡眠:
调用sleep()决定线程睡眠时间;
优先级:
优先级意味着更高的执行频率,并不会导致低优先级死锁的发生;
使用方法:setPriority()和getPriority()
JDK有十个优先级;
后台线程:
所谓后台(daemon)线程,是指在程序运行的时候在后台提供一种通用服务的线程,并且这种线程并不属于程序中不可或缺的一部分;当所有的非后台线程结束时,程序就终止了,同时会杀死进场中的所有后台线程;
import java.util.concurrent.TimeUnit;import static java.lang.Thread.currentThread;public class SimpleDaemons implements Runnable {public void run() {try {while(true) {TimeUnit.MILLISECONDS.sleep(100);System.out.println(Thread.currentThread() + " " + this);}} catch(InterruptedException e) {System.out.println("sleep() interrupted");}}public static void main(String[] args) throws Exception {for(int i = 0; i < 10; i++) {Thread daemon = new Thread(new SimpleDaemons());daemon.setDaemon(true); // Must call before start();daemon.start();}System.out.println("All daemons started");TimeUnit.MILLISECONDS.sleep(175);}}
必须在线程启动之前调用setDaemon()方法才能把它设置为后台线程;
加入一个线程:join()
作用:
主线程生成并起动了子线程,而子线程里要进行大量的耗时的运算,当主线程处理完其他的事务后,需要用到子线程的处理结果,这个时候就要用到join()方法了。
主线程等待子线程的终止。也就是在子线程调用join()*方法后面的代码,只有等到子线程结束了才能执行。(Waits for this thread to die.)
解决共享资源竞争:
Java提供了synchronized关键字,为防止资源冲突提供了支持。
synchronized void f() { /* ... */ }
- 使用synchronized关键字时,将域设置为private时必须的,否则synchronized关键字无法阻止其他任务直接访问域,因此发生冲突;
- 每个访问临界共享资源的方法都必须被同步,否则他们就不会正确地工作;
也可以使用显式的Lock对象:
- 必须显式创建、锁定和释放,不够优雅,但对于某些问题来说,更加灵活
private int currentEvenValue = 0;private Lock lock = new ReentrantLock();lock.lock();try {++currentEvenValue;Thread.yield(); // 无效,因为已经上锁++currentEvenValue;return currentEvenValue;} finally {lock.unlock();}
注意:return语句必须在try子句中出现,以确保unlock()不会过早地发生,从而将数据暴露给了第二个任务!
尽管Lock的try-finaly所需的代码比synchronized要复杂,但是使用Lock的方式可以在事务出错的时候去做正确的清理工作,而不是仅仅像synchronized一样抛出一个异常。
PS:一般使用synchronized关键字即可;在解决特殊问题时才使用Lock,
原子性与易变性:
在Java线程中,有一个不正确的知识是“原子操作不需要进行同步控制”。千万不要依赖原子性去实现同步;
volatile:关键字确保了变量的可见性,如果将一个域声明为volatile,那么只要对这个域产生了写操作,那么所有的读操作久都可以看到这个修改。即便使用了本地缓存,情况也确实如此,volatile域会被立即写入到主存当中,而读取操作就发生在主存当中。
原子性和易变性是不同的概念。在非volatile域上的原子操作不必刷新到主存中,因此其他读取该域的任务也不必看到这个新值。如果多个任务在同时访问某个域,那么这个域就应该是volatile的,否则这个域就应该经由同步来访问。同步也会导致向主存中刷新,因此如果一个域完全由synchronized方法或语句块来保护,那就不必将其设置为volatile的。
注意:当一个域的值依赖于其他域的时候,volatile就无法工作了,例如Range类的lower和upper边界,lower必须<=upper,所以单独对lower使用volatile就没有用了。
使用volatile而不是synchronized的唯一安全情况就是类中只有一个可变的域。
我们的第一选择应该是synchronized关键字,这是最安全的方式,尝试其他方式都是有风险的。
基本上,如果一个域可能会被多个任务同时访问,或者这些任务重至少有一个是写入任务,那么就应该将这个域设置为volatile的。如果将一个域定义为volatile,那么它就会告诉编译器不要执行任何移除读取和写入操作的优化,这些操作的目的是用线程中的局部变量维护对这个域的精确同步。
什么时候该使用同步?
Brian同步规则:如果你正在写一个变量,它可能接下来将被另一个线程读取,或者正在读取一个上一次已经被另一个线程写过的变量,那么你必须使用同步,并且, 读写线程都必须用相同的监视器锁同步。
原子类:
Java SE5引入了诸如AtomicInteger、AtomicLong、AtomicReference等特殊的原子性变量类。他们提供下面形式的原子性条件更新操作:
boolean compareAndSet(expectedValue, updateValue)
原子类对于常规编程来说很少用上,但是在涉及性能调优时很有帮助。
临界区:
有时,我们只是希望防止多个线程同时访问方法内部的部分代码而不是访问整个方法。通过这种方式分离出来的代码段被称为临界区,它也使用synchronized关键字建立。
synchronized(syncObject) {}
在这里,synchronized被用来指定某个对象,次对象的锁被用来对花括号内的代码进行同步控制,这也被称为同步控制块;在进入此段代码之前,必须得到syncObject对象的锁。
线程状态:
- 新建(new):当线程被创建时短暂处于这种状态。此时它已经分配了必须的系统资源,病执行了初始化。此刻线程已经有资格获得CPU时间了,之后调度器将把这个线程转变为可运行状态或阻塞状态。
- 就绪(Runnable):在这种状态下,只要调度器把时间片分配给线程,线程就可以运行。也就是说,在任意时刻,线程可以运行也可以不运行。只要调度器能分配时间片给线程,他就可以运行;
- 阻塞(Blocked):线程能够运行,但有某个条件阻止它的运行。当线程处于阻塞状态时,调度器将忽略线程,不会分配给线程任何CPU时间。直到线程重新进入了就绪状态,它才有可能执行操作。
- 死亡(Dead):处于死亡或者终止状态的线程将不再是可调度的,并且再也不会得到CPU时间片,她的任务已经结束,或不再是可运行的。任务死亡的通常方式是从run()方法返回。
线程本地存储:
防止任务在共享资源上的冲突第二种方式就是根除对变量的共享,也就是通过线程本地存储,为使用相同变量的每个不同线程都创建不同的存储,如果有5个线程都需要使用变量x所表示的对象,那线程本地存储就会生成5个用于x的不同的存储块。
public class TestThreadLocal {
private static final ThreadLocal<Integer> value = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 0;
}
};
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new Thread(new TestThread(i)).start();
}
}
static class TestThread implements Runnable {
private int index;
public TestThread(int index) {
this.index = index;
}
public void run() {
System.out.println("线程" + index + "的累加前:" + value.get());
for (int i = 0; i < 5; i++) {
value.set(value.get() + i);
}
System.out.println("线程" + index + "的累加后:" + value.get());
}
}
}
新类库中的构件:
CountDownLatch:
CountDownLatch是在Java1.5中引入的,它能够使一个线程等待其他线程完成各自的工作之后再执行。例如应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。
CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1.当计数器的值到达0时,他表示所有的线程已经完成了任务,然后再闭锁上等待的线程就可以恢复执行任务了。
CountDownLatch的伪代码如下所示:
`//Main thread start``//Create CountDownLatch for N threads``//Create and start N threads``//Main thread wait on latch``//N threads completes there tasks are returns``//Main thread resume execution`
CountDownLatch.java类中定义的构造函数:
`//Constructs a CountDownLatch initialized with the given count.``public` `void` `CountDownLatch(``int` `count) {...}`
构造器中的计数值(count)实际上就是闭锁需要等待的线程数量。这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值。
与CountDownLatch的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch.await()方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。
其他N 个线程必须引用闭锁对象,因为他们需要通知CountDownLatch对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的count值就减1。所以当N个线程都调 用了这个方法,count的值等于0,然后主线程就能通过await()方法,恢复执行自己的任务。
使用场景:
出在java实时系统中CountDownLatch都有哪些使用场景:
- 实现最大的并行性:有时我们想同时启动多个线程,实现最大程度的并行性。例如,我们想测试一个单例类。如果我们创建一个初始计数为1的CountDownLatch,并让所有线程都在这个锁上等待,那么我们可以很轻松地完成测试。我们只需调用 一次countDown()方法就可以让所有的等待线程同时恢复执行。
- 开始执行前等待n个线程完成各自任务:例如应用程序启动类要确保在处理用户请求前,所有N个外部系统已经启动和运行了。
- 死锁检测:一个非常方便的使用场景是,你可以使用n个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁。
CyclicBarrier:
CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
假设场景:
假设有只有的一个场景:每个线程代表一个跑步运动员,当运动员都准备好后,才一起出发,只要有一个人没有准备好,大家都等待。这和CountDownLatch是不同的。
两者区别:- CountDownLatch 是计数器, 线程完成一个就记一个, 就像 报数一样, 只不过是递减的.
- 而CyclicBarrier更像一个水闸, 线程执行就想水流, 在水闸处都会堵住, 等到水满(线程到齐)了, 才开始泄流.
| 构造方法 |
|---|
CyclicBarrier(int parties) 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在每个 barrier 上执行预定义的操作。 |
CyclicBarrier(int parties, Runnable barrierAction) 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行 |
| 方法 | |
|---|---|
int |
await() 在所有参与者)都已经在此 barrier 上调用 await 方法之前,将一直等待。 |
int |
await(long timeout, TimeUnit unit) 在所有参与者)都已经在此屏障上调用 await 方法之前,将一直等待。 |
int |
getNumberWaiting() 返回当前在屏障处等待的参与者数目。 |
int |
getParties() 返回要求启动此 barrier 的参与者数目。 |
boolean |
isBroken() 查询此屏障是否处于损坏状态。 |
void |
reset() 将屏障重置为其初始状态。 |
Semaphre:
Semaphore当前在多线程环境下被扩放使用,操作系统的信号量是个很重要的概念,在进程控制方面都有应用。Java 并发库 的Semaphore 可以很轻松完成信号量控制,Semaphore可以控制某个资源可被同时访问的个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。比如在Windows下可以设置共享文件的最大客户端访问个数。
public class TestSemaphore {
public static void main(String[] args) {
// 线程池
ExecutorService exec = Executors.newCachedThreadPool();
// 只能5个线程同时访问
final Semaphore semp = new Semaphore(5);
// 模拟20个客户端访问
for (int index = 0; index < 20; index++) {
final int NO = index;
Runnable run = new Runnable() {
public void run() {
try {
// 获取许可
semp.acquire();
System.out.println("Accessing: " + NO);
Thread.sleep((long) (Math.random() * 10000));
// 访问完后,释放
semp.release();
System.out.println("-----------------" + semp.availablePermits());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
exec.execute(run);
}
// 退出线程池
exec.shutdown();
}
}
- DelayQueue
类介绍
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E>
DelayQueue 是 Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。即使无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待。例如,size 方法同时返回到期和未到期元素的计数。此队列不允许使用 null 元素。
放入DelayQueue的对象需要实现Delayed接口。
public interface Delayed extends Comparable<Delayed> {
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}
Demo:
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @author lujianing01@58.com
* @Description:
* @date 2016/6/21
*/
public class DelayQueueTest {
public static void main(String[] args) {
DelayQueue<DelayedElement> delayQueue = new DelayQueue<DelayedElement>();
//生产者
producer(delayQueue);
//消费者
consumer(delayQueue);
while (true){
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 每100毫秒创建一个对象,放入延迟队列,延迟时间1毫秒
* @param delayQueue
*/
private static void producer(final DelayQueue<DelayedElement> delayQueue){
new Thread(new Runnable() {
@Override
public void run() {
while (true){
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
DelayedElement element = new DelayedElement(1000,"test");
delayQueue.offer(element);
}
}
}).start();
/**
* 每秒打印延迟队列中的对象个数
*/
new Thread(new Runnable() {
@Override
public void run() {
while (true){
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("delayQueue size:"+delayQueue.size());
}
}
}).start();
}
/**
* 消费者,从延迟队列中获得数据,进行处理
* @param delayQueue
*/
private static void consumer(final DelayQueue<DelayedElement> delayQueue){
new Thread(new Runnable() {
@Override
public void run() {
while (true){
DelayedElement element = null;
try {
element = delayQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis()+"---"+element);
}
}
}).start();
}
}
class DelayedElement implements Delayed {
private final long delay; //延迟时间
private final long expire; //到期时间
private final String msg; //数据
private final long now; //创建时间
public DelayedElement(long delay, String msg) {
this.delay = delay;
this.msg = msg;
expire = System.currentTimeMillis() + delay; //到期时间 = 当前时间+延迟时间
now = System.currentTimeMillis();
}
/**
* 需要实现的接口,获得延迟时间 用过期时间-当前时间
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
}
/**
* 用于延迟队列内部比较排序 当前时间的延迟时间 - 比较对象的延迟时间
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("DelayedElement{");
sb.append("delay=").append(delay);
sb.append(", expire=").append(expire);
sb.append(", msg='").append(msg).append('\'');
sb.append(", now=").append(now);
sb.append('}');
return sb.toString();
}
}
PriorityBlockingQueue:
PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最高的元素,是二叉树最小堆的实现,研究过数组方式存放最小堆节点的都知道,直接遍历队列元素是无序的。
