• ConcurrentDictionary:线程安全的字典集合实现,读操作无需使用锁,对写操作则需要锁。参数concurrentLevel可以在构造函数中定义锁的数量,预估的线程数量将并发地更新该字典。如果没有必要尽量避免使用Count\IsEmpty\Keys\Values\CopyTo\ToArray,因为一些操作需要获取该字典中的所有锁。
  • ConcurrentQueue实现异步处理:该集合使用了CAP(原子的比较和交换Compare and Swap)以及SpinWait来保证线程安全。实现了FIFO集合Enqueue方法向队列中加元素,TryQueue尝试取出第一个元素,TryPeek试图得到第一个元素但并不从队列中删除该元素。
  • ConcurrentStack异步处理顺序:实现了LIFO集合后进先出(Last in first output),可以使用Push和PushRange添加元素,使用TryPop和TryPopRange方法获取元素,TryPeek方法检查元素。
  • ConcurrentBag创建一个可扩展的爬虫:支持重复元素的无序集合,添加元素使用Add方法,检查元素使用TryPeek方法,获取元素使用TryTake。

注意事项:尽量避免使用上述集合的Count属性,时间复杂度为O(N)。如果想要检查集合是否为空,请使用IsEmpty属性,时间复杂度为O(1)。因为他们是链表实现的。

  • BlockingCollection进行异步处理:对IProducerConsumerCollection泛型接口实现的一个高级封装,用于实现管道场景,即当有一些步骤需要使用之前步骤运行的结果时BlockingCollection类支持如下功能:分块、调整内部集合容量、取消集合操作、从多个块集合中获取元素。

    6.1 ConcurrentDictionary(细粒度锁fine-grained locking)

    使用场景:
    多线程访问只读元素:Dictionary或ReadOnlyDictionary集合
    对字典需要大量线程的安全读操作:ConcurrentDictionary(大量线程读操作性能会更好)

    1. namespace ConcurrentDictionaryExample {
    2. class Program {
    3. const string Item = "Dictionary Item";
    4. const int Iterations = 1000000;
    5. public static string CurrentItem;
    6. static void Main(string[] args) {
    7. var concurrentDictionary = new ConcurrentDictionary<int, string>();
    8. var dictionary = new Dictionary<int, string>();
    9. var sw = new Stopwatch();
    10. sw.Start();
    11. for (int i = 0; i < Iterations; i++) {
    12. lock (dictionary) {
    13. dictionary[i] = Item;
    14. }
    15. }
    16. sw.Stop();
    17. Console.WriteLine($"Writing to dictionary with a lock:{sw.Elapsed}");
    18. sw.Restart();
    19. for (int i = 0; i < Iterations; i++) {
    20. concurrentDictionary[i] = Item;
    21. }
    22. sw.Stop();
    23. Console.WriteLine($"Writing to a concurrent dictionary:{sw.Elapsed}");
    24. sw.Restart();
    25. for (int i = 0; i < Iterations; i++) {
    26. lock (dictionary) {
    27. CurrentItem = dictionary[i];
    28. }
    29. }
    30. sw.Stop();
    31. Console.WriteLine($"Reading from a dictionary:{sw.Elapsed}");
    32. sw.Restart();
    33. for (int i = 0; i < Iterations; i++) {
    34. CurrentItem = concurrentDictionary[i];
    35. }
    36. sw.Stop();
    37. Console.WriteLine($"Reading from a concurrent dictionary:{sw.Elapsed}");
    38. Console.ReadLine();
    39. }
    40. }
    41. }

    image.png

    6.2 使用ConcurrentQueue实现异步处理

    并行队列,异步按FIFO顺序取出元素。 ```csharp using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Collections.Concurrent; using System.Threading;

namespace ConcurrentQueueExample { class Program { static void Main(string[] args) { Task t = RunProgram(); t.Wait(); Console.ReadLine(); }

  1. static async Task RunProgram() {
  2. var taskQueue = new ConcurrentQueue<CustomTask>(); //创建一个并行队列,队列里面存放的对象为CustomTask类型
  3. var cts = new CancellationTokenSource(); //Task取消选项CancellationTokenSource.Token
  4. var taskSource = Task.Run(() => TaskProducer(taskQueue)); //执行TaskProcucer方法,将并行队列传入进去
  5. //Task数组(可要可不要,可以直接Task.Run()执行,开启四个Task,执行出队操作
  6. Task[] processors = new Task[4];
  7. for (int i = 1; i <= 4; i++) {
  8. string processorID = i.ToString();
  9. processors[i - 1] = Task.Run(() => TaskProcessor(taskQueue,$"Processor" +
  10. $" {processorID}",cts.Token));
  11. }
  12. }
  13. //并行队列FIFO,元素入队列,CustomTask Enqueue入队,元素个数20个
  14. static async Task TaskProducer(ConcurrentQueue<CustomTask> queue) {
  15. for (int i = 1; i <= 20; i++) {
  16. await Task.Delay(50);
  17. var workItem = new CustomTask { ID = i };
  18. queue.Enqueue(workItem);
  19. Console.WriteLine($"{workItem.ID} has been enqueued");
  20. }
  21. }
  22. //出队操作,将队列传入进来,以及线程别名name,是否取消标志
  23. //TryDequeue尝试取出首元素,并带out参数(取出的元素)
  24. static async Task TaskProcessor(ConcurrentQueue<CustomTask> queue,string name,CancellationToken token) {
  25. CustomTask workItem;
  26. bool dequeueSuccessful = false;
  27. await GetRandomDelay();
  28. do {
  29. dequeueSuccessful = queue.TryDequeue(out workItem);
  30. if (dequeueSuccessful) {
  31. Console.WriteLine($"元素 {workItem.ID} has been dequeued by {name}");
  32. }
  33. await GetRandomDelay();
  34. }
  35. while (!token.IsCancellationRequested);
  36. }
  37. static Task GetRandomDelay() {
  38. int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
  39. return Task.Delay(delay);
  40. }
  41. }
  42. class CustomTask {
  43. public int ID { get; set; }
  44. }

}

  1. <a name="CxuPT"></a>
  2. # 6.3 改变ConcurrentStack异步处理顺序
  3. 并发栈,Push压栈 TryPop出栈 TryPeek查看一下但不删除元素。适用于多任务,早先创建的任务具有较低优先级。
  4. ```csharp
  5. using System;
  6. using System.Collections.Concurrent;
  7. using System.Threading.Tasks;
  8. namespace ConcurrentStackExample {
  9. class Program {
  10. // Demonstrates:
  11. // ConcurrentStack<T>.Push();
  12. // ConcurrentStack<T>.TryPeek();
  13. // ConcurrentStack<T>.TryPop();
  14. // ConcurrentStack<T>.Clear();
  15. // ConcurrentStack<T>.IsEmpty;
  16. static async Task Main() {
  17. int items = 10000;
  18. ConcurrentStack<int> stack = new ConcurrentStack<int>();
  19. // Create an action to push items onto the stack
  20. Action pusher = () =>
  21. {
  22. for (int i = 0; i < items; i++) {
  23. stack.Push(i);
  24. }
  25. };
  26. // Run the action once
  27. pusher();
  28. if (stack.TryPeek(out int result)) {
  29. Console.WriteLine($"TryPeek() saw {result} on top of the stack.");
  30. }
  31. else {
  32. Console.WriteLine("Could not peek most recently added number.");
  33. }
  34. // Empty the stack
  35. stack.Clear();
  36. if (stack.IsEmpty) {
  37. Console.WriteLine("Cleared the stack.");
  38. }
  39. // Create an action to push and pop items
  40. Action pushAndPop = () =>
  41. {
  42. Console.WriteLine($"Task started on {Task.CurrentId}");
  43. int item;
  44. for (int i = 0; i < items; i++)
  45. stack.Push(i);
  46. for (int i = 0; i < items; i++)
  47. stack.TryPop(out item);
  48. Console.WriteLine($"Task ended on {Task.CurrentId}");
  49. };
  50. // Spin up five concurrent tasks of the action
  51. var tasks = new Task[5];
  52. for (int i = 0; i < tasks.Length; i++)
  53. tasks[i] = Task.Factory.StartNew(pushAndPop);
  54. // Wait for all the tasks to finish up
  55. await Task.WhenAll(tasks);
  56. if (!stack.IsEmpty) {
  57. Console.WriteLine("Did not take all the items off the stack");
  58. }
  59. }
  60. }
  61. }

