Friday, January 08, 2010

Parallel.For

Just thinking out loud:

function TfrmParallelForDemo.ParaScan(rootNode: TNode; value: integer): TNode;
var
nodeResult: TNode;
nodeQueue : TOmniBlockingCollection;
begin
nodeResult := nil;
nodeQueue := TOmniBlockingCollection.Create;
try
nodeQueue.Add(rootNode);
Parallel.ForEach(nodeQueue.GetEnumerator).Timeout(10*1000).Execute(
procedure (const elem: TOmniValue)
var
node : TNode;
iNode: integer;
begin
node := TNode(elem.AsPointer);
if node.Value = value then begin
nodeResult := node;
nodeQueue.CompleteAdding;
end
else for iNode := 0 to node.NumChild - 1 do
nodeQueue.TryAdd(node.Child[iNode]);
end);
finally FreeAndNil(nodeQueue); end;
Result := nodeResult;
end; { TfrmParallelForDemo.ParaScan }

I can make it compile (just did) and I think I can make it work.

Useful? Simple enough? What do you think?

Thursday, January 07, 2010

Three steps to the blocking collection: [1] Inverse semaphore

In What’s new for the coordination data structures in Beta 2? Joshua Phillips published following algorithm demonstrating a use of BlockingCollection in Parallel Extensions Beta 2.

var targetNode = …;
var bc = new BlockingCollection<Node>(startingNodes);
// since we expect GetConsumingEnumerable to block, limit parallelism to the number of
// procs, avoiding too much thread injection
var parOpts = new ParallelOptions() { MaxDegreeOfParallelism = Enivronment.ProcessorCount };
Parallel.ForEach(bc.GetConsumingEnumerable(), parOpts, (node,loop) =>
{
    if (node == targetNode)
    {
        Console.WriteLine(“hooray!”);
        bc.CompleteAdding();
        loop.Stop();
    }
    else
    {
        foreach(var neighbor in node.Neighbors) bc.Add(neighbor);
    }
});

Even if you’re not familiar with C# and Parallel Extensions, this code is fairly simple to read. It implements a parallel search in a tree and writes “hooray” when a node is found.

When I saw this code I thought to myself: “Hmmm, this BlockingCollection really looks neat. Maybe I can add it to the OmniThreadLibrary.” And as I had some free time (shockingly, I know) I started coding. Soon I noticed a problem in that search algorithm. While it looks like a nice piece of code, it exhibits a problem that makes it mostly unusable in real world applications. Can you spot it? [I should add that the authors are aware of the problem and they decided to ignore it while writing this code fragment for the sake of simplicity.]

To solve it, I needed an inverse semaphore. This is an interesting synchronisation tool which is sadly not implemented in Win32 API. It differs from the ordinary semaphore in one important way – ordinary semaphore is signalled while greater than zero and inverse semaphore is signalled when it is equal to zero. You have no idea what I’m talking about? Here’s a more elaborate description …

[BTW, I googled for an authoritative definition of the inverse semaphore but couldn’t find one so this is my own approximation of the concept.]

A semaphore is a counting synchronisation object that starts at some value (typically greater than 0). This value typically represents a number of available resources (concurrent connections etc). To allocate a semaphore, one waits on it. If the semaphore count is > 0, the semaphore is signalled, wait will succeed and semaphore count gets decremented by 1. [Of course, all of this executes atomically.] If the semaphore count is 0, the semaphore is not signalled and wait will block until the timeout or until other thread releases the semaphore, which increments the semaphore’s count and puts it into the signalled state. [‘Nuff said. If you want to read more about semaphores, I’m recommending The Little Book of Semaphores, a free textbook on all things semaphorical.]

Inverse semaphore, on the other hand, gets signalled when the count drops to 0. This allows another thread to execute a blocking wait, which will succeed only when the semaphore’s count is 0. Why is that good, you’ll ask? Because it simplifies resource exhaustion detection. If you an inverse semaphore and this semaphore becomes signalled, then you know that the resource is fully used. And why is that good, you’ll ask? Well, you’ll have to wait until the Part 3 to learn the answer.

OTL’s inverse semaphore lives in the OtlSync unit and is called TOmniResourceCount. It also implements IOmniResourceCount in case you want to use it through the interface.

IOmniResourceCount = interface ['{F5281539-1DA4-45E9-8565-4BEA689A23AD}']
function GetHandle: THandle;
//
function Allocate: cardinal;
function Release: cardinal;
function TryAllocate(var resourceCount: cardinal; timeout_ms: cardinal = 0): boolean;
property Handle: THandle read GetHandle;
end; { IOmniResourceCount }

