并发集合介绍和使用
package com.eagle.demo;import java.util.*;import java.util.concurrent.CopyOnWriteArrayList;import java.util.concurrent.CopyOnWriteArraySet;/*** 在并发程序中* List 推荐使用 {@link CopyOnWriteArrayList}* Set 推荐使用 {@link CopyOnWriteArraySet}* Map 推荐使用 {@link java.util.concurrent.ConcurrentHashMap}*/public class SafeCollection {public static void main(String[] args) {// notsafeList();// safeList1();// safeList2();safeList3();}/*** 单线程使用*/public static void notsafeList() {List<String> list = new ArrayList<>();createData(list);}/*** Vector 使用 synchronized* 重锁,不推荐*/public static void safeList1() {List<String> list = new Vector<>();createData(list);}/*** 重锁,不推荐*/public static void safeList2() {List<String> list = Collections.synchronizedList(new ArrayList<>());createData(list);}/*** 高并发使用,推荐*/public static void safeList3() {List<String> list = new CopyOnWriteArrayList<>();createData(list);}private static void createData(List<String> list) {for (int i = 0; i < 20; i++) {new Thread(() -> {list.add(UUID.randomUUID().toString().substring(0, 8));System.out.println(list);}, "----线程" + i + "---").start();}}}
推荐使用 CopyOnWriteArrayList (使用轻量Lock锁)
public class CopyOnWriteArrayList<E>implements List<E>, RandomAccess, Cloneable, java.io.Serializable {private static final long serialVersionUID = 8673264195747942595L;/** The lock protecting all mutators */final transient ReentrantLock lock = new ReentrantLock();/** The array, accessed only via getArray/setArray. */private transient volatile Object[] array;/*** Appends the specified element to the end of this list.** @param e element to be appended to this list* @return {@code true} (as specified by {@link Collection#add})*/public boolean add(E e) {final ReentrantLock lock = this.lock;lock.lock();try {Object[] elements = getArray();int len = elements.length;Object[] newElements = Arrays.copyOf(elements, len + 1);newElements[len] = e;setArray(newElements);return true;} finally {lock.unlock();}}}
从CopyOnWriteArrayList 源码中可以看到,使用锁 ReentrantLock 进行控制。
在企业中开发多线程时推荐使用Lock锁。(ReentrantLock 是 Lock的实现类)
实例运用
场景:从kafka中间件中读取到内容以后,需要先向mysql 中保存一份,在向es中保存一份,最后在日志中记录一份。(使用并发并有顺序保存数据)
package com.eagle.demo;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;public class LockDemo {public static void main(String[] args) {SaveData saveData = new SaveData();new Thread(() -> {for (int i = 0; i < 10; i++) {saveData.sendSql();}}, "A").start();new Thread(() -> {for (int i = 0; i < 10; i++) {saveData.sendEs();}}, "B").start();new Thread(() -> {for (int i = 0; i < 10; i++) {saveData.sendLog();}}, "C").start();}}class SaveData {// 1 存数据库 2存es 3log记录private int flag = 1;private ReentrantLock lock = new ReentrantLock();private Condition condition1 = lock.newCondition();private Condition condition2 = lock.newCondition();private Condition condition3 = lock.newCondition();public void sendSql() {lock.lock();try {while (flag != 1) {condition1.await();}System.out.println("线程号:" + Thread.currentThread().getName() + "---向 数据库 发送数据");flag = 2;condition2.signal();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}public void sendEs() {lock.lock();try {while (flag != 2) {condition2.await();}System.out.println("线程号:" + Thread.currentThread().getName() + "---向 ES 发送数据");flag = 3;condition3.signal();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}public void sendLog() {lock.lock();try {while (flag != 3) {condition3.await();}System.out.println("线程号:" + Thread.currentThread().getName() + "---向 log 发送数据");flag = 1;condition1.signal();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}}
Callable 使用
项目中没有见过使用的
Callable 只是一个接口,所以在创建线程时肯定需要包装类,即 FutureTask(实现了Runnable接口)
package com.eagle.demo;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.FutureTask;public class CallableDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<Integer> futureTask = new FutureTask<>(new CallableCal());new Thread(futureTask, "AAA").start();int ss=futureTask.get();System.out.println(ss);}}class CallableCal implements Callable<Integer> {@Overridepublic Integer call() throws Exception {return 1024 * 1024;}}
