并发的多面性
更快的管理
如果有一台多处理器,就可以在这些处理器中分配任务,但是并发通常时提高在运行在单处理器上的程序的性能
并且单位时间内只有一个线程执行
如果程序中的某个任务应为该程序控制范围之外的某些条件而导致不能够继续运行,就说这个线程阻塞了
改进代码设计
Java的线程机制是抢占式的,这表示调度机制会在周期性的中断线程,切换到另外一个线程,从而为每个线程都提供时间片,使得每个线程都会分配到数量合理的时间去驱动它的任务
基本的线程机制
并发编程使我们可以将程序分为多个分离的,独立运行的任务。一个线程就是在进程中的一个单一的顺序控制流,因此进程可以拥有多个并发执行的任务
定义任务
描述任务,只需要实现Runnable接口即可
public class ListOff implements Runnable {
//描述接口只需要实现这个接口
private int countDown = 10;
private static int taskCount = 0;
private final int id = taskCount++;
public ListOff(){}
public ListOff(int countDown) {
this.countDown = countDown;
}
public String status(){
return "#" + id + "(" + (countDown > 0 ? countDown : "ListOff! ") + ")";
}
@Override
public void run() {
while (countDown-- > 0) {
System.out.println(status());
Thread.yield();
//“让一下”,可以让别的线程抢占,但是抢到抢不到另说
}
}
}
可以在main方法中直接调用run方法
public class MainThread {
public static void main(String[] args) {
final ListOff listOff = new ListOff();
listOff.run();//直接调用该方法
}
}
Thread类
public class BasicThread {
public static void main(String[] args) {
final Thread thread = new Thread(new ListOff());
thread.start();//执行线程必须的初始化
System.out.println("waiting for ListOff");
//从输出中看,虽然开始线程的运行,但是先打印出来的使main方法中的输出语句
}
}
可以添加更多的线程去驱动更多的任务
public class MoreBasicThreads {
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new Thread(new LiftOff()).start();//每一个线程都拥有一个LiftOff实例,所以不会发生资源竞争的问题
//所以不会产生多资源问题
}
System.out.println("等待发射......");
}
}
public static void main(String[] args) {
final Thread thread = new Thread(new LiftOff());
for (int i = 0; i < 5; i++) {//一个线程五个资源:IllegalThreadStateException
thread.start();
}
System.out.println("等待发射......");
}
}
输出说明不同任务的执行在线程被换进换出时混在了一起,线程调度器会在这些处理器之间默默的分发线程
使用Executor
Executor在客户端和任务执行之间提供了一个间接层,与客户端直接执行任务不同,这个中介将执行任务。Executor允许管理异步任务的执行,而无需显示的管理线程的声明周期
ExecutorService:(具有服务声明周期的Executor)知道如何构建恰当的上下文来执行Runnable。
CachedThreadPool:为每一个任务都创建一个线程(创建与所需数量相同的线程)
ExecutorService对象是使用静态的Executor方法创建的
public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();//底层是用Thread实现
//为每个任务都创建一个线程,并且如果以前创建的线程可用,会重用他们
for (int i = 0; i < 5; i++) {
exec.execute(new LiftOff());
}
exec.shutdown();//先执行Shutdown,如果有未完成的任务,会把任务完成,之后不允许再次提交任务
//如果在ShutDown之后再提交,会报错
}
}
FixedThreadPool使用了有限的线程集来执行所提交的任务
public class FixedThreadPool {
public static void main(String[] args) {
ExecutorService exec = Executors.newFixedThreadPool(5);
//一次分配五个线程(可以分配任意个线程)
for (int i = 0; i < 5; i++) {
exec.execute(new LiftOff());//在线程中执行任务
}
exec.shutdown();
}
}
在任何线程池中,如果已经创建的线程可用,都可以自动复用
SingleThreadExecutor:只有一个线程,就像是数量为1的FixedThreadPool,用这种方式,并不需要在共享资源上处理同步
线程池内部实现方式:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
从任务中产生返回值
Runnable是执行工作的独立任务,他没有任何返回值,如果希望在完成时能够返回一个值,那么可以实现Callable接口而不是Runnable。
Callable是一个具有类型参数的泛型,它的类型参数是从方法call()中返回的值,并且必须使用ExecutorService.submit()方法调用它。
public Thread(Runnable target) {//Callable不能再线程中提交任务
init(null, target, "Thread-" + nextThreadNum(), 0);
class TaskWithResult implements Callable<String> {
private int id;
public TaskWithResult(int id) {
this.id = id;
}
@Override
public String call() throws InterruptedException {
TimeUnit.SECONDS.sleep(2);
return "result of TaskWithResult " + id;
}
}
public class CallableDemo {
public static void main(String[] args) {
System.out.println("------------");
ExecutorService exec = Executors.newCachedThreadPool();
ArrayList<Future<String>> result = new ArrayList<>();
for (int i = 0; i < 10; i++) {
result.add(exec.submit(new TaskWithResult(i)));
}
for (Future<String> fs : result) {
try {
System.out.println("0000000000000");
System.out.println(fs.get());//调用get方法获取结果
//System.out.println("1111111111111");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}finally {
exec.shutdown();
}
}
System.out.println("==============");
}
}
class TaskResult implements Callable<String> {
private final int id;
private static final Random rand = new Random(47);
public TaskResult(int id){
this.id = id;
}
@Override
public String call() throws Exception {
int timeout = rand.nextInt(10);
TimeUnit.SECONDS.sleep(timeout);//随机睡眠时间
return "当前线程名字 : " + Thread.currentThread().getName() + " " +
"执行了: " + timeout + " id : " + id;
}
}
public class CallableDemo {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
ArrayList<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
futures.add(exec.submit(new TaskResult(i)));
}
long start = System.currentTimeMillis();
for (Future<String> future : futures) {
try {
System.out.println(future.get());//只要线程完成了就会立即获取
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
System.out.println(System.currentTimeMillis() - start / 1000);
exec.shutdown();
}
}
class TaskResult2 implements Callable<String> {
private final int i;
TaskResult2(int i) {
this.i = i;
}
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(5);//模拟任务耗时的时间
return "当前线程名字:" + Thread.currentThread().getName() + " " + i;
}
}
public class CallableDemo2 {
public static void main(String[] args) {
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<String> future = exec.submit(new TaskResult2(1));
try {
System.out.println("11111111111111111111");
//System.out.println(future.get());//等待结果的获取 阻塞程序的执行
System.out.println(future.get(1, TimeUnit.SECONDS));//只等待1秒钟获取结果
System.out.println("22222222222222222222");
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
exec.shutdown(); //并不会阻塞当前程序的执行;如果任务没有执行完成的话,将等待任务执行完成之后再关闭
System.out.println("这是main方法的程序......");
}
}
class TaskResult3 implements Callable<String> {
private final int i;
TaskResult3(int i) {
this.i = i;
}
@Override
public String call() throws Exception {
System.out.println("--------before sleep---------");
TimeUnit.SECONDS.sleep(i);//模拟任务耗时的时间
System.out.println("--------after sleep---------");
return "当前线程名字:" + Thread.currentThread().getName() + " " + i;
}
}
public class CallableDemo3 {
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<String> future = exec.submit(new TaskResult3(3));
try {
System.out.println("---------1---------");
System.out.println(future.get());
System.out.println("---------2---------");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
exec.shutdown();
System.out.println("其他程序继续执行");
}
}
休眠
影响任务行为的一种简单的方法是调用sleep();这将是任务终止执行给定的时间
public class SleepingTask extends LiftOff {
public void run() {
try {
while (countDown-- > 0) {
System.out.print(status());
TimeUnit.SECONDS.sleep(2);
}
} catch (InterruptedException e) {
System.err.println("Interrupted");
}
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++)
exec.execute(new SleepingTask());
exec.shutdown();
}
}
优先级
调度器倾向于让优先权最高的线程先执行,但是,这并不是意味着优先权低的得不到执行,(优先权并不会导致死锁)可以使用getpriority()来读取现有线程的优先权,并且任何时刻都可以通过setPriority()来修改它
public class SimplePriorities implements Runnable {
private int countDown = 5;
private volatile double d; // No optimization
private int priority;
public SimplePriorities(int priority) {
this.priority = priority;
}
public String toString() {
return Thread.currentThread() + ": " + countDown;
}
public void run() {
Thread.currentThread().setPriority(priority);
//驱动对该任务的引用
while (true) {
for (int i = 1; i < 100000; i++) {
d += (Math.PI + Math.E) / (double) i;
if (i % 1000 == 0)
Thread.yield();
}
System.out.println(this);
if (--countDown == 0) return;
}
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++)
exec.execute(
new SimplePriorities(Thread.MIN_PRIORITY));
exec.execute(
new SimplePriorities(Thread.MAX_PRIORITY));
exec.shutdown();
}
}
让步
yield():干一会,停一下,然后让别的线程可以抢一下,并不是说一定能够抢占成功
后台线程
后台线程指在程序运行时再后台提供的一种服务的线程。当所有的非后台线程结束的时候,程序也就终止了,并且会杀死程序中所有的后台线程,所以说只要还有非后台线程还在运行,程序就不会终止
public class SimpleDaemons implements Runnable {
@Override
public void run() {
try {
while (true) {
TimeUnit.MILLISECONDS.sleep(100);
System.out.println(Thread.currentThread() + " " + this);
}
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("sleep() interrupted");
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
Thread daemon = new Thread(new SimpleDaemons());
daemon.setDaemon(true);//将其设置成为后台线程
daemon.start();
}
System.out.println("All daemons started");
TimeUnit.MILLISECONDS.sleep(150);//如果它的休眠时间和它执行的休眠时间差不多的话,可能它执行不完
}
}
必须要在线程启动之前将其设置为后台线程
上面的代码创建了显式的线程,可以设置他们的后台标志。
通过编写ThreadFactory可以定制由Executor创建的线程的属性(后台,优先级,名称)
public class DaemonFromFactory implements Runnable {
@Override
public void run() {
try {
while (true) {
TimeUnit.MILLISECONDS.sleep(100);
System.out.println(Thread.currentThread() + " " + this);
TimeUnit.MILLISECONDS.sleep(200);
System.out.println("是否执行了一半"); //第二此任务只会执行一半
}
} catch (InterruptedException e) {
System.out.println("Interrupted");
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool(new DaemonThreadFactory());
//使用线程池来创建后台线程
for (int i = 0; i < 10; i++) {
exec.execute(new DaemonFromFactory());
}
System.out.println("All Daemon started");
TimeUnit.MILLISECONDS.sleep(500);
}
}
可以通过isDaemon()方法来确定线程是否是一个后台线程。如果是一个后台线程,那么它创建的任何线程都将被自动的设置成后台线程
class Daemon implements Runnable{
Thread[] t = new Thread[10];
@Override
public void run() {
for (int i = 0; i < t.length; i++) {
t[i] = new Thread(new DaemonSpawn());
//DaemonSpawn并没有显式的说明他是后台线程,但是已经将Daemon设置成为了后台线程
//所以DaemonSpawn也被设置成为了后台线程
t[i].start();
System.out.println("Daemon " + i + " started");
}
for (int i = 0; i < t.length; i++) {
System.out.println("t[+" + i + "].isDaemon()=" + t[i].isDaemon() + ",");
}
while (true) {
Thread.yield();
}
}
}
class DaemonSpawn implements Runnable{
@Override
public void run() {
while (true) {
Thread.yield();
}
}
}
public class Daemons {
public static void main(String[] args) throws InterruptedException {
Thread d = new Thread(new Daemon());
d.setDaemon(true);//将Daemon设置成为后台线程
d.start();
System.out.println("d.isDaemon() = " + d.isDaemon() + " ");
TimeUnit.SECONDS.sleep(1);
}
}
后台线程在不执行finally子句的情况下就会终止其run()方法
class ADaemon implements Runnable{
@Override
public void run() {
try {
System.out.println("Starting ADaemon");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
System.out.println("Exiting via InterruptedException");
}finally {
System.out.println("This should always run? ");
//并没有执行finally子句,线程就结束了
}
}
}
public class DaemonsDontRunFinally {
public static void main(String[] args) {
Thread t = new Thread(new ADaemon());
t.setDaemon(true);
t.start();
}
}
当最后一个非后台线程终止时,后台线程会突然终止,一次,一旦main方法退出,所有的后台线程都将会关闭
编码的变体
在非常简单的情况下,可以直接从Thread继承这种可替换的方式
public class SimpleThread extends Thread {
private int countDown = 5;
private static int threadCount = 0;
public SimpleThread(){
super(Integer.toString(++threadCount));//分配一个新的Thread对象
start();
}
@Override
public String toString() {
return "#" + getName() + "(" + countDown + ")";
}
public void run(){
while (true) {
System.out.println(this);
if (--countDown == 0) {
return;
}
}
}
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new SimpleThread();
}
}
}
另一种实现Runnable方法
public class SelfManaged implements Runnable {
private int countDown = 5;
private Thread t = new Thread(this);//分配一个新的Thread对象
public SelfManaged(){
t.start();
}
@Override
public String toString() {
return Thread.currentThread().getName() + "(" + countDown + ")";
}
@Override
public void run() {
while (true) {
System.out.println(this);
if (countDown-- == 0) {
return;
}
}
}
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new SelfManaged();
}
}
}
两者还是由区别的,实现Runnable接口可以继承另外的类,但是继承了Thread就不能在继承别的类,因为Java不允许多继承
术语
对Thread类实际没有任何控制权,实际操作是:创建任务,通过某种方式将一个线程附着在任务上,使得这个线程可以驱动任务,Thread类自身不执行任何操作,它只是驱动赋予它的任务(任务:比如Runnable接口中的run方法中的业务逻辑)
加入一个线程
一个线程可以在其他线程之上调用join方法,他会等待一段时间,直到这个线程结束(插队,谁调用join方法谁先执行),对join的调用可以被中断,在调用的线程上调用interrupt()方法,这时需要try-catch子句
被加入的线程一定会被挂起?
class Sleeper extends Thread {
private int duration;
public Sleeper(String name, int sleepTime) {
super(name);
duration = sleepTime;
start();//在构造器中间直接开始执行线程
}
public void run(){
try {
sleep(duration);
} catch (InterruptedException e) {
//被打断的信息在try catch之后它的被打断的信息就会被清除,所以isInterrupted在catch之后会返回到false
System.out.println(getName() + " was interrupted " + "isInterrupted(): " + isInterrupted());
return;
}
System.out.println(getName() + "has awakened");//被打断,所以这一行不会执行
}
}
class Joiner extends Thread {
private Sleeper sleeper;
public Joiner(String name, Sleeper sleeper) {
super(name);
this.sleeper = sleeper;
start();//在构造器中直接开始线程
}
public void run(){
try {
sleeper.join();//插队,先执行,sleeper执行之后才会执行joiner
} catch (InterruptedException e) {
System.out.println("Interrupted");
}
System.out.println(getName() + " join completed");
}
}
public class Joining {
public static void main(String[] args) {
//Sleeper sleepy = new Sleeper("Sleepy", 1500);
Sleeper grumpy = new Sleeper("Grumpy", 1500);
//Joiner dopey = new Joiner("Dopey", sleepy);
Joiner doc = new Joiner("Doc", grumpy);
grumpy.interrupt();
}
}
package stu;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class Aoo implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("Aoo 线程 持有时间片");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Boo implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Boo 线程 获得时间片");
}
}
public class SleepJoin {
public static void main(String[] args) throws InterruptedException {
final Thread aoo = new Thread(new Aoo());
System.out.println("Aoo 线程启动");
aoo.start();
for (int i = 0; i < 10; i++) {
TimeUnit.MILLISECONDS.sleep(200);
System.out.println("主线程");
}
aoo.join();
for (int i = 0; i < 10; i++) {
TimeUnit.SECONDS.sleep(1);
System.out.println("主线程 持有时间片");
}
}
}
joiner任务耗时的话,会继续执行,和sleeper执行完成或者中断没有任何关系
捕获异常
由于线程的本质特性,所以不能捕获从线程逃逸的异常,除非使用特殊的步骤捕获这种错误的异常
public class ExceptionThread implements Runnable{
@Override
public void run() {
throw new RuntimeException();//异常从线程中逃逸,没有被捕获,会打印错误信息
}
public static void main(String[] args) {
//即使将try-catch将其包裹也没有用
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new ExceptionThread());
}
}
Thread.UncaughtExceptionHandler允许你在每个Thread对象上都附着一个异常处理器
Thread.UncaughtExceptionHandler.uncaughtException()会在线程因未捕获的异常而临近死亡时调用它
class ExceptionThread2 implements Runnable {
@Override
public void run() {
Thread t = Thread.currentThread();//当前的线程
System.out.println("run() by " + t);
System.out.println("eh = " + t.getUncaughtExceptionHandler());
throw new RuntimeException();
}
}
class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("caught " + e);
}
}
class HandlerThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
//System.out.println(this + " creating new Thread");
Thread t = new Thread(r);//创建线程
//System.out.println("created " + t);
t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());//设置线程处理器
//System.out.println("eh = " + t.getUncaughtExceptionHandler());
return t;
}
}
public class CaptureUncaughtException {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool(new HandlerThreadFactory());
exec.execute(new ExceptionThread2());//执行任务
}
}
共享资源受限
不正确的访问资源
public abstract class IntGenerator {
private volatile boolean canceled = false;
public abstract int next();
public void cancel(){
canceled = true;
}
public boolean isCanceled(){
return canceled;
}
}
canceled的标志的boolean类型的,它是原子性的赋值和返回值这样的操作在发生时没有中断的可能,因此时看不到这些简单操作时的中间状态
public class EvenChecker implements Runnable{
private IntGenerator generator;
private final int id;
public EvenChecker(IntGenerator g, int ident) {
generator = g;
id = ident;
}
@Override
public void run() {
while (!generator.isCanceled()) {
int val = generator.next();
if (val % 2 != 0) {
System.out.println(val + "not even");
generator.cancel();
}
}
}
public static void test(IntGenerator gp, int count) {
System.out.println("Press Control-C to exit");
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < count; i++) {
exec.execute(new EvenChecker(gp,i));
}
exec.shutdown();
}
public static void test(IntGenerator gp) { //重载test方法,这时候只有一个IntGenerator类型的gp
//所以他们访问的资源时同一个
test(gp, 10);
}
}
test重载之后,十个线程访问的是同一个gp,并且上述代码通过使代码依赖于非任务对象,可以消除潜在的竞争条件()
public class EvenGenerator extends IntGenerator {
private int currentEvenValue = 0;
@Override
public int next() {
++currentEvenValue;//此处的操作每次都分三步走的,所以多线程访问的时候并不安全
++currentEvenValue;
return currentEvenValue;
}
public static void main(String[] args) {
EvenChecker.test(new EvenGenerator());
}
}
解决共享资源竞争
防止多个线程在访问同一个资源出问题的方法:当这个资源被某一个任务使用时,就给他加上锁,第一个访问某项资源的任务必须锁定这项资源,在被解锁之前,其他的任务不能访问这项资源
基本上所有的并发在解决线程冲突的时候都是采用顺序访问的方式,一段时间只有一个任务可以运行这段代码
Java提供了关键字synchronized,当任务执行被他保护的代码时,它会检查锁是否可用,然后来获取锁,执行代码,然后释放锁
如果某个任务正在执行被标记为synchronized的方法时,如果他没有冲该方法返回,那么在这个线程从该方法返回之前,其他要调用这个方法的线程都会被阻塞
public class InstanceSynchronized {
public synchronized void f(){
try {
System.out.println("进入到f()方法");
TimeUnit.SECONDS.sleep(3);
System.out.println("f()方法执行结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public synchronized void g(){
try {
System.out.println("进入到g()方法");
TimeUnit.SECONDS.sleep(5);
System.out.println("g()方法执行结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
final InstanceSynchronized in = new InstanceSynchronized();
new Thread(in::f).start();
//使用实例来调用两个方法,现在称synchornized为实例锁
new Thread(in::g).start();
f方法执行结束,释放了锁之后,这个实例才能调用被标记为synchornized的g方法
}
}
在实例调用任意的synchronized方法的时候,这个实例都会被加上锁,这是这个实例上的其他synchronized方法只有等前一个调用完毕了之后释放了锁之后才能够被调用
注意,在使用并发时,将域设置成为private是十分重要的,否则,synchronized关键字就不能防止其他的任务直接访问域,这样会产生冲突
public class SynchronizedWithPrivateDemo {
int num = 0;
synchronized int getNum() {
num++;
num++;
return num;
}
}
class Demo {
public static void main(String[] args) {
SynchronizedWithPrivateDemo demo = new SynchronizedWithPrivateDemo();
new Thread(() -> {
int res = demo.getNum();
/* try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
System.out.println(Thread.currentThread().getName() + "->" + res);
}, "get-1").start();
new Thread(() -> {
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
demo.num = 9999; //可以随意的将num的值更改,这并不是线程安全的
}, "update-1").start();
new Thread(() -> {
try {
TimeUnit.MILLISECONDS.sleep(400);
} catch (InterruptedException e) {
e.printStackTrace();
}
int res = demo.getNum();
System.out.println(Thread.currentThread().getName() + "->" + res);
}, "get-2").start();
}
}
如果正在写一个变量,它可能接下来被另一个线程读取,或者正在读取一个上一次已经被另一个线程写过的变量,那么必须使用同步,并且,读写线程都必须用相同的监视器锁同步(final修饰的线程一定是线程安全的)
同步控制EvenGenerator
public class SynchronizedEvenGenerator extends IntGenerator{
private int currentEvenValue = 0;
@Override
public synchronized int next() { //上锁,其他的线程就进不来了
++currentEvenValue;
Thread.yield();//已经上锁了,其他的线程抢不到
++currentEvenValue;
return currentEvenValue;
}
public static void main(String[] args) {
EvenChecker.test(new SynchronizedEvenGenerator());
}
}
使用显示的Lock对象
Lock对象和Synchronized相比,必须显示的创建Lock对象,显示的锁定和释放
public class MutexEvenGenerator extends IntGenerator {
private int currentEvenValue = 0;
private Lock lock = new ReentrantLock();//创建锁
ReentrantLock,不公平锁,可重入锁
@Override
public int next() {
lock.lock();//锁住
try {
++currentEvenValue;
Thread.yield();
++currentEvenValue;
return currentEvenValue;//先执行finally,在执行return
} finally {
lock.unlock();//释放锁
}
}
public static void main(String[] args) {
EvenChecker.test(new MutexEvenGenerator());
}
}
有了显示的Lock对象,就可以使用finally子句将系统维护在正确的状态
注意:return必须放在try语句中,为了防止unlock过早的发生,从而导致第二个任务进来
synchronized和Lock对象的区别:使用synchronized关键字的时候。需要写的代码量更少,并且用户出现错误的可能性更低,但是synchronized关键字不能尝试着获取锁且最终获取锁会失败,或者尝试着获取锁一段时间然后放弃它
public class AttemptLocking {
private ReentrantLock lock = new ReentrantLock();
public void untimed() throws InterruptedException {
TimeUnit.SECONDS.sleep(2);
boolean captured = lock.tryLock();
try {
System.out.println("tryLock()" + captured);
}finally {
if (captured) {
lock.unlock();
}
}
}
public void timed() {
boolean captured = false;
try {
System.out.println("tryLock(2,Times.SECONDS): " + captured);
}finally {
if (captured) {
lock.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException {
final AttemptLocking al = new AttemptLocking();
al.untimed();
al.timed();
new Thread(){ //可以将ReentrantLock看成是一个对象锁,谁先获得,将阻塞其他线程对该对象锁的获取
{setDaemon(true);}
public void run(){
al.lock.lock();
System.out.println("acquired");
}
}.start();
Thread.yield();
al.untimed();
al.timed();
}
}
ReentrantLock允许尝试获取锁,如果没有获取锁的话,可以离开去做一些其他的事情,而不是一直等待这个锁被释放
原子性与易变性
没有中间操作的操作,比如boolean,Atomicxxx的类都是原子类,这些代码不需要被同步
volitile关键字确保了应用中的可视性,如果将一个域声明为volatile的,那么只要对这个域进行了写操作,所有的读操作都会看到这个修改,修改volatile之后会被立即写入主存中,volatile一般配合着原子操作一起工作
public class AtomicityTest implements Runnable {
private int i = 0;
public int getValue(){
return i;
}
private synchronized void evenIncrement(){
i++;//这种操作都属于分布操作,所以可能会被读取到中间值
//必须将getValue也给他设置成为synchronized的
i++;
}
@Override
public void run() {
while (true) {
evenIncrement();
}
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
AtomicityTest at = new AtomicityTest();
exec.execute(at);
while (true) {
int val = at.getValue();
if (val % 2 != 0) {
System.out.println(val);
System.exit(0);
}
}
}
}
public class SerialNumberGenerator {//产生序列数字的类
private static volatile int serialNumber = 0;
public static int nextSerialNumber() {
return serialNumber++;
}
}
如果一个域可能会被多个任务同时访问,或者这些任务中至少有一个是写入任务,那么就应该将这个域设置成为volatile的
class CircularSet {
private int[] array;
private int len;
private int index = 0;
public CircularSet(int size) {
array = new int[size];
len = size;
for (int i = 0; i < size; i++)
array[i] = -1;
}
public synchronized void add(int i) {
array[index] = i;
index = ++index % len;
}
public synchronized boolean contains(int val) {
for (int i = 0; i < len; i++)
if (array[i] == val) return true;
return false;
}
}
public class SerialNumberChecker {
private static final int SIZE = 10;
private static CircularSet serials =
new CircularSet(1000);
private static ExecutorService exec =
Executors.newCachedThreadPool();
static class SerialChecker implements Runnable {
public void run() {
while (true) {
int serial =
SerialNumberGenerator.nextSerialNumber();//产生多线程问题的源头
if (serials.contains(serial)) {
System.out.println("Duplicate: " + serial);
System.exit(0);
}
serials.add(serial);
}
}
}
public static void main(String[] args) throws Exception {
for (int i = 0; i < SIZE; i++)
exec.execute(new SerialChecker());
if (args.length > 0) {
TimeUnit.SECONDS.sleep(new Integer(args[0]));
System.out.println("No duplicates detected");
System.exit(0);
}
}
}
CircularSet:持有所产生的所有序列数,另外还包括一个内嵌的SerialChecker类,它可以保证序列数是唯一的,创建多个任务来竞争序列数,最终会获得重复的序列数
原子类
AtomicInteger(乐观锁的原理),AtomicLong,AtomicReference等特殊的原子性变量类,提供以下形式的原子性条件更新操作:boolean comparaAndSet(expectedValue,updateValue);
类似boolean,进行原子操作,没有中间状态
public class AtomicIntegerTest implements Runnable {
private AtomicInteger i = new AtomicInteger(0);//给定了初始值
private int getValue(){return i.get();}//获取当前值
private void evenIncrement(){
i.addAndGet(2);}//以原子方式将给定值添加到当前值,返回更新后的值
@Override
public void run() {
while (true) {
evenIncrement();
}
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
final AtomicIntegerTest ait = new AtomicIntegerTest();
exec.execute(ait);
while (true) {
int val = ait.getValue();
if (val % 2 != 0) {//原子操作没有中间变换的状态,所以不会出错的
System.out.println(val);
System.exit(0);
}
}
}
}
public class AtomicEvenGenerator extends IntGenerator{
private AtomicInteger currentEvenValue = new AtomicInteger(0);
@Override
public int next() {
return currentEvenValue.addAndGet(2);
}
public static void main(String[] args) {
EvenChecker.test(new AtomicEvenGenerator());
}
}
临界区
有时,只希望防止多个线程同时访问方法内部的部分代码而不是访问整个方法。通过这种方式分离出来的代码被称为临界区,它也使用synchronized关键字建立,这里synchronized被用来指定某个对象,此对象的锁被用来对花括号内的代码进行同步控制
通过使用同步控制块,而不是对整个方法进行同步控制,可以使多个任务访问对象的时间新能进行显著提高
如何将一个线程不安全类,变成线程安全的:
class Pair { // Not thread-safe
private int x, y;
public Pair(int x, int y) {
this.x = x;
this.y = y;
}
public Pair() {
this(0, 0);
}
public int getX() {
return x;
}
public int getY() {
return y;
}
public void incrementX() {
x++;
}
public void incrementY() {
y++;
}
public String toString() {
return "x: " + x + ", y: " + y;
}
public class PairValuesNotEqualException
extends RuntimeException {
public PairValuesNotEqualException() {
super("Pair values not equal: " + Pair.this);
}
}
// Arbitrary invariant -- both variables must be equal:
public void checkState() {
if (x != y)
throw new PairValuesNotEqualException();
}
}
// Protect a Pair inside a thread-safe class:
abstract class PairManager {
AtomicInteger checkCounter = new AtomicInteger(0);
protected Pair p = new Pair();
private List<Pair> storage =
Collections.synchronizedList(new ArrayList<Pair>());//线程安全的容器持有一个线程不安全的类
//重新构建了一个pair的备份,该方法时线程安全的
public synchronized Pair getPair() {//读的时候保证线程安全
return new Pair(p.getX(), p.getY());
}
//仅仅为线程安全的容器中存入pair
protected void store(Pair p) {
storage.add(p);
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException ignore) {
}
}
//对x,y不断地生成一个点(x,y),并存储在线程安全的容器中
public abstract void increment();
}
// Synchronize the entire method:
class PairManager1 extends PairManager {//"写"的时候保证线程安全
public synchronized void increment() {
p.incrementX();
p.incrementY();
store(getPair());
}
}
// Use a critical section:
class PairManager2 extends PairManager {
public void increment() {
Pair temp;
synchronized (this) {
p.incrementX();
p.incrementY();
temp = getPair();
}
store(temp);
}
}
class PairManipulator implements Runnable {
private PairManager pm;
public PairManipulator(PairManager pm) {
this.pm = pm;
}
public void run() {
while (true) //不断的生成一个平面轴上的点(x,y)
pm.increment();
}
//pm.getPair() 拿到increment()生成的pair备份
public String toString() {
return "Pair: " + pm.getPair() +
" checkCounter = " + pm.checkCounter.get();//用来统计操作的
}
}
class PairChecker implements Runnable {
private PairManager pm;
public PairChecker(PairManager pm) {
this.pm = pm;
}
public void run() {
while (true) {
pm.checkCounter.incrementAndGet();//不断的递增,不断的记录操作的次数
pm.getPair().checkState();//同步的获取一个pair的备份,判断x,y是否相等
}
}
}
public class CriticalSection {
static void
testApproaches(PairManager pman1, PairManager pman2) {
ExecutorService exec = Executors.newCachedThreadPool();
PairManipulator
pm1 = new PairManipulator(pman1),
pm2 = new PairManipulator(pman2);
PairChecker
pcheck1 = new PairChecker(pman1),
pcheck2 = new PairChecker(pman2);
exec.execute(pm1);
exec.execute(pm2);
exec.execute(pcheck1);
exec.execute(pcheck2);
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
System.out.println("Sleep interrupted");
}
System.out.println("pm1: " + pm1 + "\npm2: " + pm2);
System.exit(0);
}
public static void main(String[] args) {
PairManager
pman1 = new PairManager1(),
pman2 = new PairManager2();
testApproaches(pman1, pman2);
}
}
在其他对象上同步
synchronized块必须给定一个在其上同步的对象,并且最合理的方式是:使用其方法正在被调用的对象:synchronized(this)。有时必须在另一个对象上同步,必须保证所有相关的任务都是在同一个对象上同步的
class DualSynch {
private Object syncObject = new Object();
public void f(){
synchronized(this) {//把他放在这里和直接放在方法里作用是一样的
//锁的是SyncObject中的实例
for (int i = 0; i < 5; i++) {
try {
TimeUnit.SECONDS.sleep(1);//不休眠不好看出来
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f()");
Thread.yield();
}
}
}
public void g() {
synchronized (syncObject) {//这个锁的是Object的实例,所以和上面的方法互不相干
for (int i = 0; i < 5; i++) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("g()");
Thread.yield();
}
}
}
}
public class SyncObject {
public static void main(String[] args) {
final DualSynch dualSynch = new DualSynch();
new Thread(){
public void run() {
dualSynch.f();
}
}.start();
new Thread(){
public void run(){
dualSynch.g();
}
}.start();
}
}
上述代码中的两个方法是相互独立的:一个是对象锁,一个是实例锁
线程本地存储
防止任务在共享资源上产生冲突的第二种方式是根除对变量的共享。线程本地存储是一种自动化机制,可以为使用相同变量的每一个不同的线程都创建不同的存储
创建和管理线程本地存储可以由ThreadLocal类来实现
class Accessor implements Runnable {
private final int id;
public Accessor(int idn) {
id = idn;
}
public void run() {
while (!Thread.currentThread().isInterrupted()) {//这个代码中并没有打断操作,所以是个死循环
ThreadLocalVariableHolder.increment();
System.out.println(this);
Thread.yield();
}
}
public String toString() {
return "#" + id + ": " +
ThreadLocalVariableHolder.get();
}
}
public class ThreadLocalVariableHolder {
private static ThreadLocal<Integer> value =
new ThreadLocal<Integer>() {
private Random rand = new Random(47);//每个线程都有专门的存储区域
protected synchronized Integer initialValue() {
return rand.nextInt(10000);
}
};
public static void increment() {
value.set(value.get() + 1);
}//获取值之后加1
public static int get() {
return value.get();
}
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++)
exec.execute(new Accessor(i));//id相同的都只加一
TimeUnit.SECONDS.sleep(3);
exec.shutdownNow();
}
}
如何证明ThreadLocal持有的内容是该线程原有的内容
ThreadLocal对象通常当作静态域存储
increament()和get()方法都不是synchronized的,因为ThreadLocal保证不会出现竞争条件
终结任务
首先装饰性花园记录每一个门的人数和总人数
class Count {
private int count = 0;
private Random rand = new Random(47);
public synchronized int increment() {
return ++count;
}
public synchronized int value() {
return count;
}
}
class Entrance implements Runnable {
private static Count count = new Count();
private static List<Entrance> entrances =
new ArrayList<>();
private int number = 0;//表示的是每个入口的人数
private final int id;
private static volatile boolean canceled = false;
public static void cancel() {
canceled = true;
}
public Entrance(int id) {
this.id = id;
entrances.add(this);
}
public void run() {
while (!canceled) {//单个入口的进入人数,以不断的递增1的方式统计,没有步进
synchronized (this) {
++number;
}
print(this + " Total: " + count.increment());//总人数
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
print("sleep interrupted");
}
}
print("Stopping " + this);
}
public synchronized int getValue() {
return number;
}
public String toString() {
return "Entrance " + id + ": " + getValue();
}
public static int getTotalCount() {
return count.value();
}
public static int sumEntrances() {
int sum = 0;
for (Entrance entrance : entrances)
sum += entrance.getValue();
return sum;
}
}
public class OrnamentalGarden {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++)
exec.execute(new Entrance(i));
TimeUnit.SECONDS.sleep(3);
Entrance.cancel();
exec.shutdown();
if (!exec.awaitTermination(250, TimeUnit.MILLISECONDS))//250毫秒没有终止的话,就会打印这个信息
print("Some tasks were not terminated!");
print("Total: " + Entrance.getTotalCount());
print("Sum of Entrances: " + Entrance.sumEntrances());
}
}
Entrance.canceled是一个volatile布尔标志,他只能被读取和赋值,所以不需要同步对其的访问,就可以安全的操作他
在阻塞时终结
中断
中断什么情况下才有用?
Thread类中的interrupt()方法,可以中介阻塞的任务
如果使用Executor,通过调用sumbit()而不是excutor()来启动任务,就可以持有该任务的上下文
class SleepBlocked implements Runnable {
public void run() {
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
print("InterruptedException");
}
print("Exiting SleepBlocked.run()");
}
}
class IOBlocked implements Runnable {
private InputStream in;
public IOBlocked(InputStream is) {
in = is;
}
public void run() {
try {
print("Waiting for read():");
in.read();
} catch (IOException e) {
if (Thread.currentThread().isInterrupted()) {
print("Interrupted from blocked I/O");
} else {
throw new RuntimeException(e);
}
}
print("Exiting IOBlocked.run()");
}
}
class SynchronizedBlocked implements Runnable {
public synchronized void f() {
while (true)
Thread.yield();
}
public SynchronizedBlocked() {
new Thread() {
public void run() {
f();
}
}.start();
}
public void run() {
print("Trying to call f()");
f();
print("Exiting SynchronizedBlocked.run()");
}
}
public class Interrupting {
private static ExecutorService exec =
Executors.newCachedThreadPool();
static void test(Runnable r) throws InterruptedException {
Future<?> f = exec.submit(r);
TimeUnit.MILLISECONDS.sleep(100);
print("Interrupting " + r.getClass().getName());
f.cancel(true);//取消任务的执行
print("Interrupt sent to " + r.getClass().getName());
}
public static void main(String[] args) throws Exception {
// test(new SleepBlocked()); //成功打断,没有问题
// test(new IOBlocked(System.in));//无法打断
test(new SynchronizedBlocked());
TimeUnit.SECONDS.sleep(3);
print("Aborting with System.exit(0)");
System.exit(0);
}
}
I/O和在synchronized块上的等待时不可中断的,不能中断正在试图获取synchornized锁或者试图执行I/O操作的线程
I/O阻塞无法通过外部的形式进行打断,I/OException属于当前线程中断的一种形式
class IOBlocked implements Runnable {
private InputStream is;
public IOBlocked(InputStream inputStream) {
is = inputStream;
}
@Override
public void run() {
System.out.println("waiting for read");
try {
while (true) {
int temp = is.read();//虽然发送了取消得指令,但依旧在此阻塞
if((char)temp == 'q'){
throw new IOException("quit");
//break;
}
}
} catch (IOException e) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("Interrupted from IO");
} else {
throw new RuntimeException(e);
}
}
System.out.println("Exiting IOBlocked run()...");
}
}
如何跳出死循环
private static AtomicInteger atomicInteger = new AtomicInteger();
private static volatile boolean flag = false;
public synchronized void f(){
/* while (true) {
Thread.yield();
}*/
while (!flag) { //跳出死循环的处理
//Thread.yield();
atomicInteger.addAndGet(2);
if(atomicInteger.get()==50){
flag = true;
}
}
}
public class CloseResource {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
ServerSocket server = new ServerSocket(8080);
InputStream socketInput =
new Socket("localhost", 8080).getInputStream();
exec.execute(new IOBlocked(socketInput));
exec.execute(new IOBlocked(System.in));
TimeUnit.MILLISECONDS.sleep(100);
print("Shutting down all threads");
exec.shutdownNow();
TimeUnit.SECONDS.sleep(1);
print("Closing " + socketInput.getClass().getName());
socketInput.close();
TimeUnit.SECONDS.sleep(1);
print("Closing " + System.in.getClass().getName());
System.in.close();
}
}
在shutdownNow()被调用之后以及在两个输入流上调用close()之前的延迟强调的是一旦底层资源被关闭,任务将接触阻塞
被互斥所阻塞
如果尝试在一个对象上调用其synchronized方法,如果这个对象的锁已经被其他任务获得,那么调用任务将被挂起,知道这个锁可以被获得
同一个互斥可以如何能被同一个任务多次获得
public class MultiLock {
public synchronized void f1(int count) {
if (count-- > 0) {
System.out.println("f1() calling f2() with count " + count);
f2(count);
}
}
public synchronized void f2(int count) {
if (count-- > 0) {
System.out.println("f2() calling f1() with count " + count);
f1(count);
}
}
public static void main(String[] args) {
final MultiLock multiLock = new MultiLock();
new Thread(){
public void run(){
multiLock.f1(5);
}
}.start();
}
}
一个任务应该能够调用同一个对象的其他的synchronized方法,并且这个任务已经持有锁了
如果将if方法换成while的话,从四开始f1调用f2 而且他还在循环,不断的调用
ReentrantLock上阻塞的任务具有可以被打断的特性
class BlockedMutex {
private Lock lock = new ReentrantLock();
public BlockedMutex(){
//lock.lock();//在构造器中就获取了一把锁,并且没有被释放,这样接下来的获取锁操作就会被阻塞
}
public void f() {
try {
//此方法获取可以被打断的锁,
// lock.lockInterruptibly();
//这个获取的锁是不会被打断的
lock.lock();
System.out.println("lock acquired in f()");
} catch (Exception e) {
System.out.println("Interrupted from lock acquisition");
}
}
}
class Block2 implements Runnable {
BlockedMutex blocked = new BlockedMutex();
@Override
public void run() {
System.out.println("Waiting for f() in BlockedMutex");
blocked.f();
System.out.println("Broken out of blocked call");
}
}
public class Interrupting2 {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(new Block2());
t.start();
TimeUnit.SECONDS.sleep(1);
System.out.println("Issuing t.interrupt()");
t.interrupt();
}
}
检查中断
当在线程上调用interrupt()是,中断发生的唯一时刻是任务要进入到阻塞操作中,或者已经在阻塞操作内部了(除了不可被中断的I/O操作和synchronized方法之外)
如果在run跑的时候没有任何阻塞操作,那将如何退出?
线程之间的协作
如何使任务彼此之间可以协作,使得多个任务可以一起工作去解决某个问题
当任务协作的时候,关键使这些任务之间的握手,为了实现这些握手,我们使用了相同的基础特性“互斥”
在互斥之上,我们为任务添加了一种途径,可以将自身挂起,直到外部条件发生变化比如volatile boolean,这中握手可以通过Object中的wait()方法和notify(),notifyAll()方法来安全的实现
wait()和notifyAll
wait()等待某个条件发生变化,这中条件一般是由另一个任务来改变,wait在等待外部条件产生变化的时候会将任务挂起,只有notify或notifyAll发生的时候,这个任务才会被唤醒并去检查锁产生的变化。可以说wait提供了一种在任务之间对活动同步的方式
sleep和yield方法都并没有释放锁,但是,当一个任务在方法中遇见了对wait的调用的时候,线程的执行将会被挂起,对象上的锁将会被释放,因为wait方法会释放锁,这就表明另一个任务可以获得这个锁,因此在该对象中的其他synchronized方法可以在wait期间被调用,因为此时未被锁
对于wait而言:在wait期间锁使被释放的,并且可以通过notify或者notifyAll,或者 零时间到期,从wait中恢复执行
wait,notify,notifyAll方法都是属于Object的一部分,而不是属于Thread的一部分
class Car {
private boolean waxOn = false;
public synchronized void waxed() {
waxOn = true;
notifyAll();
}
public synchronized void buffed() {
waxOn = false;
notifyAll();
}
public synchronized void waitForWaxing()
throws InterruptedException {
while (waxOn == false)
wait();
}
public synchronized void waitForBuffing()
throws InterruptedException {
while (waxOn == true)
wait();
}
}
class WaxOn implements Runnable {
private Car car;
public WaxOn(Car c) {
car = c;
}
public void run() {
try {
while (!Thread.interrupted()) {
printnb("Wax On! ");
TimeUnit.MILLISECONDS.sleep(200);
car.waxed();
car.waitForBuffing();
}
} catch (InterruptedException e) {
print("Exiting via interrupt");
}
print("Ending Wax On task");
}
}
class WaxOff implements Runnable {
private Car car;
public WaxOff(Car c) {
car = c;
}
public void run() {
try {
while (!Thread.interrupted()) {
car.waitForWaxing();
printnb("Wax Off! ");
TimeUnit.MILLISECONDS.sleep(200);
car.buffed();
}
} catch (InterruptedException e) {
print("Exiting via interrupt");
}
print("Ending Wax Off task");
}
}
public class WaxOMatic {
public static void main(String[] args) throws Exception {
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new WaxOff(car));
exec.execute(new WaxOn(car));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}
错误的信号
假设T1通知T2使以以下的方式实现的:
应该将while循环放在synchronized的下面,这样避免T1通知T2时T2已经进入下一次循环,错过了这次信号
notify和notifyAll
因为可能会有多个任务在单个Car对象上处于wait状态,因此调用notifyAll比调用notify更加的安全
并且notifyAll还是同一把锁上面的正在wait的线程
class Blocker {
synchronized void waitingCall() {
try {
while (!Thread.interrupted()) {
wait();
System.out.println(Thread.currentThread() + " ");
}
} catch (InterruptedException e) {
}
}
synchronized void prod() {
notify();
}
synchronized void prodAll() {
notifyAll();
}
}
class Task implements Runnable {
static Blocker blocker = new Blocker();
@Override
public void run() {
blocker.waitingCall();
}
}
class Task2 implements Runnable {
static Blocker blocker = new Blocker();
@Override
public void run() {
blocker.waitingCall();
}
}
public class NotifyVsNotifyAll {
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new Task());
}
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
boolean prod = true;
@Override
public void run() {
if (prod) {
System.out.println("\nnotify()");
Task.blocker.prod();
prod = false;
} else {
System.out.println("\nnotifyall");
Task.blocker.prodAll();
prod = true;
}
}
}, 400, 400);
TimeUnit.SECONDS.sleep(5);
timer.cancel();
System.out.println("\nTimer canceled");
TimeUnit.MILLISECONDS.sleep(500);
System.out.println("Task2.blocker.prodAll");
Task2.blocker.prodAll();//blocker是static的,所以只有一份,调不调这个无所谓
TimeUnit.MILLISECONDS.sleep(500);
System.out.println("\nShutting Down");
exec.shutdown();
}
}
因为循环的原因,释放某一个线程之后,这个线程又走循环,再次阻塞,所以再次notifyAll的时候该线程再次被恢复执行
生产者和消费者实例
class Meal {
private final int orderNum;
public Meal(int orderNum) {
this.orderNum = orderNum;
}
public String toString() {
return "Meal " + orderNum;
}
}
class WaitPerson implements Runnable {
private final Restaurant restaurant;
public WaitPerson(Restaurant r) {
restaurant = r;
}
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
while (restaurant.meal == null)//第一次之后就、 由于厨师那里一直在count++,所里这里不会为空了
//会一直打印信息
wait();
}
print("Waitperson got " + restaurant.meal);
synchronized (restaurant.chef) {
restaurant.meal = null;
restaurant.chef.notifyAll();
}
}
} catch (InterruptedException e) {
print("WaitPerson interrupted");
}
}
}
class Chef implements Runnable {
private final Restaurant restaurant;
private int count = 0;
public Chef(Restaurant r) {
restaurant = r;
}
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
while (restaurant.meal != null)
wait();
}
if (++count == 10) { //不断的循环,对meal进行++
print("Out of food, closing");
restaurant.exec.shutdownNow();//对当前线程设置成为打断状态
}
printnb("Order up! ");
synchronized (restaurant.waitPerson) {
//设置Meal初始是0
restaurant.meal = new Meal(count);
restaurant.waitPerson.notifyAll();
}
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (InterruptedException e) {
print("Chef interrupted");
}
}
}
public class Restaurant {
Meal meal;
ExecutorService exec = Executors.newCachedThreadPool();
final WaitPerson waitPerson = new WaitPerson(this);
final Chef chef = new Chef(this);
public Restaurant() {
exec.execute(chef);
exec.execute(waitPerson);
}
public static void main(String[] args) {
new Restaurant();
}
}
使用显式的Lock和Condition对象
使用互斥并且允许将任务挂起的类是Condition,可以通过在Condition上调用await来挂起一个任务
class Car {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private boolean waxOn = false;
public void waxed() {
lock.lock();
try {
waxOn = true; // Ready to buff
condition.signalAll();
} finally {
lock.unlock();
}
}
public void buffed() {
lock.lock();
try {
waxOn = false; // Ready for another coat of wax
condition.signalAll();
} finally {
lock.unlock();
}
}
public void waitForWaxing() throws InterruptedException {
lock.lock();
try {
while(waxOn == false)
condition.await();
} finally {
lock.unlock();
}
}
public void waitForBuffing() throws InterruptedException{
lock.lock();
try {
while(waxOn == true)
condition.await();
} finally {
lock.unlock();
}
}
}
class WaxOn2 implements Runnable {
private Car car;
public WaxOn2(Car c) { car = c; }
public void run() {
try {
while(!Thread.interrupted()) {
printnb("Wax On! ");
TimeUnit.MILLISECONDS.sleep(200);
car.waxed();
car.waitForBuffing();
}
} catch(InterruptedException e) {
print("Exiting via interrupt");
}
print("Ending Wax On task");
}
}
class WaxOff2 implements Runnable {
private Car car;
public WaxOff2(Car c) { car = c; }
public void run() {
try {
while(!Thread.interrupted()) {
car.waitForWaxing();
printnb("Wax Off! ");
TimeUnit.MILLISECONDS.sleep(200);
car.buffed();
}
} catch(InterruptedException e) {
print("Exiting via interrupt");
}
print("Ending Wax Off task");
}
}
public class WaxOMatic2 {
public static void main(String[] args) throws Exception {
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new WaxOff2(car));
exec.execute(new WaxOn2(car));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}
对lock的调用都必须紧跟一个try-finally子句,来保证在所有的情况下都可以释放锁
Lock和Condition只有在更加困难或者复杂的多线程问题中是必须的
生产者消费者与队列
使用同步队列来解决任务协作问题,同步队列在任何时刻,都只允许一个任务接口插入或者移除元素
class LiftOffRunner implements Runnable {
private BlockingQueue<LiftOff> rockets;
public LiftOffRunner(BlockingQueue<LiftOff> rockets) {
this.rockets = rockets;
}
public void add(LiftOff lo) {
try {
rockets.put(lo);//生产者:在队列中添加元素
} catch (InterruptedException e) {
System.out.println("Interrupted during put()");
}
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
LiftOff rocket = rockets.take();//消费者:从队列中取出元素
rocket.run();
}
} catch (InterruptedException e) {
System.out.println("Waking from take()");
}
System.out.println("Exiting LiftOffRunner");
}
}
public class TestBlockingQueues {
/*static void getKey() {
try {
new BufferedReader(new InputStreamReader(System.in)).readLine();
} catch (IOException e) {
throw new RuntimeException();
}
}
static void getKey(String msg) {
System.out.println(msg);
getKey();
}*/
static void test(String msg, BlockingQueue<LiftOff> queue) {
System.out.println(msg);
LiftOffRunner runner = new LiftOffRunner(queue);
Thread t = new Thread(runner);
t.start();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < 5; i++) {
runner.add(new LiftOff(5));
}
//getKey("press 'Enter'(" + msg + ")");
t.interrupt();
System.out.println("Finish" + msg + "test");
}
public static void main(String[] args) {
//test("LinkedBlockingQueue",new LinkedBlockingQueue<>());//无界的阻塞队列
//test("ArrayBlockQueue",new ArrayBlockingQueue<>(3));//有容量的阻塞队列,要初始化他的容量
test("SynchronousQueue",new SynchronousQueue<>());//同步队列,一个一个来
}
}
容量为null的时候,消费者挂起。容量满的时候,生产者挂起。待消费一个任务的时候,再去添加任务。
土司BlockingQueue
class Toast {
public enum Status {DRY, BUTTERED, JAMMED}
private Status status = Status.DRY;
private final int id;
public Toast(int idn) {
id = idn;
}
public void butter() {//抹上黄油
status = Status.BUTTERED;
}
public void jam() {//抹上果酱
status = Status.JAMMED;
}
public Status getStatus() {
return status;
}
public int getId() {
return id;
}
public String toString() {
return "Toast " + id + ": " + status;
}
}
class ToastQueue extends LinkedBlockingQueue<Toast> {
}
class Toaster implements Runnable {
private ToastQueue toastQueue;
private int count = 0;
private Random rand = new Random(47);
public Toaster(ToastQueue tq) {
toastQueue = tq;
}
public void run() {
try {
while (!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(
100 + rand.nextInt(500));
Toast t = new Toast(count++);//生产吐司实例
print(t);
toastQueue.put(t);//将吐司放进队列
}
} catch (InterruptedException e) {
print("Toaster interrupted");
}
print("Toaster off");
}
}
class Butterer implements Runnable {
private ToastQueue dryQueue, butteredQueue;
public Butterer(ToastQueue dry, ToastQueue buttered) {
dryQueue = dry;
butteredQueue = buttered;
}
public void run() {
try {
while (!Thread.interrupted()) {
// Blocks until next piece of toast is available:
Toast t = dryQueue.take();//从吐司队列中取出生产后的土司
t.butter();//抹上黄油
print(t);
butteredQueue.put(t);//将从吐司队列中取出的,并且抹好了黄油的吐司放进黄油队列
}
} catch (InterruptedException e) {
print("Butterer interrupted");
}
print("Butterer off");
}
}
// Apply jam to buttered toast:
class Jammer implements Runnable {
private ToastQueue butteredQueue, finishedQueue;
public Jammer(ToastQueue buttered, ToastQueue finished) {
butteredQueue = buttered;
finishedQueue = finished;
}
public void run() {
try {
while (!Thread.interrupted()) {
Toast t = butteredQueue.take();//从黄油队列中取出吐司
t.jam();//再抹上果酱
print(t);
finishedQueue.put(t);//将从黄油队列中取出的,抹好果酱的放在完成队列
}
} catch (InterruptedException e) {
print("Jammer interrupted");
}
print("Jammer off");
}
}
class Eater implements Runnable {
private ToastQueue finishedQueue;
private int counter = 0;
public Eater(ToastQueue finished) {
finishedQueue = finished;
}
public void run() {
try {
while (!Thread.interrupted()) {
Toast t = finishedQueue.take();//从完成队列中取出吐司
if (t.getId() != counter++ ||
t.getStatus() != Toast.Status.JAMMED) {//确保是按照顺序过来的或者确保已经是抹好果酱的
print(">>>> Error: " + t);
System.exit(1);
} else
print("Chomp! " + t);
}
} catch (InterruptedException e) {
print("Eater interrupted");
}
print("Eater off");
}
}
public class ToastOMatic {
public static void main(String[] args) throws Exception {
ToastQueue dryQueue = new ToastQueue(),
butteredQueue = new ToastQueue(),
finishedQueue = new ToastQueue();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Toaster(dryQueue));
exec.execute(new Butterer(dryQueue, butteredQueue));
exec.execute(new Jammer(butteredQueue, finishedQueue));
exec.execute(new Eater(finishedQueue));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}
注意:这个实例没有任何的显示的同步,(Lock和synchronized),因为同步队列和系统隐式的管理了,每片吐司任何时刻只能由一个任务在操作
死锁
public class DeadLockDemo {
private final Object left = new Object();
private final Object right = new Object();
public void tom() {
synchronized (left) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
synchronized (right) {
System.out.println("tom 拿到了筷子");//拿到右边的筷子
}
}
public void kobe() {
synchronized (right) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
synchronized (left) {
System.out.println("kobe 拿到了筷子");//拿到左边的筷子
}
}
public static void main(String[] args) {
final DeadLockDemo deadLockDemo = new DeadLockDemo();
new Thread(deadLockDemo::tom);
new Thread(deadLockDemo::kobe);
}
}
新类库中的构件
CountDownLarch(底层由Node实现)
用来同步一个或者多个任务,强制他们等待其他任务执行的一组操作完成,并且他被设计为只触发一次,计数值不能被重置。CountDownLatch的典型用法是将一个程序分为n个互相独立的可解决的任务,并且创建值为0的CountDownLatch,当每个任务完成的时候,都会在这个锁村器上调用countDoun,等待问题被解决的任务在这个锁存器上调用await,将他们自己拦住,直到锁存器计数结束
class TaskPortion implements Runnable {
private static int count = 0;
private final int id = count++;
private static Random rand = new Random(47);
private final CountDownLatch latch;
TaskPortion(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
doWork();
latch.countDown();//countDown--,直到为0,所有的任务才完成
} catch (InterruptedException e) {
}
}
public void doWork() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000));//随机休眠,模拟工作时间
System.out.println(this + "completed");
}
@Override
public String toString() {
return String.format("%1$-3d", id);
}
}
class WaitingTask implements Runnable {
private static int count = 0;
private final int id = count++;
private CountDownLatch latch;
WaitingTask(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
latch.await();//如果上面的任务没有完成的话,会一直在这里等待
System.out.println("Latch barrier passed for" + this);
} catch (InterruptedException e) {
System.out.println(this + "interruption");
}
}
@Override
public String toString() {
return String.format("Waiting %1$-3d", id);
}
}
public class CountDownLatchDemo {
static final int SIZE = 100;
public static void main(String[] args) {
final ExecutorService exec = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(SIZE);
for (int i = 0; i < 10; i++) {
exec.execute(new WaitingTask(latch));
}
for (int i = 0; i < SIZE ; i++) { //如果不是SIZE的个数的话(小于SIZE) WaitingTask会一直在那里等待
exec.execute(new TaskPortion(latch));
}
System.out.println("Launched all tasks");
exec.shutdown();
}
}
CyclicBarrier
适用于以下情况:创建一组任务,他们并行的执行工作,然后在进行下一个步骤之前等待,直到所有任务都完成(看起来有些像join())和CountDownLatch的区别:后者是只触发一次的时间,而CyclicBarrier可以多次重用
自己写一个例子
class Horse implements Runnable {
private static int counter = 0;
private final int id = counter++;//计数
private int strides = 0;//初始步数为0
private static Random rand = new Random(47);
private static CyclicBarrier barrier;
public Horse(CyclicBarrier b) { //传进来CyclicBarrier
barrier = b;
}
public synchronized int getStrides() {//获取步数
return strides;
}
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
strides += rand.nextInt(3);//随机前进0,1,2步
}
barrier.await();//等待所有的马都随机获取完了步数再继续
}
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
public String toString() {
return "Horse " + id + " ";
}
public String tracks() {
StringBuilder s = new StringBuilder();
for (int i = 0; i < getStrides(); i++)
s.append("*");
s.append(id);
return s.toString();
}
}
public class HorseRace {
static final int FINISH_LINE = 75;
private final List<Horse> horses = new ArrayList<>();
private final ExecutorService exec =
Executors.newCachedThreadPool();
private CyclicBarrier barrier;
public HorseRace(int nHorses, final int pause) {//传进来马的个数和每次所有的马获取步数后的休眠时间
barrier = new CyclicBarrier(nHorses, () -> {
StringBuilder s = new StringBuilder();
for (int i = 0; i < FINISH_LINE; i++)
s.append("=");
print(s);
for (Horse horse : horses)
print(horse.tracks());//打印每批马的轨迹
for (Horse horse : horses)
if (horse.getStrides() >= FINISH_LINE) {//当马的步数大于75的时候就停止
print(horse + "won!");
exec.shutdownNow();
return;
}
try {
TimeUnit.MILLISECONDS.sleep(pause);
} catch (InterruptedException e) {
print("barrier-action sleep interrupted");
}
});
for (int i = 0; i < nHorses; i++) {
Horse horse = new Horse(barrier);
horses.add(horse);
exec.execute(horse);
}
}
public static void main(String[] args) {
int nHorses = 7;
int pause = 200;
new HorseRace(nHorses, pause);
}
}
DelayQueue
是一个无界的阻塞队列,队列中的对象只能等到他到期时候才能从队列中取出,先取出时间最短的
class DelayedTask implements Runnable, Delayed {
private static int counter = 0;
private final int id = counter++;
private final int delta;//初始延迟时间
private final long trigger;//触发时间
protected static List<DelayedTask> sequence =
new ArrayList<>();
public DelayedTask(int delayInMilliseconds) {//在构造器中传进来初始延迟时间,设置触发时间,添加到集合中
delta = delayInMilliseconds;
trigger = System.nanoTime() +
NANOSECONDS.convert(delta, MILLISECONDS);
sequence.add(this);
}
public long getDelay(TimeUnit unit) {
return unit.convert(
trigger - System.nanoTime(), NANOSECONDS);
}
public int compareTo(Delayed arg) {//比较触发时间
DelayedTask that = (DelayedTask) arg;
if (trigger < that.trigger) return -1;
if (trigger > that.trigger) return 1;
return 0;
}
public void run() {
printnb(this + " ");
}
public String toString() {
return String.format("[%1$-4d]", delta) +
" Task " + id;
}
public String summary() {
return "(" + id + ":" + delta + ")";
}
public static class EndSentinel extends DelayedTask { //末端哨兵机制
private ExecutorService exec;
public EndSentinel(int delay, ExecutorService e) {
super(delay);
exec = e;
}
public void run() { //将集合中元素id和延迟时间打印出来
for (DelayedTask pt : sequence) {
printnb(pt.summary() + " ");
}
print();
print(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
class DelayedTaskConsumer implements Runnable {
private DelayQueue<DelayedTask> q;
public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {
this.q = q;
}
public void run() {
try {
while (!Thread.interrupted())
q.take().run(); //取出来元素并且打印出来
} catch (InterruptedException e) {
}
print("Finished DelayedTaskConsumer");
}
}
public class DelayQueueDemo {
public static void main(String[] args) {
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
DelayQueue<DelayedTask> queue =
new DelayQueue<>();
for (int i = 0; i < 20; i++)
queue.put(new DelayedTask(rand.nextInt(5000)));
queue.add(new DelayedTask.EndSentinel(5000, exec));
exec.execute(new DelayedTaskConsumer(queue));
}
}
PriorityBlockingQueue
优先级队列,具有可阻塞的读取操作
class PrioritizedTask implements
Runnable, Comparable<PrioritizedTask> {
private Random rand = new Random(47);
private static int counter = 0;
private final int id = counter++;
private final int priority;//优先级
protected static List<PrioritizedTask> sequence =
new ArrayList<>();
public PrioritizedTask(int priority) {//从构造器中设置优先级,并且将其添加到队列中
this.priority = priority;
sequence.add(this);
}
public int compareTo(PrioritizedTask arg) {//比较优先级 正序
return priority < arg.priority ? 1 :
(priority > arg.priority ? -1 : 0);
}
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(250));//模拟耗时操作
} catch (InterruptedException e) {
// Acceptable way to exit
}
print(this);
}
public String toString() {
return String.format("[%1$-3d]", priority) +
" Task " + id;
}
public String summary() {
return "(" + id + ":" + priority + ")";
}
public static class EndSentinel extends PrioritizedTask {
private ExecutorService exec;
public EndSentinel(ExecutorService e) {
super(-1); // Lowest priority in this program
exec = e;
}
public void run() {
int count = 0;
for (PrioritizedTask pt : sequence) {//遍历队列打印优先级
printnb(pt.summary());
if (++count % 5 == 0)
print();
}
print();
print(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
class PrioritizedTaskProducer implements Runnable {
private Random rand = new Random(47);
private Queue<Runnable> queue;
private ExecutorService exec;
public PrioritizedTaskProducer(
Queue<Runnable> q, ExecutorService e) {
queue = q;
exec = e;
}
public void run() {
for (int i = 0; i < 20; i++) {
queue.add(new PrioritizedTask(rand.nextInt(10)));//先加20个
Thread.yield();
}
try {
for (int i = 0; i < 10; i++) {
TimeUnit.MILLISECONDS.sleep(250);
queue.add(new PrioritizedTask(10)); //在加10个
}
for (int i = 0; i < 10; i++)//在加10个
queue.add(new PrioritizedTask(i));
queue.add(new PrioritizedTask.EndSentinel(exec));
} catch (InterruptedException e) {
}
print("Finished PrioritizedTaskProducer");
}
}
class PrioritizedTaskConsumer implements Runnable {
private PriorityBlockingQueue<Runnable> q;
public PrioritizedTaskConsumer(
PriorityBlockingQueue<Runnable> q) {
this.q = q;
}
public void run() {
try {
while (!Thread.interrupted())
q.take().run();
} catch (InterruptedException e) {
}
print("Finished PrioritizedTaskConsumer");
}
}
public class PriorityBlockingQueueDemo {
public static void main(String[] args) throws Exception {
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
PriorityBlockingQueue<Runnable> queue =
new PriorityBlockingQueue<Runnable>();
exec.execute(new PrioritizedTaskProducer(queue, exec));
exec.execute(new PrioritizedTaskConsumer(queue));
}
}
该队列的阻塞特性提供了所有的必需的同步,这里不需要任何显式的同步,如果这个队列中没有元素的话会直接阻塞消费者
Semaphore
正常的锁在任何时刻都只允许一个任务访问一项资源,但是计数信号量允许n个任务同时访问这个资源
class A {
private static int count = 0;
private final int id = count++;
@Override
public String toString() {
return "A{" + "id=" + id + '}';
}
}
public class Pool<T> {
private final boolean[] checkOut;
private final Semaphore semaphore;
private final List<T> items = new ArrayList<>();
private int size;
public Pool(Class<T> t, int size) {
this.size = size;
checkOut = new boolean[size];
semaphore = new Semaphore(size, true);
for (int i = 0; i < size; i++) {
try {
items.add(t.newInstance());
} catch (InstantiationException | IllegalAccessException e) {
e.printStackTrace();
}
}
System.out.println(items);
}
private T getItem() {
for (int i = 0; i < size; i++) {
if (!checkOut[i]) {//将获取的元素更改为true
checkOut[i] = true;
return items.get(i);
}
}
return null;
}
private boolean releaseItem(T t) {
int i = items.indexOf(t);
if (i == -1) {
return false;
}
if (checkOut[i]) { //将回来的更改为false
checkOut[i] = false;
return true;
}
return false;
}
public T checkOut() throws InterruptedException {
semaphore.acquire();//获取凭证
return getItem();//获取元素
}
public T checkIn(T t) {
if (releaseItem(t)) {//如果有这个元素
semaphore.release();//回收凭证
}
return null;
}
public static void main(String[] args) throws InterruptedException {
final Pool<A> aPool = new Pool<>(A.class, 5);
//TimeUnit.SECONDS.sleep(5);
for (int i = 0; i <5; i++) {
new Thread(()->{
A a;
try {
a = aPool.checkOut();
System.out.println(a);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
//A item = aPool.checkOut();
//System.out.println(item);
}
}
Exchanger
两个任务之间交换对象的栅栏。典型应用场景:一个任务在创建对象,这些对象的生产代价很高昂,另一个任务在消费这些对象,通过这种方式,可以有更多对象在创建的同时被消费
并且没有消费者的时候会阻塞
class Fat {
private volatile double d;
private static int count;
private final int id = count++;
public Fat() {
for (int i = 0; i < 10000; i++) {
d += (Math.PI + Math.E);
}
}
public void operation() {
System.out.println(this);
}
@Override
public String toString() {
return "Fat id : " + id;
}
}
class ExchangerProducer<T> implements Runnable {
private Generator<T> generator;
private Exchanger<List<T>> exchanger;
private List<T> holder;
ExchangerProducer(Exchanger<List<T>> exchanger, Generator<T> gen, List<T> holder) {
this.exchanger = exchanger;
generator = gen;
this.holder = holder;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
for (int i = 0; i < ExchangerDemo.size; i++) {
holder.add(generator.next());
}
holder = exchanger.exchange(holder);//把自己集合中的元素交给别人
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class ExchangeConsumer<T> implements Runnable {
private Exchanger<List<T>> exchanger;
private List<T> holder;
private volatile T value;
ExchangeConsumer(Exchanger<List<T>> exchanger, List<T> holder) {
this.exchanger = exchanger;
this.holder = holder;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
holder = exchanger.exchange(holder);//把别人给自己的元素加到自己集合里面
for (T x : holder) {
value = x;
holder.remove(x);//移除完再去交换
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Final value : " + value);
}
}
public class ExchangerDemo {
static int size = 0;
static int delay = 5;
public static void main(String[] args) throws InterruptedException {
final ExecutorService exec = Executors.newCachedThreadPool();
Exchanger<List<Fat>> xc = new Exchanger<>();
List<Fat> produceList = new CopyOnWriteArrayList<>();
List<Fat> consumerList = new CopyOnWriteArrayList<>();
exec.execute(new ExchangerProducer<>(xc, BasicGenerator.creat(Fat.class), produceList));
exec.execute(new ExchangeConsumer<>(xc, consumerList));
TimeUnit.SECONDS.sleep(delay);
exec.shutdown();
}
}
CopyOnWriteArrayList,这个特定的List允许在列表遍历时调用remove方法。
性能调优
比较各类互斥技术
abstract class Incrementable {
protected long counter = 0;
public abstract void increment();
}
class SynchronizingTest extends Incrementable {
public synchronized void increment() {
++counter;
}
}
class LockingTest extends Incrementable {
private Lock lock = new ReentrantLock();
public void increment() {
lock.lock();
try {
++counter;
} finally {
lock.unlock();
}
}
}
public class SimpleMicroBenchmark {
static long test(Incrementable incr) {
long start = System.nanoTime();
for (long i = 0; i < 10000000L; i++)
incr.increment();
return System.nanoTime() - start;
}
public static void main(String[] args) {
long synchTime = test(new SynchronizingTest());
long lockTime = test(new LockingTest());
System.out.printf("synchronized: %1$10d\n", synchTime);
System.out.printf("Lock: %1$10d\n", lockTime);
System.out.printf("Lock/synchronized = %1$.3f",
(double) lockTime / (double) synchTime);
}
}
输出结果:synchronized: 201171900
Lock: 156996600
Lock/synchronized : 0.780
复杂一点:
abstract class Accumulator {
public static long cycles = 50000L;
// Number of Modifiers and Readers during each test:
private static final int N = 4;
public static ExecutorService exec =
Executors.newFixedThreadPool(N * 2);
private static CyclicBarrier barrier =
new CyclicBarrier(N * 2 + 1);
protected volatile int index = 0;
protected volatile long value = 0;
protected long duration = 0;
protected String id = "error";
protected final static int SIZE = 10000000;
protected static int[] preLoaded = new int[SIZE];
static {
// Load the array of random numbers:
Random rand = new Random(47);
for (int i = 0; i < SIZE; i++)
preLoaded[i] = rand.nextInt();
}
public abstract void accumulate();
public abstract long read();
private class Modifier implements Runnable {
public void run() {
for (long i = 0; i < cycles; i++)
accumulate();
try {
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
private class Reader implements Runnable {
private volatile long value;
public void run() {
for (long i = 0; i < cycles; i++)
value = read();
try {
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
public void timedTest() {
long start = System.nanoTime();
for (int i = 0; i < N; i++) {
exec.execute(new Modifier());
exec.execute(new Reader());
}
try {
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
duration = System.nanoTime() - start;
printf("%-13s: %13d\n", id, duration);
}
public static void
report(Accumulator acc1, Accumulator acc2) {
printf("%-22s: %.2f\n", acc1.id + "/" + acc2.id,
(double) acc1.duration / (double) acc2.duration);
}
}
class BaseLine extends Accumulator {
{
id = "BaseLine";
}
public void accumulate() {
value += preLoaded[index++];
if (index >= SIZE) index = 0;
}
public long read() {
return value;
}
}
class SynchronizedTest extends Accumulator {
{
id = "synchronized";
}
public synchronized void accumulate() {
value += preLoaded[index++];
if (index >= SIZE) index = 0;
}
public synchronized long read() {
return value;
}
}
class LockTest extends Accumulator {
{
id = "Lock";
}
private Lock lock = new ReentrantLock();
public void accumulate() {
lock.lock();
try {
value += preLoaded[index++];
if (index >= SIZE) index = 0;
} finally {
lock.unlock();
}
}
public long read() {
lock.lock();
try {
return value;
} finally {
lock.unlock();
}
}
}
class AtomicTest extends Accumulator {
{
id = "Atomic";
}
private AtomicInteger index = new AtomicInteger(0);
private AtomicLong value = new AtomicLong(0);
public void accumulate() {
// Oops! Relying on more than one Atomic at
// a time doesn't work. But it still gives us
// a performance indicator:
int i = index.getAndIncrement();
value.getAndAdd(preLoaded[i]);
if (++i >= SIZE)
index.set(0);
}
public long read() {
return value.get();
}
}
public class SynchronizationComparisons {
static BaseLine baseLine = new BaseLine();
static SynchronizedTest synch = new SynchronizedTest();
static LockTest lock = new LockTest();
static AtomicTest atomic = new AtomicTest();
static void test() {
print("============================");
printf("%-12s : %13d\n", "Cycles", Accumulator.cycles);
baseLine.timedTest();
synch.timedTest();
lock.timedTest();
atomic.timedTest();
Accumulator.report(synch, baseLine);
Accumulator.report(lock, baseLine);
Accumulator.report(atomic, baseLine);
Accumulator.report(synch, lock);
Accumulator.report(synch, atomic);
Accumulator.report(lock, atomic);
}
public static void main(String[] args) {
int iterations = 5; // Default
if (args.length > 0) // Optionally change iterations
iterations = new Integer(args[0]);
// The first time fills the thread pool:
print("Warmup");
baseLine.timedTest();
// Now the initial test doesn't include the cost
// of starting the threads for the first time.
// Produce multiple data points:
for (int i = 0; i < iterations; i++) {
test();
Accumulator.cycles *= 2;
}
Accumulator.exec.shutdown();
}
}
以synchronized关键字入手,只有在性能调优时才替换为Lock对象这种做法,是有实际意义的
虚拟机锁优化
免锁容器
免锁容器背后的通用策略:对容器的修改可以与读取操作同时发生,只需要能看到完成修改之后的结果就可以,修改是在容器数据结构的某个部分的一个单独副本上执行的,并且这个副本在修改过程中是不可视的,修改完成后,被修改的结构才会主动的和主数据进行交换,之后就可以看到修改后的结果
乐观锁
如果主要是从免锁容器中读取,那么他就会比synchronized快许多,因为获取锁和释放锁的操作被取消了
public abstract class Tester<C> {
static int testReps = 10;
static int testCycles = 1000;
static int containerSize = 1000;
abstract C containerInitializer();
abstract void startReadersAndWriters();
C testContainer;
String testId;
int nReaders;
int nWriters;
volatile long readResult = 0;
volatile long readTime = 0;
volatile long writeTime = 0;
CountDownLatch endLatch;
static ExecutorService exec =
Executors.newCachedThreadPool();
Integer[] writeData;
Tester(String testId, int nReaders, int nWriters) {
this.testId = testId + " " +
nReaders + "r " + nWriters + "w";
this.nReaders = nReaders;
this.nWriters = nWriters;
writeData = Generated.array(Integer.class,
new RandomGenerator.Integer(), containerSize);
// for (int i = 0; i < testReps; i++) {
runTest();
readTime = 0;
writeTime = 0;
// }
}
void runTest() {
endLatch = new CountDownLatch(nReaders + nWriters);
testContainer = containerInitializer();
startReadersAndWriters();
try {
endLatch.await();
} catch (InterruptedException ex) {
System.out.println("endLatch interrupted");
}
System.out.printf("%-27s %14d %14d\n",
testId, readTime, writeTime);
if (readTime != 0 && writeTime != 0)
System.out.printf("%-27s %14d\n",
"readTime + writeTime =", readTime + writeTime);
}
abstract class TestTask implements Runnable {
abstract void test();
abstract void putResults();
long duration;
public void run() {
long startTime = System.nanoTime();
test();
duration = System.nanoTime() - startTime;
synchronized (Tester.this) {
putResults();
}
endLatch.countDown();
}
}
public static void initMain(String[] args) {
if (args.length > 0)
testReps = new Integer(args[0]);
if (args.length > 1)
testCycles = new Integer(args[1]);
if (args.length > 2)
containerSize = new Integer(args[2]);
System.out.printf("%-27s %14s %14s\n",
"Type", "Read time", "Write time");
}
}
abstract class ListTest extends Tester<List<Integer>> {
ListTest(String testId, int nReaders, int nWriters) {
super(testId, nReaders, nWriters);
}
class Reader extends TestTask {
long result = 0;
void test() {
for (long i = 0; i < testCycles; i++)
for (int index = 0; index < containerSize; index++)
result += testContainer.get(index);
}
void putResults() {
readResult += result;
readTime += duration;
}
}
class Writer extends TestTask {
void test() {
for (long i = 0; i < testCycles; i++)
for (int index = 0; index < containerSize; index++)
testContainer.set(index, writeData[index]);
}
void putResults() {
writeTime += duration;
}
}
void startReadersAndWriters() {
for (int i = 0; i < nReaders; i++)
exec.execute(new Reader());
for (int i = 0; i < nWriters; i++)
exec.execute(new Writer());
}
}
class SynchronizedArrayListTest extends ListTest {
List<Integer> containerInitializer() {
return Collections.synchronizedList(
new ArrayList<Integer>(
new CountingIntegerList(containerSize)));
}
SynchronizedArrayListTest(int nReaders, int nWriters) {
super("Synched ArrayList", nReaders, nWriters);
}
}
class CopyOnWriteArrayListTest extends ListTest {
List<Integer> containerInitializer() {
return new CopyOnWriteArrayList<Integer>(
new CountingIntegerList(containerSize));
}
CopyOnWriteArrayListTest(int nReaders, int nWriters) {
super("CopyOnWriteArrayList", nReaders, nWriters);
}
}
public class ListComparisons {
public static void main(String[] args) {
Tester.initMain(args);
new SynchronizedArrayListTest(10, 0);
new SynchronizedArrayListTest(9, 1);
new SynchronizedArrayListTest(5, 5);
new CopyOnWriteArrayListTest(10, 0);
new CopyOnWriteArrayListTest(9, 1);
new CopyOnWriteArrayListTest(5, 5);
Tester.exec.shutdown();
}
}
比较各种Map实现:
abstract class MapTest
extends Tester<Map<Integer, Integer>> {
MapTest(String testId, int nReaders, int nWriters) {
super(testId, nReaders, nWriters);
}
class Reader extends TestTask {
long result = 0;
void test() {
for (long i = 0; i < testCycles; i++)
for (int index = 0; index < containerSize; index++)
result += testContainer.get(index);
}
void putResults() {
readResult += result;
readTime += duration;
}
}
class Writer extends TestTask {
void test() {
for (long i = 0; i < testCycles; i++)
for (int index = 0; index < containerSize; index++)
testContainer.put(index, writeData[index]);
}
void putResults() {
writeTime += duration;
}
}
void startReadersAndWriters() {
for (int i = 0; i < nReaders; i++)
exec.execute(new Reader());
for (int i = 0; i < nWriters; i++)
exec.execute(new Writer());
}
}
class SynchronizedHashMapTest extends MapTest {
Map<Integer, Integer> containerInitializer() {
return Collections.synchronizedMap(
new HashMap<Integer, Integer>(
MapData.map(
new CountingGenerator.Integer(),
new CountingGenerator.Integer(),
containerSize)));
}
SynchronizedHashMapTest(int nReaders, int nWriters) {
super("Synched HashMap", nReaders, nWriters);
}
}
class ConcurrentHashMapTest extends MapTest {
Map<Integer, Integer> containerInitializer() {
return new ConcurrentHashMap<Integer, Integer>(
MapData.map(
new CountingGenerator.Integer(),
new CountingGenerator.Integer(), containerSize));
}
ConcurrentHashMapTest(int nReaders, int nWriters) {
super("ConcurrentHashMap", nReaders, nWriters);
}
}
public class MapComparisons {
public static void main(String[] args) {
Tester.initMain(args);
new SynchronizedHashMapTest(10, 0);
new SynchronizedHashMapTest(9, 1);
new SynchronizedHashMapTest(5, 5);
new ConcurrentHashMapTest(10, 0);
new ConcurrentHashMapTest(9, 1);
new ConcurrentHashMapTest(5, 5);
Tester.exec.shutdown();
}
}
乐观锁
public class FastSimulation2 {
static AtomicInteger ato = new AtomicInteger(10);
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
int oldValue = ato.get();
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("原来获取的oldValue为:" + oldValue + "查看是否被其他的线程修改了" + ato.get());
int newValue = oldValue + 3;
if (!ato.compareAndSet(oldValue, newValue)) {
System.out.println(Thread.currentThread().getName() + "将要去修改的旧值为:" + oldValue);
} else {
System.out.println(Thread.currentThread().getName() + "修改成功了 " + ato.get());
}
}).start();
new Thread(() ->{
int oldValue = ato.get();
int newValue = oldValue + 3;
if (ato.compareAndSet(oldValue, newValue)) {
System.out.println(oldValue);
} else {
System.out.println(Thread.currentThread().getName() + "修改成功了");
}
}).start();
TimeUnit.MILLISECONDS.sleep(100);
}
}
假设有B,C两个线程都要去修改A中的值,都是将其加3,B线程会在修改前查看是否还是原来值,如果还是原来的值就对其进行修改,此时线程C进来看到A中已经不是原来的值了,于是就不对其进行修改
ReadWriteLock
ReadWriteLock对数据结构相对不频繁的写入,可以与多个读取者,只要他们不试图写入即可,如果写的这个锁已经被其他的任务持有,那么任何读取者都不能访问,直到这个写锁被释放