Monday, June 14, 2010

OmniThreadLibrary 2.0 sneak preview [1]

You may have noticed that I’ve been strangely silent for the past two months. The reason for that is OmniThreadLibrary version 2. [And lots of other important work that couldn’t wait. And the OmniThreadLibrary version 2.]

The OTL 2.0 is not yet ready but I’ve decided to pre-announce some features. They are, after all, available to all programmers following the SVN trunk.

While the focus of the OTL 1 was to provide programmers with simple to use multithreading primitives, OTL 2 focuses mostly on the higher-level topics like parallel for and futures.

Caveat: Parallel For and Futures will work only in Delphi 2009 and newer. The implementation of both heavily depends on generics and anonymous methods and those are simply not available in Delphi 2007. Sorry, people. [I’m sad too – I’m still using Delphi 2007 for my day job.]

Parallel For

Parallel.ForEach was introduced in release 1.05 but that version was purely “technical preview” – a simple “let’s see if this can be done at all” implementation. In the last few months, Parallel.ForEach backend was completely redesigned which allowed the frontend (the API) to be vastly improved.

The basic ForEach(from, to: integer) has not changed much. The only difference is that the parameter type of the Execute delegate is now “integer” and not “TOmniValue”.

  Parallel.ForEach(1, testSize).Execute(
procedure (const elem: integer)
begin
if IsPrime(elem) then
outQueue.Add(elem);
end);

A trivial example, of course, but it shows the simplicity of Parallel.ForEach. The code passed to the Execute will be executed in parallel on all possible cores. [The outQueue parameter is of type TOmniBlockingCollection which allows Add to be called from multiple threads simultaneously.]

If you have data in a container that supports enumeration (with one limitation – enumerator must be implemented as a class, not as an interface or a record) then you can enumerate over it in parallel.

  var
nodeList := TList.Create;
Parallel.ForEach<integer>(nodeList).Execute(
procedure (const elem: integer)
begin
if IsPrime(elem) then
outQueue.Add(elem);
end);

The new ForEach backend allows parallel loops to be executed asynchronously. In the code sample below, the parallel loop tests numbers for primeness and adds primes to a TOmniBlockingCollection queue. A normal for loop, executing in parallel with the parallel loop, reads numbers from this queue and displays them on the screen.

var
prime : TOmniValue;
primeQueue: IOmniBlockingCollection;
begin
lbLog.Clear;
primeQueue := TOmniBlockingCollection.Create;
Parallel.ForEach(1, 1000).NoWait
.OnStop(
procedure
begin
primeQueue.CompleteAdding;
end)
.Execute(
procedure (const value: integer)
begin
if IsPrime(value) then begin
primeQueue.Add(value);
end;
end);
for prime in primeQueue do begin
lbLog.Items.Add(IntToStr(prime));
lbLog.Update;
end;
end;

This code depends on a TOmniBlockingCollection feature, namely that the enumerator will block when the queue is empty unless CompleteAdding is called [more info]. That’s why the OnStop delegate must be provided – without it the “normal” for loop would never stop. (It would just wait forever on the next element.)

While this shows two powerful functions (NoWait and OnStop) it is also kind of complicated and definitely not a code I would want to write too many times. That’s why OmniThreadLibrary also provides a syntactic sugar in a way of the Into function.

var
prime : TOmniValue;
primeQueue: IOmniBlockingCollection;
begin
lbLog.Clear;
primeQueue := TOmniBlockingCollection.Create;
Parallel.ForEach(1, 1000).PreserveOrder.NoWait
.Into(primeQueue)
.Execute(
procedure (const value: integer; var res: TOmniValue)
begin
if IsPrime(value) then
res := value;
end);
for prime in primeQueue do begin
lbLog.Items.Add(IntToStr(prime));
lbLog.Update;
end;
end;

This code demoes few different enhacements to the ForEach loop. Firstly, you can order the Parallel subsystem to preserve input order by calling the PreservedOrder function. [In truth, this function doesn’t work yet. That’s the part I’m currently working on.]

Secondly, because Into is called, ForEach will automatically call CompleteAdding on the parameter passed to the Into when the loop completes. No need for the ugly OnStop call.

Thirdly, Execute (also because of the Into) takes a delegate with a different signature. Instead of a standard ForEach signature procedure (const value: T) you have to provide it with a procedure (const value: integer; var res: TOmniValue). If the output parameter (res) is set to any value inside this delegate, it will be added to the Into queue and if it is not modified inside the deletage, it will not be added to the Into queue.  Basically, the parallel loop body is replaced with the code below and this code calls your own delegate (loopBody).

        result := TOmniValue.Null;
