7.1 纲要
- 使用Parealle类
- 并行化LINQ查询
- 调整PLINQ查询的参数
- 处理PLINQ查询中的异常
- 管理PLINQ查询中的数据分区
- 为PLINQ查询创建一个自定义的聚合器
发展历史:.NET Framework库中有个子集叫做并行库,并行框架扩展(Parallel Framework Extensions PFX),并行库随着.NET 4.0 一起发布,包含三大主要部分:
- TPL 任务并行库 Task、Task.ContinueWait、CancellationTokenSource、Task.WhenAll、Task.WhenAny、TaskScheduler同步上下文、异步函数async await等
- 并发集合 ConcurrentDictionary、ConcurrentBag(相当于List)、ConcurrentQueue、ConcurrentStack、BlockingCollection
- 并行LINQ(PLINQ):结构并行(structured parallelism)
7.2 Parallel
Parellel.ForEach(items,ParallelOptions{CancellationToken,MaxDegreeOfParallelism最大并发数,TaskScheduler调度器},Action(item,state));
Enumerable.Range(int a,int b)生成a-b的整数序列
namespace PLINQExample {
class Program {
static void Main(string[] args) {
//尽可能的并行执行提供的每个操作
Parallel.Invoke(
() => EmulateProcessing("Task1"),
() => EmulateProcessing("Task2"),
() => EmulateProcessing("Task3")
);
//Enumerable.Range(int a,int b) 生成[a,b]范围内的整数序列
//foreach (var item in Enumerable.Range(1,20)) {
// Console.WriteLine(item);
//}
var cts = new CancellationTokenSource();
var result = Parallel.ForEach(Enumerable.Range(1, 30), new ParallelOptions {
CancellationToken = cts.Token,
//设置当前最大的并发数
MaxDegreeOfParallelism = Environment.ProcessorCount,//当前计算机处理器的数量
TaskScheduler = TaskScheduler.Default//默认从线程池调度器
},
(item, state) => {
Console.WriteLine(item);
if(item == 20) {
state.Break();
Console.WriteLine($"Loop is stopped:{state.IsStopped}");
}
});
Console.WriteLine("==============");
Console.WriteLine($"IsCompleted :{result.IsCompleted}");
Console.WriteLine($"Lowest break iteration: {result.LowestBreakIteration}");
}
static string EmulateProcessing(string taskName) {
Sleep(new Random(DateTime.Now.Millisecond).Next(250, 350));
Console.WriteLine($"{taskName} task was processed on a" +
$"thread id {CurrentThread.ManagedThreadId}");
return taskName;
}
}
}
7.3 并行化LINQ查询
IEnumerable.AsParallel()并行计算;
IEnumerable.ForAll(方法名)将枚举中的参数并行带入到方法中执行。
namespace ParallelLINQ {
class Program {
static void Main(string[] args) {
var sw = new Stopwatch(); //监测运行时间
sw.Start();
var query = from t in GetTypes() //从枚举中迭代查询
select EmulateProcessing(t); //为了打印出来,我也是醉了
foreach (var item in query) {
PrintInfo(item);
}
sw.Stop();
Console.WriteLine($"======普通foreach查询时间{sw.ElapsedMilliseconds}=====");
sw.Restart();
//GetTypes()方法的并行执行,在不同的线程中运行
var parallelQuery = from t in GetTypes().AsParallel() //启用查询的并行化 System.Collections.IEnumerable.AsParallel
select EmulateProcessing(t);
foreach (var item in parallelQuery) {
PrintInfo(item);
}
sw.Stop();
Console.WriteLine($"=============并行查询耗时{sw.ElapsedMilliseconds}=======");
sw.Restart();
//GetTypes()方法的并行执行,在不同的线程中运行
parallelQuery = from t in GetTypes().AsParallel() //启用查询的并行化 System.Collections.IEnumerable.AsParallel
select EmulateProcessing(t);
parallelQuery.ForAll(PrintInfo); //每个元素并行调用PrintInfo方法
sw.Stop();
Console.WriteLine($"=============并行查询且调用耗时{sw.ElapsedMilliseconds}=======");
//此例子和第一种foreach完全一样
sw.Restart();
query = from t in GetTypes().AsParallel().AsSequential() //从ParallelQuery<TSource>转换为IEnumerable<out T>
select EmulateProcessing(t);
foreach (var item in query) {
PrintInfo(item);
}
sw.Stop();
Console.WriteLine($"======顺序并行查询查询时间{sw.ElapsedMilliseconds}=====");
}
static void PrintInfo(string typeName) {
Sleep(TimeSpan.FromMilliseconds(150));
Console.WriteLine($"{typeName} type was processed on a thread" +
$"id {CurrentThread.ManagedThreadId}");
}
static string EmulateProcessing(string typeName) {
Sleep(TimeSpan.FromMilliseconds(150));
Console.WriteLine($"{typeName} type was processed on a thread" +
$"id {CurrentThread.ManagedThreadId}");
return typeName;
}
static IEnumerable<string> GetTypes() {
//LINQ表达式 从GetAssemblies()程序集中获取公共类型的名字以Web开头,返回type.Name
return from assembly in AppDomain.CurrentDomain.GetAssemblies()
from type in assembly.GetExportedTypes()
where type.Name.StartsWith("Web")
select type.Name; //生成名字枚举
}
}
}
7.4 调整PLINQ查询的参数
namespace PLINQParameter {
class Program {
static void Main(string[] args) {
var parallelQuery = from typeName in GetTypes().AsParallel()
select typeName;
var cts = new CancellationTokenSource();
try {
parallelQuery.WithDegreeOfParallelism(Environment.ProcessorCount)
.WithExecutionMode(ParallelExecutionMode.ForceParallelism) //强制并行执行
.WithMergeOptions(ParallelMergeOptions.Default) //合并选项,默认缓存一定数量的结果
.ForAll(Console.WriteLine);
}
catch (Exception ex) {
throw ex;
}
}
static string EmulateProcessing(string typeName) {
Thread.Sleep(TimeSpan.FromMilliseconds(200));
return typeName;
}
static IEnumerable<string> GetTypes() {
return from item in AppDomain.CurrentDomain.GetAssemblies()
from type in item.GetExportedTypes()
where type.Name.Contains("Web")
orderby type.Name.Length
select type.Name;
}
}
}
7.5 PLINQ聚合异常处理
处理集合异常,通过捕捉AggregateException e e.Flatten().Handle(ex=>{ xxx;return true; return false})
namespace PLINQTryCatch {
class Program {
static void Main(string[] args) {
IEnumerable<int> numbers = Enumerable.Range(-5, 10);
var query = from number in numbers
select 100 / number;
try {
foreach (var item in query) {
Console.WriteLine(item);
}
}
catch (DivideByZeroException) { //
Console.WriteLine("Divide by zero");
}
Console.WriteLine("================");
Console.WriteLine("Sequential LINQ query processing");
Console.WriteLine();
//聚合异常
var queryParallel = from number in numbers.AsParallel()
select 100 / number;
try {
queryParallel.ForAll(Console.WriteLine);
}
catch (AggregateException e) {
e.Flatten().Handle(ex => {
if(ex is DivideByZeroException) {
Console.WriteLine("Divide by zero");
return true;
}
return false;
});
}
}
}
}