- ConcurrentDictionary:线程安全的字典集合实现,读操作无需使用锁,对写操作则需要锁。参数concurrentLevel可以在构造函数中定义锁的数量,预估的线程数量将并发地更新该字典。如果没有必要尽量避免使用Count\IsEmpty\Keys\Values\CopyTo\ToArray,因为一些操作需要获取该字典中的所有锁。
- ConcurrentQueue实现异步处理:该集合使用了CAP(原子的比较和交换Compare and Swap)以及SpinWait来保证线程安全。实现了FIFO集合。Enqueue方法向队列中加元素,TryQueue尝试取出第一个元素,TryPeek试图得到第一个元素但并不从队列中删除该元素。
- ConcurrentStack异步处理顺序:实现了LIFO集合后进先出(Last in first output),可以使用Push和PushRange添加元素,使用TryPop和TryPopRange方法获取元素,TryPeek方法检查元素。
- ConcurrentBag创建一个可扩展的爬虫:支持重复元素的无序集合,添加元素使用Add方法,检查元素使用TryPeek方法,获取元素使用TryTake。
注意事项:尽量避免使用上述集合的Count属性,时间复杂度为O(N)。如果想要检查集合是否为空,请使用IsEmpty属性,时间复杂度为O(1)。因为他们是链表实现的。
BlockingCollection进行异步处理:对IProducerConsumerCollection泛型接口实现的一个高级封装,用于实现管道场景,即当有一些步骤需要使用之前步骤运行的结果时BlockingCollection类支持如下功能:分块、调整内部集合容量、取消集合操作、从多个块集合中获取元素。
6.1 ConcurrentDictionary(细粒度锁fine-grained locking)
使用场景:
多线程访问只读元素:Dictionary或ReadOnlyDictionary集合
对字典需要大量线程的安全读操作:ConcurrentDictionary(大量线程读操作性能会更好)namespace ConcurrentDictionaryExample {
class Program {
const string Item = "Dictionary Item";
const int Iterations = 1000000;
public static string CurrentItem;
static void Main(string[] args) {
var concurrentDictionary = new ConcurrentDictionary<int, string>();
var dictionary = new Dictionary<int, string>();
var sw = new Stopwatch();
sw.Start();
for (int i = 0; i < Iterations; i++) {
lock (dictionary) {
dictionary[i] = Item;
}
}
sw.Stop();
Console.WriteLine($"Writing to dictionary with a lock:{sw.Elapsed}");
sw.Restart();
for (int i = 0; i < Iterations; i++) {
concurrentDictionary[i] = Item;
}
sw.Stop();
Console.WriteLine($"Writing to a concurrent dictionary:{sw.Elapsed}");
sw.Restart();
for (int i = 0; i < Iterations; i++) {
lock (dictionary) {
CurrentItem = dictionary[i];
}
}
sw.Stop();
Console.WriteLine($"Reading from a dictionary:{sw.Elapsed}");
sw.Restart();
for (int i = 0; i < Iterations; i++) {
CurrentItem = concurrentDictionary[i];
}
sw.Stop();
Console.WriteLine($"Reading from a concurrent dictionary:{sw.Elapsed}");
Console.ReadLine();
}
}
}
6.2 使用ConcurrentQueue实现异步处理
并行队列,异步按FIFO顺序取出元素。 ```csharp using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Collections.Concurrent; using System.Threading;
namespace ConcurrentQueueExample { class Program { static void Main(string[] args) { Task t = RunProgram(); t.Wait(); Console.ReadLine(); }
static async Task RunProgram() {
var taskQueue = new ConcurrentQueue<CustomTask>(); //创建一个并行队列,队列里面存放的对象为CustomTask类型
var cts = new CancellationTokenSource(); //Task取消选项CancellationTokenSource.Token
var taskSource = Task.Run(() => TaskProducer(taskQueue)); //执行TaskProcucer方法,将并行队列传入进去
//Task数组(可要可不要,可以直接Task.Run()执行,开启四个Task,执行出队操作
Task[] processors = new Task[4];
for (int i = 1; i <= 4; i++) {
string processorID = i.ToString();
processors[i - 1] = Task.Run(() => TaskProcessor(taskQueue,$"Processor" +
$" {processorID}",cts.Token));
}
}
//并行队列FIFO,元素入队列,CustomTask Enqueue入队,元素个数20个
static async Task TaskProducer(ConcurrentQueue<CustomTask> queue) {
for (int i = 1; i <= 20; i++) {
await Task.Delay(50);
var workItem = new CustomTask { ID = i };
queue.Enqueue(workItem);
Console.WriteLine($"{workItem.ID} has been enqueued");
}
}
//出队操作,将队列传入进来,以及线程别名name,是否取消标志
//TryDequeue尝试取出首元素,并带out参数(取出的元素)
static async Task TaskProcessor(ConcurrentQueue<CustomTask> queue,string name,CancellationToken token) {
CustomTask workItem;
bool dequeueSuccessful = false;
await GetRandomDelay();
do {
dequeueSuccessful = queue.TryDequeue(out workItem);
if (dequeueSuccessful) {
Console.WriteLine($"元素 {workItem.ID} has been dequeued by {name}");
}
await GetRandomDelay();
}
while (!token.IsCancellationRequested);
}
static Task GetRandomDelay() {
int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
return Task.Delay(delay);
}
}
class CustomTask {
public int ID { get; set; }
}
}
<a name="CxuPT"></a>
# 6.3 改变ConcurrentStack异步处理顺序
并发栈,Push压栈 TryPop出栈 TryPeek查看一下但不删除元素。适用于多任务,早先创建的任务具有较低优先级。
```csharp
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace ConcurrentStackExample {
class Program {
// Demonstrates:
// ConcurrentStack<T>.Push();
// ConcurrentStack<T>.TryPeek();
// ConcurrentStack<T>.TryPop();
// ConcurrentStack<T>.Clear();
// ConcurrentStack<T>.IsEmpty;
static async Task Main() {
int items = 10000;
ConcurrentStack<int> stack = new ConcurrentStack<int>();
// Create an action to push items onto the stack
Action pusher = () =>
{
for (int i = 0; i < items; i++) {
stack.Push(i);
}
};
// Run the action once
pusher();
if (stack.TryPeek(out int result)) {
Console.WriteLine($"TryPeek() saw {result} on top of the stack.");
}
else {
Console.WriteLine("Could not peek most recently added number.");
}
// Empty the stack
stack.Clear();
if (stack.IsEmpty) {
Console.WriteLine("Cleared the stack.");
}
// Create an action to push and pop items
Action pushAndPop = () =>
{
Console.WriteLine($"Task started on {Task.CurrentId}");
int item;
for (int i = 0; i < items; i++)
stack.Push(i);
for (int i = 0; i < items; i++)
stack.TryPop(out item);
Console.WriteLine($"Task ended on {Task.CurrentId}");
};
// Spin up five concurrent tasks of the action
var tasks = new Task[5];
for (int i = 0; i < tasks.Length; i++)
tasks[i] = Task.Factory.StartNew(pushAndPop);
// Wait for all the tasks to finish up
await Task.WhenAll(tasks);
if (!stack.IsEmpty) {
Console.WriteLine("Did not take all the items off the stack");
}
}
}
}
6.4 ConcurrentBag 可选泛型集合 ConcurrentStack,默认为并发队列模式
可以替代List实现并发无序集合,用于生产者消费者模式。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Threading;
namespace BlockingCollectionExample {
class Program {
static void Main(string[] args) {
Task t = RunProgram();
t.Wait();
t = RunProgram(new ConcurrentStack<CustomTask>());
t.Wait();
}
static async Task RunProgram(IProducerConsumerCollection<CustomTask> collection= null) {
var taskCollection = new BlockingCollection<CustomTask>();
if (collection != null) {
taskCollection = new BlockingCollection<CustomTask>(collection);
}
var taskSource = Task.Run(() => TaskProducer(taskCollection));
Task[] taskArray = new Task[4];
for (int i = 0; i < 4; i++) {
string processorID = $"Processer{i}";
taskArray[i] = Task.Run(() => TaskProcessor(taskCollection, processorID));
}
await taskSource;
await Task.WhenAll(taskArray);
}
static async Task TaskProducer(BlockingCollection<CustomTask> collection) {
await GetRandomDelay();
for (int i = 0; i < 20; i++) {
await Task.Delay(20);
var workItem = new CustomTask() { Id = i };
collection.Add(workItem);
}
collection.CompleteAdding();//迭代周期结束,停止向集合中填充数据
}
static async Task TaskProcessor(BlockingCollection<CustomTask> collection, string name) {
await GetRandomDelay();
// 使用GetConsumingEnumerable方法获取工作项
foreach (CustomTask item in collection.GetConsumingEnumerable()) {
Console.WriteLine($"Task {item.Id} has been processed by {name}");
await GetRandomDelay();
}
}
static Task GetRandomDelay() {
int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
return Task.Delay(delay);
}
private class CustomTask {
public int Id { get; set; }
}
}
}