3.1 定义

System.Threading.ThreadPool
降低异步操作创建线程的开销。线程池受CLR(Common Language Runtime).NET通用语言运行时管理的。每个CLR都有一个线程池实例。ThreadPool类型拥有一个 QueueUserWorkItem静态方法,该方法接受一个委托,代表用户自定义的一个异步操作。
注意事项1:线程池的用途是执行运行时间短的操作,执行长时间运行的计算密集型操作则会降低性能。尽量不要在线程池中放入长时间运行的操作或者阻塞工作者线程,会导致所有工作者线程变得繁忙,从而无法服务用户操作。
注意事项2:在ASP.NET中使用线程池要小心,当浪费掉所有工作者线程,Web服务器将不能服务新的请求。在ASP.NET中应该使用输入/输出密集型的异步操作(I/O线程)。
注意事项3:线程池中的工作者线程都是后台线程。

3.2 在线程池中调用委托

.NET历史上第一个异步编程模式:异步编程模型APM(Asynchronous Programming Model)
delegate.BeginInvoke(para1,para2,….,AsyncCallback,object@object)这个方法实际是一个IAsyncResult对象,object@object传给了IAsyncResult对象的自定义状态 IAsyncResult.AsyncState。BeginInvoke()方法结束后开始调用Callback回调方法,将带object@object状态对象IAysncResult传给回调方法的参数。EndInvoke()方法结束后,将委托指向的方法返回值给result。

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Threading.Tasks;
  6. using System.Threading;
  7. namespace AsynchronousProgrammingModel {
  8. class Program {
  9. private delegate string RunOnThreadPool(out int threadID);
  10. static void Main(string[] args) {
  11. int threadID = 0;
  12. RunOnThreadPool runOnThreadPool = new RunOnThreadPool(Test); //RunOnThreadPool runOnThreadPool = Test
  13. IAsyncResult asyncResult = runOnThreadPool.BeginInvoke(out threadID, Callback, "A delegate asyncchronous call");
  14. string result = runOnThreadPool.EndInvoke(out threadID, asyncResult);
  15. Console.WriteLine($"Thread pool worker thread id:{threadID}");
  16. Console.WriteLine(result);
  17. Thread.Sleep(2000);
  18. Console.ReadLine();
  19. }
  20. static string Test(out int threadID) {
  21. Console.WriteLine("Starting....");
  22. Console.WriteLine($"Is thread pool thread:{Thread.CurrentThread.IsThreadPoolThread}");
  23. Thread.Sleep(TimeSpan.FromSeconds(2));
  24. threadID = Thread.CurrentThread.ManagedThreadId;
  25. return $"Thread pool worker thread id was ;{threadID}";
  26. }
  27. static void Callback(IAsyncResult ar) {
  28. Console.WriteLine("Starting a callback...");
  29. Console.WriteLine($"State passed to a callback:{ar.AsyncState} {ar.IsCompleted}");
  30. Console.WriteLine($"Is thread pool thread:{Thread.CurrentThread.IsThreadPoolThread}");
  31. Console.WriteLine($"Thread pool worker thread id:{Thread.CurrentThread.ManagedThreadId}");
  32. }
  33. }
  34. }

3.3 向线程池中放入异步操作

