7.1 纲要

  • 使用Parealle类
  • 并行化LINQ查询
  • 调整PLINQ查询的参数
  • 处理PLINQ查询中的异常
  • 管理PLINQ查询中的数据分区
  • 为PLINQ查询创建一个自定义的聚合器

发展历史:.NET Framework库中有个子集叫做并行库,并行框架扩展(Parallel Framework Extensions PFX),并行库随着.NET 4.0 一起发布,包含三大主要部分:

  1. TPL 任务并行库 Task、Task.ContinueWait、CancellationTokenSource、Task.WhenAll、Task.WhenAny、TaskScheduler同步上下文、异步函数async await等
  2. 并发集合 ConcurrentDictionary、ConcurrentBag(相当于List)、ConcurrentQueue、ConcurrentStack、BlockingCollection
  3. 并行LINQ(PLINQ):结构并行(structured parallelism)

7.2 Parallel

Parellel.ForEach(items,ParallelOptions{CancellationToken,MaxDegreeOfParallelism最大并发数,TaskScheduler调度器},Action(item,state));
Enumerable.Range(int a,int b)生成a-b的整数序列

  1. namespace PLINQExample {
  2. class Program {
  3. static void Main(string[] args) {
  4. //尽可能的并行执行提供的每个操作
  5. Parallel.Invoke(
  6. () => EmulateProcessing("Task1"),
  7. () => EmulateProcessing("Task2"),
  8. () => EmulateProcessing("Task3")
  9. );
  10. //Enumerable.Range(int a,int b) 生成[a,b]范围内的整数序列
  11. //foreach (var item in Enumerable.Range(1,20)) {
  12. // Console.WriteLine(item);
  13. //}
  14. var cts = new CancellationTokenSource();
  15. var result = Parallel.ForEach(Enumerable.Range(1, 30), new ParallelOptions {
  16. CancellationToken = cts.Token,
  17. //设置当前最大的并发数
  18. MaxDegreeOfParallelism = Environment.ProcessorCount,//当前计算机处理器的数量
  19. TaskScheduler = TaskScheduler.Default//默认从线程池调度器
  20. },
  21. (item, state) => {
  22. Console.WriteLine(item);
  23. if(item == 20) {
  24. state.Break();
  25. Console.WriteLine($"Loop is stopped:{state.IsStopped}");
  26. }
  27. });
  28. Console.WriteLine("==============");
  29. Console.WriteLine($"IsCompleted :{result.IsCompleted}");
  30. Console.WriteLine($"Lowest break iteration: {result.LowestBreakIteration}");
  31. }
  32. static string EmulateProcessing(string taskName) {
  33. Sleep(new Random(DateTime.Now.Millisecond).Next(250, 350));
  34. Console.WriteLine($"{taskName} task was processed on a" +
  35. $"thread id {CurrentThread.ManagedThreadId}");
  36. return taskName;
  37. }
  38. }
  39. }

7.3 并行化LINQ查询

