当前位置: 首页 > news >正文

星月教你做网站的文档网站需求清单

星月教你做网站的文档,网站需求清单,分析网站建设,建筑工程网格化管理的目的和意义原文#xff1a;http://www.albahari.com/threading/part5.aspx 专题#xff1a;C#中的多线程 1并行编程Permalink 在这一部分#xff0c;我们讨论 Framework 4.0 加入的多线程 API#xff0c;它们可以充分利用多核处理器。 并行 LINQ#xff08;Parallel LINQ#xff09…原文http://www.albahari.com/threading/part5.aspx 专题C#中的多线程 1并行编程Permalink 在这一部分我们讨论 Framework 4.0 加入的多线程 API它们可以充分利用多核处理器。 并行 LINQParallel LINQ或称为 PLINQParallel类任务并行task parallelism构造SpinLock 和 SpinWait这些 API 可以统称为 PFXParallel Framework并行框架。Parallel类与任务并行构造一起被称为 TPLTask Parallel Library任务并行库。 Framework 4.0 也增加了一些更底层的线程构造它们针对传统的多线程。我们之前讲过的 低延迟信号构造 SemaphoreSlim、ManualResetEventSlim、CountdownEvent 以及Barrier取消标记cancellation token以便于协作取消延迟初始化ThreadLocalT在继续阅读前你需要了解第 1 部分 - 第 4 部分中的基本原理特别是锁和线程安全。 并行编程这一部分提供的所有代码都可以在LINQPad中试验。LINQPad 是一个 C# 代码草稿板可以用来测试代码段而无需创建类、项目或解决方案。想要获取这些示例代码可以在 LINQPad 左下方的 Samples 标签页中点击 Download More Samples并且选择 C# 4.0 in a Nutshell: More Chapters。译者注现在应该是 C# 5.0 in a Nutshell 和 C# 6.0 in a Nutshell 了 2为何需要 PFXPermalink 近年来CPU 时钟频率发展陷于停滞制造商已经将重心转移至增加核心数量。这对我们程序员来说是个问题因为标准的单线程代码无法自动利用那些增加的核心来提升程序运行速度。 利用多个核心对大多数服务端应用程序来说很容易每个线程可以独立处理单独的客户端请求但在桌面环境下就不那么容易了因为通常这需要你优化计算密集型代码按如下步骤进行 将工作分解成块。多线程并行处理这些工作块。以线程安全和高效的方式整理结果。尽管你可以使用传统的多线程构造但那比较笨拙尤其是在分解工作和整理结果的步骤。并且为确保线程安全通常的策略是使用锁而它在很多线程同时访问一份数据时会导致大量竞争。 PFX 库就是专门被设计用来为这些场景提供帮助的。 利用多核心或多处理器的编程被称为并行编程parallel programming。它是多线程这个更宽泛概念的子集。 2.1PFX 概念Permalink 有两种分解工作的策略数据并行data parallelism和任务并行task parallelism。 当一系列任务需要处理很多数据时可以让每个线程都执行这一系列相同的任务来处理一部分数据即所有数据的一个子集。这样实现的并行化称为数据并行因为我们是为线程分解了数据。与此相对任务并行是指对任务进行分解换句话说就是让每个线程执行不同的任务。 通常对高度并行的硬件来说数据并行更简单可伸缩性也更好因为它减少或消除了共享数据也就减少了竞争和线程安全问题。并且事实上一般都是数据比任务要多所以数据并行可以增加并发的可能。 数据并行也有利于结构化并行structured parallelism意思是说并行工作单元的启动和完成是在程序中的同一位置。相对的任务并行趋向于非结构化就是说并行工作单元的启动和完成可 能分散在程序各处。结构化并行比较简单并且不易出错也让你可以把工作分解和线程协调甚至包括结果整理这些复杂的任务交给 PFX 库来完成。 2.2PFX 组件Permalink PFX 包含两层功能。上层是由结构化数据并行 APIPLINQ和Parallel类组成。下层包含任务并行的类以及一组额外的构造来帮助你实现并行编程。 PLINQ 提供了最丰富的功能它能够自动化并行的所有步骤包括分解工作、多线程执行、最后把结果整理成一个输出序列。它被称为声明式declarative 的因为你只是声明希望并行化你的工作构造一个 LINQ 查询然后让 Framework 来处理实现细节。相对的另一种方式是指令式imperative的这种方式是需要你显式编写代码来处理工作分解和结果整理。例如使用Parallel类时你必须自己整理结果而如果使用任务并行构造你还必须自己分解工作。  分解工作整理结果PLINQ  Parallel类 -PFX 的任务并行--并发集合和自旋基元可 以帮助你实现低层次的并行编程。这很重要因为 PFX 不仅被设计适用于当今的硬件也适用于未来更多核心的处理器。如果你希望搬运一堆木块并且有 32 个工人最麻烦的是如何让工人们搬运木块时不互相挡道。这与把算法分解运行在 32 个核心上类似如果普通的锁被用于保护公共资源所产生的阻塞可能意味着同时只有一小部分核心真正在工作。并发集合专门针对于高并发访问致力于最小化或消除阻塞。PLINQ 和 Parallel类就依赖于并发集合和自旋基元来实现高效的工作管理。 PFX 与传统的多线程Permalink 传统多线程的场景是即使在单核的机器上使用多线程也有好处而此时并没有真正的并行发生。就像我们之前讨论过的保持用户界面的响应以及同时下载两个网页。 这一部分将要讲到的一些构造有时对于传统多线程也有用。特别是 PLINQ和 Parallel类在你并行执行操作以及等待它们完成结构化并行时有用。这包括非计算密集型任务例如调用 web 服务。任务并行构造适用于将操作运行在线程池线程上以及通过任务延续continuations和父 / 子任务来管理任务的工作流。并发集合有时在你需要线程安全队列、栈或字典时有用。BlockingCollection提供了一个简单的工具来实现生产者 / 消费者结构。 2.3何时使用 PFXPermalink PFX 主要用于并行编程充分利用多核处理器来加速执行计算密集型代码。 充分利用多个核心的挑战在于阿姆达尔定律Amdahl’s law它指出通过并行化产生的最大性能提升取决于有多少必须顺序执行的代码段。例如如果一个算法只有三分之二的执行时间可以并行即使有无数核心也无法获得超过三倍的性能提升。 因此在使用 PFX 前有必要先检查可并行代码中的瓶颈。还需要考虑下你的代码是否有必要是计算密集的优化这里往往是最简单有效的方法。然而这也需要平衡因为一些优化技术会使代码难以并行化。 最容易获益的是“不好意思不并行的问题embarrassingly parallel problems”工作可以很容易地被分解为多个任务每个任务自己可以高效执行结构化并行非常适合这种问题。例如很多图片处理任务、光线跟踪 算法、数学和密码学方面的暴力计算和破解。而相反的例子是实现快速排序算法的优化版本想把它实现得好需要一定思考并且可能需要非结构化并行。 3PLINQPermalink PLINQ 会自动并行化本地的 LINQ 查询。其优势在于使用简单因为将工作分解和结果整理的负担交给了 Framework。 使用 PLINQ 时只要在输入序列上调用AsParallel()然后像平常一样继续 LINQ 查询就可以了。下边的查询计算 3 到 100,000 内的素数这会充分利用目标机器上的所有核心。 // 使用一个简单的未优化算法计算素数。 // // 注意这一部分提供的所有代码都可以在 LINQPad 中试验。IEnumerableint numbers Enumerable.Range (3, 100000-3); var parallelQuery from n in numbers.AsParallel() where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i n % i 0) select n; int[] primes parallelQuery.ToArray(); AsParallel是System.Linq.ParallelEnumerable中的一个扩展方法。它使用ParallelQueryTSource来封装输入就会将你随后调用的 LINQ 查询操作符绑定在ParallelEnumerable中定义的另外一组扩展方法上。它们提供了所有标准查询操作符的并行化实现。本质上它们就是将输入序列进行分区形成工作块并在不同的线程上执行之后再将结果整理成一个输出序列 调用AsSequential()可以拆封ParallelQuery使随后的查询操作符绑定到标准查询操作符来顺序执行。在调用有副作用或非线程安全的方法前有必要这样做。 对于那些接受两个输入序列的查询操作符Join、GroupJoin、Contact、Union、Intersect和Zip来说必须在这两个输入序列上都使用AsParallel()否则将抛出异常。然而不需要为中间过程的查询使用AsParallel因为 PLINQ 的查询操作符会输出另一个ParallelQuery序列。实际上在这个输出序列上再次调用AsParallel会降低效率它会强制对序列进行合并和重新分区。 mySequence.AsParallel() // 使用 ParallelQueryint 封装序列 .Where (n n 100) // 输出另一个 ParallelQueryint .AsParallel() // 不需要会降低效率 .Select (n n * n) 并非所有的查询操作符都可以被有效地并行化。对于那些不能的PLINQ 使用了顺序的实现。如果 PLINQ 认为并行化的开销实际会使查询变慢它也会顺序执行。 PLINQ 仅适用于本地集合它无法在 LINQ to SQL 或 Entity Framework 中使用因为在那些场景中LINQ 会被翻译成 SQL 语句然后在数据库服务器上执行。然而你可以使用 PLINQ 对从数据库查询获得的结果执行进一步的本地查询。 如果 PLINQ 查询抛出异常它会被封装进AggregateException重新抛出其InnerExceptions属性包含真正的异常。详见使用 AggregateException。 为什么 AsParallel 不是默认的Permalink 我们知道AsParallel可以透明的并行化 LINQ 查询那么问题来了“微软为什么不直接并行化标准查询操作符使 PLINQ 成为默认的” 有很多原因使其成为这种选择使用opt-in的方式。首先要使 PLINQ 有用必须要有一定数量的计算密集型任务它们可以被分配到多个工作线程。大多数 LINQ to Objects 的查询执行非常快根本不需要并行化并行化过程中的任务分区、结果整理以及线程协调反而会使程序变慢。 其次 PLINQ 查询的输出默认情况下在元素排序方面不同于 LINQ 查询。PLINQ 将异常封装在AggregateException中能够处理抛出的多个异常。如果查询引用了非线程安全的方法PLINQ 会给出不可靠的结果。最后PLINQ 为了进行微调提供了一些钩子hook。把这些累赘加入标准的 LINQ to Objects 的 API 会增加使用障碍。 3.1并行执行的特征Permalink 与普通的 LINQ 查询一样PLINQ 查询也是延迟估值的。这意味着只有当结果开始被使用时查询才会被触发执行。通常结果是通过一个foreach循环被使用通过转换操作符也会触发例如ToArray还有返回单个元素或值的操作符。 当枚举结果时执行过程与普通的顺序查询略有不同。顺序查询完全由使用方通过“拉”的方式驱动每个元素都在使用方需要时从输入序列中被提取。并行 查询通常使用独立的线程从输入序列中提取元素这可能比使用方的需要稍微提前了一些很像一个给播报员使用的提词机或者 CD 机中的防震缓冲区。然后通过查询链并行处理这些元素将结果保存在一个小缓冲区中以准备在需要的时候提供给使用方。如果使用方在枚举过程中暂停或中 断查询也会暂停或停止这样可以不浪费 CPU 时间或内存。 你可以通过在AsParallel之后调用WithMergeOptions来调整 PLINQ 的缓冲行为。默认值AutoBuffered通常能产生最佳的整体效果NotBuffered禁用缓冲如果你希望尽快看到结果可以使用这个FullyBuffered在呈现给使用方前缓存整个查询的输出OrderBy和Reverse操作符天生以这种方式工作取元素、聚合和转换操作符也是一样。 3.2PLINQ 与排序Permalink 并行化查询操作符的一个副作用是当整理结果时不一定能与它们提交时的顺序保持一致就如同之前图中所示的那样。换句话说就是无法像普通的 LINQ 那样能保证序列的正常顺序。 如果你需要保持序列顺序可以通过在AsParallel后调用AsOrdered()来强制它保证 myCollection.AsParallel().AsOrdered()... 在大量元素的情况下调用AsOrdered会造成一定性能损失因为 PLINQ 必须跟踪每个元素原始位置。 之后你可以通过调用AsUnordered来取消AsOrdered的效果这会引入一个“随机洗牌点random shuffle point”允许查询从这个点开始更高效的执行。因此如果你希望仅为前两个查询操作保持输入序列的顺序可以这样做 inputSequence.AsParallel().AsOrdered() .QueryOperator1() .QueryOperator2() .AsUnordered() // 从这开始顺序无关紧要 .QueryOperator3() // ... AsOrdered不是默认的因为对于大多数查询来说原始的输入顺序无关紧要。换句话说如果AsOrdered是默认的你就不得不为大多数并行查询使用AsUnordered来获得最好的性能这会成为负担。 3.3PLINQ 的限制Permalink 目前PLINQ 在能够并行化的操作上有些实用性限制。这些限制可能会在之后的更新包或 Framework 版本中解决。 下列查询操作符会阻止查询的并行化除非源元素是在它们原始的索引位置 Take、TakeWhile、Skip和SkipWhileSelect、SelectMany和ElementAt这几个操作符的带索引版本大多数查询操作符都会改变元素的索引位置包括可能移除元素的那些操作符例如Where。这意味着如果你希望使用上述操作符就要在查询开始的地方使用。 下列查询操作符可以并行化但会使用代价高昂的分区策略有时可能比顺序执行还慢。 Join、GroupBy、GroupJoin、Distinct、Union、Intersect和ExceptAggregate操作符的带种子seed的重载是不能并行化的PLINQ 提供了专门的重载来解决。 其它所有操作符都是可以并行化的然而使用这些操作符并不能确保你的查询会被并行化。如果 PLINQ 认为进行分区的开销会导致部分查询变慢它也许会顺序执行查询。你可以覆盖这个行为方法是在AsParallel()之后调用如下代码来强制并行化 .WithExecutionMode (ParallelExecutionMode.ForceParallelism) 3.4例并行拼写检查Permalink 假设我们希望实现一个拼写检查程序它在处理大文档时能够通过充分利用所有可用的核心来快速运行。我们把算法设计成一个 LINQ 查询这样就可以很容易的并行化它。 第一步是下载英文单词字典为了能够高效查找将其放在一个HashSet中 if (!File.Exists (WordLookup.txt)) // 包含约 150,000 个单词 new WebClient().DownloadFile ( http://www.albahari.com/ispell/allwords.txt, WordLookup.txt); var wordLookup new HashSetstring ( File.ReadAllLines (WordLookup.txt), StringComparer.InvariantCultureIgnoreCase); 然后使用wordLookup来创建一个测试“文档”该“文档”是个包含了一百万个随机单词的数组。创建完数组后引入两个拼写错误 var random new Random(); string[] wordList wordLookup.ToArray(); string[] wordsToTest Enumerable.Range (0, 1000000) .Select (i wordList [random.Next (0, wordList.Length)]) .ToArray(); wordsToTest [12345] woozsh; // 引入两个 wordsToTest [23456] wubsie; // 拼写错误 现在通过对比wordLookup检查wordsToTest来完成这个并行的拼写检查程序。PLINQ 让这变得很简单 var query wordsToTest.AsParallel() .Select ((word, index) new IndexedWord { Wordword, Indexindex }) .Where (iword !wordLookup.Contains (iword.Word)) .OrderBy (iword iword.Index); query.Dump(); // 在 LINQPad 中显示输出 下边是 LINQPad 中的显示的输出 IndexedWord是一个自定义的结构体定义如下 struct IndexedWord { public string Word; public int Index; } 判定器中的wordLookup.Contains方法作为查询的主要部分它使得这个查询值得并行化。 我们可以使用匿名类型来代替IndexedWord结构体从而稍微简化下这个查询。然而这会降低性能因为匿名类型是类因此是引用类型会产生分配堆内存的开销以及之后的垃圾回收。 这个区别对于顺序查询来说没太大关系但对于并行查询来说基于栈的内存分配则相当有利。这是因为基于栈的内存分配是可以高度并行化的因为每个线程有其自己的栈反之基于堆的内存分配会使所有线程竞争同一个堆它是由单一的内存管理器和垃圾回收器管理的。 使用 ThreadLocalTPermalink 来扩展一下我们的例子让创建随机测试单词列表的过程并行化。我们把它作为 LINQ 查询来构造这样事情就简单多了。以下是顺序执行版本 string[] wordsToTest Enumerable.Range (0, 1000000) .Select (i wordList [random.Next (0, wordList.Length)]) .ToArray(); 不幸的是对Random.Next的调用不是线程安全的所以实现并行化不是向查询语句直接插入AsParallel()这么简单。一个可能的解决办法是写个方法对random.Next加锁然而这会限制并发能力。更好的处理办法是使用ThreadLocalRandom为每个线程创建独立的Random对象。然后我们可以使用如下代码来并行化查询 var localRandom new ThreadLocalRandom ( () new Random (Guid.NewGuid().GetHashCode()) ); string[] wordsToTest Enumerable.Range (0, 1000000).AsParallel() .Select (i wordList [localRandom.Value.Next (0, wordList.Length)]) .ToArray(); 在实例化Random对象的工厂方法中我们传递了一个Guid的散列值用来确保如果两个Random对象在很短的时间范围内被创建它们可以生成不同的随机数序列。 何时使用 PLINQPermalink 在你的程序中寻找 LINQ 查询尝试并行化它们貌似是很诱人的。然而这通常没什么用因为绝大多数明显应该使用 LINQ 的地方执行都很快所以并行化并没有什么好处。更好的方法是找到 CPU 密集型工作的瓶颈然后考虑“这能写成 LINQ 查询吗”这样重构的一个好处是 LINQ 通常可以使代码变得更短并且更具可读性。 PLINQ 非常适合于“不好意思不并行的问题embarrassingly parallel problems”。它也能很好的应用于结构化阻塞任务structured blocking tasks例如同时调用多个 web 服务见调用阻塞或 I/O 密集型功能。 对于图像处理来说 PLINQ 是个糟糕的选择因为整理几百万个像素到输出序列将形成瓶颈。更好的方法是把像素直接写入数组或非托管的内存块然后使用Parallel类或任务并行来管理多线程。也可以使用ForAll来绕过结果整理。如果该图像处理算法天生适合 LINQ这么做可能有益。 3.5纯方法Permalink 译者注pure function 译为纯方法是指一个方法 / 函数不能改变任何状态也不能进行任何 I/O 操作它的返回值不能依赖任何可能被改变的状态并且使用相同的输入调用就会产生相同的输出。 因为 PLINQ 会在并行的线程上运行查询因此必须注意不要执行非线程安全的操作。特别需要注意对变量进行写操作有副作用side-effecting是非线程安全的。 // 下列查询将每个元素与其索引相乘。 // 给定一个 0 到 999 的输入序列, 它应该输出元素的平方。 int i 0; var query from n in Enumerable.Range(0,999).AsParallel() select n * i; 可以通过使用锁或Interlocked来确保i的自增是线程安全的但是问题仍然存在i并不能保证对应输入元素的原始索引。并且加上AsOrdered也无法解决这个问题因为AsOrdered仅仅确保输出是按顺序的就像顺序执行的输出顺序一样。但这并不意味着实际的处理过程也是按顺序的。 替代方法是将这个查询重写使用带索引的Select版本。 var query Enumerable.Range(0,999).AsParallel().Select ((n, i) n * i); 为了达到最佳性能任何被查询操作符调用的方法必须是线程安全的不要给字段或属性赋值无副作用纯方法。如果用锁来保证线程安全查询的并行能力将会受到限制。这个限制可以通过锁定的持续时间除以花费在方法上的总时间来计算。 3.6调用阻塞或 I/O 密集型功能Permalink 有时一个查询的长时间运行并不是因为是 CPU 密集型操作而是因为它在等待某些东西例如等待网页下载或是硬件的响应。PLINQ 能够有效地并行化这种类型的查询可以通过在AsParallel后调用WithDegreeOfParallelism来提示这种特征。例如假设我们希望同时 ping 6 个网站。比起使用异步委托或手动让 6 个线程自旋使用 PLINQ 查询可以轻松实现它 from site in new[] { www.albahari.com, www.linqpad.net, www.oreilly.com, www.takeonit.com, stackoverflow.com, www.rebeccarey.com } .AsParallel().WithDegreeOfParallelism(6) let p new Ping().Send (site) select new { site, Result p.Status, Time p.RoundtripTime } WithDegreeOfParallelism强制 PLINQ 同时运行指定数量的任务。在调用阻塞方法例如Ping.Send时有必要这么做否则的话PLINQ 会认为这个查询是 CPU 密集型的并进行相应的任务分配。在双核机器上PLINQ 会默认同时运行 2 个任务对于上述情况来说这显然不是我们希望看到的。 受线程池的影响PLINQ 通常为每个任务分配一个线程。可以通过调用ThreadPool.SetMinThreads来加速初始线程的创建速度。 再给一个例子假设我们要实现一个监控系统希望它不断将来自 4 个安全摄像头的图像合并成一个图像并在闭路电视上显示。使用下边的类来表示一个摄像头 class Camera {public readonly int CameraID; public Camera (int cameraID) { CameraID cameraID; } // 获取来自摄像头的图像: 返回一个字符串来代替图像 public string GetNextFrame() { Thread.Sleep (123); // 模拟获取图像的时间 return Frame from camera CameraID; } } 要获取一个合成图像我们必须分别在 4 个摄像头对象上调用GetNextFrame。假设操作主要是受 I/O 影响的通过并行化我们能将帧率提升 4 倍即使是在单核机器上。PLINQ 使用一小段程序就能实现它 Camera[] cameras Enumerable.Range (0, 4) // 创建 4 个摄像头对象 .Select (i new Camera (i)) .ToArray(); while (true) { string[] data cameras .AsParallel().AsOrdered().WithDegreeOfParallelism (4) .Select (c c.GetNextFrame()).ToArray(); Console.WriteLine (string.Join (, , data)); // 显示数据... } GetNextFrame是一个阻塞方法所以我们使用了WithDegreeOfParallelism来获得期望的并发度。在我们的例子中阻塞是在调用Sleep时发生。而在真实情况下阻塞的发生是因为从摄像头中获取图像是 I/O 密集型操作而不是 CPU 密集型操作。 调用AsOrdered可以确保图像按照一致的顺序显示。因为序列中只有 4 个元素所以它对性能的影响可以忽略不计。 改变并发度Permalink 在一个 PLINQ 查询内仅能够调用WithDegreeOfParallelism一次。如果你需要再次调用它必须在查询中通过再次调用AsParallel()强制进行查询的合并和重新分区 The Quick Brown Fox.AsParallel().WithDegreeOfParallelism (2) .Where (c !char.IsWhiteSpace (c)) .AsParallel().WithDegreeOfParallelism (3) // 强制合并和重新分区 .Select (c char.ToUpper (c)) 3.7取消Permalink 当在foreach循环中使用 PLINQ 查询的结果时取消该查询很简单使用break退出循环就可以了。查询会被自动取消因为枚举器会被隐式销毁。 对于结束一个使用转换、取元素或聚合操作符的查询来说你可以在其它线程使用取消标记来取消它。在AsParallel后调用WithCancellation来添加一个标记并把CancellationTokenSource对象的Token属性作为参数传递。之后另一个线程就可以在这个CancellationTokenSource对象上调用Cancel它会在查询的使用方那边抛出OperationCanceledException异常。 IEnumerableint million Enumerable.Range (3, 1000000); var cancelSource new CancellationTokenSource(); var primeNumberQuery from n in million.AsParallel().WithCancellation (cancelSource.Token) where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i n % i 0) select n; new Thread (() { Thread.Sleep (100); // 在 100 毫秒后 cancelSource.Cancel(); // 取消查询 } ).Start(); try { // 开始运行查询 int[] primes primeNumberQuery.ToArray(); // 永远到不了这里因为其它线程会进行取消操作。 } catch (OperationCanceledException) { Console.WriteLine (Query canceled); } PLINQ 不会直接中止线程因为这么做是危险的。在取消时它会等待所有工作线程处理完当前的元素然后结束查询。这意味着查询调用的任何外部方法都会执行完成。 3.8优化 PLINQPermalink 输出端优化Permalink PLINQ 的一个优点是它能够很容易地将并行化任务的结果整理成一个输出序列。然而有时最终要做的是在输出序列的每个元素上运行一些方法 foreach (int n in parallelQuery) DoSomething (n); 如果是上述情况并且不关心元素的处理顺序那么可以使用 PLINQ 的ForAll方法来提高效率。 ForAll方法在ParallelQuery的每个输出元素上运行一个委托。它直接挂钩hook到 PLINQ 内部绕过整理和枚举结果的步骤。举个栗子 abcdef.AsParallel().Select (c char.ToUpper(c)).ForAll (Console.Write); 整理和枚举结果的开销不是非常大所以当有大量输入元素且处理执行很快的时候才能最大化ForAll优化的收益。 输入端优化Permalink PLINQ 有 3 种分区策略用来分配输入元素到线程 策略元素分配相对性能块分区Chunk partitioning动态平均范围分区Range partitioning静态差 - 极好散列分区Hash partitioning静态差对于那些需要比较元素的查询操作符GroupBy、Join、GroupJoin、Intersect、Except、Union和DistinctPLINQ 总是使用散列分区。散列分区相对低效因为它必须预先计算每个元素的散列值拥有同样散列值的元素会在同一个线程中被处理。如果发现运行太慢唯一的选择是调用AsSequential来禁止并行处理。 对于其它所有查询操作符你可以选择使用范围分区或块分区。默认情况下 如果输入序列可以通过索引访问数组或是IListT的实现PLINQ 选用范围分区。否则PLINQ 选用块分区。概括来讲对于较长的序列且处理每个元素所需的 CPU 时间比较近似时范围分区更快。否则块分区通常更快。 如果想强制使用范围分区 如果查询以Enumerable.Range开始将其替换为ParallelEnumerable.Range。否则在输入序列上调用ToList或ToArray显然你需要考虑在这里产生的性能开销。 ParallelEnumerable.Range并不是对Enumerable.Range(…).AsParallel()的简单封装。它通过激活范围分区改变了查询的性能。 如果想强制使用块分区就通过调用Partitioner.Create在命名空间System.Collection.Concurrent中来封装输入序列例如 int[] numbers { 3, 4, 5, 6, 7, 8, 9 }; var parallelQuery Partitioner.Create (numbers, true).AsParallel() .Where (...) Partitioner.Create的第二个参数表示希望对查询开启负载均衡load-balance这是另一个使用块分区的动机。 块分区的工作方式是定期从输入序列中抓取小块元素来处理。PLINQ 一开始会分配非常小的块一次 1 到 2 个元素然后随着查询的进行增加块的大小这确保小序列能够被有效地并行化而大序列不会导致过多的抓取工作。如果一个工作线程碰巧拿到了一些“容易” 的元素处理很快它最终将拿到更多的块。这个系统使每个线程保持均等的繁忙程度使核心负载均衡。唯一的不利因素是从共享的输入序列中获取元素需要 同步通常使用一个排它锁这会产生一定的开销和竞争。 范围分区会绕过正常的输入端枚举并且为每个工作线程预分配相同数量的元素避免了在输入序列上的竞争。但是如果某些线程拿到了容易的元素并很早就 完成了处理在其它工作线程仍在继续工作的时候它就会是空闲的。我们之前的素数计算的例子在使用范围分区时就性能不高。举个范围分区适用的例子计算 1000 万以内数字的平方和 ParallelEnumerable.Range (1, 10000000).Sum (i Math.Sqrt (i)) ParallelEnumerable.Range返回一个ParallelQueryT因此不需要在之后调用AsParallel。 范围分区不是必须把元素分成相邻的块它也许会选用一种 “条纹式striping”策略。例如有两个工作线程一个工作线程可能会处理奇数位置的元素而另一个工作线程处理偶数位置的元素。TakeWhile操作符几乎一定会触发条纹式策略用来避免处理序列后边不必要的元素。 3.9并行化自定义聚合Permalink PLINQ 可以在无需额外干预的情况下有效地并行化Sum、Average、Min和Max操作符。然而Aggregate操作符对于 PLINQ 来说是个特殊的麻烦。 如果不熟悉Aggregate操作符你可以认为它就是一个Sum、Average、Min和Max的泛化版本换句话说就是一个可以使你通过自定义的聚合算法实现非通常聚合操作的操作符。如下代码展现了Aggregate如何实现Sum操作符的工作 int[] numbers { 1, 2, 3 }; int sum numbers.Aggregate (0, (total, n) total n); // 6 Aggregate的第一个参数是 seed种子初值聚合操作从这里开始。第二个参数是一个用于更新聚合值的表达式该表达式生成一个新的元素。第三个参数是可选的用来表示如何通过聚合值生成最终的结果值。 大多数Aggregate被设计用来解决的问题都能够使用foreach循环轻松解决并且这也是更熟悉的语法。而Aggregate的优点在于对庞大或复杂的聚合操作可以使用 PLINQ 来进行声明式的并行化。 无种子的聚合Permalink 调用Aggregate时可以省略种子值这种情况下第一个元素会被隐式当作种子之后聚合处理会从第二个元素开始进行。下边是一个无种子的例子 int[] numbers { 1, 2, 3 }; int sum numbers.Aggregate ((total, n) total n); // 6 这得到了与之前相同的结果然而实际上却是进行了不同的计算。之前例子计算的是 0123而现在计算的是123。通过乘法运算来代替加法运算能够更好地说明这个不同 int[] numbers { 1, 2, 3 }; int x numbers.Aggregate (0, (prod, n) prod * n); // 0*1*2*3 0 int y numbers.Aggregate ( (prod, n) prod * n); // 1*2*3 6 如同我们马上将要看到的无种子的聚合的优点在于被并行化时不需要使用特殊的重载。然而无种子的聚合存在一个陷阱无种子的聚合方法期望使用的委 托中的计算应满足交换律和结合律。如果用在别的情况下结果要不然是反直觉的普通查询要不然是不确定的PLINQ 并行化查询。例如考虑如下函数 (total, n) total n * n 它既不满足交换律也不满足结合律。例如12*2 ! 21*1。我们来看一下使用它来对数字 2、3、4 计算平方和时会发生什么 int[] numbers { 2, 3, 4 }; int sum numbers.Aggregate ((total, n) total n * n); // 27 本来的计算应该是 2*2 3*3 4*4 // 29 但现在的计算是 2 3*3 4*4 // 27 可以通过多种方法解决这个问题。首先我们可以在序列最前端加入 0 作为第一个元素 int[] numbers { 0, 2, 3, 4 }; 这不仅不优雅而且在并行执行的情况下仍然会产生错误的结果因为 PLINQ 会选择多个元素作为种子这相当于假定了计算满足结合律。为说明这个问题用如下方式表示我们的聚合函数 f(total, n) total n * n LINQ to Objects 会这样计算 f(f(f(0, 2),3),4) PLINQ 可能会这样计算 f(f(0,2),f(3,4)) 结果是 第一个分区 a 0 2*2 ( 4) 第二个分区 b 3 4*4 ( 19) 最终结果 a b*b ( 365) 甚至可能是: b a*a ( 35)有两种好的解决方案第一种是将其转换为有种子的聚合使用 0 作为种子。这种方案带来的复杂度的提升仅仅是使用 PLINQ 时我们需要使用特殊的重载确保查询并行执行马上会看到。 第二种解决方案是重构查询使聚合函数满足交换律和结合律 int sum numbers.Select (n n * n).Aggregate ((total, n) total n); 当然在这种简单的场景下你可以并且应该使用Sum操作符来代替Aggregate int sum numbers.Sum (n n * n); 实际上可以更进一步使用Sum和Average。例如可以使用Average来计算均方根root-mean-square Math.Sqrt (numbers.Average (n n * n)) 甚至是标准差 double mean numbers.Average(); double sdev Math.Sqrt (numbers.Average (n { double dif n - mean; return dif * dif; })); 上述两个方法都是安全、高效并且可完全并行化的。 并行化聚合Permalink 我们刚刚看到了无种子的聚合提供的委托必须满足交换律和结合律。如果违反这个规则PLINQ 会给出错误的结果因为它可能使用输入序列中多个的元素作为种子来同时聚合多个分区。 指定种子的聚合也许看起来像是使用 PLINQ 的安全选择然而不幸的是这样通常会导致顺序执行因为它依赖于单独一个种子。为减缓这个问题PLINQ 提供了另一个Aggregate的重载允许你指定多个种子或者是一个种子工厂方法。对每个线程它执行这个方法来生成一个独立的种子这就形成了一个线程局部的累加器通过它在聚合局部元素。 你必须再提供一个方法来指示如何合并局部累加器至主累加器。最后Aggregate的这个重载还需要一个委托用来对结果进行任意的最终变换有些没必要你可以之后对结果运行一些代码完成同样操作。所以这里有 4 个委托按照它们被传递的顺序 种子工厂seedFactory 返回一个新的局部累加器更新累加器方法updateAccumulatorFunc 聚合元素至局部累加器合并累加器方法combineAccumulatorFunc 合并局部累加器至主累加器结果选择器resultSelector 在结果上应用任意最终变换 在简单的场景中你可以指定一个种子值来代替种子工厂。当种子是你需要改变的引用类型时这种策略行不通因为同一个实例将在线程间共享。 提供一个简单的例子下边的代码对numbers数组中的值进行求和 numbers.AsParallel().Aggregate ( () 0, // 种子工厂 (localTotal, n) localTotal n, // 更新累加器方法 (mainTot, localTot) mainTot localTot, // 合并累加器方法 finalResult finalResult) // 结果选择器 这个例子有些刻意我们可以使用更简单的方式获取相同的结果例如无种子的聚合或者更好的选择是使用Sum操作符。给一个更加实际的例子假设我们要计算字符串中每个英文字母的出现频率。简单的顺序执行方案看起来是这样 string text Let’s suppose this is a really long string; var letterFrequencies new int[26]; foreach (char c in text) { int index char.ToUpper (c) - A; if (index 0 index 26) letterFrequencies [index]; }; 基因序列是一个输入文本可能会非常长的例子它的“字母表”是由字母 a、c、g、t 组成。 为了将它并行化我们可以把foreach替换为Parallel.ForEach在接下来的一节会讲到但这会导致共享数组上的并发问题。对数组的访问加锁可以解决问题但会降低并发的可能性。 Aggregate提供了一个好的解决方案。这种情况下累加器是一个数组就像是之前例子中letterFrequencies数组。使用Aggregate的顺序执行版本如下 int[] result text.Aggregate ( new int[26], // 创建“累加器” (letterFrequencies, c) // 聚合一个字母至累加器 { int index char.ToUpper (c) - A; if (index 0 index 26) letterFrequencies [index]; return letterFrequencies; }); 下面是并行版本它使用 PLINQ 的专门重载 int[] result text.AsParallel().Aggregate ( () new int[26], // 新建局部累加器 (localFrequencies, c) // 聚合至局部累加器 { int index char.ToUpper (c) - A; if (index 0 index 26) localFrequencies [index]; return localFrequencies; }, // 聚合局部累加器至主累加器 (mainFreq, localFreq) mainFreq.Zip (localFreq, (f1, f2) f1 f2).ToArray(), finalResult finalResult // 对结果进行 ); // 最终变换 注意局部累加方法会改动localFrequencies数组。这个优化是非常重要的也是合法的因为localFrequencies是每个线程的局部变量。 4Parallel 类Permalink PFX 通过Parallel类上的三个静态方法提供了结构化并行的基本形式 Parallel.Invoke并行执行一组委托Parallel.ForC#for循环的并行版本Parallel.ForEachC#foreach循环的并行版本三个方法都是在工作完成前会阻塞。类似于PLINQ如果有未处理的异常其它工作线程会在当前迭代完成之后停止异常会被封装在AggregateException中抛给调用方。 4.1Parallel.InvokePermalink Parallel.Invoke并行执行一组Action类型的委托然后等待它们完成。这个方法最简单的版本如下 public static void Invoke (params Action[] actions); 下面是使用Parallel.Invoke来同时下载两个网页 Parallel.Invoke (() new WebClient().DownloadFile (http://www.linqpad.net, lp.html), () new WebClient().DownloadFile (http://www.jaoo.dk, jaoo.html)); 这表面上看起来像是创建了两个Task对象或异步委托并等待它们。但是有个重要的区别Parallel.Invoke在你传递一百万个委托时仍然能高效工作。这是因为它会对大量元素进行分区partition形成多个块再对其分配底层的Task。而不是直接对每一个委托创建独立的Task。 使用Parallel上的所有方法时都需要自行实现整理结果的代码。这意味着你需要注意线程安全。例如下面的代码不是线程安全的 var data new Liststring(); Parallel.Invoke ( () data.Add (new WebClient().DownloadString (http://www.foo.com)), () data.Add (new WebClient().DownloadString (http://www.far.com))); 对添加的过程加锁可以解决问题但是如果你的委托数量更多它们每一个执行的又很快那么锁可能造成瓶颈。更好的解决方案是使用线程安全的集合比如ConcurrentBag就是这里的理想方案。 Parallel.Invoke也有接受ParallelOptions对象的重载 public static void Invoke (ParallelOptions options, params Action[] actions); 通过ParallelOptions你可以添加取消标记、限制最大并发数量和指定自定义任务调度器。如果要执行的委托数量大致上大于核心数那么使用取消标记才有意义在取消时所有未启动的委托都会被抛弃。而所有已经在执行的委托会继续完成。对于如何使用取消标记可以参考取消中的例子。 4.2Parallel.For 和 Parallel.ForEachPermalink Parallel.For和Parallel.ForEach与 C# for和foreach类似但会并行执行而不是顺序执行。下面是它们最简单的方法签名 public static ParallelLoopResult For ( int fromInclusive, int toExclusive, Actionint body) public static ParallelLoopResult ForEachTSource ( IEnumerableTSource source, ActionTSource body) 对于下面的for循环 for (int i 0; i 100; i) Foo (i); 并行版本是这样 Parallel.For (0, 100, i Foo (i)); 或更简洁的 Parallel.For (0, 100, Foo); 而对于下面的foreach循环 foreach (char c in Hello, world) Foo (c); 并行版本是这样 Parallel.ForEach (Hello, world, Foo); 给一个实际点的例子。引入System.Security.Cryptography命名空间然后我们可以像这样并行生成六组密钥对的字符串形式 var keyPairs new string[6]; Parallel.For (0, keyPairs.Length, i keyPairs[i] RSA.Create().ToXmlString (true)); 与Parallel.Invoke同样我们也可以让Parallel.For和Parallel.ForEach执行大量工作项它们也会被分区分配给任务高效执行。 上面的例子也可以使用PLINQ来实现 string[] keyPairs ParallelEnumerable.Range (0, 6) .Select (i RSA.Create().ToXmlString (true)) .ToArray(); 外循环 vs 内循环Permalink Parallel.For和Parallel.ForEach通常更适合用于外循环而不是内循环。这是因为前者会带来更大的分区块就稀释了管理并行的开销。一般没有必要同时并行内外循环。对于下面的例子我们需要 100 个核心才能让内循环的并行有益处 Parallel.For (0, 100, i { Parallel.For (0, 50, j Foo (i, j)); // 对于内循环 }); // 顺序执行更好。 带索引的 Parallel.ForEachPermalink 有时需要获知循环迭代的索引。在顺序的foreach中这很简单 int i 0; foreach (char c in Hello, world) Console.WriteLine (c.ToString() i); 然而在并行环境中让共享变量自增并不是线程安全的。你必须使用下面这个ForEach版本 public static ParallelLoopResult ForEachTSource ( IEnumerableTSource source, ActionTSource,ParallelLoopState,long body) 先忽略ParallelLoopState下一节会讲。现在我们关注的是Action的第三个long类型的参数它代表了循环的索引 Parallel.ForEach (Hello, world, (c, state, i) { Console.WriteLine (c.ToString() i); }); 为了把它用到实际场景中我们来回顾下使用 PLINQ 的拼写检查。下面的代码加载了一个字典并生成了一个用来测试的数组有一百万个测试项 if (!File.Exists (WordLookup.txt)) // 包含约 150,000 个单词 new WebClient().DownloadFile ( http://www.albahari.com/ispell/allwords.txt, WordLookup.txt); var wordLookup new HashSetstring ( File.ReadAllLines (WordLookup.txt), StringComparer.InvariantCultureIgnoreCase); var random new Random(); string[] wordList wordLookup.ToArray(); string[] wordsToTest Enumerable.Range (0, 1000000) .Select (i wordList [random.Next (0, wordList.Length)]) .ToArray(); wordsToTest [12345] woozsh; // 引入两个 wordsToTest [23456] wubsie; // 拼写错误 我们可以使用带索引的Parallel.ForEach来对wordsToTest数组进行拼写检查如下 var misspellings new ConcurrentBagTupleint,string(); Parallel.ForEach (wordsToTest, (word, state, i) { if (!wordLookup.Contains (word)) misspellings.Add (Tuple.Create ((int) i, word)); }); 注意必须使用线程安全的集合来整理结果这一点是相对于使用 PLINQ 的劣势。而优势是我们可以避免使用带索引的Select查询操作符它没有带索引的ForEach高效。 ParallelLoopState提前退出循环Permalink 因为对于并行的For和ForEach循环循环体是一个委托所以就无法使用break语句来提前退出循环。在这里你必须使用ParallelLoopState对象上的Break或Stop public class ParallelLoopState {public void Break(); public void Stop(); public bool IsExceptional { get; } public bool IsStopped { get; } public long? LowestBreakIteration { get; } public bool ShouldExitCurrentIteration { get; } } 获取ParallelLoopState很容易所有版本的For和ForEach都有重载可以接受ActionTSource,ParallelLoopState类型的循环体。所以如果要并行化 foreach (char c in Hello, world) if (c ,) break; else Console.Write (c); 可以使用 Parallel.ForEach (Hello, world, (c, loopState) { if (c ,) loopState.Break(); else Console.Write (c); }); 输出 Hlloe从结果中可以发现循环体会以随机顺序完成。除这点不同以外调用Break会给出与顺序循环至少相同数量的元素在上例中总是以一定顺序至少输出 H、e、l、l、o 这几个字母。而如果改为调用Stop会强制所有线程在当前迭代完成后立即结束。在上例中如果有些线程滞后了调用Stop可能给出 H、e、l、l、o 的子集。当发现已经找到了需要的东西时或是发现出错了不想看结果的情况下Stop比较适用。 Parallel.For和Parallel.ForEach方法都返回一个ParallelLoopResult对象它暴露了IsCompleted和LowestBreakIteration属性。它们可以告知循环是否完成如果没有完成是在哪个迭代中断的。 如果LowestBreakIteration返回null意味着在循环中调用了Stop而不是Break。 如果你的循环体很长可能会希望其它线程能够在执行中途中断循环体来让使用Break或Stop时更快的退出。实现方法是在代码中多个地方查询ShouldExitCurrentIteration属性它会在调用Stop后立即为true或者是在Break后很快为true。 ShouldExitCurrentIteration在请求取消或者循环中有异常抛出时也会为true。 IsExceptional属性可以告知其它线程上是否有异常产生。任何未处理的异常都会导致循环在所有线程完成当前迭代后结束如果想要避免必须在代码中显式处理异常。 使用局部值进行优化Permalink Parallel.For和Parallel.ForEach都提供了拥有TLocal泛型变量的重载。这是为了协助你优化密集迭代的循环中的数据整理工作。最简单的形式如下 public static ParallelLoopResult For TLocal ( int fromInclusive, int toExclusive, Func TLocal localInit, Func int, ParallelLoopState, TLocal, TLocal body, Action TLocal localFinally); 这些方法在实际中很少用到因为它们的目标场景基本都被PLINQ覆盖了好开森因为这些重载真可怕。 本质上问题在于假设我们要计算从 1 到 10,000,000 的平方根的和。并行计算一千万个平方根很容易但是求和是个问题因为必须像这样加锁才能更新和值 object locker new object(); double total 0; Parallel.For (1, 10000000, i { lock (locker) total Math.Sqrt (i); }); 并行化的收益都被获取一千万个锁的开销抵消了还不算导致的阻塞。 然而实际上并不需要一千万个锁。想象一队志愿者捡一大堆垃圾的场景如果大家都共享单独一个垃圾桶那冲突就会使整个过程极端低效。明显的方案是每个人都有自己“局部”的垃圾桶偶尔去一趟主垃圾桶倾倒干净。 For和ForEach的TLocal版本就是这样工作的。志愿者就是内部的工作线程局部值local value就是局部垃圾桶。想要让Parallel以这种方式工作那么必须提供两个额外的委托 如何初始化新的局部值如何将局部的聚合值合并到主值另外循环体委托现在不能返回void而是应该返回局部值新的聚合结果。下面是重构后的例子 object locker new object(); double grandTotal 0; Parallel.For (1, 10000000, () 0.0, // 初始化局部值 (i, state, localTotal) // 循环体委托。注意现在 localTotal Math.Sqrt (i), // 返回新的局部值 localTotal // 把局部值 { lock (locker) grandTotal localTotal; } // 加入主值 ); 我们还是需要锁但是只需要锁定将局部和加入总和的过程。这让处理效率有了极大的提升。 前面说过PLINQ 一般更适合这些场景。我们的例子如果使用 PLINQ 来并行会很简单 ParallelEnumerable.Range(1, 10000000) .Sum (i Math.Sqrt (i)) 注意我们使用了ParallelEnumerable来强制范围分区在这里可以提高性能因为对所有数字的计算都是相等时间的。 更复杂的场景中你可能会用到 LINQ 的Aggregate操作符而不是Sum。如果指定了局部种子工厂那情况就和使用局部值的Parallel.For差不多了。 5任务并行Permalink 任务并行task parallelism是 PFX 中最底层的并行方式。这一层次的类定义在System.Threading.Tasks命名空间中如下所示 类作用Task管理工作单元TaskTResult管理有返回值的工作单元TaskFactory创建任务TaskFactoryTResult创建有相同返回类型的任务和任务延续TaskScheduler管理任务调度TaskCompletionSource手动控制任务的工作流本质上任务是用来管理可并行工作单元的轻量级对象。任务使用 CLR 的线程池来避免启动独立线程的开销它和ThreadPool.QueueUserWorkItem使用的是同一个线程池在 CLR 4.0 中这个线程池被调节过让Task工作的更有效率一般来说。 需要并行执行代码时都可以使用任务。然而它们是为了充分利用多核而调节的事实上Parallel类和PLINQ内部就是基于任务并行构建的。 任务并不只是提供了简单高效的使用线程池的方式。它们还提供了一些强大的功能来管理工作单元包括 调节任务调度在一个任务中启动另一个任务时建立其父子关系实现协作取消等待一组任务而无需使用信号构造附加“延续”任务基于多个前项任务调度延续任务传递异常给父任务、延续任务和任务的使用方任务也实现了局部工作队列local work queues这个优化能够让你高效的创建很多快速执行的子任务而不会带来单一工作队列会导致的竞争开销。 TPL 可以让你使用极小的开销创建几百个甚至几千个任务但如果你要创建上百万个任务那需要把这些任务分成大一些的工作单元才能有效率。Parallel类和 PLINQ 可以自动实现这种工作分解。 Visual Studio 2010 提供了一个新的窗口来监视任务调试 | 窗口 | 并行任务。它和线程窗口类似只是用于任务。并行栈窗口也有一个专门的模式用于任务。 5.1创建与启动任务Permalink 如同我们在第 1 部分线程池的讨论中那样你可以调用Task.Factory.StartNew并给它传递一个Action委托来创建并启动Task Task.Factory.StartNew (() Console.WriteLine (Hello from a task!)); 泛型的版本TaskTResultTask的子类可以让你在任务结束时获得返回的数据 Taskstring task Task.Factory.StartNewstring (() // 开始任务 { using (var wc new System.Net.WebClient()) return wc.DownloadString (http://www.linqpad.net); }); RunSomeOtherMethod(); // 我们可以并行的做其它工作... string result task.Result; // 等待任务结束并获取结果 Task.Factory.StartNew是一步创建并启动任务。你也可以分解它先创建Task实例再调用Start var task new Task (() Console.Write (Hello)); // ... task.Start(); 使用这种方式创建的任务也可以同步运行在当前线程上使用RunSynchronously替代Start。 可以使用Status属性来追踪任务的执行状态。 指定状态对象Permalink 当创建任务实例或调用Task.Factory.StartNew时可以指定一个状态对象state object它会被传递给目标方法。如果你希望直接调用方法而不是 lambda 表达式则可以使用它。 static void Main() { var task Task.Factory.StartNew (Greet, Hello); task.Wait(); // 等待任务结束 } static void Greet (object state) { Console.Write (state); } // 打印 Hello 因为 C# 中有 lambda 表达式我们可以更好的使用状态对象用它来给任务赋予一个有意义的名字。然后就可以使用AsyncState属性来查询这个名字 static void Main() { var task Task.Factory.StartNew (state Greet (Hello), Greeting); Console.WriteLine (task.AsyncState); // 打印 Greeting task.Wait(); } static void Greet (string message) { Console.Write (message); } Visual Studio 会在并行任务窗口显示每个任务的AsyncState属性所以指定有意义的名字可以很大程度的简化调试。 TaskCreationOptionsPermalink 在调用StartNew或实例化Task时可以指定一个TaskCreationOptions枚举来调节线程的执行。TaskCreationOptions是一个按位组合的枚举它有下列可组合的值LongRunning、PreferFairness和AttachedToParent。 LongRunning向调度器建议为任务使用一个独立的线程。这对长时间运行的任务有好处因为它们可能会“霸占”队列强迫短时间任务等待过长的时间后才能被调度。LongRunning对于会阻塞的任务也有好处。 由于任务调度器一般会试图保持刚好足够数量的任务在线程上运行来保持所有 CPU 核心都工作。所以不要超额分配oversubscribing CPU或者说不要使用过多的活动线程以避免由于操作系统被迫进行大量耗时的时间切片和上下文切换导致的性能下降。 PreferFairness让调度器试图确保任务以它们启动的顺序被调度。默认情况下是使用另一种方式因为内部使用了局部工作窃取队列来优化任务调度。这个优化对于非常小的细粒度任务有实际的好处。 AttachedToParent用来创建子任务。 子任务Permalink 当一个任务启动另一个任务时你可以通过指定TaskCreationOptions.AttachedToParent选择性地建立父子关系 Task parent Task.Factory.StartNew (() { Console.WriteLine (I am a parent); Task.Factory.StartNew (() // 分离的任务 { Console.WriteLine (I am detached); }); Task.Factory.StartNew (() // 子任务 { Console.WriteLine (I am a child); }, TaskCreationOptions.AttachedToParent); }); 子任务的特殊之处在于当你等待父任务结束时也同样会等待所有子任务。这对于子任务是一个延续任务时非常有用稍后我们会看到。 5.2等待任务Permalink 有两种方式可以显式等待任务完成 调用Wait方法可选择指定超时时间访问Result属性当使用TaskTResult时也可以同时等待多个任务通过静态方法Task.WaitAll等待所有指定任务完成和Task.WaitAny等待任意一个任务完成。 WaitAll和依次等待每个任务类似但它更高效因为它只需要至多一次上下文切换。并且如果有一个或多个任务抛出未处理的异常WaitAll仍然能够等待所有任务并在之后重新抛出一个AggregateException异常它聚合了所有出错任务的异常功能相当于下面的代码 // 假设 t1、t2 和 t3 是任务 var exceptions new ListException(); try { t1.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); } try { t2.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); } try { t3.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); } if (exceptions.Count 0) throw new AggregateException (exceptions); 调用WaitAny相当于在一个ManualResetEventSlim上等待每个任务结束时都对它发信号。 除了使用超时时间你也可以传递一个取消标记给Wait方法这样可以取消等待。注意这不是取消任务。 5.3异常处理Permalink 当你等待一个任务结束时通过调用Wait方法或访问其Result属性所有未处理的异常都会用一个AggregateException对象封装方便重新抛给调用方。一般就无需在任务代码中处理异常而是这么做 int x 0; Taskint calc Task.Factory.StartNew (() 7 / x); try { Console.WriteLine (calc.Result); } catch (AggregateException aex) { Console.Write (aex.InnerException.Message); // 试图以 0 为除数 } 你仍然需要对独立的任务无父任务并且没有在等待它进行异常处理以免当任务失去作用域被垃圾回收时见以下注释有未处理的异常那会导致程序结束。如果对任务的等待指定了超时时间那也是如此因为所有超时时间过后抛出的异常都是未处理的。 TaskScheduler.UnobservedTaskException静态事件提供了应对未处理的任务异常的最后手段。通过挂接这个事件你就可以拦截这些原本会导致程序结束的异常并且使用自己的逻辑对它们进行处理。 对于有父子关系的任务在父任务上等待也会隐式的等待子任务所有子任务的异常也会传递出来。 TaskCreationOptions atp TaskCreationOptions.AttachedToParent; var parent Task.Factory.StartNew (() { Task.Factory.StartNew (() // 子 { Task.Factory.StartNew (() { throw null; }, atp); // 孙 }, atp); }); // 下面的调用会抛出 NullReferenceException 异常 封装在 // 嵌套的 AggregateExceptions 中 parent.Wait(); 有趣的是如果你在任务抛出异常后检查它的Exception属性这个读取属性的动作会防止因为该异常导致程序结束。基本原则是PFX 的设计者不希望你忽略异常只要采取某种方式接收异常就不会受到结束程序的惩罚。 任务中的未处理异常不会导致程序立即结束它会延迟直到垃圾回收器处理到这个任务并调用它的析构方法时。这个延迟是因为在进行垃圾回收前还无法判断是否会调用Wait或检查Result或Exception属性。它有时也会误导你对错误源头的判断Visual Studio 的调试器如果开启了在首个异常处中断可以帮助进行判断。 马上我们会看到处理异常的另一种策略就是使用任务延续。 5.4取消任务Permalink 启动任务时可以可选的传递一个取消标记cancellation token。它可以让你通过协作取消模式取消任务像之前描述的那样 var cancelSource new CancellationTokenSource(); CancellationToken token cancelSource.Token; Task task Task.Factory.StartNew (() { // 做些事情... token.ThrowIfCancellationRequested(); // 检查取消请求 // 做些事情... }, token); // ... cancelSource.Cancel(); 如果要检测任务取消可以用如下方式捕捉AggregateException并检查它的内部异常 try {task.Wait(); } catch (AggregateException ex) { if (ex.InnerException is OperationCanceledException) Console.Write (Task canceled!); } 如果希望显式的抛出OperationCanceledException异常而不是通过调用ThrowIfCancellationRequested那么必须把取消标记传递给OperationCanceledException的构造方法。如果不这么做这个任务就不会以TaskStatus.Canceled状态结束并且也不会触发使用OnlyOnCanceled条件的任务延续。 如果任务在启动前被取消它就不会被调度而是直接在任务中抛出OperationCanceledException。 因为取消标记也可以被其它 API 识别所以可以在其它构造中无缝使用 var cancelSource new CancellationTokenSource(); CancellationToken token cancelSource.Token; Task task Task.Factory.StartNew (() { // 传递取消标记给 PLINQ 查询 var query someSequence.AsParallel().WithCancellation (token)... // ... enumerate query ... }); 调用cancelSource上的Cancel方法就可以取消该 PLINQ 查询它会在任务中抛出OperationCanceledException异常从而取消该任务。 也可以给Wait或CancelAndWait这类方法传递取消标记它可以让你取消等待操作而不是任务本身。 5.5任务延续Permalink 有时在一个任务完成或失败后马上启动另一个任务会很有用。Task类上的ContinueWith方法正是实现了这种功能 Task task1 Task.Factory.StartNew (() Console.Write (antecedant..)); Task task2 task1.ContinueWith (ant Console.Write (..continuation)); 一旦task1前项antecedent完成、失败或取消task2延续continuation会自动启动。如果task1在运行第二行代码前已经结束那么task2会被立即调度执行。传递给延续的 lambda 表达式的ant参数是对前项任务的引用。 我们的例子演示了最简单的延续它和以下代码功能类似 Task task Task.Factory.StartNew (() { Console.Write (antecedent..); Console.Write (..continuation); }); 但是通过延续的方式可以更加灵活比如先等待task1完成之后再等待task2。如果task1返回数据这样就非常有用。 另一个不明显的差异是默认情况下前项和延续任务可能是在不同的线程上执行。你可以在调用ContinueWith时指定TaskContinuationOptions.ExecuteSynchronously来强制它们在同一个线程执行如果延续是非常细粒度的这样做可以通过减少开销来提升性能。 延续和 TaskTResultPermalink 像普通任务一样延续也可以使用TaskTResult类型并返回数据。下面的例子中我们使用链状任务来计算Math.Sqrt(8*2)并打印结果 Task.Factory.StartNewint (() 8) .ContinueWith (ant ant.Result * 2) .ContinueWith (ant Math.Sqrt (ant.Result)) .ContinueWith (ant Console.WriteLine (ant.Result)); // 4 我们的例子比较简单实际应用中这些 lambda 表达式可能会调用计算密集型的方法。 延续与异常Permalink 延续可以通过前项的Exception属性来获取前项抛出的异常。下面的代码会输出NullReferenceException信息 Task task1 Task.Factory.StartNew (() { throw null; }); Task task2 task1.ContinueWith (ant Console.Write (ant.Exception)); 如果前项抛出了异常但延续没有检查前项的Exception属性并且也没有在等待前项那么异常会被认为是未处理的就会导致程序结束除非使用TaskScheduler.UnobservedTaskException进行了处理。 安全的模式是重新抛出前项的异常。只要延续被Wait等待异常就能够传播并重新抛出给等待方。 Task continuation Task.Factory.StartNew (() { throw null; }) .ContinueWith (ant { if (ant.Exception ! null) throw ant.Exception; // 继续处理... }); continuation.Wait(); // 异常被抛回调用方 另一种处理异常的方法是为异常和正常情况指定不同的延续。需要用到TaskContinuationOptions Task task1 Task.Factory.StartNew (() { throw null; }); Task error task1.ContinueWith (ant Console.Write (ant.Exception), TaskContinuationOptions.OnlyOnFaulted); Task ok task1.ContinueWith (ant Console.Write (Success!), TaskContinuationOptions.NotOnFaulted); 这种模式在结合子任务使用时非常有用我们马上会看到。 下面的扩展方法会“吞掉”任务的未处理异常 public static void IgnoreExceptions (this Task task) { task.ContinueWith (t { var ignore t.Exception; }, TaskContinuationOptions.OnlyOnFaulted); } 可以添加对异常的日志记录来进一步改进它。以下是用法 Task.Factory.StartNew (() { throw null; }).IgnoreExceptions(); 延续与子任务Permalink 延续的一个强大功能是它仅在所有子任务都完成时才会启动。这时所有子任务抛出的异常都会被封送给延续。 接下来的例子中我们启动三个子任务每个都抛出NullReferenceException。然后使用父任务的延续来一次性捕捉这些异常 TaskCreationOptions atp TaskCreationOptions.AttachedToParent; Task.Factory.StartNew (() { Task.Factory.StartNew (() { throw null; }, atp); Task.Factory.StartNew (() { throw null; }, atp); Task.Factory.StartNew (() { throw null; }, atp); }) .ContinueWith (p Console.WriteLine (p.Exception), TaskContinuationOptions.OnlyOnFaulted); 条件延续Permalink 默认情况下延续是被无条件调度的也就是说无论前项是完成、抛出异常还是取消延续都会执行。你可以通过设置TaskContinuationOptions枚举中的标识可组合来改变这种行为。三种控制条件延续的核心标识是 NotOnRanToCompletion 0x10000, NotOnFaulted 0x20000, NotOnCanceled 0x40000, 这些标识是做减法的也就是组合的越多延续越不可能被执行。为了方便使用也提供了以下预先组合好的值 OnlyOnRanToCompletion NotOnFaulted | NotOnCanceled, OnlyOnFaulted NotOnRanToCompletion | NotOnCanceled, OnlyOnCanceled NotOnRanToCompletion | NotOnFaulted 组合所有Not*标识[NotOnRanToCompletion, NotOnFaulted, NotOnCanceled]没有意义这会导致延续始终被取消。 RanToCompletion代表前项成功完成没有被取消也没有未处理的异常。 Faulted代表前项中有未处理的异常抛出。 Canceled代表以下两种情况之一 前项通过其取消标记被取消。换句话说OperationCanceledException在前项中抛出它的CancellationToken属性与启动时传递给前项的标记取消匹配。 前项被隐式的取消因为无法满足指定的延续条件。 特别需要注意的是如果这些标识导致延续无法执行延续并不是被忘记或抛弃而是被取消。这意味着所有延续任务上的延续就会开始运行除非你指定了NotOnCanceled。例如 Task t1 Task.Factory.StartNew (...); Task fault t1.ContinueWith (ant Console.WriteLine (fault), TaskContinuationOptions.OnlyOnFaulted); Task t3 fault.ContinueWith (ant Console.WriteLine (t3)); 像之前说的一样t3始终会被调度即使是t1没有抛出异常也是如此。因为t1成功完成fault任务会被取消而t3上并没有定义任何限制延续的条件所以t3就会被无条件执行。 如果希望仅在fault真正运行的情况下执行t3需要把代码改成 Task t3 fault.ContinueWith (ant Console.WriteLine (t3), TaskContinuationOptions.NotOnCanceled); 此外也可以指定OnlyOnRanToCompletion不同之处就是t3在fault抛出异常的情况下不会执行。 多前项的延续Permalink 延续的另一个有用的功能是它可以在多个前项完成后调度执行。ContinueWhenAll是在多个前项都完成后调度而ContinueWhenAny是在任意一个前项完成后调度。这两个方法都定义在TaskFactory类上 var task1 Task.Factory.StartNew (() Console.Write (X)); var task2 Task.Factory.StartNew (() Console.Write (Y)); var continuation Task.Factory.ContinueWhenAll ( new[] { task1, task2 }, tasks Console.WriteLine (Done)); 上面的例子会在打印 “ XY “ 或 “ YX “ 之后打印 “ Done “。Lambda 表达式中的tasks参数可以用来访问完成的任务数组当前项返回数据时可以用到。下面的例子对两个前项返回的数字求和 // 真实场景中 task1 和 task2 可能调用复杂的功能 Taskint task1 Task.Factory.StartNew (() 123); Taskint task2 Task.Factory.StartNew (() 456); Taskint task3 Taskint.Factory.ContinueWhenAll ( new[] { task1, task2 }, tasks tasks.Sum (t t.Result)); Console.WriteLine (task3.Result); // 579 在这个例子中我们使用了int类型参数来调用Task.Factory是为了演示获得了一个泛型的任务工厂。这个类型参数不是必须的它可以被编译器推断。 单前项的多个延续Permalink 对一个任务调用一次以上的ContinueWith会创建单前项的多个延续。当该前项完成时所有延续会一起启动除非指定了TaskContinuationOptions.ExecuteSynchronously这会导致延续顺序执行。 下面的代码会等待一秒然后打印 “ XY “ 或者 “ YX “ var t Task.Factory.StartNew (() Thread.Sleep (1000)); t.ContinueWith (ant Console.Write (X)); t.ContinueWith (ant Console.Write (Y)); 5.6任务调度器与 UIPermalink 任务调度器task scheduler为任务分配线程其由抽象类TaskScheduler类代表所有任务都会和一个任务调度器关联。Framework 提供了两种具体实现默认调度器default scheduler是使用 CLR 线程池工作还有同步上下文调度器synchronization context scheduler它主要是为了对于使用 WPF 和 Windows Forms 的场景提供帮助这里的线程模型需要 UI 控件只能在创建它们的线程上访问。例如假设我们需要在后台从一个 web 服务获取数据然后使用它更新一个叫做lblResult的 WPF 标签。这可以分解为两个任务 调用方法从 web 服务获取数据前项任务。使用结果更新lblResult延续任务。如果对延续任务指定了窗口创建时获取的同步上下文调度器那么就可以安全的更新lblResult public partial class MyWindow : Window { TaskScheduler _uiScheduler; // 定义一个字段以便于 // 在类中使用 public MyWindow() { InitializeComponent(); // 从创建窗口的线程获取 UI 调度器 _uiScheduler TaskScheduler.FromCurrentSynchronizationContext(); Task.Factory.StartNewstring (SomeComplexWebService) .ContinueWith (ant lblResult.Content ant.Result, _uiScheduler); } string SomeComplexWebService() { ... } } 也可以实现自己的任务调度器通过继承TaskScheduler但是一般只会在非常特殊的场景下才会这么做。对于自定义调度需要经常使用TaskCompletionSource我们马上会讲到。 5.7TaskFactoryPermalink 当调用Task.Factory时就是通过Task上的静态属性获取了默认的TaskFactory对象。这个任务工厂的作用就是创建任务具体的说有三种任务 普通任务通过StartNew多前项的延续通过ContinueWhenAll和ContinueWhenAny封装了异步编程模型APM的任务通过FromAsync 有趣的是TaskFactory是创建后两种任务的唯一方法。而对于StartNewTaskFactory纯粹是为了方便技术上说是多余的这完全等同于创建Task对象然后调用其Start方法。 创建自己的任务工厂Permalink TaskFactory不是抽象工厂你可以实例化这个类在希望重复使用同样的非默认的TaskCreationOptions值、TaskContinuationOptions值或者TaskScheduler时有用。例如如果希望重复创建长时间运行的子任务我们可以这样创建一个自定义工厂 var factory new TaskFactory ( TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None); 然后创建任务就可以仅调用这个工厂上的StartNew Task task1 factory.StartNew (Method1); Task task2 factory.StartNew (Method2); // ... 在调用ContinueWhenAll和ContinueWhenAny时自定义的延续选项会被应用。 5.8TaskCompletionSourcePermalink Task类做了两件事情 它可以调度一个委托到线程池线程上运行。它提供了管理工作项的丰富功能延续、子任务、异常封送等等。有趣的是这两件事可以是分离的可以只利用任务的管理工作项的功能而不让它调度到线程池上运行。TaskCompletionSource类开启了这个模式。 使用TaskCompletionSource时就创建它的实例。它暴露一个Task属性来返回一个任务你可以对其等待或附加延续就和对一般的任务一样。然而这个任务可以通过TaskCompletionSource对象的下列方法进行完全控制 public class TaskCompletionSourceTResult { public void SetResult (TResult result); public void SetException (Exception exception); public void SetCanceled(); public bool TrySetResult (TResult result); public bool TrySetException (Exception exception); public bool TrySetCanceled(); // ... } 如果调用多次SetResult、SetException和SetCanceled会抛出异常而Try*方法会返回false。 TResult对应任务的返回类型所以TaskCompletionSourceint会给你一个Taskint。如果需要不返回结果的任务可以使用object类型来创建TaskCompletionSource并在调用SetResult时传递null。可以把Taskobject转换为Task类型来使用。 下面的代码在等待五秒之后打印 “ 123 “ var source new TaskCompletionSourceint(); new Thread (() { Thread.Sleep (5000); source.SetResult (123); }) .Start(); Taskint task source.Task; // 我们的“奴隶”任务 Console.WriteLine (task.Result); // 123 稍后我们会展示使用如何BlockingCollection来写一个生产者 / 消费者队列。然后会演示使用TaskCompletionSource来改进这个方案它可以使队列中的工作项可以被等待和取消。 6使用 AggregateExceptionPermalink 如前所属PLINQ、Parallel类和Task都会自动封送异常给使用者。为了明白这么做的重要性考虑以下 LINQ 查询它在第一次迭代时会抛出DivideByZeroException try {var query from i in Enumerable.Range (0, 1000000) select 100 / i; // ... } catch (DivideByZeroException) { // ... } 如果我们使用 PLINQ 来并行化查询而假设它并没有进行异常处理那么DivideByZeroException可能会在一个线程中被抛出就会无视catch块从而导致程序结束。 因此异常会被自动捕捉并重新抛给调用方。然而不幸的是情况并不是就像捕捉一个DivideByZeroException那般简单。因为这些类库会利用很多线程很可能有两个或更多的异常被同时抛出。为了确保能够报告所有异常就使用了AggregateException作为容器来封装它们并通过InnerExceptions属性来暴露 try {var query from i in ParallelEnumerable.Range (0, 1000000) select 100 / i; // 对查询进行枚举 // ... } catch (AggregateException aex) { foreach (Exception ex in aex.InnerExceptions) Console.WriteLine (ex.Message); } PLINQ 和Parallel类都会在遇到第一个异常时停止查询或循环执行它使用的方式是不处理之后的元素或循环体。而在本轮循环结束前还有可能抛出更多的异常。第一个异常可以通过AggregateException上的InnerException属性获取。 6.1Flatten 和 HandlePermalink AggregateException类提供了一对方法来简化异常处理Flatten和Handle。 FlattenPermalink AggregateException经常会包含其它的AggregateException。比如在子任务抛出异常时就可能如此。你可以通过调用Flatten来消除任意层级的嵌套以简化处理。这个方法会返回一个新的AggregateException它的InnerExceptions就是展平之后的结果 catch (AggregateException aex) { foreach (Exception ex in aex.Flatten().InnerExceptions) myLogWriter.LogException (ex); } HandlePermalink 有时只需要捕捉特定类型的异常并重新抛出其它类型的异常。AggregateException上的Handle方法提供了一个快捷方案。它接受一个异常判定器来对所有封装的异常进行判定 public void Handle (FuncException, bool predicate) 如果判定器返回true则该异常被认为是“已处理”。对于所有异常都运行判定之后接下来会发生 如果所有异常都“已处理”判定器返回true则不会重新抛出异常。如果有异常被判定为false“未处理”则会生成一个新的AggregateException来封装这些异常并重新抛出。例如下面的代码最后会重新抛出一个AggregateException并且其中仅包含一个NullReferenceException var parent Task.Factory.StartNew (() { // 我们使用 3 个子任务同时抛出 3 个异常 int[] numbers { 0 }; var childFactory new TaskFactory (TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None); childFactory.StartNew (() 5 / numbers[0]); // 除数为零 childFactory.StartNew (() numbers [1]); // 索引越界 childFactory.StartNew (() { throw null; }); // 空引用 }); try { parent.Wait(); } catch (AggregateException aex) { aex.Flatten().Handle (ex // 注意这里还是需要调用 Flatten { if (ex is DivideByZeroException) { Console.WriteLine (Divide by zero); return true; // 该异常“已处理” } if (ex is IndexOutOfRangeException) { Console.WriteLine (Index out of range); return true; // 该异常“已处理” } return false; // 其它所有异常会被重新抛出 }); } 7并发集合Permalink Framework 4.0 在System.Collections.Concurrent命名空间中提供了一组新的集合。它们都是完全线程安全的 并发集合对应的非并发集合ConcurrentStackTStackTConcurrentQueueTQueueTConcurrentBagT( none )BlockingCollectionT( none )ConcurrentDictionaryTKey,TValueDictionaryTKey,TValue在一般的多线程场景中需要线程安全的集合时可能会用到这些并发集合。但是有些注意事项 并发集合是为了并行编程而调整的。除了高并发场景传统的集合都比它们更高效。线程安全的集合并不能确保使用它的代码也是线程安全的。如果在对并发集合进行枚举的同时有其它线程修改了集合并不会产生异常而是会得到一个新旧内容的混合结果。没有ListT的并发版本。并发的栈、队列和包bag类内部都是使用链表实现的。这使得它们的空间效率不如非并发的Stack和Queue类但是这对于并发访问更好因为链表有助于实现无锁或更少的锁。这是因为向链表中插入一个节点只需要更新两个引用而对于ListT这种结构插入一个元素可能需要移动几千个已存在的元素。换句话说这些集合并不是提供了加锁使用普通集合的快捷办法。为了演示这一点如果我们在单一线程上执行以下代码 var d new ConcurrentDictionaryint,int(); for (int i 0; i 1000000; i) d[i] 123; 它会比下面的代码慢三倍 var d new Dictionaryint,int(); for (int i 0; i 1000000; i) lock (d) d[i] 123; 但是对ConcurrentDictionary读取会更快因为读是无锁的。 并发集合与普通集合的另一个不同之处是它们暴露了一些特殊的方法来进行原子的检查并行动test-and-act的操作例如TryPop。这些方法中的大部分都是由IProducerConsumerCollectionT接口统一的。 7.1IProducerConsumerCollectionTPermalink 生产者 / 消费者集合有两个主要用例 添加一个元素“生产”获取一个元素并移除它“消费”典型的例子是栈和队列。生产者 / 消费者集合在并行编程中非常重要因为它有助于高效的无锁实现。 IProducerConsumerCollectionT接口代表了线程安全的生产者 / 消费者集合。以下类实现了该接口ConcurrentStackT、ConcurrentQueueT和ConcurrentBagT。 IProducerConsumerCollectionT扩展自ICollection并加入了以下方法 void CopyTo (T[] array, int index); T[] ToArray(); bool TryAdd (T item); bool TryTake (out T item); TryAdd和TryTake方法检查是否能进行添加 / 移除操作如果可以就进行添加 / 移除。检查和操作是原子的所以无需像普通集合那样使用锁 int result; lock (myStack) if (myStack.Count 0) result myStack.Pop(); TryTake在集合为空时返回false。TryAdd在三种实现中都总会成功并返回true。而如果你要写自己的不允许重复元素的并发集合就可以在元素已存在时让TryAdd返回false比如自己写并发集set。 TryTake移除的具体元素是在子类中定义的 对于栈TryTake移除最新添加的元素。对于队列TryTake移除最早添加的元素。对于包TryTake移除可以最快移除的元素。这三个具体类基本都是显式实现了TryTake和TryAdd方法也通过更具体的的名字暴露了同样的功能比如TryDequeue和TryPop。 7.2ConcurrentBagTPermalink ConcurrentBagT用来存储一组无序的对象允许重复。它适用于你不关心调用Take或TryTake会返回哪个元素的场景。 ConcurrentBagT相比并发队列和栈的好处是它的Add方法被很多线程同时调用时几乎没有竞争冲突。而对于并发队列和栈并行调用Add会有一些竞争冲突但是比对非并发集合加锁的方式要小得多。并发包的Take方法也非常高效只要每个线程不要拿出比它添加的数量更多的元素。 在并发包的内部每一个线程都有其私有的链表。元素会加入到调用Add的线程对应的私有链表中就消除了竞争冲突。在对包进行枚举时枚举器会遍历所有线程的私有链表返回其中的每一个元素。 调用Take时包会首先检查当前线程的私有链表。如果其中有至少一个元素就可以没有冲突的轻松完成任务大多数情况都是如此。但是如果链表没有元素它就必须从其它线程的私有链表中“偷”一个元素就可能导致竞争冲突。 所以准确的说调用Take会返回当前线程最新添加的元素如果当前线程没有对应的元素就会随机取一个其它线程返回它最新添加的元素。 如果你的并行操作基本都是在添加元素或者每个线程的Add和Take是平衡的那么使用并发包就很理想。我们来看前面的一个例子是使用Parallel.ForEach来实现并行拼写检查 var misspellings new ConcurrentBagTupleint,string(); Parallel.ForEach (wordsToTest, (word, state, i) { if (!wordLookup.Contains (word)) misspellings.Add (Tuple.Create ((int) i, word)); }); 对于实现生产者 / 消费者队列并发包就不是一个好的选择因为元素是在不同的线程进行添加和移除的。 7.3BlockingCollectionTPermalink 如果在ConcurrentStackT、ConcurrentQueueT和ConcurrentBagT这些生产者 / 消费者集合上调用TryTake时集合为空该方法会返回false。这种场景下有时可能等待一个元素被添加会更有用。 与其重载TryTake方法来实现这个功能如果还要允许取消和超时就可能需要大量成员不如使用 PFX 的设计者已经实现好的BlockingCollectionT类。阻塞集合可以封装任意实现了IProducerConsumerCollectionT接口的对象就可以调用这个封装上面的Take方法它在没有元素时会阻塞。 阻塞集合也可以让你限制集合的大小如果超过限制就阻塞生产者。这样限制了大小的集合被称为有界阻塞集合bounded blocking collection。 使用BlockingCollectionT时 创建其实例可选的指定一个IProducerConsumerCollectionT来封装还有集合的最大大小上界。调用Add或TryAdd来对底层集合添加元素。调用Take或TryTake来移除消费底层集合中的元素。如果调用构造方法的时候没有指定目标集合就会自动使用一个ConcurrentQueueT的实例。进行生成和消费的方法都可以指定取消标记和超时时间。Add和TryAdd在集合有界时可能会阻塞Take和TryTake在集合为空时会阻塞。 另一种消费元素的方式是调用GetConsumingEnumerable。它会返回一个可能无限的序列当有元素时就可以返回它。你可以调用CompleteAdding来强行结束这个序列它也会阻止之后再添加元素。 前面我们写过一个使用 Wait 和 Pulse的生产者 / 消费者队列。这里使用BlockingCollectionT来重构同一个类不考虑异常处理 public class PCQueue : IDisposable { BlockingCollectionAction _taskQ new BlockingCollectionAction(); public PCQueue (int workerCount) { // 为每个消费者创建并启动单独的任务 for (int i 0; i workerCount; i) Task.Factory.StartNew (Consume); } public void Dispose() { _taskQ.CompleteAdding(); } public void EnqueueTask (Action action) { _taskQ.Add (action); } void Consume() { // 没有元素时对序列的枚举就会被阻塞 // 而调用 CompleteAdding 可以结束枚举。 foreach (Action action in _taskQ.GetConsumingEnumerable()) action(); // 进行任务 } } 因为没有给BlockingCollection的构造方法传递任何参数所以会自动创建一个并发队列。而如果传递一个ConcurrentStack我们就会得到生产者 / 消费者栈。 BlockingCollection还提供了AddToAny和TakeFromAny这些静态方法它们可以让你对指定的多个阻塞集合进行添加或移除元素。操作会对第一个能够进行操作的集合进行。 利用 TaskCompletionSourcePermalink 我们之前实现的生产者 / 消费者模式还不够灵活因为工作项添加后无法追踪它们。如果能够实现以下功能会更好 能够获知工作项的完成。取消未启动的工作项。优雅的处理工作项抛出的异常。理想的解决方案是让EnqueueTask方法返回一个对象来提供我们上面描述的功能。好消息是这个类已经存在正是Task类。我们需要做的只是通过TaskCompletionSource来操控它 public class PCQueue : IDisposable { class WorkItem { public readonly TaskCompletionSourceobject TaskSource; public readonly Action Action; public readonly CancellationToken? CancelToken; public WorkItem ( TaskCompletionSourceobject taskSource, Action action, CancellationToken? cancelToken) { TaskSource taskSource; Action action; CancelToken cancelToken; } } BlockingCollectionWorkItem _taskQ new BlockingCollectionWorkItem(); public PCQueue (int workerCount) { // 为每个消费者创建并启动单独的任务 for (int i 0; i workerCount; i) Task.Factory.StartNew (Consume); } public void Dispose() { _taskQ.CompleteAdding(); } public Task EnqueueTask (Action action) { return EnqueueTask (action, null); } public Task EnqueueTask (Action action, CancellationToken? cancelToken) { var tcs new TaskCompletionSourceobject(); _taskQ.Add (new WorkItem (tcs, action, cancelToken)); return tcs.Task; } void Consume() { foreach (WorkItem workItem in _taskQ.GetConsumingEnumerable()) if (workItem.CancelToken.HasValue workItem.CancelToken.Value.IsCancellationRequested) { workItem.TaskSource.SetCanceled(); } else try { workItem.Action(); workItem.TaskSource.SetResult (null); // 表示完成 } catch (OperationCanceledException ex) { if (ex.CancellationToken workItem.CancelToken) workItem.TaskSource.SetCanceled(); else workItem.TaskSource.SetException (ex); } catch (Exception ex) { workItem.TaskSource.SetException (ex); } } } 在EnqueueTask中我们入队一个工作项它封装了目标委托和任务完成源从而让我们之后可以控制返回给消费者的任务。 在Consume中我们在出队一个工作项后先检查任务是否被取消。如果没有就运行委托然后调用任务完成源上的SetResult来表示任务完成。 下面是如何使用这个类 var pcQ new PCQueue (1); Task task pcQ.EnqueueTask (() Console.WriteLine (Easy!)); // ... 我们现在可以对task等待、附加延续、让延续中的异常传播给父任务等等。换句话说我们获得了任务模型的丰富功能同时也相当于自行实现了一个调度器。 8SpinLock 和 SpinWaitPermalink 在并行编程中短暂的自旋经常比阻塞更好因为它避免了上下文切换和内核模式转换的开销。SpinLock和SpinWait被设计用来在这种场景下提供帮助。它们的主要用途是实现自定义的同步构造。 SpinLock和SpinWait是结构体而不是类这个设计是一种避免间址和垃圾回收的极限优化技术。它意味着你必须小心不能不经意地复制了实例比如不使用ref修饰符把它们传递给另一个方法或者把它们定义成了readonly的字段。这在使用SpinLock时十分重要。 8.1SpinLockPermalink SpinLock结构体可以让你进行锁定而无需上下文切换的开销它的代价是保持一个线程自旋空忙。这种方式适用于高竞争的场景下锁定非常短暂的情况比如从头写一个线程安全的链表。 如果让自旋锁等待的太久最多是几毫秒它会和普通的锁一样出让其时间片导致上下文切换。再被重新调度后它会继续出让就这样不断的“自旋出让”。这比完全使用自旋消耗的 CPU 资源要少得多但是比阻塞要高。 在单核的机器上自旋锁在遇到竞争时会立即开始“自旋出让”。 使用SpinLock和普通的锁差不多除了以下几点 自旋锁是结构体前面有提到。自旋锁不可重入意味着不能在一个线程上连续两次调用同一个SpinLock上的Enter方法。如果违反了这条规则要不然会抛出异常启用所有者追踪owner tracking时要不然会死锁禁用所有者追踪时。在构造自旋锁时可以指定是否启用所有者追踪启用会影响性能。SpinLock可以让你通过IsHeld属性查询锁是否已被获取如果启用了所有者追踪那么使用IsHeldByCurrentThread属性。没有像 C# 的lock语句那样的语法糖来简化SpinLock的使用。另一个不同之处是当调用Enter时你必须遵循提供 lockTaken 参数的健壮模式几乎总是使用try / finally一起实现。 下面是个例子 var spinLock new SpinLock (true); // 启用所有者追踪 bool lockTaken false; try { spinLock.Enter (ref lockTaken); // 做些事情... } finally { if (lockTaken) spinLock.Exit(); } 和普通的锁一样当且仅当Enter方法抛出异常并且锁没有被获取时lockTaken会为false。这种场景非常罕见当调用该线程的Abort或者OutOfMemoryException异常被抛出时但可以让你确定之后是否需要调用Exit。 SpinLock也提供了接受超时时间的TryEnter方法。 由于SpinLock笨拙的值类型语义和缺乏语法支持几乎每次想用它都是受罪在替换掉普通的锁前请三思。 SpinLock在需要写自己的可重用同步构造时最有意义。即便如此自旋锁也不像看上去那么有用。它仍然限制了并发。并且会什么都不做的浪费 CPU 时间。经常更好的选择都是把时间花在一些“投机”的事情上并使用SpinWait来辅助。译者注这里“投机”是指先进行操作并检测抢占如果发现被抢占就重试详见SpinWait 8.2SpinWaitPermalink SpinWait可以帮助实现无锁的代码使用自旋而非阻塞。它实现了安全措施来避免普通自旋可能会造成的资源饥饿和优先级倒置。 使用SpinWait的无锁编程是多线程中最难的它是为了应对没有其它高层构造可以使用的场景。先决条件是理解非阻塞同步。 为什么需要 SpinWaitPermalink 假设我们写了一个纯粹基于一个简单标识的自旋信号系统 bool _proceed; void Test() { // 自旋直到其它线程把 _proceed 设置为 true while (!_proceed) Thread.MemoryBarrier(); // ... } 如果Test运行时_proceed已经为true或者几次循环内就能变为true那么这个实现就会很高效。但是现在假设_proceed在几秒内保持false并且有四个线程同时调用Test。这个自旋就会完全占用一个四核的 CPU这会导致其它线程运行缓慢资源饥饿包括那个会把_proceed设置为true的线程优先级倒置。在单核机器上状况会进一步恶化因为自旋几乎总是导致优先级倒置。虽然现在单核机器已经很少见了可是单核的虚拟机并不少。 SpinWait使用两种方法解决这个问题。首先它会限制消耗 CPU 的自旋在经过一定次数的自旋后就会每次循环都出让其时间片通过调用Thread.Yield 和 Thread.Sleep从而减少资源消耗。其次它会检测是否是在单核机器上运行如果是就会每次循环都出让其时间片。 如何使用 SpinWaitPermalink 有两种方式使用SpinWait。第一种是调用静态方法SpinUntil。这个方法接受一个判定器和一个可选的超时时间 bool _proceed; void Test() { SpinWait.SpinUntil (() { Thread.MemoryBarrier(); return _proceed; }); // ... } 另一种更灵活的方式是创建SpinWait结构体的实例并在循环中调用SpinOnce bool _proceed; void Test() { var spinWait new SpinWait(); while (!_proceed) { Thread.MemoryBarrier(); spinWait.SpinOnce(); } // ... } 前者就是使用后者提供的快捷方式。 SpinWait 如何工作Permalink 在当前的实现中SpinWait会在出让之前进行 10 次消耗 CPU 的迭代。但它并不会在每次迭代后立即返回调用方而是调用Thread.SpinWait来 通过 CLR最终是通过操作系统再自旋一定时间。这个时间最初是几十纳秒每次迭代都会加倍直到 10 次迭代结束。这在一定程度上保证了消耗 CPU 的自旋阶段的总时间的可预测性CLR 和操作系统可以根据情况来调节。一般来说这会在几十微秒的区间很小但是要大于上下文切换的开销。 在单核机器上SpinWait每次迭代都会出让。可以通过NextSpinWillYield属性来检查SpinWait在下一次自旋时会不会出让。 如果SpinWait在自旋出让模式保持了很久大概 20 次就会定期Sleep几微秒来进一步节约资源给其它线程使用。 使用 SpinWait 和 Interlocked.CompareExchange 进行无锁更新Permalink 结合SpinWait和Interlocked.CompareExchange可以原子的更新一个通过自己的值进行计算的字段读 - 改 - 写。例如假设我们要把字段 x 乘 10。非线程安全的简单代码就是 x x * 10; 它不是线程同步的原因就和我们在非阻塞同步中看到的对字段自增不是线程同步的原因一样。 正确的无锁方式如下 使用局部变量获取 x 的一个“快照”。计算新值这里就是将快照乘 10。如果快照还是最新的就将计算后的值写回这一步必须是原子的通过调用Interlocked.CompareExchange实现。如果快照过期了自旋并返回第 1 步。例如 int x;void MultiplyXBy (int factor) { var spinWait new SpinWait(); while (true) { int snapshot1 x; Thread.MemoryBarrier(); int calc snapshot1 * factor; int snapshot2 Interlocked.CompareExchange (ref x, calc, snapshot1); if (snapshot1 snapshot2) return; // 没有被抢占 spinWait.SpinOnce(); } } 我们可以去掉对Thread.MemoryBarrier的调用来略微提高性能。这是因为CompareExchange也会生成内存屏障。最坏的情况就是如果snapshot1在第一次迭代时就读取了一个过期的值那么会多进行一次自旋。 Interlocked.CompareExchange是在字段的当前值与第三个参数相等时使用指定的值来更新字段。它会返回字段的旧值就可以用来与原快照比较检查是否过期。如果值不相等意味着被另一个线程抢占就需要自旋重试。 CompareExchange也有重载可以对于object类型使用。我们可以利用这个重载来实现对所有引用类型的无锁更新方法 static void LockFreeUpdateT (ref T field, Func T, T updateFunction) where T : class { var spinWait new SpinWait(); while (true) { T snapshot1 field; T calc updateFunction (snapshot1); T snapshot2 Interlocked.CompareExchange (ref field, calc, snapshot1); if (snapshot1 snapshot2) return; spinWait.SpinOnce(); } } 下面是如何使用这个方法来写一个无锁的线程安全的事件实际上这是 C# 4.0 的编译器对于事件默认的处理 EventHandler _someDelegate; public event EventHandler SomeEvent { add { LockFreeUpdate (ref _someDelegate, d d value); } remove { LockFreeUpdate (ref _someDelegate, d d - value); } } SpinWait vs SpinLockPermalink 我们也可以通过把对共享的字段的访问放进SpinLock里来解决上面的问题。问题是自旋锁同一时间只允许一个线程进入尽管它通常能够消除上下文切换的开销。而使用SpinWait时我们可以假设没有竞争投机的运行。如果被抢占就重试。花费 CPU 时间做事情也许比在自旋锁中浪费 CPU 时间好 最后考虑下面的类 class Test {ProgressStatus _status new ProgressStatus (0, Starting); class ProgressStatus // 不可变类 { public readonly int PercentComplete; public readonly string StatusMessage; public ProgressStatus (int percentComplete, string statusMessage) { PercentComplete percentComplete; StatusMessage statusMessage; } } } 我们可以使用LockFreeUpdate方法来增加_status的PercentComplete字段的值 LockFreeUpdate (ref _status, s new ProgressStatus (s.PercentComplete 1, s.StatusMessage)); 注意我们基于现有值创建了新的ProgressStatus对象。要感谢LockFreeUpdate方法读取PercentComplete的值、增加它并写回的操作不会被不安全的抢占任何抢占都可以被可靠的检测到触发自旋重试。
http://www.yutouwan.com/news/142605/