ThreadPool出现于.NET 2.0。ThreadPool出现的背景:Thread功能繁多,而且对线程数量没有管控,对于线程的开辟和销毁要消耗大量的资源。每次new一个THread都要重新开辟内存。如果某个线程的创建和销毁的代价比较高,同时这个对象还可以反复使用的,就需要一个池子(容器),保存多个这样的对象,需要用的时候从池子里面获取,用完之后不用销毁,在放到池子里面。这样不但能节省内存资源,提高性能,而且还能管控线程的总数量,防止滥用。这时就需要使用ThreadPool了。
ThreadPool.QueueUserWorkItem(委托WaitCallback,状态值),传给委托回调方法的形参。此线程池方法有点类似于ManualResetEvent()开关门操作。
几种形式:

  • new 一个委托 ThreadPool.QueueUserWorkItem(new WaitCallback(方法名),状态值object);
  • Lambda表达式,直接使用方法 ThreadPool.QueueUserWorkItem(a=>方法名(形参));
  • Lambda表达式,在{}中写方法体ThreadPool.QueueUserWorkItem(a=>{执行语句;},状态值a);执行语句可以调用外部的变量,称之为闭包。 ```csharp using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Threading;

namespace ThreadPoolQueueWorkItem { class Program {

  1. static void Main(string[] args) {
  2. const string lambdaState = "lamAlbert";
  3. ThreadPool.QueueUserWorkItem(new WaitCallback(DoSomethingLong),"first thread");
  4. ThreadPool.QueueUserWorkItem(a => DoSomethingLong("HelloWorld"));
  5. Console.WriteLine("Hello World");
  6. int workerThreads;
  7. int completionPortThreads;
  8. ThreadPool.GetMaxThreads(out workerThreads, out completionPortThreads);
  9. Console.WriteLine($"GetMaxThreads {workerThreads} {completionPortThreads}");
  10. Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}");
  11. ThreadPool.QueueUserWorkItem(state => {
  12. Console.WriteLine($"Operation state :{state}");
  13. Console.WriteLine($"Worker thread id :{Thread.CurrentThread.ManagedThreadId}");
  14. Thread.Sleep(TimeSpan.FromSeconds(2));
  15. },"lambda state");
  16. ThreadPool.QueueUserWorkItem(_ => {
  17. Console.WriteLine($"Operation state :{_} {lambdaState}"); //_的值为lambdaTwo,lambdaState从外部获取变量称之为闭包
  18. }, "lambdaTwo");
  19. Console.ReadLine();
  20. }
  21. static void DoSomethingLong(object name) {
  22. Console.WriteLine($"CurrentThread is {Thread.CurrentThread.ManagedThreadId}");
  23. Console.WriteLine($"CurrentThread name is {Thread.CurrentThread.Name}");
  24. Console.WriteLine(name);
  25. for (int i = 0; i < 10; i++) {
  26. Console.WriteLine(name);
  27. Thread.Sleep(1000);
  28. }
  29. }
  30. }

}

  1. 3.4 线程池与并行度<br />直接创建n个线程会消耗大量资源,而线程池则可以用时间换资源,通过有限的线程创建来实现资源的少量消耗,但相应的会增大异步操作的时间。
  2. ```csharp
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Linq;
  6. using System.Text;
  7. using System.Threading.Tasks;
  8. using System.Threading;
  9. using System.Diagnostics;
  10. namespace ThreadPoolAndParell {
  11. class Program {
  12. static void Main(string[] args) {
  13. const int _numberOfOperation = 500;
  14. var sw = new Stopwatch();
  15. sw.Start();
  16. UseThreads(_numberOfOperation);
  17. sw.Stop();
  18. Console.WriteLine($"{sw.ElapsedMilliseconds} ThreadCreate Time");
  19. sw.Reset();
  20. sw.Start();
  21. UseThreadPool(_numberOfOperation);
  22. sw.Stop();
  23. Console.WriteLine($"{sw.ElapsedMilliseconds} ThreadPool Time");
  24. Console.ReadLine();
  25. }
  26. static void UseThreads(int numberOfOperation) {
  27. using (var countdown = new CountdownEvent(numberOfOperation)) {
  28. Console.WriteLine("Scheduling work by creating threads");
  29. for (int i = 0; i < numberOfOperation; i++) {
  30. new Thread(() => {
  31. Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId},");
  32. Thread.Sleep(TimeSpan.FromSeconds(0.1));
  33. countdown.Signal();
  34. }).Start();
  35. }
  36. countdown.Wait();
  37. Console.WriteLine("Run completely");
  38. }
  39. }
  40. static void UseThreadPool(int numberOfOperation) {
  41. using (var countdown = new CountdownEvent(numberOfOperation)) {
  42. Console.WriteLine("Scheduling work by threadpool");
  43. for (int i = 0; i < numberOfOperation; i++) {
  44. ThreadPool.QueueUserWorkItem(_ => {
  45. Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId},");
  46. Thread.Sleep(TimeSpan.FromSeconds(0.1));
  47. countdown.Signal();
  48. });
  49. }
  50. countdown.Wait();
  51. Console.WriteLine("Run completely");
  52. }
  53. }
  54. }
  55. }

3.4 实现一个取消选项CancellationToken CancellationTokenSource

