装饰性花园-随机人数的统计
volatile boolean 作为总任务开关示例
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class Entrance implements Runnable {
private Random random = new Random();
private static List<Entrance> entrances = new ArrayList<Entrance>();
private int number = 0;//单个门口通过的人数
// Doesn't need synchronization to read:
private final int id;
private static volatile boolean canceled = false;
// Atomic operation on a volatile field:
public static void cancel() {
canceled = true;
}
public Entrance(int id) {
this.id = id;
entrances.add(this);
}
public void run() {
while (!canceled) {
synchronized (this) {
number = number + random.nextInt(100);
}
System.out.println(this + " Total: " + sumEntrances());//实时统计总数
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
System.out.println("sleep interrupted");
}
}
System.out.println("Stopping " + this);
}
public synchronized int getValue() {
return number;
}
public String toString() {
return "Entrance " + id + ": " + getValue();
}
public static int sumEntrances() {
int sum = 0;
for (Entrance entrance : entrances)
sum += entrance.getValue();
return sum;
}
}
public class OrnamentalGarden {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++)
exec.execute(new Entrance(i));
// Run for a while, then stop and collect the data:
TimeUnit.SECONDS.sleep(2);
Entrance.cancel();//所有线程的总开关
exec.shutdown();
if (!exec.awaitTermination(250, TimeUnit.MILLISECONDS))
System.out.println("Some tasks were not terminated!");
//System.out.println("Total: " + Entrance.getTotalCount());
System.out.println("Sum of Entrances: " + Entrance.sumEntrances());
}
}
中断
三种中断的情况
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
class SleepBlocked implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(100);//模拟的是耗时的任务
} catch (InterruptedException e) {
System.out.println("SleepBlocked Task Interrupted form outside...");
return;
}
System.out.println("Exist SleepBlocked run()...");
}
}
/**
* IO阻塞无法通过外部的形式进行中断
* IOException属于当前线程中断的一种形式
*/
class IOBlocked implements Runnable {
private InputStream is;
public IOBlocked(InputStream inputStream) {
is = inputStream;
}
@Override
public void run() {
System.out.println("waiting for read");
try {
while (true) {
int temp = is.read();//虽然发送了取消得指令,但依旧在此阻塞
if((char)temp == 'q'){
throw new IOException("quit");
//break;
}
}
} catch (IOException e) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("Interrupted from IO");
} else {
throw new RuntimeException(e);
}
}
System.out.println("Exiting IOBlocked run()...");
}
}
class SynchronizedBlocked implements Runnable {
private static AtomicInteger atomicInteger = new AtomicInteger();
private static volatile boolean flag = false;
public synchronized void f() {
/*while (true) {
Thread.yield();
}*/
while (!flag) { //跳出死循环得处理
//Thread.yield();
atomicInteger.addAndGet(2);
if (atomicInteger.get() == 50) {
flag = true;
}
}
}
public SynchronizedBlocked() {
new Thread() {
public void run() {
f();
System.out.println("======构造器中其他得业务逻辑处理====");
}
}.start();
}
@Override
public void run() {
System.out.println("Trying to call f()");
f();
System.out.println("Existing SynchronizedBlocked run()... ");
}
}
public class Interrupting {
private static ExecutorService service = Executors.newCachedThreadPool();
public static void test(Runnable r) throws InterruptedException {
Future<?> future = service.submit(r);
TimeUnit.MILLISECONDS.sleep(100);
System.out.println("Interruping " + r.getClass().getName());
//取消任务的执行
future.cancel(true);
System.out.println("Interruped " + r.getClass().getName());
TimeUnit.SECONDS.sleep(10);
//关闭新任务提交的通道 其他的继续执行
service.shutdown();
System.out.println("线程池关闭任务");
}
public static void main(String[] args) throws Exception {
// test(new SleepBlocked());
// test(new IOBlocked(System.in));
test(new SynchronizedBlocked());
/* TimeUnit.SECONDS.sleep(3);
System.exit(0);*/
}
}
被互斥阻塞
证明方法参数属于线程私有、synchronized方法的锁是实例锁
import java.util.concurrent.TimeUnit;
public class MutliLock {
/**
* f1 f2方法上的锁 是同一把实例锁;
* count方法参数属于线程私有
*/
public synchronized void f1(int count) throws InterruptedException {
if(count-- > 0) {
TimeUnit.MILLISECONDS.sleep(1000);
System.out.println(Thread.currentThread().getName()+"->"+"f1() calling f2() with count " + count);
f2(count);
}
}
public synchronized void f2(int count) throws InterruptedException {
if(count-- > 0) {
TimeUnit.MILLISECONDS.sleep(1000);
System.out.println(Thread.currentThread().getName()+"->"+"f2() calling f1() with count " + count);
f1(count);
}
}
public static void main(String[] args) throws Exception {
final MutliLock multiLock = new MutliLock();
new Thread() {
public void run() {
try {
multiLock.f1(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
new Thread() {
public void run() {
try {
multiLock.f1(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
}
}
一个柜台俩窗口进行卖票
package com.thinking.in.java.course.chapter21;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* MutliLock2 模拟的是柜台 一个柜台俩窗口
* f1() f2()模拟的是窗口。
* 开四个线程 :4个人在卖
*/
public class MutliLock2 {
private static AtomicInteger count = new AtomicInteger(20);
//类锁
public synchronized static void f1() {
if (count.decrementAndGet() > 0) {
System.out.println(Thread.currentThread().getName() + " f1() sell product with count :" + count);
//模拟的查询数据库或者网络波动
try {
TimeUnit.MILLISECONDS.sleep(2000);
} catch (InterruptedException e) {
}
}
}
public synchronized static void f2() {
if (count.decrementAndGet() > 0) {
System.out.println(Thread.currentThread().getName() + " f2() sell product with count :" + count);
//模拟的查询数据库或者网络波动
try {
TimeUnit.MILLISECONDS.sleep(2000);
} catch (InterruptedException e) {
}
}
}
public static void main(String[] args) {
MutliLock2 mutliLock = new MutliLock2();
new Thread("Thread 1-1->") {
@Override
public void run() {
while (MutliLock2.count.get() > 0) {
mutliLock.f2();
}
}
}.start();
new Thread("Thread 1-2->") {
@Override
public void run() {
while (MutliLock2.count.get() > 0) {
mutliLock.f1();
}
}
}.start();
MutliLock2 mutliLock2 = new MutliLock2();
new Thread("Thread 2-1->") {
@Override
public void run() {
while (MutliLock2.count.get() > 0) {
mutliLock2.f2();
}
}
}.start();
new Thread("Thread 2-2->") {
@Override
public void run() {
while (MutliLock2.count.get() > 0) {
mutliLock2.f1();
}
}
}.start();
}
}
检查中断
正确清理中断发生异常时的资源try {} finally {}
package com.thinking.in.java.course.chapter21;
import java.util.concurrent.TimeUnit;
//标识类
class NeedsCleanup {
private final int id;
public NeedsCleanup(int ident) {
id = ident;
System.out.println("NeedsCleanup " + id);
}
public void cleanup() {
System.out.println("Cleaning up " + id);
}
}
class Blocked3 implements Runnable {
private volatile double d = 0.0;
public void run() {
try {
while (!Thread.interrupted()) {
NeedsCleanup n1 = new NeedsCleanup(1);
try {
System.out.println("Sleeping");
TimeUnit.SECONDS.sleep(2);
NeedsCleanup n2 = new NeedsCleanup(2);
try {
System.out.println("Calculating");
for (int i = 1; i < 2500000; i++)
d = d + (Math.PI + Math.E) / d;
System.out.println("Finished time-consuming operation");
} finally {
n2.cleanup();
}
}
finally {
n1.cleanup();
}
}
System.out.println("Exiting via while() test");
} catch (InterruptedException e) {
System.out.println("Exiting via InterruptedException");
}
}
}
public class InterruptingIdiom {
public static void main(String[] args) throws Exception {
Thread t = new Thread(new Blocked3());
t.start();
TimeUnit.MILLISECONDS.sleep(2010);
t.interrupt();
}
}