12.5 并发集合

不变的集合很容易在多个线程中使用,因为它们不能改变。如果希望使用应在多个线程中改变的集合,.NET在名称空间System.Collections.Concurrent中提供了几个线程安全的集合类。线程安全的集合可防止多个线程以相互冲突的方式访问集合。

为了对集合进行线程安全的访问,定义了IProducerConsumerCollection<T>接口。这个接口中最重要的方法是TryAdd()和TryTake()。TryAdd()方法尝试给集合添加一项,但如果集合禁止添加项,这个操作就可能失败。为了给出相关信息,TryAdd()方法返回一个布尔值,以说明操作是成功还是失败。TryTake()方法也以这种方式工作,以通知调用者操作是成功还是失败,并在操作成功时返回集合中的项。下面列出了System.Collections.Concurrent名称空间中的类及其功能。

● ConcurrentQueue<T> ——这个集合类用一种免锁定的算法实现,使用在内部合并到一个链表中的32项数组。访问队列元素的方法有Enqueue()、TryDequeue()和TryPeek()。这些方法的命名非常类似于前面Queue<T>类的方法,只是给可能调用失败的方法加上了前缀Try。因为这个类实现了IProducerConsumerCollection<T>接口,所以TryAdd()和TryTake()方法仅调用Enqueue()和TryDequeue()方法。

● ConcurrentStack<T> ——非常类似于ConcurrentQueue<T>类,只是带有另外的元素访问方法。ConcurrentStack<T>类定义了Push()、PushRange()、TryPeek()、TryPop()以及TryPopRange()方法。在内部这个类使用其元素的链表。

● ConcurrentBag<T> ——该类没有定义添加或提取项的任何顺序。这个类使用一个把线程映射到内部使用的数组上的概念,因此尝试减少锁定。访问元素的方法有Add()、TryPeek()和TryTake()。

● ConcurrentDictionary<TKey, TValue> ——这是一个线程安全的键值集合。TryAdd()、TryGetValue()、TryRemove()和TryUpdate()方法以非阻塞的方式访问成员。因为元素基于键和值,所以ConcurrentDictionary<TKey, TValue>没有实现IProducerConsumerCollection<T>。

● BlockingCollection<T> ——这个集合在可以添加或提取元素之前,会阻塞线程并一直等待。BlockingCollection<T>集合提供了一个接口,以使用Add()和Take()方法来添加和删除元素。这些方法会阻塞线程,一直等到任务可以执行为止。Add()方法有一个重载版本,其中可以给该重载版本传递一个CancellationToken令牌。这个令牌允许取消被阻塞的调用。如果不希望线程无限期地等待下去,且不希望从外部取消调用,就可以使用TryAdd()和TryTake()方法,在这些方法中,也可以指定一个超时值,它表示在调用失败之前应阻塞线程和等待的最长时间。

ConcurrentXXX集合是线程安全的,如果某个动作不适用于线程的当前状态,它们就返回false。在继续之前,总是需要确认添加或提取元素是否成功。不能相信集合会完成任务。

BlockingCollection<T>是对实现了IProducerConsumerCollection<T>接口的任意类的修饰器,它默认使用ConcurrentQueue<T>类。还可以给构造函数传递任何其他实现了IProducerConsumer-Collection<T>接口的类,例如,ConcurrentBag<T>和ConcurrentStack<T>。

12.5.1 创建管道

将这些并发集合类用于管道是一种很好的应用。一个任务向一个集合类写入一些内容,同时另一个任务从该集合中读取内容。

下面的示例应用程序演示了BlockingCollection<T>类的用法,使用多个任务形成一个管道。第一个管道如图12-1所示。第一阶段的任务读取文件名,并把它们添加到队列中。在这个任务运行的同时,第二阶段的任务已经开始从队列中读取文件名并加载它们的内容。结果被写入另一个队列。第三阶段可以同时启动,读取并处理第二个队列的内容。结果被写入一个字典。

在这个场景中,只有第三阶段完成,并且内容已被最终处理,在字典中得到了完整的结果时,下一个阶段才会开始。图12-2显示了接下来的步骤。第四阶段从字典中读取内容,转换数据,然后将其写入队列中。第五阶段在项中添加了颜色信息,然后把它们添加到另一个队列中。最后一个阶段显示了信息。第四阶段到第六阶段也可以并发运行。

Info类代表由管道维护的项(代码文件PipelineSample/Info.cs ):

        public class Info
        {
          public string Word { get; set; }
          public int Count { get; set; }
          public string Color { get; set; }
          public override string ToString() => $"{Count} times: {Word}";
        }

