- C#高级编程(第10版) C# 6 & .NET Core 1.0 (.NET开发经典名著)
- (美)Christian Nagel
- 2790字
- 2025-02-18 01:50:02
12.5 并发集合
不变的集合很容易在多个线程中使用,因为它们不能改变。如果希望使用应在多个线程中改变的集合,.NET在名称空间System.Collections.Concurrent中提供了几个线程安全的集合类。线程安全的集合可防止多个线程以相互冲突的方式访问集合。
为了对集合进行线程安全的访问,定义了IProducerConsumerCollection<T>接口。这个接口中最重要的方法是TryAdd()和TryTake()。TryAdd()方法尝试给集合添加一项,但如果集合禁止添加项,这个操作就可能失败。为了给出相关信息,TryAdd()方法返回一个布尔值,以说明操作是成功还是失败。TryTake()方法也以这种方式工作,以通知调用者操作是成功还是失败,并在操作成功时返回集合中的项。下面列出了System.Collections.Concurrent名称空间中的类及其功能。
● ConcurrentQueue<T> ——这个集合类用一种免锁定的算法实现,使用在内部合并到一个链表中的32项数组。访问队列元素的方法有Enqueue()、TryDequeue()和TryPeek()。这些方法的命名非常类似于前面Queue<T>类的方法,只是给可能调用失败的方法加上了前缀Try。因为这个类实现了IProducerConsumerCollection<T>接口,所以TryAdd()和TryTake()方法仅调用Enqueue()和TryDequeue()方法。
● ConcurrentStack<T> ——非常类似于ConcurrentQueue<T>类,只是带有另外的元素访问方法。ConcurrentStack<T>类定义了Push()、PushRange()、TryPeek()、TryPop()以及TryPopRange()方法。在内部这个类使用其元素的链表。
● ConcurrentBag<T> ——该类没有定义添加或提取项的任何顺序。这个类使用一个把线程映射到内部使用的数组上的概念,因此尝试减少锁定。访问元素的方法有Add()、TryPeek()和TryTake()。
● ConcurrentDictionary<TKey, TValue> ——这是一个线程安全的键值集合。TryAdd()、TryGetValue()、TryRemove()和TryUpdate()方法以非阻塞的方式访问成员。因为元素基于键和值,所以ConcurrentDictionary<TKey, TValue>没有实现IProducerConsumerCollection<T>。
● BlockingCollection<T> ——这个集合在可以添加或提取元素之前,会阻塞线程并一直等待。BlockingCollection<T>集合提供了一个接口,以使用Add()和Take()方法来添加和删除元素。这些方法会阻塞线程,一直等到任务可以执行为止。Add()方法有一个重载版本,其中可以给该重载版本传递一个CancellationToken令牌。这个令牌允许取消被阻塞的调用。如果不希望线程无限期地等待下去,且不希望从外部取消调用,就可以使用TryAdd()和TryTake()方法,在这些方法中,也可以指定一个超时值,它表示在调用失败之前应阻塞线程和等待的最长时间。
ConcurrentXXX集合是线程安全的,如果某个动作不适用于线程的当前状态,它们就返回false。在继续之前,总是需要确认添加或提取元素是否成功。不能相信集合会完成任务。
BlockingCollection<T>是对实现了IProducerConsumerCollection<T>接口的任意类的修饰器,它默认使用ConcurrentQueue<T>类。还可以给构造函数传递任何其他实现了IProducerConsumer-Collection<T>接口的类,例如,ConcurrentBag<T>和ConcurrentStack<T>。
12.5.1 创建管道
将这些并发集合类用于管道是一种很好的应用。一个任务向一个集合类写入一些内容,同时另一个任务从该集合中读取内容。
下面的示例应用程序演示了BlockingCollection<T>类的用法,使用多个任务形成一个管道。第一个管道如图12-1所示。第一阶段的任务读取文件名,并把它们添加到队列中。在这个任务运行的同时,第二阶段的任务已经开始从队列中读取文件名并加载它们的内容。结果被写入另一个队列。第三阶段可以同时启动,读取并处理第二个队列的内容。结果被写入一个字典。
在这个场景中,只有第三阶段完成,并且内容已被最终处理,在字典中得到了完整的结果时,下一个阶段才会开始。图12-2显示了接下来的步骤。第四阶段从字典中读取内容,转换数据,然后将其写入队列中。第五阶段在项中添加了颜色信息,然后把它们添加到另一个队列中。最后一个阶段显示了信息。第四阶段到第六阶段也可以并发运行。
Info类代表由管道维护的项(代码文件PipelineSample/Info.cs ):
public class Info { public string Word { get; set; } public int Count { get; set; } public string Color { get; set; } public override string ToString() => $"{Count} times: {Word}"; }
PipelineSample使用如下依赖项和名称空间:
依赖项:
NETStandard.Library
名称空间:
System.Collections.Generic System.Collections.Concurrent System.IO System.Linq System.Threading.Tasks static System.Console