IEnumerable.AsParallel()并行计算;
IEnumerable.ForAll(方法名)将枚举中的参数并行带入到方法中执行。

  1. namespace ParallelLINQ {
  2. class Program {
  3. static void Main(string[] args) {
  4. var sw = new Stopwatch(); //监测运行时间
  5. sw.Start();
  6. var query = from t in GetTypes() //从枚举中迭代查询
  7. select EmulateProcessing(t); //为了打印出来,我也是醉了
  8. foreach (var item in query) {
  9. PrintInfo(item);
  10. }
  11. sw.Stop();
  12. Console.WriteLine($"======普通foreach查询时间{sw.ElapsedMilliseconds}=====");
  13. sw.Restart();
  14. //GetTypes()方法的并行执行,在不同的线程中运行
  15. var parallelQuery = from t in GetTypes().AsParallel() //启用查询的并行化 System.Collections.IEnumerable.AsParallel
  16. select EmulateProcessing(t);
  17. foreach (var item in parallelQuery) {
  18. PrintInfo(item);
  19. }
  20. sw.Stop();
  21. Console.WriteLine($"=============并行查询耗时{sw.ElapsedMilliseconds}=======");
  22. sw.Restart();
  23. //GetTypes()方法的并行执行,在不同的线程中运行
  24. parallelQuery = from t in GetTypes().AsParallel() //启用查询的并行化 System.Collections.IEnumerable.AsParallel
  25. select EmulateProcessing(t);
  26. parallelQuery.ForAll(PrintInfo); //每个元素并行调用PrintInfo方法
  27. sw.Stop();
  28. Console.WriteLine($"=============并行查询且调用耗时{sw.ElapsedMilliseconds}=======");
  29. //此例子和第一种foreach完全一样
  30. sw.Restart();
  31. query = from t in GetTypes().AsParallel().AsSequential() //从ParallelQuery<TSource>转换为IEnumerable<out T>
  32. select EmulateProcessing(t);
  33. foreach (var item in query) {
  34. PrintInfo(item);
  35. }
  36. sw.Stop();
  37. Console.WriteLine($"======顺序并行查询查询时间{sw.ElapsedMilliseconds}=====");
  38. }
  39. static void PrintInfo(string typeName) {
  40. Sleep(TimeSpan.FromMilliseconds(150));
  41. Console.WriteLine($"{typeName} type was processed on a thread" +
  42. $"id {CurrentThread.ManagedThreadId}");
  43. }
  44. static string EmulateProcessing(string typeName) {
  45. Sleep(TimeSpan.FromMilliseconds(150));
  46. Console.WriteLine($"{typeName} type was processed on a thread" +
  47. $"id {CurrentThread.ManagedThreadId}");
  48. return typeName;
  49. }
  50. static IEnumerable<string> GetTypes() {
  51. //LINQ表达式 从GetAssemblies()程序集中获取公共类型的名字以Web开头,返回type.Name
  52. return from assembly in AppDomain.CurrentDomain.GetAssemblies()
  53. from type in assembly.GetExportedTypes()
  54. where type.Name.StartsWith("Web")
  55. select type.Name; //生成名字枚举
  56. }
  57. }
  58. }

7.4 调整PLINQ查询的参数

namespace PLINQParameter {
    class Program {
        static void Main(string[] args) {
            var parallelQuery = from typeName in GetTypes().AsParallel()
                                select typeName;
            var cts = new CancellationTokenSource();

            try {
                parallelQuery.WithDegreeOfParallelism(Environment.ProcessorCount)
                    .WithExecutionMode(ParallelExecutionMode.ForceParallelism) //强制并行执行
                    .WithMergeOptions(ParallelMergeOptions.Default) //合并选项,默认缓存一定数量的结果
                    .ForAll(Console.WriteLine);
            }
            catch (Exception ex) {
                throw ex;
            }

        }

        static string EmulateProcessing(string typeName) {
            Thread.Sleep(TimeSpan.FromMilliseconds(200));
            return typeName;
        }

        static IEnumerable<string> GetTypes() {
            return from item in AppDomain.CurrentDomain.GetAssemblies()
                   from type in item.GetExportedTypes()
                   where type.Name.Contains("Web")
                   orderby type.Name.Length
                   select type.Name;
        }
    }
}

7.5 PLINQ聚合异常处理

处理集合异常,通过捕捉AggregateException e e.Flatten().Handle(ex=>{ xxx;return true; return false})

namespace PLINQTryCatch {
    class Program {
        static void Main(string[] args) {
            IEnumerable<int> numbers = Enumerable.Range(-5, 10);
            var query = from number in numbers
                        select 100 / number;

            try {
                foreach (var item in query) {
                    Console.WriteLine(item);
                }
            }
            catch (DivideByZeroException) { //
                Console.WriteLine("Divide by zero");            
            }

            Console.WriteLine("================");
            Console.WriteLine("Sequential LINQ query processing");
            Console.WriteLine();

            //聚合异常
            var queryParallel = from number in numbers.AsParallel()
                                select 100 / number;
            try {
                queryParallel.ForAll(Console.WriteLine);
            }
            catch (AggregateException e) {
                e.Flatten().Handle(ex => { 
                    if(ex is DivideByZeroException) {
                        Console.WriteLine("Divide by zero");
                        return true;
                    }
                    return false;
                });
            }

        }
    }
}