PipelineSample使用如下依赖项和名称空间:

依赖项:

        NETStandard.Library

名称空间:

        System.Collections.Generic
        System.Collections.Concurrent
        System.IO
        System.Linq
        System.Threading.Tasks
        static System.Console

图12-1

看看这个示例应用程序的代码可知,完整的管道是在StartPipeline()方法中管理的。该方法实例化了集合,并把集合传递到管道的各个阶段。第1阶段用ReadFilenamesAsync处理,第2和第3阶段分别由同时运行的LoadContentAsync和ProcessContentAsync处理。但是,只有当前3个阶段完成后,第4个阶段才能启动(代码文件PipelineSample/Program.cs)。

图12-2

        public static async Task StartPipelineAsync()
        {
          var fileNames = new BlockingCollection<string>();
          var lines = new BlockingCollection<string>();
          var words = new ConcurrentDictionary<string, int>();
          var items = new BlockingCollection<Info>();
          var coloredItems = new BlockingCollection<Info>();
          Task t1 = PipelineStages.ReadFilenamesAsync(@"../../..", fileNames);
          ColoredConsole.WriteLine("started stage 1");
          Task t2 = PipelineStages.LoadContentAsync(fileNames, lines);
          ConsoleHelper.WriteLine("started stage 2");
          Task t3 = PipelineStages.ProcessContentAsync(lines, words);
          await Task.WhenAll(t1, t2, t3);
          ConsoleHelper.WriteLine("stages 1, 2, 3 completed");
          Task t4 = PipelineStages.TransferContentAsync(words, items);
          Task t5 = PipelineStages.AddColorAsync(items, coloredItems);
          Task t6 = PipelineStages.ShowContentAsync(coloredItems);
          ColoredConsole.WriteLine("stages 4, 5, 6 started");
          await Task.WhenAll(t4, t5, t6);
          ColoredConsole.WriteLine("all stages finished");
        }

注意:这个示例应用程序使用了任务以及async和await关键字,第15章将介绍它们。第21章将详细介绍线程、任务和同步。第23章将讨论文件I/O。

本例用ColoredConsole类向控制台写入信息。该类可以方便地改变控制台输出的颜色,并使用同步来避免返回颜色错误的输出(代码文件PipelineSample/ConsoleHelper.cs):

        public static class ColoredConsole
        {
          private static object syncOutput = new object();
          public static void WriteLine(string message)
          {
            lock (syncOutput)
            {
              Console.WriteLine(message);
            }
          }
          public static void WriteLine(string message, string color)
          {
            lock (syncOutput)
            {
              Console.ForegroundColor = (ConsoleColor)Enum.Parse(
                  typeof(ConsoleColor), color);
              Console.WriteLine(message);
              Console.ResetColor();
            }
          }
        }

12.5.2 使用BlockingCollection

