为什么需要TPL

前面提到过,直接通过Thread来操作线程时,会每次都创建一个新线程,但创建线程代价是高昂的,而且每个线程都要占用大量虚拟内存(例如Windows默认1MB)。
为了减少创建和销毁带来的性能损耗,更有效的做法是使用线程池:需要时分配线程,为线程分配异步工作,运行至完成,再为后续异步工作重用线程,而不是在工作结束后销毁再重新创建线程。
但线程池也有一个问题,就是通知线程池去执行一个任务后,无法知道任务何时执行完成,也无法获取任务的执行结果,在.NETFramework4和后续版本中,特增加了TPL任务并行库来解决这些问题

TPL简介

TPL不是每次开始异步工作时都创建一个线程,而是创建一个Task,并告诉任务调度器有异步工作要执行。此时任务调度器可能采取多种策略,但默认是从线程池请求一个工作者线程。线程池会自行判断怎么做最高效。可能在当前任务结束后再运行新任务,或者将新任务的工作者线程调度给特定处理器。线程池还会判断是创建全新线程,还是重用之前已结束运行的现有线程。
通过将异步工作的概念抽象到Task对象中,TPL提供了一个能代表异步工作的对象,还提供了面向对象的API与工作交互。通过提供代表工作单元的对象,TPL使我们能通过编程将小任务合并成大任务,从而建立起一个工作流程,详情稍后讨论。

Task任务与委托的区别

任务是对象,其中封装了以异步方式执行的工作。这听起来有点儿耳熟,委托不也是封装了代码的对象吗?区别在于,委托是同步的,而任务是异步的。
如执行委托(例如一个Action),当前线程的控制点会立即转移到委托的代码;除非委托结束,否则控制不会返回调用者。相反,启动任务,控制几乎立即返回调用者,无论任务要执行多少工作。任务通常在另一个线程上异步执行(本章稍后会讲到,也可只用一个线程来异步执行任务,而且这样还有一些好处)。
简单地说,任务将委托从同步执行模式转变成异步。
既然Task也是异步执行,不是同步的,那我如何知道某个任务执行完成,或者如何获取任务结果呢,我们来从Task类来找答案

Task操作API

静态方法或属性

  1. //获取一个表示已经完成的任务实例
  2. public static Task CompletedTask { get; }
  3. //获取一个延迟任务,任务将在指定时间后完成
  4. public static Task Delay(int millisecondsDelay);
  5. public static Task Delay(int millisecondsDelay, CancellationToken cancellationToken);
  6. public static Task Delay(TimeSpan delay);
  7. public static Task Delay(TimeSpan delay, CancellationToken cancellationToken);
  8. //获取一个已经取消的任务实例
  9. public static Task FromCanceled(CancellationToken cancellationToken);
  10. public static Task<TResult> FromCanceled<TResult>(CancellationToken cancellationToken);
  11. //获取一个抛出指定异常的任务实例
  12. public static Task FromException(Exception exception);
  13. public static Task<TResult> FromException<TResult>(Exception exception);
  14. //获取一个指定返回结果的任务实例
  15. public static Task<TResult> FromResult<TResult>(TResult result);
  16. //获取当前正在执行的任务id,如果当前没有正在执行的任务,将返回null
  17. public static int? CurrentId { get; }
  18. //获取一个任务工厂,用于创建任务实例,也可以直接调用任务的构造函数来创建任务实例
  19. public static TaskFactory Factory { get; }
  20. //获取一个运行指定方法的任务实例,并且开始这个任务的运行
  21. public static Task Run(Action action);
  22. public static Task Run(Action action, CancellationToken cancellationToken);
  23. public static Task Run(Func<Task?> function);
  24. public static Task Run(Func<Task?> function, CancellationToken cancellationToken);
  25. public static Task<TResult> Run<TResult>(Func<TResult> function);
  26. public static Task<TResult> Run<TResult>(Func<TResult> function, CancellationToken cancellationToken);
  27. public static Task<TResult> Run<TResult>(Func<Task<TResult>?> function, CancellationToken cancellationToken);
  28. public static Task<TResult> Run<TResult>(Func<Task<TResult>?> function);
  29. //等待所有任务,直到所有任务完成或者超时
  30. public static bool WaitAll(Task[] tasks, TimeSpan timeout);
  31. public static void WaitAll(Task[] tasks, CancellationToken cancellationToken);
  32. public static bool WaitAll(Task[] tasks, int millisecondsTimeout, CancellationToken cancellationToken);
  33. public static bool WaitAll(Task[] tasks, int millisecondsTimeout);
  34. public static void WaitAll(params Task[] tasks);
  35. //等待任务,直到有任一任务完成或者超时
  36. public static int WaitAny(params Task[] tasks);
  37. public static int WaitAny(Task[] tasks, int millisecondsTimeout);
  38. public static int WaitAny(Task[] tasks, CancellationToken cancellationToken);
  39. public static int WaitAny(Task[] tasks, TimeSpan timeout);
  40. public static int WaitAny(Task[] tasks, int millisecondsTimeout, CancellationToken cancellationToken);
  41. //获取一个当指定的所有任务都完成后才能完成的任务
  42. public static Task WhenAll(IEnumerable<Task> tasks);
  43. public static Task WhenAll(params Task[] tasks);
  44. public static Task<TResult[]> WhenAll<TResult>(IEnumerable<Task<TResult>> tasks);
  45. public static Task<TResult[]> WhenAll<TResult>(params Task<TResult>[] tasks);
  46. //获取一个当指定的所有任务中任何一个完成后都将完成的任务
  47. public static Task<Task> WhenAny(IEnumerable<Task> tasks);
  48. public static Task<Task> WhenAny(params Task[] tasks);
  49. public static Task<Task<TResult>> WhenAny<TResult>(IEnumerable<Task<TResult>> tasks);
  50. public static Task<Task<TResult>> WhenAny<TResult>(params Task<TResult>[] tasks);

