thread的问题
- 线程(Thread)是用来创建并发(concurrency)的一种低级别工具,它有一些限制,尤其是:
- ·虽然开始线程的时候可以方便的传入数据,但是当Join 的时候,很难从线程获得返回值。
- 可能需要设置一些共享字段。
- 如果操作抛出异常,捕获和传播该异常都很麻烦。
- 无法告诉线程在结束时开始做另外的工作,你必须进行Join 操作(在进程中阻塞当前的线程)
- 很难使用较小的并发(concurrent)来组建大型的并发。
解决:Task类
Task
-
Task是一个相对高级的抽象:它代表了一个并发操作(concurrent)
- 该操作可能由Thread支持,或不由Thread支持
-
Task是可组合的(可使用Continuation把它们串成链)
- Task可以使用线程池来减少启动延迟
- 使用TaskCompletionSource,Tasks 可以利用回调的方式,在等待IO绑定操作时完全避免线程。
开始使用Task:Task.Run
-
Task类在System.Threading.Tasksm命名空间下
-
开始一个Task最简单的办法就是使用Task.Run (.NET 4.5,4.0的时候是Task.Factory.StartNew)这个静态方法:
- 传个一个Action委托即可
-
Task默认使用线程池,也就是后台线程
- 当主线程结束时,你创建的所有Tasks都会结束
-
Task.Run返回一个Task,对象,可以使用它来监视其过程
- ·在Task.Run 之后,我们没有调用Start,因为该方法创建的是“热”任务(hot task(类似创建即使用))
- 可以通过Task 的构造函数创建“冷”任务(cold task),但是很少这样做
- ·在Task.Run 之后,我们没有调用Start,因为该方法创建的是“热”任务(hot task(类似创建即使用))
-
可以通过Task的Status属性来跟踪Taskd的执行状态
-
调用Task的Wait方法进行阻塞直到操作完成
static void Main(string[] args) { Task task = Task.Run(() => { Thread.Sleep(3000); Console.WriteLine("foo"); }); Console.WriteLine(task.IsCompleted);//false task.Wait();//阻塞直到task完成操作 Console.WriteLine(task.IsCompleted);//true }
-
Wait也可以让你指定一个超时时间和一个消息令牌来提前结束等待
-
默认情况下,CLR在线程池中运行Task,这非常适合短时间运行的Compute-Bound类工作。
-
针对长时间运行的任务或者阻塞操作,你可以不采用线程池
static void Main(string[] args) { Task task = Task.Factory.StartNew(() => { Thread.Sleep(3000); Console.WriteLine("foo"); },TaskCreationOptions.LongRunning); }
-
如果同时运行多个 long-running tasks(尤其是其中有处于阻塞状态的),那么性能将会受很大影响,这时有比TaskCreationOptions.LongRunning更好的办法:·
- 如果任务是IO-Bound,TaskCompletionSource和异步函数可以让你用回调(Coninuations) 代替线程来实现并发。
- 如果任务是 Compute-Bound,生产者/消费者队列允许你对任务的并发性进行限流,避免把其他线程和进程饿死。
-
Task有一个泛型子类叫做Task,它允许发出一个返回值。
-
使用Func委托或兼容的Lambda表达式来调用Task.Run就可以得到Task.
-
然后可以通过Result属性来获取返回的结果
- 如果这个task还没有完成操作,访问Result属性会阻塞该线程直到该task完成操作
static void Main(string[] args) { Tasktask = Task.Run(() => { Console.WriteLine("Foo"); Thread.Sleep(3000); return 3; }); int result = task.Result;//如果task没有完成,那么就会阻塞 Console.WriteLine(result); }
static void Main(string[] args) { //获取质数 TaskTask的异常task = Task.Run(() => Enumerable.Range(2, 30000).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0))); Console.WriteLine("Task runing ……"); Console.WriteLine("The answer is "+task.Result); }
-
与Thread不一样,Task可以很方便的传播异常
-
如果你的task里面抛出一个未处理的异常(故障),那么该异常就会重新被抛出给:
-
调用Wait()的地方
-
访问了Task的Result属性的地方
static void Main(string[] args) { Task task = Task.Run(() => { throw null; }); try { task.Wait(); } catch (AggregateException aex) { if (aex.InnerException is NullReferenceException) { Console.WriteLine("Null"); } else { throw; } } }
-
-
-
CLR将异常包裹在AggregateRxception里,以便在并行编程场景中发挥很好的作用
不想抛出异常,想要知道问题
-
无需抛出异常,通过Task的IsFaulted和IsCanceled属性也可以检查出Task是否发生故障。
- 如果两个属性都返回false那么没有错误发生
- 如果IsCanceled为true那就说明OperationCanceledRxception为该Task抛出了。
- 如果lsFaulted为true,那就说明另一个类型的异常被抛出了,而Exception属性也将指明错误
异常与“自治”的Task
-
自治的,“设置完就不管了”的Task。就是指不通过调用Wait()方法、Result属性或continuation进行会合的任务。
-
针对自治的Task,需要像Thread一样,显式的处理异常,避免发生“悄无声息的故障”。
**未观察到的异常:**自治Task上未处理的异常称为未观察到的异常。
- 可以通过全局的 TaskScheduler.UnobservedTaskException来订阅未观察到的异常。
- 关于什么是“未观察到的异常”,有一些细微的差别:
- ·使用超时进行等待的 Task,如果在超时后发生故障,那么它将会产生一个“未观察到的异常”。
- 在Task 发生故障后,如果访问Task的Exception属性,那么该异常就被认为是“已观察到的”。
- 一个Continuation会对 Task 说:“当你结束的时候,继续再做点其它的事”
- Continuation通常是通过回调的方式实现的
static void Main(string[] args) { Tasktask = Task.Run(() => Enumerable.Range(2, 30000).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0))); var awaiter = task.GetAwaiter(); awaiter.OnCompleted(() => { int result = awaiter.GetResult(); Console.WriteLine(result); }); Console.ReadLine(); }
- 在task上调用GetAwaiter会返回一个awaiter 对象
- 它的 OnCompleted方法会告诉之前的task:“当你结束/发生故障的时候要执行委托”。
- 可以将Continuation 附加到已经结束的task 上面,此时Continuation将会被安排立即执行。
-
r任何可以暴露下列两个方法和一个属性的对象就是awaiter:
- OnCompleted:是INotifyCompleted的一部分
- GetResult
- IsCompleted的bool属性
如果发生故障
-
如果之前的任务发生故障,那么当Continuation代码调用awaiter.GetResult()的时候,异常会被重新抛出。
-
无需调用GetResult,我们可以直接访问task的 Result属性。
-
但调用GetResult的好处是,如果 task发生故障,那么异常会被直接的抛出,而不是包裹在AggregateException里面,这样的话catch 块就简洁很多了。
- 针对非泛型的Task,GetResult()方法有一个void放回值,它就是用来重新抛出异常的
- 如果同步上下文出现了,那么OnCompleted 会自动捕获它,并将Continuation提交到这个上下文中。这一点在富客户端应用中非常有用,因为它会把Continuation放回到U线程中。
- 如果是编写一个库,则不希望出现上述行为,因为开销较大的UI线程切换应该在程序运行离开库的时候只发生一次,而不是出现在方法调用之间。所以,我们可以便用ConfigureAwait方法来避免这种行为
static void Main(string[] args) { Tasktask = Task.Run(() => Enumerable.Range(2, 30000).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0))); var awaiter = task.ConfigureAwait(false).GetAwaiter(); awaiter.OnCompleted(() => { int result = awaiter.GetResult(); Console.WriteLine(result); }); Console.ReadLine(); }
- 如果没有同步上下文出现,或者你使用的是ConfigureAwait(false),那么Continuation会运行在先前task的同一个线程上,从而避免不必要的开销
- 另外一种附加Continuation的方式就是调用task 的 ContinueWith方法(例子continueWith)
static void Main(string[] args) { Tasktask = Task.Run(() => Enumerable.Range(2, 30000).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0))); task.ContinueWith(tasks => { int result = tasks.Result; Console.WriteLine(result); }); Console.ReadLine(); }
- ContinueWith本身返回一个task,它可以用它来附加更多的Continuation。
- 但是,必须直接处理AggregateException
- 如果 task 发生故障,需要写额外的代码来把 Continuation封装(marshal)到UI应用上。
- 在非UI上下文中,若想让Continuation和 task执行在同一个线程上,必须指定TaskContinuationOptions.ExecuteSynchronously,否则它将弹回到线程池。
- ContinueWith 对于并行编程来说非常有用。
-
TaskCompletionSource让你在稍后开始和结束的任意操作中创建Task
- 它会为你提供一个可手动执行的“从属”Task
- 指示操作何时结束或发生故障
- 它会为你提供一个可手动执行的“从属”Task
-
它对IO-Bound类工作比较理想
- 可以获得所有Task的好处(传播值、异常、Continuation等)·不需要在操作时阻塞线程
使用TaskCompletionSource
-
初始化一个实例即可: TaskCompletionSource tcs=new TaskCompletionSource();
-
有一个Task属性可返回一个Task
-
该Task完全由TaskCompletionSource 对象控制
-
对象中的方法只能调用一次,如果多次调用:
- SetXxx会抛出异常
- TryXxx会返回false
static void Main(string[] args) { var tcs = new TaskCompletionSource(); new Thread(() => { Thread.Sleep(5000); tcs.SetResult(42); }) { IsBackground = true }.Start(); Task task = tcs.Task; Console.WriteLine(task.Result); }
static void Main(string[] args) { Tasktask = Run(() => { Thread.Sleep(5000); return 42; }); Console.WriteLine(task.Result); } //调用此方法相当于调用Task.Factory.StastNew. //并使用TaskCreationOptions.LongRunning选项来创建非线程池的线程 static Task Run (Func function) { var tcs = new TaskCompletionSource (); new Thread(() => { try { tcs.SetResult(function()); } catch (Exception ex) { tcs.SetException(ex); } }).Start(); return tcs.Task; }
TaskCompletionSource创建Task,但是并不占用线程(关键)
static void Main(string[] args) { var awaiter = GetAnswerToLife().GetAwaiter(); awaiter.OnCompleted(() => { Console.WriteLine(awaiter.GetResult()); }); Consle.ReadKey(); } static TaskGetAnswerToLife() { var tcs = new TaskCompletionSource (); var timer = new System.Timers.Timer(5000) { AutoReset = false }; timer.Elapsed += delegate { timer.Dispose(); tcs.SetResult(42); }; timer.Start(); return tcs.Task; }
static void Main(string[] args) { Delay(5000).GetAwaiter().OnCompleted(() => Console.WriteLine(42)); //五秒之后,Coninuation开始的时候,才占用线程。 Console.ReadKey(); } //注意;没有非泛型版本的TaskCompletioSource static Task Delay(int Mill) { var tcs = new TaskCompletionSourceTask.Delay
static void Main(string[] args) { //Task.Delay相当于异步版本的Thread.Sleep Task.Delay(5000).GetAwaiter().OnCompleted(() => Console.WriteLine(42)); Task.Delay(5000).ContinueWith(ant => Console.WriteLine(42)); Console.ReadKey(); }同步 vs 异步
- 同步操作会在返回调用者之前完成它的工作
- 异步操作会在返回调用者之后去做它的(大部分)工作
- ·异步的方法更为少见,会启用并发,因为它的工作会与调用者并行执行
- 异步方法通常很快(立即)就会返回到调用者,所以叫非阻塞方法
什么是异步编程
原则:将长时间运行的函数写成异步的
-
传统的做法是将长时间运行的函数写成同步的,然后从新的线程或Task进行调用,从而按需引入并发。
-
上述异步方式的不同之处在于,它是从长时间运行函数的内部启动并发。这有两点好处:
- IO-bound并发可不使用线程来实现。可提高可扩展性和执行效率;
- 富客户端在worker线程会使用更少的代码,简化了线程安全性。
异步编程的两种途径
-
一、编写高效处理大量并发IO的应用程序(典型的:服务器端应用)
-
挑战并不是线程安全(因为共享状态通常是最小化的),而是执行效率、
-
特别的,每个网络请求并不会消耗一个线程
-
-
调用图(Call graph):
-
在富客户端应用里简化线程安全。
-
·如果调用图中任何一个操作是长时间运行的,那么整个call graph 必须运行在worker线程上,以保证UI响应。
-
得到一个跨越多个方法的单一并发操作(粗粒度)
-
需要为call graph中的每个方法考虑线程安全
-
-
-
- 异步的call graph直到需要才开启一个线程,通常较浅(IO-bound操作完全不需要)
- 其他的方法可以在UI线程执行,线程安全简化
- 并发,是一连串小的并发操作,操作之间会弹回UI线程
- 异步的call graph直到需要才开启一个线程,通常较浅(IO-bound操作完全不需要)
建议异步编写:
- IO-bound和Compute-bound操作
- 执行时间操作50毫秒的操作
- Task 非常适合异步编程,因为它们支持Continuation(它对异步非常重要)
- TaskCompletionSource是实现底层IO-bound异步方法的一种标准方式
- 对于Compute-bound方法,Task.Run会初始化绑定线程的并发。
- 把task返回调用者,创建异步方法;
- 异步编程的区别:目标是在调用图较低的位置来这样做
- 富客户端应用中,高级方法可以保留在UI线程和访问控制以及共享状态上,不会出现线程安全问题。
static void Main(string[] args) { DisplayPrimeCounts(); Console.ReadKey(); } static void DisplayPrimeCounts() { //for (int i = 0; i < 10; i++) //{ // Console.WriteLine(GetPrimesCount(i*100000+2,100000)+"ptimes between"+(i*100000)+"and"+((i+1)*100000-1)); //} //Console.WriteLine("Done!"); for (int i = 0; i < 10; i++) { var awaiter = GetPrimesCountAsync(i * 100000 + 2, 100000).GetAwaiter(); awaiter.OnCompleted(() => Console.WriteLine(awaiter.GetResult() + "ptimes between ……")); } Console.WriteLine("Done!"); } static TaskGetPrimesCountAsync(int start,int count) { return Task.Run(()=>ParallelEnumerable.Range(start, count).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0))); } static int GetPrimesCount(int start, int count) { return ParallelEnumerable.Range(start, count).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0)); }
语言对异步的支持非常重要
.需要对task 的执行序列化
·例如TaskB依赖于TaskA的执行结果。
·为此,必须在continuation内部触发下一次循环
static void Main(string[] args) { DisplayPrimeCountsFrom(0); Console.ReadKey(); } static void DisplayPrimeCountsFrom(int i) { var awaiter = GetPrimesCountAsync(i * 100000 + 2, 100000).GetAwaiter(); awaiter.OnCompleted(() => { Console.WriteLine(awaiter.GetResult() + "ptimes between ……"); if (++i < 10) { DisplayPrimeCountsFrom(i); } else { Console.WriteLine("Done!"); } } ); } static TaskGetPrimesCountAsync(int start, int count) { return Task.Run(() => ParallelEnumerable.Range(start, count).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0))); }
但是不是异步的:
异步:async和await
static async Task Main(string[] args) { await DisplayPrimeCountsAsync(); } async static Task DisplayPrimeCountsAsync() { for (int i = 0; i < 10; i++) { Console.WriteLine(await GetPrimesCountAsync(i * 100000 + 2, 100000) + "ptimes between" + (i * 100000) + "and" + ((i + 1) * 100000 - 1)); } Console.WriteLine("Done!"); } static TaskGetPrimesCountAsync(int start, int count) { return Task.Run(() => ParallelEnumerable.Range(start, count).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0))); }
- 命令式循环结构不要和continuation混合在一起,因为它们依赖于当前本地状态。
- 另外一种实现,函数式写法(Linq查询),它也是响应式编程(Rx)的基础。
- async和await关键字可以让你写出和同步代码一样简洁且结构相同的异步代码
awating
-
await关键字简化了附加continuation的过程
-
代码结构
var result=await expression; statement(s);
-
作用相当于:
var awaiter=expression.GetAwaiter(); awaiter.OnCompleted(()=> { var result=awaiter.GetResult(); statement(s); });
static async Task Main(string[] args) { await DisplayPrimes(); } static async Task DisplayPrimes() { int result = await GetPrimesCountAsync(2, 100000); Console.WriteLine(result); } static TaskGetPrimesCountAsync(int start, int count) { return Task.Run(() => ParallelEnumerable.Range(start, count).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0))); }
async修饰符
-
async修饰符会让编译器把await当作关键字而不是标识符(C#5以前可能会使用await 作为标识符)
-
async修饰符只能用于方法(包括Lambda表达式)
- 该该方法可以返回void、Task、Task
-
async修饰符对方法的签名或public元数据没有影响(和unsafe一样),它只会影响方法内部
- 在接口内使用async是没有意义的
- 使用async来重载非async的方法却是合法的(只要方法签名一致)
-
使用了async修饰符的方法就是“异步函数”
异步方法的执行
- 遇到await表达式,执行(正常情况下)会返回调用者
- 就像iterator里面的yield return
- 在返回前,运行时会附加一个continuation到await的task
- 为保证task结束时,执行会跳回原方法,从停止的地方继续执行
- 如果发生故障,那么异常将会被重新抛出
- 如果一切正常,那么它的返回值就会赋给await表达式
static async Task Main(string[] args) { //DisplayPrimesAsync和DisplayPrimesCount等同 //DisplayPrimesCount是DisplayPrimesAsync编译后的代码 await DisplayPrimesAsync(); DisplayPrimesCount(); Console.ReadKey(); } static void DisplayPrimesCount() { var awaiter = GetPrimesCountAsync(2, 100000).GetAwaiter(); awaiter.OnCompleted(() => { int result = awaiter.GetResult(); Console.WriteLine(result); }); } static async Task DisplayPrimesAsync() { int result = await GetPrimesCountAsync(2, 100000); Console.WriteLine(result); } static TaskGetPrimesCountAsync(int start, int count) { return Task.Run(() => ParallelEnumerable.Range(start, count).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0))); }
可以await什么?
-
await的表达式通常是一个task
-
也可以是满足线路条件的任意对象:
- 有GetAwaiter方法,它返回一个awaiter(实现了INotifyCompletion.OnCompleted接口)
- 返回适当类型的GetResult方法
- 一个bool类型的IsCompleted属性
捕获本地状态
- await表达式的最牛之处就是它几乎可以出现在任何地方。
- 特别的,在异步方法内,await表达式可以替换任何表达式。
- 除了lock表达式和unsafe 上下文
static void Main(string[] args) { DisplayPrimesCount(); Console.ReadKey(); } async static void DisplayPrimesCount() { for (int i = 0; i < 10; i++) { Console.WriteLine(await GetPrimesCountAsync(i*100000+2, 100000)); } } static TaskGetPrimesCountAsync(int start, int count) { return Task.Run(() => ParallelEnumerable.Range(start, count).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0))); }
await之后在那个线程上执行
- 在await表达式后,编译器依赖于continuation(通过awaiter模式)来继续执行
- 如果在富客户端应用的UI线程上,同步上下文湖保证后续是在原线程上执行;
- 否则,就会在task结束的线程上继续执行
UI 上的await
WPF例子:
///本例中,只有 GetPrimesCountAsync中的代码在worker线程上运行 ///Go中的代码会“租用”UI线程上的时间 ///可以说:Go是在消息循环中“伪并发”的执行 ///也就是说:它和U线程处理的其它事件是穿插执行的 ///因为这种伪并发,唯一能发生“抢占”的时刻就是在await 期间。 ///这其实简化了线程安全,防止重新进入即可 ///因为这种伪并发,唯一能发生“抢占”的时刻就是在await 期间。·这其实简化了线程安全,防止重新进入即可 public partial calss MainWindow:Window{ Button _button =new Button{Content = "Go"}; TextBlock _result=new TextBlock(); public MainWindow(){ InitalizeComponet(); var panel=new StackPanel(); panel.Children.Add(_button); panel.Children.Add(_result); Content=panel; _button.Click+=(Sender,args)=>Go(); } async void Go(){ //禁用按钮 _button.IsEnabled=false; for(int i=1;i<5;i++){ _result.Text += await GetPrimesCountAsync(i * 100000 + 2, 100000) + "ptimes between" + (i * 100000) + "and" + ((i + 1) * 100000 - 1)+Environment.NewLine; } //启用按钮 _button.IsEnabled=true; } TaskGetPrimesCountAsync(int start, int count) { return Task.Run(()=>ParallelEnumerable.Range(start, count).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0))); } }
public partial class MainWindow : Window { Button _button = new Button { Content = "Go" }; TextBlock _result = new TextBlock(); public MainWindow() { InitializeComponent(); var panel = new StackPanel(); panel.Children.Add(_button); panel.Children.Add(_result); Content = panel; _button.Click += (Sender, args) => Go(); } async void Go() { //禁用按钮 _button.IsEnabled = false; string[] urls = "www.baidu.com www.cikxk.com www.linqpad.com".Split(); int tolallLength = 0; try { foreach (string item in urls) { var url = new Uri("http://" + item); byte[] data = await new WebClient().DownloadDataTaskAsync(url); _result.Text += "Length of" + url + "is" + data.Length + Environment.NewLine; tolallLength += data.Length; } _result.Text += "Total Length" + tolallLength; } catch (Exception ex) { _result.Text += "Error" + ex.Message; } finally { //启用按钮 _button.IsEnabled = true; } }
-
伪代码:
- 附加到U元素的event handler 通过消息循环执行
- 因为在U线程上await,continuation将发送到同步上下文上,该同步上下文通过消息循环执行来保证整个Go方法伪并发的在U线程上执行。
与粗粒度的并发相比
- 整个同步调用图都在worker线程上、
- 必须在代码中到处使用Dispatcher.BeginInvoke
- 循环本身在worker 线程上
- 引入了 race condition(静态条件)
- 若实现取消和过程报告,会使得线程安全问题更容易发生,在方法中新添加任何的代码也是同样的效果。
public partial calss MainWindow:Window{ Button _button =new Button{Content = "Go"}; TextBlock _result=new TextBlock(); public MainWindow(){ InitalizeComponet(); var panel=new StackPanel(); panel.Children.Add(_button); panel.Children.Add(_result); Content=panel; _button.Click+=(Sender,args)=>{ _button.IsEnabled=false; Task.Run(()=>Go()); } } async void Go(){ for(int i=1;i<5;i++){ int result=GetPrimesCoun(i * 100000+2, 100000); Dispatcher.BeginInvoke(new Action(()=>result.Text += result+ "ptimes between" + (i * 100000) + "and" + ((i + 1) * 100000 - 1)+Environment.NewLine)); } //启用按钮 Dispatcher.BeginInvoke(new Action(()=>_button.IsEnabled=true)); } int GetPrimesCount(int start, int count) { return ParallelEnumerable.Range(start, count).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0)); } }编写异步函数
-
对于任何异步函数,你可以使用Task代替Void作为返回值,让该方法分忧小的异步(可以进行await)
static async Task Main(string[] args) { await PtintAnswerTolisfo(); Console.ReadKey(); } async static Task PtintAnswerTolisfo() { await Task.Delay(5000); int answer = 21 * 2; Console.WriteLine(answer); }
-
并不需要在方法体中显式的返回Task。编译器会生成一个Task(当方法完成或发生异常时),这使得创建异步的调用链非常方便.
static async Task Main(string[] args) { await Go(); Console.ReadKey(); } async static Task Go() { await PtintAnswerTolisfo(); Console.WriteLine("Done!"); } async static Task PtintAnswerTolisfo() { await Task.Delay(5000); int answer = 21 * 2; Console.WriteLine(answer); }
-
编译器会对返回Task的异步函数进行扩展,使其成为当发送信号或发生故障时使用TaskCompletionSource来创建Task的代码
///PtintAnswerTolisfoy和PtintAnswer大致等同 async static Task PtintAnswerTolisfo() { await Task.Delay(5000); int answer = 21 * 2; Console.WriteLine(answer); } Task PtintAnswer() { var tcs = new TaskCompletionSource
-
因此,当返回Task的异步方法结束的时候,执行就会跳回到对它进行await的地方。(通过continuation)
编写异步函数:富客户端场景下
- 富客户端场景下,执行在此刻会跳回到UI线程(如果目前不在U线程的话)。
- 否则,就在continuation返回的任意线程上继续执行。
- 这意味着,在异步调用图中向上冒泡的时候,不会发生延迟成本,除非是U线程启动的第一次“反弹》。
放回Task
-
如果方法体返回TResult,那么异步方法就可以返回Task。
static async Task Main(string[] args) { int num = await GetAnswerTo(); Console.WriteLine(num); } async static Task
GetAnswerTo() { await Task.Delay(5000); int answer = 21 * 2; return answer; // nethod有返回类型Task ,我们返回int } -
其原理就是给TaskCompletion发送的信号带有值,而不是null。
static async Task Main(string[] args) { await GetAnswerToS(); } async static Task GetAnswerToS() { int num = await GetAnswerTo(); Console.WriteLine(num); } async static Task
GetAnswerTo() { await Task.Delay(5000); int answer = 21 * 2; return answer; } -
与同步编程很相似
//同步版本 static void Main(string[] args) { Go(); Console.ReadKey(); } static void Go() { PrintAnswerTolifo(); Console.WriteLine("Done"); } static void PrintAnswerTolifo() { int answer = GetAnswerTolifo(); Console.WriteLine(answer); } static int GetAnswerTolifo() { Thread.Sleep(5000); int answer = 21 * 2; return answer; }
c#中如何设计异步函数
-
以同步的方式编写方法
-
使用异步调用来代替同步调用,并且进行await
-
除了顶层方法外(UI控件的event handler),把你方法的返回类型升级为Task或Task,这样它们就可以进行await了。
编译器能对异步函数生成Task意味着:
- 大多数情况下,你只需要在初始化IO-bound并发的底层方法里显式的初始化TaskCompletionSource,这种情况很少见。
- 针对初始化compute-bound的并发方法,你可以使用Task.Run来创建Task.
异步调用图执行
static async Task Main(string[] args) { await Go();//主线程 } static async Task Go() { var task = PrintAnswerTolifo(); await task; Console.WriteLine("Done"); } static async Task PrintAnswerTolifo() { var task = GetAnswerTolifo(); int answer = await task; Console.WriteLine(answer); } static async TaskGetAnswerTolifo() { var task = Task.Delay(5000); await task; int answer = 21 * 2; return answer; }
-
整个执行与之前同步例子中调用图执行的顺序一样,因为我们对每个异步函数的调用都进行了await。
-
车调用图中创建了一个没有并行和重叠的连续流。
-
每个await在执行中都创建了一个间隙,在间隙后,程序可以从中断处恢复执行。
并行
-
不使用await 来调用异步函数会导致并行执行的发生。
-
例如:_button.Click +=(sender, args) => Go();
- ·确实也能满足保持UlI响应的并发要求。
-
同样,可以并行两个操作
var task1=PrintAnswerTolifo(); var task2=PrintAnswerTolifo(); await task1; await task2;
异步Lambda表达式
-
匿名方法(包括Lambda表达式),通过使用async也可以变成异步方法。
-
调用一样
static async Task Main(string[] args) { Func
unnamed = async () => { await Task.Delay(1000); Console.WriteLine("Foo"); }; await NameNethod(); await unnamed(); } static async Task NameNethod() { await Task.Delay(5000); Console.WriteLine("Foo"); } -
附加event handler的时候也可以使用异步Lambda表达式
MyButton.click+=async(sender,args)=>{ await Task.Delay(1000); MyButton.Count="Done" } //相当于 MyButton.Click+=ButtonHanfler; async void ButtonHander(object sender,EventArgs args){ await Task.Delay(1000); MyButton.Count="Done" }
-
可返回Task类型
static async Task Main(string[] args) { Func
> unnamed = async () => { await Task.Delay(1000); return 123; }; int answer=await unnamed(); }
发布异常
-
富客户端应用通常依赖于集中的异常处理事件来处理U线程上未捕获的异常。
-
例如WPF 中的Application.DispatcherUnhandledException
-
ASP.NET Core 中定制ExceptionFilterAttribute 也是差不多的效果
-
-
其内部原理就是:通过在它们自己的 try/catch 块来调用UI事件(在ASP.NETcore里就是页面处理方法的管道)
-
顶层的异步方法会使事情更加复杂。
async void Buttonclick (object sender,RoutedEventArgs args){ await Task.Delay (1000) ; throw new Exception ( "will this be ignored?" ); }
-
当点击按钮,event handler运行时,在await后,执行会正常的返回到消息循环;1秒钟之后抛出的异常无法被消息循环中的catch块捕获。
-
为了缓解该问题,AsyncVoidMethodBuilder 会捕获未处理的异常(在返回void的异步方法里),并把它们发布到同步上下文(如果出现的话),以确保全局异常处理事件能够触发。
注意
- 编译器只会吧上述逻辑应用于返回值类型为void的异步方法
- 如果 ButtonClick的返回类型是Task,那么未处理的异常将导致结果Task出错,然后Task无处可去(导致未观察到的异常)
一个细微的差别
-
无论你在await前面还是后边抛出异常,都没有区别。
-
因此,异常会被发布到同步上下文(如果出现的话),而不会发布给调用者。
- 例如:async void Foo() { throw null; await Task.Delay(1000);}
- 如果同步上下文没有出现的话,异常将会子在线程上传播,从而终止应用程序。
不直接将异常抛回调用者的原因
-
为了确保可预测性和一致性。
-
在下例中,不管someCondition是什么值,InvalidOperationException将始终得到和导致Task出错同样的效果
async Task Foo(){ if (someCondition){await Task.Delay (100);} throw new InvalidoperationException ( ) ; }
-
lterator也是一样
//异常绝不会直接返回给调用者,直到序列被遍历后,才会抛出异常。 IEnumerable
Foo() { throw null; yield return 123;}
OperationStarted和OperationCompleted
- 如果存在同步上下文,返回void 的异步函数也会在进入函数时调用其OperationStarted方法,在函数完成时调用其OperationCompleted方法
- 如果为了对返回void的异步方法进行单元测试而编写一个自定义的同步上下文那么重写这两个方法确实很有用。
-
异步函数可以在await之前返回
static async Task Main(string[] args) { //编译器是通过检查awaiter 上的 lsCompleted属性来实现这个优化的。也就是说,无论何时,当你await的时候: Console.WriteLine(await GetWebPage("http://oreilly.com")); } static Dictionary
_cache = new Dictionary (); static async Task GetWebPage(string uri) { string html; if (_cache.TryGetValue(uri,out html)) { return html; } return _cache[uri] = await new WebClient().DownloadStringTaskAsync(uri); } //如果URI在缓存中存在,那么不会有await 发生,执行就会返回给调用者,方法会返向一个己经设置信号的Task,这就是同步完成。 //当await同步完成的Task时,执行不会返回到调用者,也不同通过continuation跳回。它会立即执行到下个语句。 -
如果是同步完成,编译器会释放可短路continuation 的代码,
var awaiter = GetwebPageAsync().GetAwaiter(); if ( awaiter.IsCompleted){ Console.writeLine (awaiter.GctResult()); }else{ awaiter.oncompleted(()=> console.writeLine (awaiter.GetResult ()); }
注意
- 对一个同步返回的异步方法进行await,仍然会引起一个小的开销〈20纳秒左右,2019年的PC)
- 反过来,跳回到线程池,会引入上下文切换开销,可能是1-2毫秒
- 而跳回到UI的消息循环,至少是10倍开销(如果UI繁忙,那时间更长)
-
编写完全没有await的异步方法也是合法的,但是编译器会发出警告:
async Task
Foo() { return "abc"; } -
但这类方法可以用于重载virtual/abstract方法。
-
另外一种可以达到相同结果的方式是:使用Task.FromResult,它会返回一个已经设置好信号的Task。
Task
Foo(){ return Task .FromResult (""abc"); }
-
如果从U线程上调用,那么GetWebPageAsync方法是隐式线程安全的。您可以连续多次调用它(从而启动多个并发下载),并且不需要lock来保护缓存。
-
有一种简单的方法可以实现这一点,而不必求助于lock或信令结构。我们创建一个“futures”" (Task)的缓存,而不是字符串的缓存。注意并没有async:
static Dictionary
> _cache =new DictionarsKstring, Task > () ; Task GetwebPageAsync (string uri){ if (_cache.TryGetvalue (uri,out var downloadTask)){ return downloadTask ; } return _cache [uri] = new webclient().DownloadstringTaskasync (uri) ; }
不使用同步上下文,使用Lock也可以
-
lock的不是下载的过程,lock的是检查缓存的过程(很短暂)
static Dictionary
> _cache =new DictionarsKstring, Task > () ; Task GetwebPageAsync (string uri){ lock(_cache){ if(_cache.TryGetValue(uri,out var downloadTask)){ return downloadTask; } else{ return _cache[uri]=new new webclient().DownloadstringTaskasync (uri) ; } } }
-
ValueTask用于微优化场景,您可能永远不需要编写返回此类型的方法。
-
Task和Task 是引用类型,实例化它们需要基于堆的内存分配和后续的收集。
-
优化的一种极端形式是编写无需分配此类内存的代码;换句话说,这不会实例化任何引用类型,不会给垃圾收集增加负担。
-
为了支持这种模式,C#引入了 ValueTask和ValueTask这两个struct,编译器允许使用它们代替Task和 Task
async ValueTask
Foo(){……} -
如果操作同步完成,则await ValueTask是无分配的。
int answer=await Foo();//(可能是)无分配的
-
如果操作不是同步完成的,ValueTask实际上就会创建一个普通的Task(并将await 转发给它)
-
使用AsTask方法,可以把 ValueTask转化为Task(也包括非泛型版本)
注意
- ValueTask并不常见,它的出现纯粹是为了性能。
- 这意味着它被不恰当的值类型语义所困扰,这可能会导致意外。为避免错误行为,必须避免以下情况:
- 多次await同一个ValueTask
- 操作没结束的时候就调用.GetAwaiter().GetResult()
- 如果你需要进行这些操作,那么先调用AsTask方法,操作它返回的Task。
- 避免上述陷阱最简单的办法就是直接await方法调用:
- await Foo();
- 将ValueTask赋给变量时,就可能引发错误了:
- ValueTask valueTask = Foo();
- 但是:将其立即转化为普通的Task,就可以避免此类错误的发生:-
- Task task = Foo().AsTask();
避免过度的弹回
- 对于在循环中多次调用的方法,通过调用ConfigureAwait方法,就可以避免重复的弹回到U消息循环所带来的开销。
- 这强迫Task 不把continuation弹回给同步上下文。从而将开销削减到接近上下文切换的成本(如果您await的方法同步完成,则开销会小得多):
//这意味着对于方法B和C,我们取消了Ul程序中的简单线程安全模型,即代码在U线程上运行,并且只能在await语句期间被抢占。但是,方法A不受影响,如果在一个UI线程上启动,它将保留在UI线程上。 //这种优化在编写库时特别重要:您不需要简化线程安全性带来的好处,因为您的代码通常不与调用方共享状态,也不访问U控件。 async void A(){……await B();……} async Task B(){ for(int i=0;i<1000;i++){ await C().ConfigureAwait(false); } } async Task C(){……}异步模式
取消(cancellation)
-
使用取消标志来实现对并发进行取消,可以封装一个类
class CancellationToken{ public bool IscancellationRequested { get;private set; } public void cancel (){ IscancellationRequested = true; } public void ThrowIfcancellationRequested () { if(IscancellationRequested){ throw new operationcanceledException ( ) ; } } } //调用使用 async Task Foo (cancellationToken cancellationToken){ for (int i = 0; i < 10; i++){ console.writeLine (i); await Task. Delay (1000); cancellationToken.ThrowIfCancellationRequested (); } } //当调用者想取消的时候,它调用Cancellation Token 上的Cancel 方法。这就会把lsCancellationRequested设置为true,即会导致短时间后Foo会通过OperationCanceledException引发错误。
CancellationToken和CancellationTokenSource
-
先不管线程安全(应该在读写lsCancellationRequested时进行lock),这个模式非常的有效,CLR也提供了一个Cancellation Token类,它的功能和前面的例子类似。
-
但是它缺少一个Cancel方法,Cancel方法在另外一个类上进行暴露:CancellationTokenSource
-
这种分离的设计是出于安全考虑:只能对CancellationToken访问的方法可以检查取消,但是不能实例化取消。
var cancelSource=new CancellationTokenSource(); Task foo=Foo(cancelSource.Token); …… ……(some time later) cancelSource.Cancel();
Delay
-
CLR里大部分的异步方法都支持CancellationToken,包括Delay方法。
async Task Foo (cancellationToken cancellationToken{ for (int i = 0; i < 10; i++){ console.writeLine (i); //这时,task在遇到请求时会立即停止(而不是1秒钟之后才停止) await Task.Delay (1000,cancellationToken) ; } } //这里,我们无需调用ThrowlfCancellationRequested,因为Delay会替我们做。 //取消标记在调用栈中很好地向下传播(就像是因为异常,取消请求在调用栈中向上级联一样)。
同步方法
-
同步方法也支持取消(例如Task的 Wait方法)。这种情况下,取消指令需要异步发出例如,来自另一个Task)
var cancelsource = new CancellationTokenSource ( ) ; Task.Delay ( 500e).Continuewith (ant => cancelSource.Cance1()); ...
其他
-
事实上,您可以在构造 Cancellation TokenSource时指定一个时间间隔,以便在一段时间后启动取消。它对于实现超时非常有用,无论是同步还是异步:
var cancelsource = new CancellationTokensource (5000) ; try { await Foo (cancelsource.Token) ; } catch (operationcanceledException ex) { console.writeLine ("cancelled"); }
-
CancellationToken这个struct提供了一个Register方法,它可以让你注册一个回调委托,这个委托会在取消时触发。它会返回一个对象,这个对象在取消注册时可以被Dispose掉。
-
编译器的异步函数生成的Task在遇到未处理的OperationCanceledException异常时会自动进入取消状态(lsCanceled返向true,lsFaulted返回false)
-
使用Task.Run创建的 Task也是如此。这里是指向构造函数传递(相同的)CancellationToken。
-
在异步场景中,故障Task和取消的Task之间的区别并不重要,因为它们在await时都会抛出一个 OperationCanceledException。但这在高级并行编程场景(特别是条件continuation)中很重要。
-
有时,你希望异步操作在运行的过程中能实时的反馈进度。一个简单的解决办法是向异步方法传入一个Action委托,当进度变化的时候触发方法调用:
Task Foo (Action
onProgressPercentchanged){ return Task.Run ( ( ) =>{ for (int i = 0; i < 1000; i++){ if (i % 10 == 0){ onProgressPercentChanged (i / 10) ; } // Do something compute-bound. . . } }); } Action progress = i => console.writeLine (i + "%"); await Foo (progress) ; //尽管这段代码可以在ConsoleApp种很好的应用,但在富客户端应用中却不理想。因为它是从worker线程报告的进度,可能会导致消费者的线程安全问题。
IPROGRESS和PROGRESS
-
CLR提供了一对类型来解决此问题: IProgress 接口、. Progress类(实现了,上面的接口)
-
它们的目的就是包装一个委托,以便UI程序可以安全的通过同步上下文来报告进度。
-
接口定义如下:
public interface IProgress
{ vpid Report (T value) ; } -
使用IProgress:
Task FoR(IProgress
onProgressPercentchanged){ return Task.Run ( ()=>{ for (int i = 0; i < 1000; i++){ if (i % 10 -= 0){ onProgressPercentchanged.Report (i / 10); } //Do something compute-bound. . . } }); } -
Progress的一个构造函数可以接受Action类型的委托:
//接上 var progress = new Progress
(i => console.writeLine (i +”%"); await Foo (progress); -
Progress还有一个 ProgressChanged事件,您可以订阅它,而不是[或附加的]将Action委托传递给构造函数。
-
在实例化 Progress时,类捕获同步上下文(如果存在)
- 当Foo 调用Report时,委托是通过该上下文调用的。
-
异步方法可以通过将int替换为公开一系列属性的自定义类型来实现更精细的进度报告。
TAP
Task-Based Asynchronous Pattern
- .NET Core暴露了数百个返回Task且可以await的异步方法(主要和I/O相关)。大多数方法都遵循一个模式,叫做基于Task的异步模式(TAP)。这是我们迄今为止所描述的合理形式化。TAP方法执行以下操作:
- 返回一个“热”(运行中的)Task或Task
- 方法名以Async结尾(除了像Task 组合器等情况)
- 会被重载,以便接受CancellationToken(和)IProgress,如果支持相关操作的话。
- 快速返回调用者(只有很小的初始化同步阶段)
- 如果是I/O绑定,那么无需绑定线程
Task组合器
- 异步函数有一个让其保持一致的协议(可以一致的返回Task),这能让其保持良好的结果:可以便用以及编写Task组合器,也就是可以组合Task,但是并不关心Task具体做什么的函数。
- CLR提供了两个Task 组合器:-
- Task.WhenAny
- Task.WhenAll
本节基础代码:
async TaskDelay1() { await Task.Delay(1000); return 1; } async Task Delay2() { await Task.Delay(2000); return 2; } async Task Delay3() { await Task.Delay(3000); return 3; }
WhenAny
-
当一组Task 中任何一个Task 完成时,Task.WhenAny会返回完成的Task.
Task
winningTask = await Task.WhenAny (Delay1(),Delay2 (),Delay3()); console.writeLine ("Done" ); console.writeLine (winningTask.Result);// 1 -
因为Task.WhenAny本身就返回一个Task,我们对它进行 await,就会返回最先完成的Task。
-
上例完全是非阻塞的,包括最后一行(当访问Result属性时,winningTask已完或),但最好还是对winning Task进行await,因为异常无需 AggregateException可装就会重新抛出:console.writeLine (await winningTask) ;
-
一步操作中执行两个await:int answer = await await, Task.whenAny (Delay1(),Delay2(),Delay3 () );
-
如果“没赢”的Task后续发生了错误,那么异常将不会被观察到,除非你后续对它们讲行await(或者香询其Exception国性)
-
WhenAny很适合为不支持超时或取消的操作添加这些功能:
Task
task = someAsyncFunc () ; Task winner = await (Task.whenany (task,Task.Delay (5000) ) ) ; if (winner != task){ throw new TimeoutException (); } string result = await task; //ap result/ re-throw //返结果是Task类型
WhenAll
-
当传给它的所有的Task 都完成后,Task.WhenAll会返回一个Task.
await Task.whenAll ( Delay1() , Delay20) ,Delay3 0) ) ;
WhenAll
-
与之相对,Task.WhenAll直到所有Task完成,它才会完成,即使有错误发生。如果有多个错误,它们的异常会包裹在Task的 AggregateException里
-
await组合的Task,只会抛出第一个异常,想要看到所有的异常,你需要这样做:
Task task1 = Task.Run ( () => { throw null; }); Task task2 = Task.Run ( (=> { throw null; }); Task al = Task.whenAll (task1, task2); try { await all; }catch{ Console.writeLine (all.Exception.InnerExceptions.count);// 2 }
-
对一组Task调用 WhenAll 会返Task,也就是所有Task的组合结果。
-
如果进行await,那么就会得到TResult[]:
Task
task1 =Tsk.Run ( 0) => 1) ; Task task2 = Task.Run (0) =>2) ; int[] results = await Task.whenAll (taskl,task2); // { 1,2 }
实例
async TaskGetTotalsize (string[] uris){ IEnumerable > downloadTasks = uris.select (async uri =>(await new webclient () .DownloadDataTaskisync (uri) ).Length); int[] contentLengths = await Task.whenAll (downloadTasks) ; return contentLengths.sum(): }
自定义的Task组合器
-
可以编写自定义的Task 组合器。最简单的组合器接收一个Task,看下例:
//通过在Task完成时取消Task.Delay 我们可以改进上例的效率(避免了计时器的小开销): async static Task
withTimeout (this Task task,Timespan timeout){ var cancelsource = new cancellationTokensource(); var delay = Task.Delay (timeout,cancelsource.Token); Task winner = await Task.whenAny (taskdelay).configureAwait (false) ; if(winner == task){ cancelsource.cancel(); } else{ throw new TimeoutException(); } return await task.configureAwait (false); //Unwrap result/re-throw }
自定义 Task 组合器:通过CancellationToken放弃Task
static Taskwithcancellation (this Task task,CancellationToken cancelToken){ var tcs = new Taskcompletionsource (); var reg = cancelToken.Register (() => tcs.TrySetcanceled()); task.continuewith (ant =>{ reg.Dispose(); if (ant.IsCanceled){ tcs.Trysetcanceled (); }else if (ant.IsFaulted){ tcs.TrySetException (ant.Exception.InnerException); } else{ tcs.TrysetResult (ant.Result); } }); return tcs.Task; }
- 这个组合器功能类似WhenAll,如果一个Task出错,那么其余的Task也立即出错:
//这里面TaskCompletionSource的任务就是当任意一个Task 出错时,结束工作。所以我们没调用SetResult方法,只调用了它的TrySetCanceled和TrySetException方法。在这里ContinueWith 要比GetAwaiter().OnCompleted更方便,因为我们不访问Task 的 Result,并且此刻不想弹回到U线程。 async TaskwhenAllOrError (params Task []tasks){ var killJoy = new Taskcompletionsource (); foreach (var task in tasks){ task.continuewith (ant =>{ if (ant.Iscanceled){ killJoy.Trysetcanceled(); }else if (antlIsFaulted){ killJoy.TrySetException (ant.Exception.InnerException); } }); } return await await Task.wheneny (killJoy.Task,Task.Whenall (tasks)).configureAwait (false); }