并发集合介绍和使用

  1. package com.eagle.demo;
  2. import java.util.*;
  3. import java.util.concurrent.CopyOnWriteArrayList;
  4. import java.util.concurrent.CopyOnWriteArraySet;
  5. /**
  6. * 在并发程序中
  7. * List 推荐使用 {@link CopyOnWriteArrayList}
  8. * Set 推荐使用 {@link CopyOnWriteArraySet}
  9. * Map 推荐使用 {@link java.util.concurrent.ConcurrentHashMap}
  10. */
  11. public class SafeCollection {
  12. public static void main(String[] args) {
  13. // notsafeList();
  14. // safeList1();
  15. // safeList2();
  16. safeList3();
  17. }
  18. /**
  19. * 单线程使用
  20. */
  21. public static void notsafeList() {
  22. List<String> list = new ArrayList<>();
  23. createData(list);
  24. }
  25. /**
  26. * Vector 使用 synchronized
  27. * 重锁,不推荐
  28. */
  29. public static void safeList1() {
  30. List<String> list = new Vector<>();
  31. createData(list);
  32. }
  33. /**
  34. * 重锁,不推荐
  35. */
  36. public static void safeList2() {
  37. List<String> list = Collections.synchronizedList(new ArrayList<>());
  38. createData(list);
  39. }
  40. /**
  41. * 高并发使用,推荐
  42. */
  43. public static void safeList3() {
  44. List<String> list = new CopyOnWriteArrayList<>();
  45. createData(list);
  46. }
  47. private static void createData(List<String> list) {
  48. for (int i = 0; i < 20; i++) {
  49. new Thread(() -> {
  50. list.add(UUID.randomUUID().toString().substring(0, 8));
  51. System.out.println(list);
  52. }, "----线程" + i + "---").start();
  53. }
  54. }
  55. }

推荐使用 CopyOnWriteArrayList (使用轻量Lock锁)

  1. public class CopyOnWriteArrayList<E>
  2. implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
  3. private static final long serialVersionUID = 8673264195747942595L;
  4. /** The lock protecting all mutators */
  5. final transient ReentrantLock lock = new ReentrantLock();
  6. /** The array, accessed only via getArray/setArray. */
  7. private transient volatile Object[] array;
  8. /**
  9. * Appends the specified element to the end of this list.
  10. *
  11. * @param e element to be appended to this list
  12. * @return {@code true} (as specified by {@link Collection#add})
  13. */
  14. public boolean add(E e) {
  15. final ReentrantLock lock = this.lock;
  16. lock.lock();
  17. try {
  18. Object[] elements = getArray();
  19. int len = elements.length;
  20. Object[] newElements = Arrays.copyOf(elements, len + 1);
  21. newElements[len] = e;
  22. setArray(newElements);
  23. return true;
  24. } finally {
  25. lock.unlock();
  26. }
  27. }
  28. }

从CopyOnWriteArrayList 源码中可以看到,使用锁 ReentrantLock 进行控制。
在企业中开发多线程时推荐使用Lock锁。(ReentrantLock 是 Lock的实现类)

实例运用

场景:从kafka中间件中读取到内容以后,需要先向mysql 中保存一份,在向es中保存一份,最后在日志中记录一份。(使用并发并有顺序保存数据)

  1. package com.eagle.demo;
  2. import java.util.concurrent.locks.Condition;
  3. import java.util.concurrent.locks.ReentrantLock;
  4. public class LockDemo {
  5. public static void main(String[] args) {
  6. SaveData saveData = new SaveData();
  7. new Thread(() -> {
  8. for (int i = 0; i < 10; i++) {
  9. saveData.sendSql();
  10. }
  11. }, "A").start();
  12. new Thread(() -> {
  13. for (int i = 0; i < 10; i++) {
  14. saveData.sendEs();
  15. }
  16. }, "B").start();
  17. new Thread(() -> {
  18. for (int i = 0; i < 10; i++) {
  19. saveData.sendLog();
  20. }
  21. }, "C").start();
  22. }
  23. }
  24. class SaveData {
  25. // 1 存数据库 2存es 3log记录
  26. private int flag = 1;
  27. private ReentrantLock lock = new ReentrantLock();
  28. private Condition condition1 = lock.newCondition();
  29. private Condition condition2 = lock.newCondition();
  30. private Condition condition3 = lock.newCondition();
  31. public void sendSql() {
  32. lock.lock();
  33. try {
  34. while (flag != 1) {
  35. condition1.await();
  36. }
  37. System.out.println("线程号:" + Thread.currentThread().getName() + "---向 数据库 发送数据");
  38. flag = 2;
  39. condition2.signal();
  40. } catch (Exception e) {
  41. e.printStackTrace();
  42. } finally {
  43. lock.unlock();
  44. }
  45. }
  46. public void sendEs() {
  47. lock.lock();
  48. try {
  49. while (flag != 2) {
  50. condition2.await();
  51. }
  52. System.out.println("线程号:" + Thread.currentThread().getName() + "---向 ES 发送数据");
  53. flag = 3;
  54. condition3.signal();
  55. } catch (Exception e) {
  56. e.printStackTrace();
  57. } finally {
  58. lock.unlock();
  59. }
  60. }
  61. public void sendLog() {
  62. lock.lock();
  63. try {
  64. while (flag != 3) {
  65. condition3.await();
  66. }
  67. System.out.println("线程号:" + Thread.currentThread().getName() + "---向 log 发送数据");
  68. flag = 1;
  69. condition1.signal();
  70. } catch (Exception e) {
  71. e.printStackTrace();
  72. } finally {
  73. lock.unlock();
  74. }
  75. }
  76. }

Callable 使用

项目中没有见过使用的

Callable 只是一个接口,所以在创建线程时肯定需要包装类,即 FutureTask(实现了Runnable接口)

  1. package com.eagle.demo;
  2. import java.util.concurrent.Callable;
  3. import java.util.concurrent.ExecutionException;
  4. import java.util.concurrent.FutureTask;
  5. public class CallableDemo {
  6. public static void main(String[] args) throws ExecutionException, InterruptedException {
  7. FutureTask<Integer> futureTask = new FutureTask<>(new CallableCal());
  8. new Thread(futureTask, "AAA").start();
  9. int ss=futureTask.get();
  10. System.out.println(ss);
  11. }
  12. }
  13. class CallableCal implements Callable<Integer> {
  14. @Override
  15. public Integer call() throws Exception {
  16. return 1024 * 1024;
  17. }
  18. }