Resource count (let’s call it that from now on) starts at some count (passed to the constructor). Allocate will block if this count is 0 (until the count becomes greater than 0), otherwise it will decrement the count. The new value of the counter is returned as a function result. [Keep in mind that this number may not be valid even at the time the function returned if other threads are using the same resource count.]

Release increments the count and unblocks waiting Allocates. New resource count (potentially invalid at the moment caller will see it) is returned as the result.

Then there is TryAllocate – a safer version of Allocate taking a timeout parameter (which may be set to INFINITE) and returning success/fail status as a function result.

Finally, there is a Handle property exposing a handle which is signalled when resource count is 0 and unsignalled otherwise.

  TOmniResourceCount = class(TInterfacedObject, IOmniResourceCount)
strict private
orcAvailable : TDSiEventHandle;
orcHandle : TDSiEventHandle;
orcLock : TOmniCS;
orcNumResources: TGp4AlignedInt;
protected
function GetHandle: THandle;
public
constructor Create(initialCount: cardinal);
destructor Destroy; override;
function Allocate: cardinal; inline;
function Release: cardinal;
function TryAllocate(var resourceCount: cardinal; timeout_ms: cardinal = 0): boolean;
property Handle: THandle read GetHandle;
end; { TOmniResourceCount }

Internally, orcNumResources is used to manage the resource count, orcLock provides internal locking (I never said that my inverse semaphore is lock free), orcHandle is externally visible event that gets signalled when resource count drops to zero and orcAvailable is an internal event which is signalled when resource count is above zero (just like in a standard semaphore).

Some parts are really really (really!) simple.

constructor TOmniResourceCount.Create(initialCount: cardinal);
begin
inherited Create;
orcHandle := CreateEvent(nil, true, (initialCount = 0), nil);
orcAvailable := CreateEvent(nil, true, (initialCount <> 0), nil);
orcNumResources.Value := initialCount;
end; { TOmniResourceCount.Create }

destructor TOmniResourceCount.Destroy;
begin
DSiCloseHandleAndNull(orcHandle);
DSiCloseHandleAndNull(orcAvailable);
inherited;
end; { TOmniResourceCount.Destroy }

function TOmniResourceCount.GetHandle: THandle;
begin
Result := orcHandle;
end; { TOmniResourceCount.GetHandle }

function TOmniResourceCount.Allocate: cardinal;
begin
TryAllocate(Result, INFINITE);
end; { TOmniResourceCount.Allocate }

Release is only slightly more complicated as it has to provide atomic ‘change, test and signal’ operation.

function TOmniResourceCount.Release: cardinal;
begin
orcLock.Acquire;
try
Result := cardinal(orcNumResources.Increment);
if Result = 1 then begin
ResetEvent(orcHandle);
SetEvent(orcAvailable);
end;
finally orcLock.Release; end;
end; { TOmniResourceCount.Release }

Now TryAllocate, that’s the problematic one. Lets take a look at how it would be defined if there was no timeout parameter.

function TOmniResourceCount.TryAllocate(var resourceCount: cardinal): boolean;
begin
Result := false;
orcLock.Acquire;
repeat
if orcNumResources.Value = 0 then begin
orcLock.Release;
if WaitForSingleObject(orcAvailable, INFINITE) <> WAIT_OBJECT_0 then
Exit;
orcLock.Acquire;
end;
if orcNumResources.Value > 0 then begin
resourceCount := cardinal(orcNumResources.Decrement);
if resourceCount = 0 then begin
SetEvent(orcHandle);
ResetEvent(orcAvailable);
end;
break; //repeat
end;
until false;
orcLock.Release;
end; { TOmniResourceCount.TryAllocate }

The code first locks the internal lock. If there are no free resources, it will release the lock (so a Release in another thread can execute), wait on the available handle to become signalled (that will happen when a Release is called) and relock the internal lock. Then it will again check the resource count (it might get allocated by another thread between the WaitForSingleObject and Acquire) and decrement it if a resource is available. When resource count drops to zero, events are set/reset appropriately.

In reality, TryAllocate doesn’t loop infinitely (well, it does if you pass it the INFINITE timeout) and calculates appropriate timeout that is passed to the WaitForSingleObject. Check the source if you want to learn how that is done.

That’s about all that can be written about TOmniResourceCount. Next time, I’ll tackle something much more interesting – a microlocking, O(1) insert/remove (well, most of the time ;) ), dynamically allocated queue.