Monday, February 22, 2010

Three steps to the blocking collection: [3] Blocking collection

About two months ago I started working on Delphi clone of .NET 4 BlockingCollection. Initial release was completed just before the end of 2009 and I started to write a series of articles on TOmniBlockingCollection in early January but then I got stuck in the dynamic lock-free queue implementation. Instead of writing articles I spent most of my free time working on that code.

Now it is (finally) time to complete the journey. Everything that had to be said about the infrastructure was told and I only have to show you the internal workings of the blocking collection itself.

[Step 1: Three steps to the blocking collection: [1] Inverse semaphore]

[Step 2: Dynamic lock-free queue – doing it right]

The blocking collecting is exposed as an interface that lives in the OtlCollections unit.

  IOmniBlockingCollection = interface(IGpTraceable) 
procedure Add(const value: TOmniValue);
procedure CompleteAdding;
function GetEnumerator: IOmniValueEnumerator;
function IsCompleted: boolean;
function Take(var value: TOmniValue): boolean;
function TryAdd(const value: TOmniValue): boolean;
function TryTake(var value: TOmniValue; timeout_ms: cardinal = 0): boolean;
end; { IOmniBlockingCollection }

There’s also a class TOmniBlockingCollection which implements this interface. This class is public and can be used or reused in your code.

The blocking collection works in the following way:

  • Add will add new value to the collection (which is internally implemented as a queue (FIFO, first in, first out)).
  • CompleteAdding tells the collection that all data is in the queue. From now on, calling Add will raise an exception.
  • TryAdd is the same as Add except that it doesn’t raise an exception but returns False if the value can’t be added.
  • IsCompleted returns True after the CompleteAdding has been called.
  • Take reads next value from the collection. If there’s no data in the collection, Take will block until the next value is available. If, however, any other thread calls CompleteAdding while the Take is blocked, Take will unblock and return False.
  • TryTake is the same as Take except that it has a timeout parameter specifying maximum time the call is allowed to wait for the next value.
  • Enumerator calls Take in the MoveNext method and returns that value. Enumerator will therefore block when there is no data in the collection. The usual way to stop the enumerator is to call CompleteAdding which will unblock all pending MoveNext calls and stop enumeration. [For another approach see the example at the end of this article.]

The trivial parts

Most of the blocking collection code is fairly trivial.

Add just calls TryAdd and raises an exception if TryAdd fails.

procedure TOmniBlockingCollection.Add(const value: TOmniValue);
if not TryAdd(value) then
raise ECollectionCompleted.Create('Adding to completed collection');
end; { TOmniBlockingCollection.Add }

CompleteAdding sets two “completed” flags – one boolean flag and one Windows event. Former is used for speed in non-blocking tests while the latter is used when TryTake has to block.

procedure TOmniBlockingCollection.CompleteAdding;
if not obcCompleted then begin
obcCompleted := true;
end; { TOmniBlockingCollection.CompleteAdding }

Take calls the TryTake with the INFINITE timeout.

function TOmniBlockingCollection.Take(var value: TOmniValue): boolean;
Result := TryTake(value, INFINITE);
end; { TOmniBlockingCollection.Take }

TryAdd checks if CompleteAdding has been called. If not, the value is stored in the dynamic queue.

There’s a potential problem hiding in the TryAdd – between the time the completed flag is checked and the time the value is enqueued, another thread may call CompleteAdding. Strictly speaking, TryAdd should not succeed in that case. However, I cannot foresee a parallel algorithm where this could cause a problem.

function TOmniBlockingCollection.TryAdd(const value: TOmniValue): boolean;
// CompleteAdding and TryAdd are not synchronised
Result := not obcCompleted;
if Result then
end; { TOmniBlockingCollection.TryAdd }

Easy peasy.

The not so trivial part

And now for something completely different …

TryTake is a whole different beast. It must:

  • retrieve the data
  • observe IsCompleted
  • block when there’s no data and observer is completed
  • observe the timeout limitations

Not so easy.

In addition to the obcCompletedSignal (completed event) and obcCollection (dynamic data queue) it will also use obcObserver (a queue change mechanism used inside the OTL) and obcResourceCount, which is an instance of the TOmniResourceCount (inverse semaphore, introduced in Part 1). All these are created in the constructor:

constructor TOmniBlockingCollection.Create(numProducersConsumers: integer);
inherited Create;
if numProducersConsumers > 0 then
obcResourceCount := TOmniResourceCount.Create(numProducersConsumers);
obcCollection := TOmniQueue.Create;
obcCompletedSignal := CreateEvent(nil, true, false, nil);
obcObserver := CreateContainerWindowsEventObserver;
obcSingleThreaded := (Environment.Process.Affinity.Count = 1);
if obcSingleThreaded then
obcCollection.ContainerSubject.Attach(obcObserver, coiNotifyOnAllInserts);
end; { TOmniBlockingCollection.Create }

TryTake is pretty long so I’ve split it into two parts. Let’s take a look at the non-blocking part first.

