并发集合介绍和使用
package com.eagle.demo;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* 在并发程序中
* List 推荐使用 {@link CopyOnWriteArrayList}
* Set 推荐使用 {@link CopyOnWriteArraySet}
* Map 推荐使用 {@link java.util.concurrent.ConcurrentHashMap}
*/
public class SafeCollection {
public static void main(String[] args) {
// notsafeList();
// safeList1();
// safeList2();
safeList3();
}
/**
* 单线程使用
*/
public static void notsafeList() {
List<String> list = new ArrayList<>();
createData(list);
}
/**
* Vector 使用 synchronized
* 重锁,不推荐
*/
public static void safeList1() {
List<String> list = new Vector<>();
createData(list);
}
/**
* 重锁,不推荐
*/
public static void safeList2() {
List<String> list = Collections.synchronizedList(new ArrayList<>());
createData(list);
}
/**
* 高并发使用,推荐
*/
public static void safeList3() {
List<String> list = new CopyOnWriteArrayList<>();
createData(list);
}
private static void createData(List<String> list) {
for (int i = 0; i < 20; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
}, "----线程" + i + "---").start();
}
}
}
推荐使用 CopyOnWriteArrayList (使用轻量Lock锁)
public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
private static final long serialVersionUID = 8673264195747942595L;
/** The lock protecting all mutators */
final transient ReentrantLock lock = new ReentrantLock();
/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
}
从CopyOnWriteArrayList 源码中可以看到,使用锁 ReentrantLock 进行控制。
在企业中开发多线程时推荐使用Lock锁。(ReentrantLock 是 Lock的实现类)
实例运用
场景:从kafka中间件中读取到内容以后,需要先向mysql 中保存一份,在向es中保存一份,最后在日志中记录一份。(使用并发并有顺序保存数据)
package com.eagle.demo;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class LockDemo {
public static void main(String[] args) {
SaveData saveData = new SaveData();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
saveData.sendSql();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
saveData.sendEs();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
saveData.sendLog();
}
}, "C").start();
}
}
class SaveData {
// 1 存数据库 2存es 3log记录
private int flag = 1;
private ReentrantLock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void sendSql() {
lock.lock();
try {
while (flag != 1) {
condition1.await();
}
System.out.println("线程号:" + Thread.currentThread().getName() + "---向 数据库 发送数据");
flag = 2;
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void sendEs() {
lock.lock();
try {
while (flag != 2) {
condition2.await();
}
System.out.println("线程号:" + Thread.currentThread().getName() + "---向 ES 发送数据");
flag = 3;
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void sendLog() {
lock.lock();
try {
while (flag != 3) {
condition3.await();
}
System.out.println("线程号:" + Thread.currentThread().getName() + "---向 log 发送数据");
flag = 1;
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
Callable 使用
项目中没有见过使用的
Callable 只是一个接口,所以在创建线程时肯定需要包装类,即 FutureTask(实现了Runnable接口)
package com.eagle.demo;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class CallableDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<>(new CallableCal());
new Thread(futureTask, "AAA").start();
int ss=futureTask.get();
System.out.println(ss);
}
}
class CallableCal implements Callable<Integer> {
@Override
public Integer call() throws Exception {
return 1024 * 1024;
}
}