3.1 定义
System.Threading.ThreadPool
降低异步操作创建线程的开销。线程池受CLR(Common Language Runtime).NET通用语言运行时管理的。每个CLR都有一个线程池实例。ThreadPool类型拥有一个 QueueUserWorkItem静态方法,该方法接受一个委托,代表用户自定义的一个异步操作。
注意事项1:线程池的用途是执行运行时间短的操作,执行长时间运行的计算密集型操作则会降低性能。尽量不要在线程池中放入长时间运行的操作或者阻塞工作者线程,会导致所有工作者线程变得繁忙,从而无法服务用户操作。
注意事项2:在ASP.NET中使用线程池要小心,当浪费掉所有工作者线程,Web服务器将不能服务新的请求。在ASP.NET中应该使用输入/输出密集型的异步操作(I/O线程)。
注意事项3:线程池中的工作者线程都是后台线程。
3.2 在线程池中调用委托
.NET历史上第一个异步编程模式:异步编程模型APM(Asynchronous Programming Model)
delegate.BeginInvoke(para1,para2,….,AsyncCallback,object@object)这个方法实际是一个IAsyncResult对象,object@object传给了IAsyncResult对象的自定义状态 IAsyncResult.AsyncState。BeginInvoke()方法结束后开始调用Callback回调方法,将带object@object状态对象IAysncResult传给回调方法的参数。EndInvoke()方法结束后,将委托指向的方法返回值给result。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
namespace AsynchronousProgrammingModel {
class Program {
private delegate string RunOnThreadPool(out int threadID);
static void Main(string[] args) {
int threadID = 0;
RunOnThreadPool runOnThreadPool = new RunOnThreadPool(Test); //RunOnThreadPool runOnThreadPool = Test
IAsyncResult asyncResult = runOnThreadPool.BeginInvoke(out threadID, Callback, "A delegate asyncchronous call");
string result = runOnThreadPool.EndInvoke(out threadID, asyncResult);
Console.WriteLine($"Thread pool worker thread id:{threadID}");
Console.WriteLine(result);
Thread.Sleep(2000);
Console.ReadLine();
}
static string Test(out int threadID) {
Console.WriteLine("Starting....");
Console.WriteLine($"Is thread pool thread:{Thread.CurrentThread.IsThreadPoolThread}");
Thread.Sleep(TimeSpan.FromSeconds(2));
threadID = Thread.CurrentThread.ManagedThreadId;
return $"Thread pool worker thread id was ;{threadID}";
}
static void Callback(IAsyncResult ar) {
Console.WriteLine("Starting a callback...");
Console.WriteLine($"State passed to a callback:{ar.AsyncState} {ar.IsCompleted}");
Console.WriteLine($"Is thread pool thread:{Thread.CurrentThread.IsThreadPoolThread}");
Console.WriteLine($"Thread pool worker thread id:{Thread.CurrentThread.ManagedThreadId}");
}
}
}
3.3 向线程池中放入异步操作
ThreadPool出现于.NET 2.0。ThreadPool出现的背景:Thread功能繁多,而且对线程数量没有管控,对于线程的开辟和销毁要消耗大量的资源。每次new一个THread都要重新开辟内存。如果某个线程的创建和销毁的代价比较高,同时这个对象还可以反复使用的,就需要一个池子(容器),保存多个这样的对象,需要用的时候从池子里面获取,用完之后不用销毁,在放到池子里面。这样不但能节省内存资源,提高性能,而且还能管控线程的总数量,防止滥用。这时就需要使用ThreadPool了。
ThreadPool.QueueUserWorkItem(委托WaitCallback,状态值),传给委托回调方法的形参。此线程池方法有点类似于ManualResetEvent()开关门操作。
几种形式:
- new 一个委托 ThreadPool.QueueUserWorkItem(new WaitCallback(方法名),状态值object);
- Lambda表达式,直接使用方法 ThreadPool.QueueUserWorkItem(a=>方法名(形参));
- Lambda表达式,在{}中写方法体ThreadPool.QueueUserWorkItem(a=>{执行语句;},状态值a);执行语句可以调用外部的变量,称之为闭包。 ```csharp using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Threading;
namespace ThreadPoolQueueWorkItem { class Program {
static void Main(string[] args) {
const string lambdaState = "lamAlbert";
ThreadPool.QueueUserWorkItem(new WaitCallback(DoSomethingLong),"first thread");
ThreadPool.QueueUserWorkItem(a => DoSomethingLong("HelloWorld"));
Console.WriteLine("Hello World");
int workerThreads;
int completionPortThreads;
ThreadPool.GetMaxThreads(out workerThreads, out completionPortThreads);
Console.WriteLine($"GetMaxThreads {workerThreads} {completionPortThreads}");
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}");
ThreadPool.QueueUserWorkItem(state => {
Console.WriteLine($"Operation state :{state}");
Console.WriteLine($"Worker thread id :{Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(TimeSpan.FromSeconds(2));
},"lambda state");
ThreadPool.QueueUserWorkItem(_ => {
Console.WriteLine($"Operation state :{_} {lambdaState}"); //_的值为lambdaTwo,lambdaState从外部获取变量称之为闭包
}, "lambdaTwo");
Console.ReadLine();
}
static void DoSomethingLong(object name) {
Console.WriteLine($"CurrentThread is {Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine($"CurrentThread name is {Thread.CurrentThread.Name}");
Console.WriteLine(name);
for (int i = 0; i < 10; i++) {
Console.WriteLine(name);
Thread.Sleep(1000);
}
}
}
}
3.4 线程池与并行度<br />直接创建n个线程会消耗大量资源,而线程池则可以用时间换资源,通过有限的线程创建来实现资源的少量消耗,但相应的会增大异步操作的时间。
```csharp
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using System.Diagnostics;
namespace ThreadPoolAndParell {
class Program {
static void Main(string[] args) {
const int _numberOfOperation = 500;
var sw = new Stopwatch();
sw.Start();
UseThreads(_numberOfOperation);
sw.Stop();
Console.WriteLine($"{sw.ElapsedMilliseconds} ThreadCreate Time");
sw.Reset();
sw.Start();
UseThreadPool(_numberOfOperation);
sw.Stop();
Console.WriteLine($"{sw.ElapsedMilliseconds} ThreadPool Time");
Console.ReadLine();
}
static void UseThreads(int numberOfOperation) {
using (var countdown = new CountdownEvent(numberOfOperation)) {
Console.WriteLine("Scheduling work by creating threads");
for (int i = 0; i < numberOfOperation; i++) {
new Thread(() => {
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId},");
Thread.Sleep(TimeSpan.FromSeconds(0.1));
countdown.Signal();
}).Start();
}
countdown.Wait();
Console.WriteLine("Run completely");
}
}
static void UseThreadPool(int numberOfOperation) {
using (var countdown = new CountdownEvent(numberOfOperation)) {
Console.WriteLine("Scheduling work by threadpool");
for (int i = 0; i < numberOfOperation; i++) {
ThreadPool.QueueUserWorkItem(_ => {
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId},");
Thread.Sleep(TimeSpan.FromSeconds(0.1));
countdown.Signal();
});
}
countdown.Wait();
Console.WriteLine("Run completely");
}
}
}
}
3.4 实现一个取消选项CancellationToken CancellationTokenSource
托管资源:由CLR管理分配和释放的资源,也就是我们直接new出来的对象;
非托管资源:不受CLR控制的资源,也就是不属于.NET本身的功能,往往是通过调用跨平台程序集(如C++)或者操作系统提供的一些接口,比如Windows内核对象、文件操作、数据库连接、socket、Win32API、网络等。一个标准的释放非托管资源的类应该去实现IDisposable接口。
public class MyClass:IDisposable{
public void Dispose(){
}
}
using (var mc = new MyClass()){
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
namespace CancellationTokenAndCancellationTokenSource {
class Program {
static void Main(string[] args) {
using (var tokenSource = new CancellationTokenSource()) {
var token = tokenSource.Token; //这里通常利用CancellationTokenSource来控制CancellationToken
//以避免cancellationtoken构造器传入false,而不执行register方法。
ThreadPool.QueueUserWorkItem(_ => {
AsyncOperation1(token);
// Thread.Sleep(TimeSpan.FromSeconds(2));
});
Thread.Sleep(TimeSpan.FromSeconds(2));
tokenSource.Cancel();
Console.ReadLine();
}
}
static void AsyncOperation1(CancellationToken token) {
Console.WriteLine("Starting the first task");
for (int i = 0; i < 5; i++) {
if (token.IsCancellationRequested) { //轮询方式,查询是否有取消请求
Console.WriteLine("The first task has been canceled.");
return;
}
else
Console.WriteLine(i.ToString());
Thread.Sleep(TimeSpan.FromSeconds(1));
}
Console.WriteLine("The first task has completed successfully");
}
static void AsyncOperation2(CancellationToken token) {
try {
Console.WriteLine("Starting the second task");
for (int i = 0; i < 5; i++) {
token.ThrowIfCancellationRequested();//抛一个线程取消异常,走到异常分支
Thread.Sleep(TimeSpan.FromSeconds(1));
}
Console.WriteLine("The first task has completed successfully");
}
catch (OperationCanceledException) {
Console.WriteLine("The second task has been canceled.");
}
}
static void AsyncOperation3(CancellationToken token) {
bool cancellationFlag = false;
token.Register(() => cancellationFlag = true); //通常我们使用CancellationTokenSourcecancel方法后,执行此回调方法。
Console.WriteLine("Starting the third task");
for (int i = 0; i < 5; i++) {
if (cancellationFlag) {
Console.WriteLine("The third task has been canceled.");
}
Thread.Sleep(TimeSpan.FromSeconds(1));
}
Console.WriteLine("The first task has completed successfully");
}
}
}
3.5 RegisterWaitForSingleObject线程池等待事件处理器
在timeSpan之后将执行回调方法,回调方法两个参数,一个state,为Object state下面为null,另一个布尔值timeout,true超时,false表示收到了信号
ThreadPool.RegisterWaitForSingleObject(WaitHandle waitObject,WaitOrTimerCallback callBack, object state, TimeSpan timeout, bool executeOnlyOnce);
WaitHandle信号量类例如ManualResetEvent, 回调函数在timeout之后执行,object state将状态传回回调函数第一个参数,回调函数第二个参数false表示收到了信号,true表示超时。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
namespace ThreadPoolRegisterWaitForSingleObject {
class Program {
static void Main(string[] args) {
RunOperations(TimeSpan.FromSeconds(1000));
// RunOperations(TimeSpan.FromSeconds(10));
Console.ReadLine();
}
static void RunOperations(TimeSpan timeSpan) {
using (var evt = new ManualResetEvent(false)) //无法将ManualResetEventSlim转换为WaitHandle类
using (var cts = new CancellationTokenSource()) {
Console.WriteLine("Registering timeout operation....");
//在timeSpan之后将执行回调方法,回调方法两个参数,一个state,为Object state下面为null,另一个布尔值timeout,true超时,false表示收到了信号
var worker = ThreadPool.RegisterWaitForSingleObject(evt, (state,timeout)=> WorkOperationWait(cts, timeout), null, timeSpan, true);
Console.WriteLine("Starting long running opration....");
ThreadPool.QueueUserWorkItem(_=>WorkerOpration(cts.Token,evt));
Thread.Sleep(timeSpan.Add(TimeSpan.FromMilliseconds(6)));
worker.Unregister(evt);
}
}
static void WorkerOpration(CancellationToken token,ManualResetEvent evt) {
for (int i = 0; i < 5; i++) {
if (token.IsCancellationRequested) {
Console.WriteLine("Cancel token");
return;
}
Thread.Sleep(TimeSpan.FromSeconds(1));
}
evt.Set();//产生一个信号量
}
static void WorkOperationWait(CancellationTokenSource tokenSource,bool isTimeOut) {
if (isTimeOut) {//true表示超时或cancel了
tokenSource.Cancel();
Console.WriteLine("Worker Operation timed out and was canceled");
}
else {//false表示收到信号通知
Console.WriteLine("Worker Operation succeded");
}
}
}
}
3.6 计时器System.Threading.Timer
Timer(callback,state,dueTime,periodTime)
下方程序延时1S启动第一个执行,间隔2S后依次执行。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
namespace ThreadPoolRegisterWaitForSingleObject {
class Program {
static Timer _timer;
static void Main(string[] args) {
Console.WriteLine("Press 'Enter' to stop the timer...");
DateTime start = DateTime.Now;
//延时1S启动,间隔2S再次启动
_timer = new Timer(_ => TimerOperation(start), null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2));
//Timer.Change(dueTime,periodTime)改变
Console.ReadLine();
}
static void TimerOperation(DateTime start) {
TimeSpan timeSpan = DateTime.Now - start;
Console.WriteLine($"{timeSpan.Seconds} seconds from {start}."+
$"Timer thread pool thread id:{Thread.CurrentThread.ManagedThreadId}");
}
}
}
3.7 使用BackgroundWorker组件 Winform耗时后台
基于事件的异步模式 Event-based Asynchronous Pattern。历史上第二个用于构造异步程序的方法,现推荐使用TPL方法。BackgroudWorker使用参考
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using System.ComponentModel;
namespace ThreadPoolRegisterWaitForSingleObject {
class Program {
static void Main(string[] args) {
var bw = new BackgroundWorker();
bw.WorkerReportsProgress = true;//报到进度更新
bw.WorkerSupportsCancellation = true;//支持异步取消操作
bw.DoWork += Bw_DoWork; //执行方法
bw.ProgressChanged += Bw_ProgressChanged;//进度改变事件
bw.RunWorkerCompleted += Bw_RunWorkerCompleted;//进度完成事件
bw.RunWorkerAsync(100);//开始执行后台操作
Thread.Sleep(TimeSpan.FromSeconds(2));
bw.CancelAsync();//触发RunWorkCompleted操作
Console.ReadLine();
}
private static void Bw_RunWorkerCompleted(object sender, RunWorkerCompletedEventArgs e) {
if (e.Cancelled) {
Console.WriteLine("Canceled");
}
else if (e.Error != null) {
Console.WriteLine(e.Error);
}
else {
Console.WriteLine($"Finally { e.Result}");
}
}
private static void Bw_ProgressChanged(object sender, ProgressChangedEventArgs e) {
Console.WriteLine($"Current UserState {e.UserState}");
Console.WriteLine($"Current Progress {e.ProgressPercentage}");
}
private static void Bw_DoWork(object sender, DoWorkEventArgs e) {
var bw = (BackgroundWorker)sender;
int endNumber = 0;
if(e.Argument != null) {
endNumber = (int)e.Argument;
}
int sum = 0;
for (int i = 0; i < endNumber; i++) {
sum += i;
string userState = "CurrentNumber is " + i.ToString();
bw.ReportProgress(i, userState);//触发ProgressChanged事件 i传给了e.ProgressPercentage userState传给了e.UserState
Thread.Sleep(TimeSpan.FromSeconds(0.1));
//检查是否执行了CancelAsync()方法
if (bw.CancellationPending) {
e.Cancel =true;
break;
}
}
Console.WriteLine($"do work: {sum}");
e.Result = sum;
}
}
}