First, the code tries to retrieve data from the dynamic queue. If there’s data available, it is returned. End of story.

Otherwise, the completed flag is checked. If CompleteAdding has been called, TryTake returns immediately. It also returns if timeout is 0.

Otherwise, the code prepares for the blocking wait. Resource counter is allocated (reasons for this will be provided later), and observer is attached to the blocking collection. This observer will wake the blocking code when new value is stored in the collection.

[In the code below you can see a small optimization – if the code is running on a single core then the observer is attached in the TOmniBlockingCollection constructor and detached in the destructor. Before this optimization was introduced, Attach and Detach spent much too much time in busy-wait code (on a single-core computer).]

After all that is set, the code waits for the value (see the next code block), observer is detached from the queue and resource counter is released.

function TOmniBlockingCollection.TryTake(var value: TOmniValue;
timeout_ms: cardinal): boolean;
awaited : DWORD;
startTime : int64;
waitHandles: array [0..2] of THandle;
if obcCollection.TryDequeue(value) then
Result := true
else if IsCompleted or (timeout_ms = 0) then
Result := false
else begin
if assigned(obcResourceCount) then
if not obcSingleThreaded then
obcCollection.ContainerSubject.Attach(obcObserver, coiNotifyOnAllInserts);
//wait for the value, see the next code block below
if not obcSingleThreaded then
obcCollection.ContainerSubject.Detach(obcObserver, coiNotifyOnAllInserts);
if assigned(obcResourceCount) then
end; { TOmniBlockingCollection.TryTake }

Blocking part starts by storing the current time (millisecond-accurate TimeGetTime is used) and preparing wait handles. Then it enters the loop which repeats until the CompleteAdding has been called or timeout has elapsed (the Elapsed function which I’m not showing here for the sake of simplicty; see the source) or a value was dequeued.

In the loop, the code tries again to dequeue a value from the dynamic queue and exits the loop if dequeue succeeds. Otherwise, a WaitForMultipleObjects is called. This wait waits for one of three conditions:

  • Completed event. If this event is signalled, CompleteAdding has been called and TryTake must exit.
  • Observer event. If this event is signalled, new value was enqueued into the dynamic queue and code must try to dequeue this value.
  • Resource count event. If this event is signalled, all resources are used and the code must exit (more on that later).
        startTime := DSiTimeGetTime64;
waitHandles[0] := obcCompletedSignal;
waitHandles[1] := obcObserver.GetEvent;
if assigned(obcResourceCount) then
waitHandles[2] := obcResourceCount.Handle;
Result := false;
while not (IsCompleted or Elapsed) do begin
if obcCollection.TryDequeue(value) then begin
Result := true;
break; //while
awaited := WaitForMultipleObjects(IFF(assigned(obcResourceCount), 3, 2),
@waitHandles, false, TimeLeft_ms);
if awaited <> WAIT_OBJECT_1 then begin
if awaited = WAIT_OBJECT_2 then
Result := false;
break; //while

If new value was enqueued into the dynamic queue, TryDequeue is called again. It is entirely possible that another thread calls that function first and removes the value causing TryDequeue to fail and WaitForMultipleObjects to be called again. Such is life in the multithreaded world.

Enumerating the blocking collection

TOmniBlockingCollection enumerator is slightly more powerful than the usual Delphi enumerator. In addition to the usual methods it contains function Take which is required by the Parallel architecture (see Parallel.For and Parallel.ForEach.Aggregate for more information).

  IOmniValueEnumerator = interface ['{F60EBBD8-2F87-4ACD-A014-452F296F4699}']