构造函数

  1. public Task(Action action);
  2. public Task(Action action, CancellationToken cancellationToken);
  3. public Task(Action action, TaskCreationOptions creationOptions);
  4. public Task(Action action, CancellationToken cancellationToken, TaskCreationOptions creationOptions);
  5. public Task(Action<object?> action, object? state);
  6. public Task(Action<object?> action, object? state, CancellationToken cancellationToken);
  7. public Task(Action<object?> action, object? state, TaskCreationOptions creationOptions);
  8. public Task(Action<object?> action, object? state, CancellationToken cancellationToken, TaskCreationOptions creationOptions);

从构造函数来看,大体上也是可以分为两组,一个是接受一个没有参数的Action,一个是接收一个参数的Action,并且同时将参数对象传给构造函数。
两组同时都有4个重载,可以接收一些取消令牌,创建属性配置,或者同时接收两者。

常用实例方法或属性

  1. //获取任务id
  2. public int Id { get; }
  3. //获取构造函数中传入的参数对象,如果没有传入则为null
  4. public object? AsyncState { get; }
  5. //获取任务执行过程中抛出的异常,如果任务还没有执行完成或者执行完成但没有抛出异常时,则为null
  6. public AggregateException? Exception { get; }
  7. //获取任务是否已经完成标记,当任务正常执行完成RanToCompletion,被取消Canceled或者异常终止Faulted时都返回true
  8. public bool IsCompleted { get; }
  9. //获取任务是否已经成功完成,只有任务正常执行完成才返回true
  10. public bool IsCompletedSuccessfully { get; }
  11. //获取任务是否已经被中止
  12. public bool IsCanceled { get; }
  13. //获取任务的当前状态
  14. public TaskStatus Status { get; }
  15. //给当前任务指定一个延续任务,将在当前任务完成后开始调用
  16. public Task ContinueWith(Action<Task, object?> continuationAction, object? state);
  17. public Task ContinueWith(Action<Task, object?> continuationAction, object? state, CancellationToken cancellationToken);
  18. public Task ContinueWith(Action<Task, object?> continuationAction, object? state, CancellationToken cancellationToken, TaskContinuationOptions continuationOptions, TaskScheduler scheduler);
  19. public Task ContinueWith(Action<Task, object?> continuationAction, object? state, TaskContinuationOptions continuationOptions);
  20. public Task ContinueWith(Action<Task, object?> continuationAction, object? state, TaskScheduler scheduler);
  21. public Task ContinueWith(Action<Task> continuationAction);
  22. public Task ContinueWith(Action<Task> continuationAction, CancellationToken cancellationToken);
  23. public Task ContinueWith(Action<Task> continuationAction, TaskContinuationOptions continuationOptions);
  24. public Task ContinueWith(Action<Task> continuationAction, CancellationToken cancellationToken, TaskContinuationOptions continuationOptions, TaskScheduler scheduler);
  25. public Task<TResult> ContinueWith<TResult>(Func<Task, object?, TResult> continuationFunction, object? state, CancellationToken cancellationToken);
  26. public Task ContinueWith(Action<Task> continuationAction, TaskScheduler scheduler);
  27. public Task<TResult> ContinueWith<TResult>(Func<Task, TResult> continuationFunction, CancellationToken cancellationToken, TaskContinuationOptions continuationOptions, TaskScheduler scheduler);
  28. public Task<TResult> ContinueWith<TResult>(Func<Task, TResult> continuationFunction, TaskContinuationOptions continuationOptions);
  29. public Task<TResult> ContinueWith<TResult>(Func<Task, TResult> continuationFunction, TaskScheduler scheduler);
  30. public Task<TResult> ContinueWith<TResult>(Func<Task, object?, TResult> continuationFunction, object? state, TaskScheduler scheduler);
  31. public Task<TResult> ContinueWith<TResult>(Func<Task, TResult> continuationFunction, CancellationToken cancellationToken);
  32. public Task<TResult> ContinueWith<TResult>(Func<Task, TResult> continuationFunction);
  33. public Task<TResult> ContinueWith<TResult>(Func<Task, object?, TResult> continuationFunction, object? state, TaskContinuationOptions continuationOptions);
  34. public Task<TResult> ContinueWith<TResult>(Func<Task, object?, TResult> continuationFunction, object? state);
  35. public Task<TResult> ContinueWith<TResult>(Func<Task, object?, TResult> continuationFunction, object? state, CancellationToken cancellationToken, TaskContinuationOptions continuationOptions, TaskScheduler scheduler);
  36. //获取一个用于await当前任务的awatier
  37. public TaskAwaiter GetAwaiter();
  38. //同步运行当前任务
  39. public void RunSynchronously();
  40. public void RunSynchronously(TaskScheduler scheduler);
  41. //开始运行当前任务
  42. public void Start();
  43. public void Start(TaskScheduler scheduler);
  44. //等待当前任务执行完成
  45. public void Wait(CancellationToken cancellationToken);
  46. public bool Wait(int millisecondsTimeout);
  47. public void Wait();
  48. public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken);
  49. public bool Wait(TimeSpan timeout);