托管资源:由CLR管理分配和释放的资源,也就是我们直接new出来的对象;
非托管资源:不受CLR控制的资源,也就是不属于.NET本身的功能,往往是通过调用跨平台程序集(如C++)或者操作系统提供的一些接口,比如Windows内核对象、文件操作、数据库连接、socket、Win32API、网络等。一个标准的释放非托管资源的类应该去实现IDisposable接口。

  1. public class MyClass:IDisposable{
  2. public void Dispose(){
  3. }
  4. }
  5. using (var mc = new MyClass()){
  6. }
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Threading.Tasks;
  6. using System.Threading;
  7. namespace CancellationTokenAndCancellationTokenSource {
  8. class Program {
  9. static void Main(string[] args) {
  10. using (var tokenSource = new CancellationTokenSource()) {
  11. var token = tokenSource.Token; //这里通常利用CancellationTokenSource来控制CancellationToken
  12. //以避免cancellationtoken构造器传入false,而不执行register方法。
  13. ThreadPool.QueueUserWorkItem(_ => {
  14. AsyncOperation1(token);
  15. // Thread.Sleep(TimeSpan.FromSeconds(2));
  16. });
  17. Thread.Sleep(TimeSpan.FromSeconds(2));
  18. tokenSource.Cancel();
  19. Console.ReadLine();
  20. }
  21. }
  22. static void AsyncOperation1(CancellationToken token) {
  23. Console.WriteLine("Starting the first task");
  24. for (int i = 0; i < 5; i++) {
  25. if (token.IsCancellationRequested) { //轮询方式,查询是否有取消请求
  26. Console.WriteLine("The first task has been canceled.");
  27. return;
  28. }
  29. else
  30. Console.WriteLine(i.ToString());
  31. Thread.Sleep(TimeSpan.FromSeconds(1));
  32. }
  33. Console.WriteLine("The first task has completed successfully");
  34. }
  35. static void AsyncOperation2(CancellationToken token) {
  36. try {
  37. Console.WriteLine("Starting the second task");
  38. for (int i = 0; i < 5; i++) {
  39. token.ThrowIfCancellationRequested();//抛一个线程取消异常,走到异常分支
  40. Thread.Sleep(TimeSpan.FromSeconds(1));
  41. }
  42. Console.WriteLine("The first task has completed successfully");
  43. }
  44. catch (OperationCanceledException) {
  45. Console.WriteLine("The second task has been canceled.");
  46. }
  47. }
  48. static void AsyncOperation3(CancellationToken token) {
  49. bool cancellationFlag = false;
  50. token.Register(() => cancellationFlag = true); //通常我们使用CancellationTokenSourcecancel方法后,执行此回调方法。
  51. Console.WriteLine("Starting the third task");
  52. for (int i = 0; i < 5; i++) {
  53. if (cancellationFlag) {
  54. Console.WriteLine("The third task has been canceled.");
  55. }
  56. Thread.Sleep(TimeSpan.FromSeconds(1));
  57. }
  58. Console.WriteLine("The first task has completed successfully");
  59. }
  60. }
  61. }

3.5 RegisterWaitForSingleObject线程池等待事件处理器

在timeSpan之后将执行回调方法,回调方法两个参数,一个state,为Object state下面为null,另一个布尔值timeout,true超时,false表示收到了信号
ThreadPool.RegisterWaitForSingleObject(WaitHandle waitObject,WaitOrTimerCallback callBack, object state, TimeSpan timeout, bool executeOnlyOnce);
WaitHandle信号量类例如ManualResetEvent, 回调函数在timeout之后执行,object state将状态传回回调函数第一个参数,回调函数第二个参数false表示收到了信号,true表示超时。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;

namespace ThreadPoolRegisterWaitForSingleObject {
    class Program {
        static void Main(string[] args) {
            RunOperations(TimeSpan.FromSeconds(1000));
           // RunOperations(TimeSpan.FromSeconds(10));
            Console.ReadLine();
        }
        static void RunOperations(TimeSpan timeSpan) {
            using (var evt = new ManualResetEvent(false)) //无法将ManualResetEventSlim转换为WaitHandle类
            using (var cts = new CancellationTokenSource()) {
                Console.WriteLine("Registering timeout operation....");
                //在timeSpan之后将执行回调方法,回调方法两个参数,一个state,为Object state下面为null,另一个布尔值timeout,true超时,false表示收到了信号
                var worker = ThreadPool.RegisterWaitForSingleObject(evt, (state,timeout)=> WorkOperationWait(cts, timeout), null, timeSpan, true);
                Console.WriteLine("Starting long running opration....");
                ThreadPool.QueueUserWorkItem(_=>WorkerOpration(cts.Token,evt));
                Thread.Sleep(timeSpan.Add(TimeSpan.FromMilliseconds(6)));
                worker.Unregister(evt);
            }
        }