现在介绍管道的第一阶段。ReadFilenamesAsync接收BlockingCollection<T>为参数,在其中写入输出。该方法的实现使用枚举器来迭代指定目录及其子目录中的C#文件。这些文件的文件名用Add方法添加到BlockingCollection<T>中。完成添加文件名的操作后,调用CompleteAdding方法,以通知所有读取器不应再等待集合中的任何额外项(代码文件PipelineSample/PipelineStages.cs):

        public static class PipelineStages
        {
          public static Task ReadFilenamesAsync(string path,
              BlockingCollection<string> output)
          {
            return Task.Factory.StartNew(() =>
            {
              foreach (string filename in Directory.EnumerateFiles(path, "*.cs",
                  SearchOption.AllDirectories))
              {
                output.Add(filename);
                ColoredConsole.WriteLine($"stage 1: added {filename}");
            }
            output.CompleteAdding();
          }, TaskCreationOptions.LongRunning);
        }
        //. . .

注意:如果在写入器添加项的同时,读取器从BlockingCollection<T>中读取,那么调用CompleteAdding方法是很重要的。否则,读取器会在foreach循环中等待更多的项被添加。

下一个阶段是读取文件并将其内容添加到另一个集合中,这由LoadContentAsync方法完成。该方法使用了输入集合传递的文件名,打开文件,然后把文件中的所有行添加到输出集合中。在foreach循环中,用输入阻塞集合调用GetConsumingEnumerable,以迭代各项。直接使用input变量而不调用GetConsumingEnumerable是可以的,但是这只会迭代当前状态的集合,而不会迭代以后添加的项。

        public static async Task LoadContentAsync(BlockingCollection<string> input,
            BlockingCollection<string> output)
        {
          foreach (var filename in input.GetConsumingEnumerable() )
          {
            using (FileStream stream = File.OpenRead(filename))
            {
              var reader = new StreamReader(stream);
              string line = null;
              while ((line = await reader.ReadLineAsync()) ! = null)
              {
                output.Add(line);
                ColoredConsole.WriteLine($"stage 2: added {line}");
              }
            }
          }
          output.CompleteAdding();
        }

注意:如果在填充集合的同时,使用读取器读取集合,则需要使用GetConsumingEnumerable方法获取阻塞集合的枚举器,而不是直接迭代集合。

12.5.3 使用ConcurrentDictionary

ProcessContentAsync方法实现了第三阶段。这个方法获取输入集合中的行,然后拆分它们,将各个词筛选到输出字典中。AddOrUpdate是ConcurrentDictionary类型的一个方法。如果键没有添加到字典中,第二个参数就定义应该设置的值。如果键已存在于字典中,updateValueFactory参数就定义值的改变方式。在这种情况下,现有的值只是递增1:

        public static Task ProcessContentAsync(BlockingCollection<string> input,
                ConcurrentDictionary<string, int> output)
        {
          return Task.Factory.StartNew(() =>
          {
            foreach (var line in input.GetConsumingEnumerable() )
            {
              string[] words = line.Split(' ', '; ', '\t', '{', '}', '(', ')', ':',
                  ', ', '"');
              foreach (var word in words.Where(w => ! string.IsNullOrEmpty(w)))
              {
                output.AddOrUpdate(key: word, addValue: 1,
                  updateValueFactory: (s, i) = > ++i) ;
                ColoredConsole.WriteLine($"stage 3: added {word}");
              }
            }
          }, TaskCreationOptions.LongRunning);
        }

运行前3个阶段的应用程序,得到的输出如下所示,各个阶段的操作交织在一起:

        stage 3: added DisplayBits
        stage 3: added bits2
        stage 3: added Write
        stage 3: added =
        stage 3: added bits1.Or
        stage 2: added          DisplayBits(bits2);
        stage 2: added          Write(" and ");
        stage 2: added          DisplayBits(bits1);
        stage 2: added          WriteLine();
        stage 2: added          DisplayBits(bits2);

12.5.4 完成管道

在完成前3个阶段后,接下来的3个阶段也可以并行运行。TransferContentAsync从字典中获取数据,将其转换为Info类型,然后放到输出BlockingCollection<T>中(代码文件PipelineSample/PipelineStages.cs):

        public static Task TransferContentAsync(
            ConcurrentDictionary<string, int> input,
            BlockingCollection<Info> output)
        {
          return Task.Factory.StartNew(() =>
          {
            foreach (var word in input.Keys)
            {
              int value;
              if (input.TryGetValue(word, out value))
              {
                var info = new Info { Word = word, Count = value };
                output.Add(info);
                ColoredConsole.WriteLine($"stage 4: added {info}");
              }
            }
            output.CompleteAdding();
          }, TaskCreationOptions.LongRunning);
        }

管道阶段AddColorAsync根据Count属性的值设置Info类型的Color属性:

        public static Task AddColorAsync(BlockingCollection<Info> input,
            BlockingCollection<Info> output)
        {
          return Task.Factory.StartNew(() =>
          {
            foreach (var item in input.GetConsumingEnumerable())
            {
              if (item.Count > 40)
              {
                item.Color = "Red";
              }
              else if (item.Count > 20)
              {
                item.Color = "Yellow";
              }
              else
              {
                item.Color = "Green";
              }
              output.Add(item);
              ColoredConsole.WriteLine($"stage 5: added color {item.Color} to {item}");
            }
            output.CompleteAdding();
          }, TaskCreationOptions.LongRunning);
        }

最后一个阶段用指定的颜色在控制台中输出结果:

        public static Task ShowContentAsync(BlockingCollection<Info> input)
        {
          return Task.Factory.StartNew(() =>
          {
            foreach (var item in input.GetConsumingEnumerable())
            {
              ColoredConsole.WriteLine($"stage 6: {item}", item.Color);
            }
          }, TaskCreationOptions.LongRunning);
        }

运行应用程序,得到的结果如下所示,它是彩色的。

        stage 6: 20 times: static
        stage 6: 3 times: Count
        stage 6: 2 times: t2
        stage 6: 1 times: bits2[sectionD]
        stage 6: 3 times: set
        stage 6: 2 times: Console.ReadLine
        stage 6: 3 times: started
        stage 6: 1 times: builder.Remove
        stage 6: 1 times: reader
        stage 6: 2 times: bit4
        stage 6: 1 times: ForegroundColor
        stage 6: 1 times: all
        all stages finished