6.4 ConcurrentBag 可选泛型集合 ConcurrentStack,默认为并发队列模式

可以替代List实现并发无序集合,用于生产者消费者模式。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Threading;

namespace BlockingCollectionExample {
    class Program {
        static void Main(string[] args) {
            Task t = RunProgram();
            t.Wait();

            t = RunProgram(new ConcurrentStack<CustomTask>());
            t.Wait();

        }

        static async Task RunProgram(IProducerConsumerCollection<CustomTask> collection= null) {
            var taskCollection = new BlockingCollection<CustomTask>();
            if (collection != null) {
                taskCollection = new BlockingCollection<CustomTask>(collection);
            }

            var taskSource = Task.Run(() => TaskProducer(taskCollection));
            Task[] taskArray = new Task[4];
            for (int i = 0; i < 4; i++) {
                string processorID = $"Processer{i}";
                taskArray[i] = Task.Run(() => TaskProcessor(taskCollection, processorID));
            }

            await taskSource;
            await Task.WhenAll(taskArray);

        }

        static async Task TaskProducer(BlockingCollection<CustomTask> collection) {
            await GetRandomDelay();
            for (int i = 0; i < 20; i++) {
                await Task.Delay(20);
                var workItem = new CustomTask() { Id = i };
                collection.Add(workItem);
            }
            collection.CompleteAdding();//迭代周期结束,停止向集合中填充数据
        }

        static async Task TaskProcessor(BlockingCollection<CustomTask> collection, string name) {
            await GetRandomDelay();       
            // 使用GetConsumingEnumerable方法获取工作项
            foreach (CustomTask item in collection.GetConsumingEnumerable()) {
                Console.WriteLine($"Task {item.Id} has been processed by {name}");
                await GetRandomDelay();
            }
        }


        static Task GetRandomDelay() {
            int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
            return Task.Delay(delay);
        }

        private class CustomTask {
            public int Id { get; set; }
        }
    }
}