        static void WorkerOpration(CancellationToken token,ManualResetEvent evt) {
            for (int i = 0; i < 5; i++) {
                if (token.IsCancellationRequested) {
                    Console.WriteLine("Cancel token");
                    return;
                }
                Thread.Sleep(TimeSpan.FromSeconds(1));
            }
            evt.Set();//产生一个信号量
        }

        static void WorkOperationWait(CancellationTokenSource tokenSource,bool isTimeOut) {
            if (isTimeOut) {//true表示超时或cancel了
                tokenSource.Cancel();
                Console.WriteLine("Worker Operation timed out and was canceled");
            }
            else {//false表示收到信号通知
                Console.WriteLine("Worker Operation succeded");
            }
        }
    }
}

3.6 计时器System.Threading.Timer

Timer(callback,state,dueTime,periodTime)
下方程序延时1S启动第一个执行,间隔2S后依次执行。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;

namespace ThreadPoolRegisterWaitForSingleObject {
    class Program {
        static Timer _timer;
        static void Main(string[] args) {
            Console.WriteLine("Press 'Enter' to stop the timer...");
            DateTime start = DateTime.Now;
            //延时1S启动,间隔2S再次启动
            _timer = new Timer(_ => TimerOperation(start), null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2));
            //Timer.Change(dueTime,periodTime)改变
            Console.ReadLine();
        }
        static void TimerOperation(DateTime start) {
            TimeSpan timeSpan = DateTime.Now - start;
            Console.WriteLine($"{timeSpan.Seconds} seconds from {start}."+
                $"Timer thread pool thread id:{Thread.CurrentThread.ManagedThreadId}");
        }
    }
}

3.7 使用BackgroundWorker组件 Winform耗时后台

基于事件的异步模式 Event-based Asynchronous Pattern。历史上第二个用于构造异步程序的方法,现推荐使用TPL方法。BackgroudWorker使用参考

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using System.ComponentModel;

namespace ThreadPoolRegisterWaitForSingleObject {
    class Program {
        static void Main(string[] args) {
            var bw = new BackgroundWorker();
            bw.WorkerReportsProgress = true;//报到进度更新
            bw.WorkerSupportsCancellation = true;//支持异步取消操作

            bw.DoWork += Bw_DoWork; //执行方法
            bw.ProgressChanged += Bw_ProgressChanged;//进度改变事件
            bw.RunWorkerCompleted += Bw_RunWorkerCompleted;//进度完成事件

            bw.RunWorkerAsync(100);//开始执行后台操作

            Thread.Sleep(TimeSpan.FromSeconds(2));
            bw.CancelAsync();//触发RunWorkCompleted操作
            Console.ReadLine();
        }

        private static void Bw_RunWorkerCompleted(object sender, RunWorkerCompletedEventArgs e) {
            if (e.Cancelled) {
                Console.WriteLine("Canceled");
            }
            else if (e.Error != null) {
                Console.WriteLine(e.Error);
            }
            else {
                Console.WriteLine($"Finally { e.Result}");
            }

        }

        private static void Bw_ProgressChanged(object sender, ProgressChangedEventArgs e) {
            Console.WriteLine($"Current UserState {e.UserState}");
            Console.WriteLine($"Current Progress {e.ProgressPercentage}");
        }

        private static void Bw_DoWork(object sender, DoWorkEventArgs e) {
            var bw = (BackgroundWorker)sender;
            int endNumber = 0;
            if(e.Argument != null) {
                endNumber = (int)e.Argument;
            }
            int sum = 0;
            for (int i = 0; i < endNumber; i++) {
                sum += i;
                string userState = "CurrentNumber is " + i.ToString();
                bw.ReportProgress(i, userState);//触发ProgressChanged事件 i传给了e.ProgressPercentage userState传给了e.UserState
                Thread.Sleep(TimeSpan.FromSeconds(0.1));

                //检查是否执行了CancelAsync()方法
                if (bw.CancellationPending) {
                    e.Cancel =true;
                    break;
                }

            }
            Console.WriteLine($"do work: {sum}");
            e.Result = sum;
        }
    }
}