- ConcurrentDictionary:线程安全的字典集合实现,读操作无需使用锁,对写操作则需要锁。参数concurrentLevel可以在构造函数中定义锁的数量,预估的线程数量将并发地更新该字典。如果没有必要尽量避免使用Count\IsEmpty\Keys\Values\CopyTo\ToArray,因为一些操作需要获取该字典中的所有锁。
- ConcurrentQueue实现异步处理:该集合使用了CAP(原子的比较和交换Compare and Swap)以及SpinWait来保证线程安全。实现了FIFO集合。Enqueue方法向队列中加元素,TryQueue尝试取出第一个元素,TryPeek试图得到第一个元素但并不从队列中删除该元素。
- ConcurrentStack异步处理顺序:实现了LIFO集合后进先出(Last in first output),可以使用Push和PushRange添加元素,使用TryPop和TryPopRange方法获取元素,TryPeek方法检查元素。
- ConcurrentBag创建一个可扩展的爬虫:支持重复元素的无序集合,添加元素使用Add方法,检查元素使用TryPeek方法,获取元素使用TryTake。
注意事项:尽量避免使用上述集合的Count属性,时间复杂度为O(N)。如果想要检查集合是否为空,请使用IsEmpty属性,时间复杂度为O(1)。因为他们是链表实现的。
BlockingCollection进行异步处理:对IProducerConsumerCollection泛型接口实现的一个高级封装,用于实现管道场景,即当有一些步骤需要使用之前步骤运行的结果时BlockingCollection类支持如下功能:分块、调整内部集合容量、取消集合操作、从多个块集合中获取元素。
6.1 ConcurrentDictionary(细粒度锁fine-grained locking)
使用场景:
多线程访问只读元素:Dictionary或ReadOnlyDictionary集合
对字典需要大量线程的安全读操作:ConcurrentDictionary(大量线程读操作性能会更好)namespace ConcurrentDictionaryExample {class Program {const string Item = "Dictionary Item";const int Iterations = 1000000;public static string CurrentItem;static void Main(string[] args) {var concurrentDictionary = new ConcurrentDictionary<int, string>();var dictionary = new Dictionary<int, string>();var sw = new Stopwatch();sw.Start();for (int i = 0; i < Iterations; i++) {lock (dictionary) {dictionary[i] = Item;}}sw.Stop();Console.WriteLine($"Writing to dictionary with a lock:{sw.Elapsed}");sw.Restart();for (int i = 0; i < Iterations; i++) {concurrentDictionary[i] = Item;}sw.Stop();Console.WriteLine($"Writing to a concurrent dictionary:{sw.Elapsed}");sw.Restart();for (int i = 0; i < Iterations; i++) {lock (dictionary) {CurrentItem = dictionary[i];}}sw.Stop();Console.WriteLine($"Reading from a dictionary:{sw.Elapsed}");sw.Restart();for (int i = 0; i < Iterations; i++) {CurrentItem = concurrentDictionary[i];}sw.Stop();Console.WriteLine($"Reading from a concurrent dictionary:{sw.Elapsed}");Console.ReadLine();}}}
6.2 使用ConcurrentQueue实现异步处理
并行队列,异步按FIFO顺序取出元素。 ```csharp using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Collections.Concurrent; using System.Threading;
namespace ConcurrentQueueExample { class Program { static void Main(string[] args) { Task t = RunProgram(); t.Wait(); Console.ReadLine(); }
static async Task RunProgram() {var taskQueue = new ConcurrentQueue<CustomTask>(); //创建一个并行队列,队列里面存放的对象为CustomTask类型var cts = new CancellationTokenSource(); //Task取消选项CancellationTokenSource.Tokenvar taskSource = Task.Run(() => TaskProducer(taskQueue)); //执行TaskProcucer方法,将并行队列传入进去//Task数组(可要可不要,可以直接Task.Run()执行,开启四个Task,执行出队操作Task[] processors = new Task[4];for (int i = 1; i <= 4; i++) {string processorID = i.ToString();processors[i - 1] = Task.Run(() => TaskProcessor(taskQueue,$"Processor" +$" {processorID}",cts.Token));}}//并行队列FIFO,元素入队列,CustomTask Enqueue入队,元素个数20个static async Task TaskProducer(ConcurrentQueue<CustomTask> queue) {for (int i = 1; i <= 20; i++) {await Task.Delay(50);var workItem = new CustomTask { ID = i };queue.Enqueue(workItem);Console.WriteLine($"{workItem.ID} has been enqueued");}}//出队操作,将队列传入进来,以及线程别名name,是否取消标志//TryDequeue尝试取出首元素,并带out参数(取出的元素)static async Task TaskProcessor(ConcurrentQueue<CustomTask> queue,string name,CancellationToken token) {CustomTask workItem;bool dequeueSuccessful = false;await GetRandomDelay();do {dequeueSuccessful = queue.TryDequeue(out workItem);if (dequeueSuccessful) {Console.WriteLine($"元素 {workItem.ID} has been dequeued by {name}");}await GetRandomDelay();}while (!token.IsCancellationRequested);}static Task GetRandomDelay() {int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);return Task.Delay(delay);}}class CustomTask {public int ID { get; set; }}
}
<a name="CxuPT"></a># 6.3 改变ConcurrentStack异步处理顺序并发栈,Push压栈 TryPop出栈 TryPeek查看一下但不删除元素。适用于多任务,早先创建的任务具有较低优先级。```csharpusing System;using System.Collections.Concurrent;using System.Threading.Tasks;namespace ConcurrentStackExample {class Program {// Demonstrates:// ConcurrentStack<T>.Push();// ConcurrentStack<T>.TryPeek();// ConcurrentStack<T>.TryPop();// ConcurrentStack<T>.Clear();// ConcurrentStack<T>.IsEmpty;static async Task Main() {int items = 10000;ConcurrentStack<int> stack = new ConcurrentStack<int>();// Create an action to push items onto the stackAction pusher = () =>{for (int i = 0; i < items; i++) {stack.Push(i);}};// Run the action oncepusher();if (stack.TryPeek(out int result)) {Console.WriteLine($"TryPeek() saw {result} on top of the stack.");}else {Console.WriteLine("Could not peek most recently added number.");}// Empty the stackstack.Clear();if (stack.IsEmpty) {Console.WriteLine("Cleared the stack.");}// Create an action to push and pop itemsAction pushAndPop = () =>{Console.WriteLine($"Task started on {Task.CurrentId}");int item;for (int i = 0; i < items; i++)stack.Push(i);for (int i = 0; i < items; i++)stack.TryPop(out item);Console.WriteLine($"Task ended on {Task.CurrentId}");};// Spin up five concurrent tasks of the actionvar tasks = new Task[5];for (int i = 0; i < tasks.Length; i++)tasks[i] = Task.Factory.StartNew(pushAndPop);// Wait for all the tasks to finish upawait Task.WhenAll(tasks);if (!stack.IsEmpty) {Console.WriteLine("Did not take all the items off the stack");}}}}
6.4 ConcurrentBag 可选泛型集合 ConcurrentStack,默认为并发队列模式
可以替代List实现并发无序集合,用于生产者消费者模式。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Threading;
namespace BlockingCollectionExample {
class Program {
static void Main(string[] args) {
Task t = RunProgram();
t.Wait();
t = RunProgram(new ConcurrentStack<CustomTask>());
t.Wait();
}
static async Task RunProgram(IProducerConsumerCollection<CustomTask> collection= null) {
var taskCollection = new BlockingCollection<CustomTask>();
if (collection != null) {
taskCollection = new BlockingCollection<CustomTask>(collection);
}
var taskSource = Task.Run(() => TaskProducer(taskCollection));
Task[] taskArray = new Task[4];
for (int i = 0; i < 4; i++) {
string processorID = $"Processer{i}";
taskArray[i] = Task.Run(() => TaskProcessor(taskCollection, processorID));
}
await taskSource;
await Task.WhenAll(taskArray);
}
static async Task TaskProducer(BlockingCollection<CustomTask> collection) {
await GetRandomDelay();
for (int i = 0; i < 20; i++) {
await Task.Delay(20);
var workItem = new CustomTask() { Id = i };
collection.Add(workItem);
}
collection.CompleteAdding();//迭代周期结束,停止向集合中填充数据
}
static async Task TaskProcessor(BlockingCollection<CustomTask> collection, string name) {
await GetRandomDelay();
// 使用GetConsumingEnumerable方法获取工作项
foreach (CustomTask item in collection.GetConsumingEnumerable()) {
Console.WriteLine($"Task {item.Id} has been processed by {name}");
await GetRandomDelay();
}
}
static Task GetRandomDelay() {
int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
return Task.Delay(delay);
}
private class CustomTask {
public int Id { get; set; }
}
}
}
