vlambda博客
学习文章列表

.NET(C#):线程安全集合的阻塞BlockingCollection的使用

目录

  • 1. 限制最大容量:BoundedCapacity

  • 2. 禁止加入:CompleteAdding和IsCompleted

  • 3. 枚举:GetConsumingEnumerable和BlockingCollection本身

  • 4. GetConsumingEnumerable和CompleteAdding

 

 

返回目录

1. 限制最大容量:BoundedCapacity

BoundedCapacity属性和CompleteAdding方法,它们都可以从某种方式上限制元素被加入到集合中。但BoundedCapacity是用来限制集合的最大容量,当容量已满后,后续的添加操作会被阻塞,一旦有元素被移除,那么阻塞的添加操作会成功执行。

 

比如下面代码,试图将1-50加入到BlockingCollection,此时默认内部是ConcurrentBag,当然你可以指定任意IProducerConsumerCollection。我们把BoundedCapacity设置成2。

 

var bcollec =newBlockingCollection<int>(2);//试图添加1-50Task.Run(() => { Parallel.For(1, 51, i => { bcollec.Add(i); Console.WriteLine("加入:"+ i); }); }); Thread.Sleep(1000);Console.WriteLine("调用一次Take");bcollec.Take(); Thread.Sleep(Timeout.Infinite);


 

可能的输出:

加入:37

加入:13

调用一次Take

加入:25

 

只有最多两个可以加入,然后调用Take后,下一个元素才会被加入。(注意此时Parallel.For中会有多个线程处于阻塞状态,因为无法加入数据)。

 

 

 

返回目录

2. 禁止加入:CompleteAdding和IsCompleted

 

CompleteAdding方法则是直接不允许任何元素被加入集合,即使是当前元素的数量小于BoundedCapacity属性。

 

代码:

var bcollec =newBlockingCollection<int>(5);//试图添加1-50Task.Run(() => { Parallel.For(1, 51, i => { Console.WriteLine("准备加入:"+ i); bcollec.Add(i); Console.WriteLine("== 成功加入:"+ i); Thread.Sleep(1000); }); }); //等待一小段时间后马上调用CompleteAddingThread.Sleep(500);Console.WriteLine("调用CompleteAdding");bcollec.CompleteAdding(); Thread.Sleep(Timeout.Infinite);


 

上述代码可能的输出:

准备加入:1

准备加入:13

准备加入:25

准备加入:37

== 成功加入:13

== 成功加入:1

== 成功加入:37

== 成功加入:25

调用CompleteAdding

准备加入:2

准备加入:38

准备加入:26

准备加入:14

 

可以看到,虽然BlockingCollection的BoundedCapacity为5,但是由于提前调用了CompleteAdding,即使当前集合只有4个元素,也不会再同意新的加入操作了。

 

那么CompleteAdding有什么用?当使用了CompleteAdding方法后且集合内没有元素的时候,另一个属性IsCompleted此时会为True,这个属性可以用来判断是否当前集合内的所有元素都被处理完,而BlockingCollection背后的IProducerConsumerCollection恰恰常用来处理此类生产者-消费者问题的。

 

下面我们首先在多个线程中试图往BlockingCollection中加入元素,然后中途调用CompleteAdding,接着通过IsCompleted属性逐个处理被成功加入的元素。

 

如下代码:

var bcollec =newBlockingCollection<int>();//试图添加1-50Task.Run(() => { Parallel.For(1, 51, i => { bcollec.Add(i); Console.WriteLine("成功加入:"+ i); Thread.Sleep(1000); }); }); //等待一小段时间后马上调用CompleteAddingThread.Sleep(700);Console.WriteLine("调用CompleteAdding");bcollec.CompleteAdding(); Console.WriteLine("容器元素数量:"+ bcollec.Count); while (!bcollec.IsCompleted){ var res = bcollec.Take(); Console.WriteLine("取出:"+ res);} Console.WriteLine("操作完成"); Thread.Sleep(Timeout.Infinite);


 

可能的输出:

成功加入:37

成功加入:25

成功加入:13

成功加入:1

调用CompleteAdding

容器元素数量:4

取出:1

取出:37

取出:25

取出:13

操作完成

 

 

返回目录

3. 枚举:GetConsumingEnumerable和BlockingCollection本身

BlockingCollection有两种枚举方法,首先BlockingCollection本身继承自IEnumerable<T>,所以它自己就可以被foreach枚举,首先BlockingCollection包装了一个线程安全集合,那么它自己也是线程安全的,而当多个线程在同时修改或访问线程安全容器时,BlockingCollection自己作为IEnumerable会返回一个一定时间内的集合片段,也就是只会枚举在那个时间点上内部集合的元素。

 

看下面代码:

var bcollec =newBlockingCollection<int>();//试图添加1-10Task.Run(() =>{ var forOpt =newParallelOptions() { //防止在某些硬件上并发数太多 MaxDegreeOfParallelism =2 };  Parallel.For(1, 11, forOpt, i => { bcollec.Add(i); Console.WriteLine("成功加入:"+ i); Thread.Sleep(500); });}); Thread.Sleep(700);//开始枚举Task.Run(() =>{ foreach (var i in bcollec) Console.WriteLine("输出:"+ i);}); Thread.Sleep(Timeout.Infinite);


 

我们边加入元素边进行枚举(直接在BlockingCollection上foreach),可能的输出:

成功加入:1

成功加入:6

成功加入:2

成功加入:7

输出:1

输出:6

输出:2

输出:7

成功加入:8

成功加入:3

成功加入:4

成功加入:9

成功加入:5

成功加入:10

 

可以看到,BlockingCollection本身的迭代器只能反映出一时的容器内容。

 

而BlockingCollection还有一个GetConsumingEnumerable方法,同样返回一个IEnumerable<T>,这个可枚举的集合背后的迭代器不同于BlockingCollection本身的迭代器,它可以返回最新的加入的元素,如果当前时间段没有元素被加入,它会阻塞然后等新加进来的元素。

 

我们把上面的使用BlockingCollection本身枚举代码中的枚举Task改成这样:

//开始枚举Task.Run(() =>{ foreach (var i in bcollec.GetConsumingEnumerable()) Console.WriteLine("输出:"+ i); Console.WriteLine("完成枚举");});


 

可能的输出:

成功加入:6

成功加入:1

成功加入:2

成功加入:7

输出:6

输出:1

输出:2

输出:7

成功加入:3

成功加入:8

输出:3

输出:8

成功加入:4

成功加入:9

输出:4

输出:9

成功加入:10

成功加入:5

输出:10

输出:5

 

这个迭代器很给力,一直处于等待和执行的状态,只要有新的元素被加入,它会找机会去执行foreach的内容,然后再阻塞去等新的元素。

 

而且在输出中,代码里的“完成枚举”字符串一直没有被输出。此时它还在卖力地等……因为它不确定什么时候才不会有新元素被加入。

 

 

 

返回目录

4. GetConsumingEnumerable和CompleteAdding

 

好,此时你应该想到了上面学的CompleteAdding方法,它可以禁止新的元素被加入到BlockingCollection的内部线程安全集合中,所以使用这个方法可以通知GetConsumingEnumerable的迭代器您老不用再等了,后面不会有元素被加进来了。

 

如下代码:

 

抱歉,这几段代码都不短,而且都类似。但我仍然把完整代码贴出来,虽然这使文章比较冗长,但是我觉得这样读者浏览或者复制时从上到下一目了然,总比看到诸如“请把前面xxx个代码做如下修改:把xxx行改成xxx,在xxx行加入这段代码……”好吧。

 

var bcollec =newBlockingCollection<int>();//试图添加1-10Task.Run(() =>{ var forOpt =newParallelOptions() { //防止在某些硬件上并发数太多 MaxDegreeOfParallelism =2 };  Parallel.For(1, 11, forOpt, i => { Console.WriteLine("等待加入:"+ i); bcollec.Add(i); Console.WriteLine("成功加入:"+ i); Thread.Sleep(500); });}); Thread.Sleep(600);//开始枚举Task.Run(() =>{ foreach (var i in bcollec.GetConsumingEnumerable()) Console.WriteLine("输出:"+ i); Console.WriteLine("完成枚举");}); Thread.Sleep(300); bcollec.CompleteAdding();Console.WriteLine("=== 调用CompleteAdding"); Thread.Sleep(Timeout.Infinite); 


可能的输出:

等待加入:1

等待加入:6

成功加入:1

成功加入:6

等待加入:2

成功加入:2

等待加入:7

成功加入:7

输出:1

输出:6

输出:2

输出:7

=== 调用CompleteAdding

完成枚举

等待加入:3

等待加入:8

 

可以看到,等CompleteAdding,“枚举完成”马上被输出!