function GetCurrent: TOmniValue;
function MoveNext: boolean;
function Take(var value: TOmniValue): boolean;
property Current: TOmniValue read GetCurrent;
end; { IOmniValueEnumerator }
  TOmniBlockingCollectionEnumerator = class(TInterfacedObject,
constructor Create(collection: TOmniBlockingCollection);
function GetCurrent: TOmniValue; inline;
function MoveNext: boolean; inline;
function Take(var value: TOmniValue): boolean;
property Current: TOmniValue read GetCurrent;
end; { TOmniBlockingCollectionEnumerator }

The implementation is trivial.

constructor TOmniBlockingCollectionEnumerator.Create(collection: TOmniBlockingCollection);
obceCollection_ref := collection;
end; { TOmniBlockingCollectionEnumerator.Create }

function TOmniBlockingCollectionEnumerator.GetCurrent: TOmniValue;
Result := obceValue;
end; { TOmniBlockingCollectionEnumerator.GetCurrent }

function TOmniBlockingCollectionEnumerator.MoveNext: boolean;
Result := obceCollection_ref.Take(obceValue);
end; { TOmniBlockingCollectionEnumerator.MoveNext }

function TOmniBlockingCollectionEnumerator.Take(var value: TOmniValue): boolean;
Result := MoveNext;
if Result then
value := obceValue;
end; { TOmniBlockingCollectionEnumerator.Take }


A not-so-simple how to on using the blocking collection can be seen in the demo 34_TreeScan. It uses the blocking collection to scan a tree with multiple parallel threads. This demo works in Delphi 2007 and newer.

A better example of using the blocking collection is in the demo 35_ParallelFor. Actually, it uses the same approach as demo 34 to scan the tree, except that the code is implemented as an anonymous method which causes it to be much simpler than the D2007 version. Of course, this demo works only in Delphi 2009 and above.

This is the full parallel scanner from the 35_ParallelFor demo:

function TfrmParallelForDemo.ParaScan(rootNode: TNode; value: integer): TNode;
cancelToken: IOmniCancellationToken;
nodeQueue : IOmniBlockingCollection;
nodeResult : TNode;
numTasks : integer;
nodeResult := nil;
cancelToken := CreateOmniCancellationToken;
numTasks := Environment.Process.Affinity.Count;
nodeQueue := TOmniBlockingCollection.Create(numTasks);
Parallel.ForEach(nodeQueue as IOmniValueEnumerable)
.NumTasks(numTasks) // must be same number of task as in
nodeQueue to ensure stopping

procedure (const elem: TOmniValue)
childNode: TNode;
node : TNode;
node := TNode(elem.AsObject);
if node.Value = value then begin
nodeResult := node;
else for childNode in node.Children do
Result := nodeResult;
end; { TfrmParallelForDemo.ParaScan }

The code first creates a cancellation token which will be used to stop the Parallel.ForEach loop. Number of tasks is set to number of cores accessible from the process and a blocking collection is created. Resource count for this collection is initialized to the number of tasks (parameter to the TOmniBlockingCollection.Create). The root node of the tree is added to the blocking collection.

Then the Parallel.ForEach is called. The IOmniValueEnumerable aspect of the blocking collection is passed to the ForEach. Currently, this is the only way to provide ForEach with data. This interface just tells the ForEach how to generate enumerator for each worker thread. [At the moment, each worker requires a separate enumerator. This may change in the future.]

  IOmniValueEnumerable = interface ['{50C1C176-C61F-41F5-AA0B-6FD215E5159F}']
function GetEnumerator: IOmniValueEnumerator;
end; { IOmniValueEnumerable }

The code also passes cancellation token to the ForEach loop and starts the parallel execution (call to Execute). In each parallel task, the following code is executed (this code is copied from the full ParaScan example above):

      procedure (const elem: TOmniValue)
childNode: TNode;
node : TNode;
node := TNode(elem.AsObject);
if node.Value = value then begin
nodeResult := node;
else for childNode in node.Children do

The code is provided with one element from the blocking collection (ForEach takes care of that). If the Value field is the value we’re searching for, nodeResult is set, blocking collection is put into CompleteAdding state (so that enumerators in other tasks will terminate blocking wait (if any)) and ForEach is cancelled.

Otherwise (not the value we’re looking for), all the children of the current node are added to the blocking collection. TryAdd is used (and its return value ignored) because another thread may call CompleteAdding while the for childNode loop is being executed.

That’s all! There is a blocking collection into which nodes are put (via the for childNode loop) and from which they are removed (via the ForEach infrastructure). If child nodes are not provided fast enough, blocking collection will block on Take and one or more tasks may sleep for some time until new values appear. Only when the value is found, the blocking collection and ForEach loop are completed/cancelled.

This is very similar to the code that was my inspiration for writing the blocking collection:

var targetNode = …;
var bc = new BlockingCollection<Node>(startingNodes);
// since we expect GetConsumingEnumerable to block, limit parallelism to the number of
// procs, avoiding too much thread injection
var parOpts = new ParallelOptions() { MaxDegreeOfParallelism = Enivronment.ProcessorCount };
Parallel.ForEach(bc.GetConsumingEnumerable(), parOpts, (node,loop) =>
    if (node == targetNode)
        foreach(var neighbor in node.Neighbors) bc.Add(neighbor);

However, this C# code exhibits a small problem. If the value is not to be found in the tree, the code never stops. Why? All tasks eventually block in the Take method (because complete tree has been scanned) and nobody calls CompleteAdding and loop.Stop. Does the Delphi code contains the very same problem?

Definitely not! That’s exactly why the resource counter was added to the blocking collection!

If the blocking collection is initialized with number of resources greater then zero, it will allocate a resource counter in the constructor. This resource counter is allocated just before the thread blocks in TryTake and released after that. Each blocking wait in TryTake waits for this resource counter to become signalled. If all threads try to execute blocking wait, this resource counter drops to zero, signals itself and unblocks all TryTake calls!

This elegant solution has only one problem – resource counter must be initialized to the number of threads that will be reading from the blocking collection. That’s why in the code above (ParaScan) same number is passed to the blocking collection constructor (resource counter initialization) and to the ForEach.NumTasks method (number of parallel threads).


TOmniBlockingCollection will be available in the OmniThreadLibrary 1.05, which will be released in few days.

For the impatient there is OTL 1.05 Release Candidate. The only code that will change between 1.05 RC and release are possible bug fixes.

No comments:

Post a Comment