Task示例

我们使用Task来实现一下之前的那些示例,代码如下:

示例代码

  1. using System;
  2. using System.Threading;
  3. using System.Threading.Tasks;
  4. namespace ThreadDemo1Start
  5. {
  6. class Program
  7. {
  8. const int Count = 100;
  9. static void Main(string[] args)
  10. {
  11. Task[] allTasks = new Task[10];
  12. for (var i = 0; i < 10; i++)
  13. {
  14. //使用Task来运行所有的
  15. var task = new Task((c) => { DoWork(c); }, i.ToString());
  16. task.Start();
  17. allTasks[i] = task;
  18. }
  19. //此处的代码不会被上面的任务阻塞,是会同时运行的
  20. //在控制台中输出-号
  21. DoWork("-");
  22. //使用任务时,是可以等待任务完成的,不像线程池那样,无法知道任务什么时候完成
  23. Task.WaitAll(allTasks);
  24. //全部执行完毕
  25. Console.WriteLine("全部执行完毕");
  26. }
  27. static void DoWork(object? obj)
  28. {
  29. string str = $"{Thread.CurrentThread.ManagedThreadId}*{obj} ";
  30. for (var i = 0; i < Count; i++)
  31. {
  32. Console.Write(str);
  33. }
  34. }
  35. }
  36. }

这里从线程等待任务执行完成时,不再像线程池那样,只能估计一个时间,而是可以直接使用Task的WaitAll方法来等待所有任务的执行完成。

示例执行结果

image.png
这个执行结果和之前直接使用线程池的效果差不多,可以看到线程7被用来输出3,4,线程4被用来输出2,6,8。这些线程都是会被重用的。
但我们代码的焦点只关注我们需要执行的任务,而不需要关心任务是如何被调度的,具体的调度优化由.net来执行,我们只需要关注业务需要执行的任务,并且调用start来通知调度器开始调度异步执行即可。

下期预告

从上面的执行结果可以看出,虽然我们是按顺序输出1-10,但由于是异步执行的,实际上是输出的顺序是随机的,但实际我们业务中,可能会要求一些任务是有相关性的,比如要先输出1完成后才能输出2,那我们该如何来构建这样的任务链条,通过将一些小任务构建成任务链条,来组装成一个大的任务,敬请期待。