图12-1
看看这个示例应用程序的代码可知,完整的管道是在StartPipeline()方法中管理的。该方法实例化了集合,并把集合传递到管道的各个阶段。第1阶段用ReadFilenamesAsync处理,第2和第3阶段分别由同时运行的LoadContentAsync和ProcessContentAsync处理。但是,只有当前3个阶段完成后,第4个阶段才能启动(代码文件PipelineSample/Program.cs)。

图12-2
public static async Task StartPipelineAsync() { var fileNames = new BlockingCollection<string>(); var lines = new BlockingCollection<string>(); var words = new ConcurrentDictionary<string, int>(); var items = new BlockingCollection<Info>(); var coloredItems = new BlockingCollection<Info>(); Task t1 = PipelineStages.ReadFilenamesAsync(@"../../..", fileNames); ColoredConsole.WriteLine("started stage 1"); Task t2 = PipelineStages.LoadContentAsync(fileNames, lines); ConsoleHelper.WriteLine("started stage 2"); Task t3 = PipelineStages.ProcessContentAsync(lines, words); await Task.WhenAll(t1, t2, t3); ConsoleHelper.WriteLine("stages 1, 2, 3 completed"); Task t4 = PipelineStages.TransferContentAsync(words, items); Task t5 = PipelineStages.AddColorAsync(items, coloredItems); Task t6 = PipelineStages.ShowContentAsync(coloredItems); ColoredConsole.WriteLine("stages 4, 5, 6 started"); await Task.WhenAll(t4, t5, t6); ColoredConsole.WriteLine("all stages finished"); }
注意:这个示例应用程序使用了任务以及async和await关键字,第15章将介绍它们。第21章将详细介绍线程、任务和同步。第23章将讨论文件I/O。
本例用ColoredConsole类向控制台写入信息。该类可以方便地改变控制台输出的颜色,并使用同步来避免返回颜色错误的输出(代码文件PipelineSample/ConsoleHelper.cs):
public static class ColoredConsole { private static object syncOutput = new object(); public static void WriteLine(string message) { lock (syncOutput) { Console.WriteLine(message); } } public static void WriteLine(string message, string color) { lock (syncOutput) { Console.ForegroundColor = (ConsoleColor)Enum.Parse( typeof(ConsoleColor), color); Console.WriteLine(message); Console.ResetColor(); } } }
12.5.2 使用BlockingCollection
现在介绍管道的第一阶段。ReadFilenamesAsync接收BlockingCollection<T>为参数,在其中写入输出。该方法的实现使用枚举器来迭代指定目录及其子目录中的C#文件。这些文件的文件名用Add方法添加到BlockingCollection<T>中。完成添加文件名的操作后,调用CompleteAdding方法,以通知所有读取器不应再等待集合中的任何额外项(代码文件PipelineSample/PipelineStages.cs):
public static class PipelineStages { public static Task ReadFilenamesAsync(string path, BlockingCollection<string> output) { return Task.Factory.StartNew(() => { foreach (string filename in Directory.EnumerateFiles(path, "*.cs", SearchOption.AllDirectories)) { output.Add(filename); ColoredConsole.WriteLine($"stage 1: added {filename}"); } output.CompleteAdding(); }, TaskCreationOptions.LongRunning); } //. . .
注意:如果在写入器添加项的同时,读取器从BlockingCollection<T>中读取,那么调用CompleteAdding方法是很重要的。否则,读取器会在foreach循环中等待更多的项被添加。
下一个阶段是读取文件并将其内容添加到另一个集合中,这由LoadContentAsync方法完成。该方法使用了输入集合传递的文件名,打开文件,然后把文件中的所有行添加到输出集合中。在foreach循环中,用输入阻塞集合调用GetConsumingEnumerable,以迭代各项。直接使用input变量而不调用GetConsumingEnumerable是可以的,但是这只会迭代当前状态的集合,而不会迭代以后添加的项。
public static async Task LoadContentAsync(BlockingCollection<string> input, BlockingCollection<string> output) { foreach (var filename in input.GetConsumingEnumerable() ) { using (FileStream stream = File.OpenRead(filename)) { var reader = new StreamReader(stream); string line = null; while ((line = await reader.ReadLineAsync()) ! = null) { output.Add(line); ColoredConsole.WriteLine($"stage 2: added {line}"); } } } output.CompleteAdding(); }
注意:如果在填充集合的同时,使用读取器读取集合,则需要使用GetConsumingEnumerable方法获取阻塞集合的枚举器,而不是直接迭代集合。
12.5.3 使用ConcurrentDictionary
ProcessContentAsync方法实现了第三阶段。这个方法获取输入集合中的行,然后拆分它们,将各个词筛选到输出字典中。AddOrUpdate是ConcurrentDictionary类型的一个方法。如果键没有添加到字典中,第二个参数就定义应该设置的值。如果键已存在于字典中,updateValueFactory参数就定义值的改变方式。在这种情况下,现有的值只是递增1:
public static Task ProcessContentAsync(BlockingCollection<string> input, ConcurrentDictionary<string, int> output) { return Task.Factory.StartNew(() => { foreach (var line in input.GetConsumingEnumerable() ) { string[] words = line.Split(' ', '; ', '\t', '{', '}', '(', ')', ':', ', ', '"'); foreach (var word in words.Where(w => ! string.IsNullOrEmpty(w))) { output.AddOrUpdate(key: word, addValue: 1, updateValueFactory: (s, i) = > ++i) ; ColoredConsole.WriteLine($"stage 3: added {word}"); } } }, TaskCreationOptions.LongRunning); }
运行前3个阶段的应用程序,得到的输出如下所示,各个阶段的操作交织在一起:
stage 3: added DisplayBits stage 3: added bits2 stage 3: added Write stage 3: added = stage 3: added bits1.Or stage 2: added DisplayBits(bits2); stage 2: added Write(" and "); stage 2: added DisplayBits(bits1); stage 2: added WriteLine(); stage 2: added DisplayBits(bits2);
12.5.4 完成管道
在完成前3个阶段后,接下来的3个阶段也可以并行运行。TransferContentAsync从字典中获取数据,将其转换为Info类型,然后放到输出BlockingCollection<T>中(代码文件PipelineSample/PipelineStages.cs):
public static Task TransferContentAsync( ConcurrentDictionary<string, int> input, BlockingCollection<Info> output) { return Task.Factory.StartNew(() => { foreach (var word in input.Keys) { int value; if (input.TryGetValue(word, out value)) { var info = new Info { Word = word, Count = value }; output.Add(info); ColoredConsole.WriteLine($"stage 4: added {info}"); } } output.CompleteAdding(); }, TaskCreationOptions.LongRunning); }
管道阶段AddColorAsync根据Count属性的值设置Info类型的Color属性:
public static Task AddColorAsync(BlockingCollection<Info> input, BlockingCollection<Info> output) { return Task.Factory.StartNew(() => { foreach (var item in input.GetConsumingEnumerable()) { if (item.Count > 40) { item.Color = "Red"; } else if (item.Count > 20) { item.Color = "Yellow"; } else { item.Color = "Green"; } output.Add(item); ColoredConsole.WriteLine($"stage 5: added color {item.Color} to {item}"); } output.CompleteAdding(); }, TaskCreationOptions.LongRunning); }
最后一个阶段用指定的颜色在控制台中输出结果:
public static Task ShowContentAsync(BlockingCollection<Info> input) { return Task.Factory.StartNew(() => { foreach (var item in input.GetConsumingEnumerable()) { ColoredConsole.WriteLine($"stage 6: {item}", item.Color); } }, TaskCreationOptions.LongRunning); }
运行应用程序,得到的结果如下所示,它是彩色的。
stage 6: 20 times: static stage 6: 3 times: Count stage 6: 2 times: t2 stage 6: 1 times: bits2[sectionD] stage 6: 3 times: set stage 6: 2 times: Console.ReadLine stage 6: 3 times: started stage 6: 1 times: builder.Remove stage 6: 1 times: reader stage 6: 2 times: bit4 stage 6: 1 times: ForegroundColor stage 6: 1 times: all all stages finished