为什么需要TPL
前面提到过,直接通过Thread来操作线程时,会每次都创建一个新线程,但创建线程代价是高昂的,而且每个线程都要占用大量虚拟内存(例如Windows默认1MB)。
为了减少创建和销毁带来的性能损耗,更有效的做法是使用线程池:需要时分配线程,为线程分配异步工作,运行至完成,再为后续异步工作重用线程,而不是在工作结束后销毁再重新创建线程。
但线程池也有一个问题,就是通知线程池去执行一个任务后,无法知道任务何时执行完成,也无法获取任务的执行结果,在.NETFramework4和后续版本中,特增加了TPL任务并行库来解决这些问题
TPL简介
TPL不是每次开始异步工作时都创建一个线程,而是创建一个Task,并告诉任务调度器有异步工作要执行。此时任务调度器可能采取多种策略,但默认是从线程池请求一个工作者线程。线程池会自行判断怎么做最高效。可能在当前任务结束后再运行新任务,或者将新任务的工作者线程调度给特定处理器。线程池还会判断是创建全新线程,还是重用之前已结束运行的现有线程。
通过将异步工作的概念抽象到Task对象中,TPL提供了一个能代表异步工作的对象,还提供了面向对象的API与工作交互。通过提供代表工作单元的对象,TPL使我们能通过编程将小任务合并成大任务,从而建立起一个工作流程,详情稍后讨论。
Task任务与委托的区别
任务是对象,其中封装了以异步方式执行的工作。这听起来有点儿耳熟,委托不也是封装了代码的对象吗?区别在于,委托是同步的,而任务是异步的。
如执行委托(例如一个Action),当前线程的控制点会立即转移到委托的代码;除非委托结束,否则控制不会返回调用者。相反,启动任务,控制几乎立即返回调用者,无论任务要执行多少工作。任务通常在另一个线程上异步执行(本章稍后会讲到,也可只用一个线程来异步执行任务,而且这样还有一些好处)。
简单地说,任务将委托从同步执行模式转变成异步。
既然Task也是异步执行,不是同步的,那我如何知道某个任务执行完成,或者如何获取任务结果呢,我们来从Task类来找答案
Task操作API
静态方法或属性
//获取一个表示已经完成的任务实例
public static Task CompletedTask { get; }
//获取一个延迟任务,任务将在指定时间后完成
public static Task Delay(int millisecondsDelay);
public static Task Delay(int millisecondsDelay, CancellationToken cancellationToken);
public static Task Delay(TimeSpan delay);
public static Task Delay(TimeSpan delay, CancellationToken cancellationToken);
//获取一个已经取消的任务实例
public static Task FromCanceled(CancellationToken cancellationToken);
public static Task<TResult> FromCanceled<TResult>(CancellationToken cancellationToken);
//获取一个抛出指定异常的任务实例
public static Task FromException(Exception exception);
public static Task<TResult> FromException<TResult>(Exception exception);
//获取一个指定返回结果的任务实例
public static Task<TResult> FromResult<TResult>(TResult result);
//获取当前正在执行的任务id,如果当前没有正在执行的任务,将返回null
public static int? CurrentId { get; }
//获取一个任务工厂,用于创建任务实例,也可以直接调用任务的构造函数来创建任务实例
public static TaskFactory Factory { get; }
//获取一个运行指定方法的任务实例,并且开始这个任务的运行
public static Task Run(Action action);
public static Task Run(Action action, CancellationToken cancellationToken);
public static Task Run(Func<Task?> function);
public static Task Run(Func<Task?> function, CancellationToken cancellationToken);
public static Task<TResult> Run<TResult>(Func<TResult> function);
public static Task<TResult> Run<TResult>(Func<TResult> function, CancellationToken cancellationToken);
public static Task<TResult> Run<TResult>(Func<Task<TResult>?> function, CancellationToken cancellationToken);
public static Task<TResult> Run<TResult>(Func<Task<TResult>?> function);
//等待所有任务,直到所有任务完成或者超时
public static bool WaitAll(Task[] tasks, TimeSpan timeout);
public static void WaitAll(Task[] tasks, CancellationToken cancellationToken);
public static bool WaitAll(Task[] tasks, int millisecondsTimeout, CancellationToken cancellationToken);
public static bool WaitAll(Task[] tasks, int millisecondsTimeout);
public static void WaitAll(params Task[] tasks);
//等待任务,直到有任一任务完成或者超时
public static int WaitAny(params Task[] tasks);
public static int WaitAny(Task[] tasks, int millisecondsTimeout);
public static int WaitAny(Task[] tasks, CancellationToken cancellationToken);
public static int WaitAny(Task[] tasks, TimeSpan timeout);
public static int WaitAny(Task[] tasks, int millisecondsTimeout, CancellationToken cancellationToken);
//获取一个当指定的所有任务都完成后才能完成的任务
public static Task WhenAll(IEnumerable<Task> tasks);
public static Task WhenAll(params Task[] tasks);
public static Task<TResult[]> WhenAll<TResult>(IEnumerable<Task<TResult>> tasks);
public static Task<TResult[]> WhenAll<TResult>(params Task<TResult>[] tasks);
//获取一个当指定的所有任务中任何一个完成后都将完成的任务
public static Task<Task> WhenAny(IEnumerable<Task> tasks);
public static Task<Task> WhenAny(params Task[] tasks);
public static Task<Task<TResult>> WhenAny<TResult>(IEnumerable<Task<TResult>> tasks);
public static Task<Task<TResult>> WhenAny<TResult>(params Task<TResult>[] tasks);
构造函数
public Task(Action action);
public Task(Action action, CancellationToken cancellationToken);
public Task(Action action, TaskCreationOptions creationOptions);
public Task(Action action, CancellationToken cancellationToken, TaskCreationOptions creationOptions);
public Task(Action<object?> action, object? state);
public Task(Action<object?> action, object? state, CancellationToken cancellationToken);
public Task(Action<object?> action, object? state, TaskCreationOptions creationOptions);
public Task(Action<object?> action, object? state, CancellationToken cancellationToken, TaskCreationOptions creationOptions);
从构造函数来看,大体上也是可以分为两组,一个是接受一个没有参数的Action,一个是接收一个参数的Action
常用实例方法或属性
//获取任务id
public int Id { get; }
//获取构造函数中传入的参数对象,如果没有传入则为null
public object? AsyncState { get; }
//获取任务执行过程中抛出的异常,如果任务还没有执行完成或者执行完成但没有抛出异常时,则为null
public AggregateException? Exception { get; }
//获取任务是否已经完成标记,当任务正常执行完成RanToCompletion,被取消Canceled或者异常终止Faulted时都返回true
public bool IsCompleted { get; }
//获取任务是否已经成功完成,只有任务正常执行完成才返回true
public bool IsCompletedSuccessfully { get; }
//获取任务是否已经被中止
public bool IsCanceled { get; }
//获取任务的当前状态
public TaskStatus Status { get; }
//给当前任务指定一个延续任务,将在当前任务完成后开始调用
public Task ContinueWith(Action<Task, object?> continuationAction, object? state);
public Task ContinueWith(Action<Task, object?> continuationAction, object? state, CancellationToken cancellationToken);
public Task ContinueWith(Action<Task, object?> continuationAction, object? state, CancellationToken cancellationToken, TaskContinuationOptions continuationOptions, TaskScheduler scheduler);
public Task ContinueWith(Action<Task, object?> continuationAction, object? state, TaskContinuationOptions continuationOptions);
public Task ContinueWith(Action<Task, object?> continuationAction, object? state, TaskScheduler scheduler);
public Task ContinueWith(Action<Task> continuationAction);
public Task ContinueWith(Action<Task> continuationAction, CancellationToken cancellationToken);
public Task ContinueWith(Action<Task> continuationAction, TaskContinuationOptions continuationOptions);
public Task ContinueWith(Action<Task> continuationAction, CancellationToken cancellationToken, TaskContinuationOptions continuationOptions, TaskScheduler scheduler);
public Task<TResult> ContinueWith<TResult>(Func<Task, object?, TResult> continuationFunction, object? state, CancellationToken cancellationToken);
public Task ContinueWith(Action<Task> continuationAction, TaskScheduler scheduler);
public Task<TResult> ContinueWith<TResult>(Func<Task, TResult> continuationFunction, CancellationToken cancellationToken, TaskContinuationOptions continuationOptions, TaskScheduler scheduler);
public Task<TResult> ContinueWith<TResult>(Func<Task, TResult> continuationFunction, TaskContinuationOptions continuationOptions);
public Task<TResult> ContinueWith<TResult>(Func<Task, TResult> continuationFunction, TaskScheduler scheduler);
public Task<TResult> ContinueWith<TResult>(Func<Task, object?, TResult> continuationFunction, object? state, TaskScheduler scheduler);
public Task<TResult> ContinueWith<TResult>(Func<Task, TResult> continuationFunction, CancellationToken cancellationToken);
public Task<TResult> ContinueWith<TResult>(Func<Task, TResult> continuationFunction);
public Task<TResult> ContinueWith<TResult>(Func<Task, object?, TResult> continuationFunction, object? state, TaskContinuationOptions continuationOptions);
public Task<TResult> ContinueWith<TResult>(Func<Task, object?, TResult> continuationFunction, object? state);
public Task<TResult> ContinueWith<TResult>(Func<Task, object?, TResult> continuationFunction, object? state, CancellationToken cancellationToken, TaskContinuationOptions continuationOptions, TaskScheduler scheduler);
//获取一个用于await当前任务的awatier
public TaskAwaiter GetAwaiter();
//同步运行当前任务
public void RunSynchronously();
public void RunSynchronously(TaskScheduler scheduler);
//开始运行当前任务
public void Start();
public void Start(TaskScheduler scheduler);
//等待当前任务执行完成
public void Wait(CancellationToken cancellationToken);
public bool Wait(int millisecondsTimeout);
public void Wait();
public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken);
public bool Wait(TimeSpan timeout);
Task示例
示例代码
using System;
using System.Threading;
using System.Threading.Tasks;
namespace ThreadDemo1Start
{
class Program
{
const int Count = 100;
static void Main(string[] args)
{
Task[] allTasks = new Task[10];
for (var i = 0; i < 10; i++)
{
//使用Task来运行所有的
var task = new Task((c) => { DoWork(c); }, i.ToString());
task.Start();
allTasks[i] = task;
}
//此处的代码不会被上面的任务阻塞,是会同时运行的
//在控制台中输出-号
DoWork("-");
//使用任务时,是可以等待任务完成的,不像线程池那样,无法知道任务什么时候完成
Task.WaitAll(allTasks);
//全部执行完毕
Console.WriteLine("全部执行完毕");
}
static void DoWork(object? obj)
{
string str = $"{Thread.CurrentThread.ManagedThreadId}*{obj} ";
for (var i = 0; i < Count; i++)
{
Console.Write(str);
}
}
}
}
这里从线程等待任务执行完成时,不再像线程池那样,只能估计一个时间,而是可以直接使用Task的WaitAll方法来等待所有任务的执行完成。
示例执行结果
这个执行结果和之前直接使用线程池的效果差不多,可以看到线程7被用来输出3,4,线程4被用来输出2,6,8。这些线程都是会被重用的。
但我们代码的焦点只关注我们需要执行的任务,而不需要关心任务是如何被调度的,具体的调度优化由.net来执行,我们只需要关注业务需要执行的任务,并且调用start来通知调度器开始调度异步执行即可。
下期预告
从上面的执行结果可以看出,虽然我们是按顺序输出1-10,但由于是异步执行的,实际上是输出的顺序是随机的,但实际我们业务中,可能会要求一些任务是有相关性的,比如要先输出1完成后才能输出2,那我们该如何来构建这样的任务链条,通过将一些小任务构建成任务链条,来组装成一个大的任务,敬请期待。