while (not Stopped) and localQueue.GetNext(value) do begin
loopBody(value, result);
if not result.IsEmpty then begin
oplIntoQueueObj.Add(result)
result := TOmniValue.Null;
end;
end;
oplIntoQueueObj.CompleteAdding;

The NoWait and Into provide you with a simple way to chain Parallel loops and implement multiple parallel processing stages. [Although this works in the current version, the OtlParallel does nothing to balance the load between all active Parallel loops. I’m not yet completely sure that this will be supported in the 2.0 release.]

var
dataQueue : IOmniBlockingCollection;
prime : TOmniValue;
resultQueue: IOmniBlockingCollection;
begin
lbLog.Clear;
dataQueue := TOmniBlockingCollection.Create;
resultQueue := TOmniBlockingCollection.Create;
Parallel.ForEach(1, 1000)
.NoWait.Into(dataQueue).Execute(
procedure (const value: integer; var res: TOmniValue)
begin
if IsPrime(value) then
res := value;
end
);
Parallel.ForEach<integer>(dataQueue as IOmniValueEnumerable)
.NoWait.Into(resultQueue).Execute(
procedure (const value: integer; var res: TOmniValue)
begin
// Sophie Germain primes
if IsPrime(2*value + 1) then
res := value;
end
);
for prime in primeQueue do begin
lbLog.Items.Add(IntToStr(prime));
lbLog.Update;
end;
end;

[BTW, there will be a better way to enumerate over TOmniBlockingCollection in the OTL 2.0 release. Passing “dataQueue as IOmniValueEnumerable” to the ForEach is ugly.]

If you want to iterate over something very nonstandard, you can write a “GetNext” delegate:

    Parallel.ForEach<integer>(
function (var value: integer): boolean
begin
value := i;
Result := (i <= testSize);
Inc(i);
end)
.Execute(
procedure (const elem: integer)
begin
outQueue.Add(elem);
end);

In case you wonder what the possible iteration sources are, here’s the full list:

    ForEach(const enumerable: IOmniValueEnumerable): IOmniParallelLoop; 
ForEach(const enum: IOmniValueEnumerator): IOmniParallelLoop;
ForEach(const enumerable: IEnumerable): IOmniParallelLoop;
ForEach(const enum: IEnumerator): IOmniParallelLoop;
ForEach(const sourceProvider: TOmniSourceProvider): IOmniParallelLoop;
ForEach(enumerator: TEnumeratorDelegate): IOmniParallelLoop;
ForEach(low, high: integer; step: integer = 1): IOmniParallelLoop<integer>;
ForEach<T>(const enumerable: IOmniValueEnumerable): IOmniParallelLoop<T>;
ForEach<T>(const enum: IOmniValueEnumerator): IOmniParallelLoop<T>;
ForEach<T>(const enumerable: IEnumerable): IOmniParallelLoop<T>;
ForEach<T>(const enum: IEnumerator): IOmniParallelLoop<T>;
ForEach<T>(const enumerable: TEnumerable<T>): IOmniParallelLoop<T>;
ForEach<T>(const enum: TEnumerator<T>): IOmniParallelLoop<T>;
ForEach<T>(enumerator: TEnumeratorDelegate<T>): IOmniParallelLoop<T>;
ForEach(const enumerable: TObject): IOmniParallelLoop;
ForEach<T>(const enumerable: TObject): IOmniParallelLoop<T>;

The last two versions are used to iterate over any object that supports class-based enumerators. Sadly, this feature is only available in Delphi 2010 because it uses extended RTTI to access the enumerator and its methods.

Parallel For Implementation

The backend allows for efficient parallel enumeration even when the enumeration source is not threadsafe. You can be assured that the data passed to the ForEach will be accessed only from one thread at the same time (although this will not always be the same thread). Only in special occasions, when backend knows that the source is threadsafe (for example when IOmniValueEnumerator is passed to the ForEach), the data will be accessed from multiple threads at the same time.

I’m planning to write an article of the parallel for implementation but it will have to wait until the PreserveOrder is implemented. At the moment backend implementation is not fixed yet.

3 comments:

  1. Great Stuff Primoz.

    Thanks for your work.

    I use OTL very often.

    ReplyDelete
  2. Hi, I it possible to use OTL v 1.x with C++ Builder 2007? I so are there any limitations?

    ReplyDelete
  3. @macma77: Sorry, I have no idea.

    ReplyDelete