相关文章:

  • 青海建设信用信息服务网站网站收录查询入口
  • 网站公司简介模板免费下载电商培训基地
  • 如何做后台网站的教程网站添加视频
  • 网站建设中html网页个人导航网站怎么备案
  • 无锡网站制作公司网片加工机器
  • wordpress文章半透明福州seo建站
  • 淘宝搜索热词排名seo标题优化
  • dede网站logo怎么改锦州市网站建设
  • 免费建设论坛网站做招聘网站的需求分析
  • 热门搜索关键词怎样优化网站自然排名
  • 用asp做旅游网站建筑工程发布网站
  • 汕头网站开发定制大理住房和城乡建设局网站
  • dedecms确定网站风格网站建设 中企动力 常州
  • 建设招标项目常挂网站有哪些网站的简介怎么在后台炒做
  • xxx网站策划书线上引流的八种推广方式
  • 自己做网站写文章如何增加网站的权重
  • 网站有效内容的宣传及推广如何做跨境电商怎么做
  • 一家网站建设公司需要什么资质移动商城网站建设 深圳
  • 一个空间怎么做多个网站长沙广告传媒有限公司
  • 手机网站建设在哪儿深圳市绿色建筑信息平台
  • 想给公司做个网站怎么做的提高网站知名度
  • 推广引流图片临沂网站优化哪家好
  • 怎样免费注册自己网站的域名WordPress如何禁止游客访问
  • 搬家网站建设思路邯郸房产
  • 网站备案 换域名云南楚雄旅游必去的景点
  • 网站设计分析案例手机做ppt免费模板
  • 怎么用ps做静态网站网站建设增值服务
  • 做定制校服的网站绿色企业网站模板
  • 建设网站企业网上银行登录入口官方网络网站推广选择乐云seo
  • 福清市建设局网站多少海口网站建设呢