Friday, December 31, 2010

Happy New Year 2011

shadows

See you around in 2011 – with some luck maybe even in person.

[Picture of the living room wall taken by my trusty Panasonic Lumix DMC-LX5 and heavily processed in Adobe Photoshop Lightroom 3.]

Saturday, December 25, 2010

Gp* Holiday Update

As always, all units are available on the Google Code.

DSiWin32 1.60

  • When compiled with D2007 or newer, unit FileCtrl is not included.
  • Call UniqueString before calling CreateProcessW.
  • [Tommi Prami] Added types missing in Delphi 7.
  • Added function DSiDisableStandby that will try to disable standby and hibernate on Windows XP SP 2 and newer.
  • Define TStartupInfoW in Delphi 7 and earlier.
  • DSiAddApplicationToFirewallExceptionList[Advanced|XP] got a new parameter
    TDSiFwResolveConflict (default rcDuplicate) where the caller can specify
    behaviour if the rule with the same name already exists.
    [rcDuplicate = add new rule with the same name, rcOverwrite = remove all rules
    with the same name and then add the new rule, rcSkip = leave existing rules
    intact and don't add the new rule]
  • Implemented DSiFindApplicationInFirewallExceptionList[Advanced|XP].
  • Bug fix in DSiAddApplicationToFirewallExceptionListAdvanced: setting rule.ServiceName to '' caused fwPolicy2.Rules.Add(rule) to raise exception.

Monday, December 13, 2010

You’ve got mail!

It was only three days ago that I released the new OTL and already I’ve got a gift – and it came from the other side of the world, from a place so far ago that Google Maps doesn’t know how to calculate directions to guide me there, from Australia!

sea mail

No, I’m not kidding about the Google Maps.

gmaps

Inside were books, books and more books. Chris, thanks a lot!

Friday, December 10, 2010

Out with the old … in with the two

OmniThreadLibrary 1.05 was a great success …

image

… but all good things must come to an end. Don’t worry, I’m not ending the project. It’s just that there’s a new player in town. Ladies and gentlemen, in the left corner I give you …

image

… OmniThreadLibrary 2.0! Get it while its hot!

Thursday, December 02, 2010

OmniThreadLibrary 2.0 TODO

Only few things to do before the alpha release:

  • Parallel.Pipeline must support cancellation;
  • Parallel.ForEach would benefit from task-local state (maybe will be implemented after the 2.0 release);
  • Cleanup various small TODOs;
  • Add project files for Delphi XE;
  • Retest everything.

Wednesday, December 01, 2010

OmniThreadLibrary Documentation

Thanks to the great people of DevJET software there may something (finally) happen on the very much neglected documentation front. They have donated a Documentation Insight license to the OmniThreadLibrary and I intend to use it to add documentation here and there while I’m working on the project.

A big Thank You to the DevJET!

Documentation Insight & OmniThreadLibrary

The next part of the puzzle is how to export XmlDoc documentation into some useful form. I know Delphi XE can do html+javascript export but I don’t believe that it’s included in the Professional release which I have (but correct me if I’m wrong). It would be even better if it could be converted into the Wiki format used by the Google Code. Can you help me with suggestions?

Tuesday, November 23, 2010

Multistage processes with the OmniThreadLibrary

The OtlParallel unit in OmniThreadLibrary offers some high-level solutions that allow you to easily run some kinds of processes in parallel. Up to now it supported self-contained background calculations (Parallel.Future), independent parallel processes (Parallel.Join) and loop calculations where the background task is stateless (i.e. it only depends on the input – the loop value – and not on calculations done on other inputs – loop values; Parallel.ForEach). But lots of time processes don’t fall into any of those categories.

Mason Wheeler recently suggested adding support for multistage processes. Over the weekend I’ve implemented it in the OmniThreadLibrary and now this feature is ready for you to start testing. Great thanks to Mason for his suggestion and also for work on the implementation and design of newly introduced Parallel.Pipeline.

Saturday, November 20, 2010

ITDevCon 2010

ITDevCon has finished almost a 24 hours ago but I was simply too tired to write anything about it until now. In short – great event!

There was close to 50 participants and sessions were in most cases nicely filled up. Three tracks were running in parallel and sometimes it was quite hard to choose the most interesting session. I was quite sad as I was not able to see any of Alexander Alexeev’s sessions on debugging (the first one overlapped with my own session and in the second case seeing Ray Konopka speaking of user interface design trumped all other options). Besides Ray session which was definitely the best on the ITDevCon (Ray told me to say so ;) ) I have to mention Paweł’s expose on DataSnap and Thierry’s multitouch session which were both very interesting.

Monday, November 15, 2010

Embarcadero tech support should pull its act together

For the past few days I have fought a small war with the Embarcadero registration system. For some unknown reason, Delphi XE installed fine but when I tried to register it, I got back
“No  valid  license information found for Embarcadero(R) Delphi(R) XE.
You  must provide a valid serial number in order to use Embarcadero(R)
Delphi(R) XE. Do you want to run the registration wizard again?”
The funny thing was that my registration count got incremented nevertheless! Of course I only noticed this after five failed installs when the sixth attempt told me that I’ve used my installation bonus and I can install no more Delphis :( Most probably there was some clash between the fresh Delphi XE installation, previous trial version and pre-previous [sorry I can’t tell you about it] version. Of course, I did try to fully remove all remnants of all installations but obviously I missed some important part.
At that point I did the right thing and opened a support case. A guy from Embarcadero support responded soon enough with a note that he bumped my registration count and I can retry. Of course I already knew that retrying won’t help so I asked him how to fully clean up my machine.
Then I waited 24 hours …
Next day I sent few more mails and finally got a pretty comprehensive clean-up list. Then only problem was that it was completely b0rken (as you’ll see in a second).
So I sent a response to that mail, requesting clarification of some items.
No reply.
Then the weekend occurred …
Nothing happened yesterday either. At last I sent another mail urging the support to please respond to my question and bump my registration count again (as I was already up to 10 registrations – only one of them really successful, and that one was on another machine).
Finally I got the reply. Actually, I received a mail stating that my registration count was bumped again. Nothing else. No reply to any of my questions. Nada, zip, zilch.
It looks like Embarcadero has tech support that can only do two things. 1. Bump registration count. 2. Send around bad instructions on how to remove remnants of Delphi from your system.
Sad smile
Embarcadero people, if you really want Delphi to be successful again, you should educate your tech support.
Luckily, I know enough about Delphi to clean my system even without answers to my questions so I was able to finally install XE. Hurrah!
I still have to prove that the cleanup list is really really bad, yes? Well, here it is, directly pasted from the support mail. My comments are in italic.
1. Uninstall the product via the "Embarcadero Rad Studio 2010" entry in "Uninstall a Program" in the Control Panel
I’m trying to uninstall Delphi XE so “Embarcadero RAD Studio XE” would be a better choice.
2. Remove the C:\Program Files\Codegear\Rad Studio\8.0 directory

Actually, folder is named \Embarcadero\, not \Codegear\.
3. Remove the C:\Users\Public\Public Documents\Rad Studio directory
That would trash the Delphi 2010 install. Only 8.0 subfolder has to be removed.
4. Remove the c:\ProgramData\CodeGear\Rad Studio\8.0 directory
Again, it’s \Embarcadero\.
5. Remove the HKEY_LOCAL_MACHINE\SOFTWARE\CodeGear\BDS\8.0 registry key
Same here.
6. Remove the HKEY_CURRENT_USER\SOFTWARE\CodeGear\BDS\8.0 registry key
And here.
7. Remove the
HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall\{A2B58B18-5D04-4006-9713-B6945880746E} registry key
8. Remove the
HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall\CodeGear RAD Studio 2010 registry key
Embarcadero RAD Studio XE, not CodeGear RAD Studio 2010.
9. Remove folders with a GUID name in C:\Documents and Settings\All Users\Application Data\   
In item 3, C:\Users was used. Here, C:\Documents and Settings. On every computer, one of those will not be the correct choice.
9a. You can find the specific GUID folder names to remove by browsing to the subkeys of HKEY_LOCAL_MACHINE\Software\MimarSinan\InstallAware\Ident.Cache\
10. Remove the following files from Windows\System32:   
10a. *120.bpl   
10b. *120.jdbg   
10c. *120.xml   

In Delphi XE, those files end in 150, not 120. And on a 64-bit system they live in \Windows\SysWOW64.
10d. bdeadmin.*   
10e. cc32*.dll   
10f. midas.*

Dear reader, please tell me – am I overreacting? Or somebody should be fired here for not doing his/her work (and I definitely don’t mean the poor tech support guy I was talking to)?

Tuesday, November 09, 2010

Friday, November 05, 2010

Tuesday, November 02, 2010

Tuesday, October 05, 2010

Too many what?

image

Delphi 2007.

Conditional symbols:

xFullDebugMode;xUseReservationList;xSimulateDVBMaster;xNoDongle;NoExcept;NoTrace;xLogDriverCalls;xNoDVBMaster;DEBUG;xDuplicateOutputToSimulator;xLogDelaying;TestSubtitleDelay;CatchUseOfFreedInterfaces;LockDriverCalls;NoDVBNowAdjustment;DetailedTooLateError

16 symbols.

257 characters in total.

Too many? Was Delphi 2007 built in DOS times?

Can somebody please check this with Delphi XE?

Friday, September 03, 2010

Ingenious!

Sometimes you find a code fragment that’s so great you only wish it was your idea …
const
  fm: array[Boolean] of Word = (fmCreate, fmOpenWrite);
fs := TFileStream.Create(fileName, fm[FileExists(fileName)]);

Obvious – in retrospective.

Wednesday, August 18, 2010

Friday, July 23, 2010

So terribly sorry …

Writing code ain’t that hard a job.

Writing correct code, well that’s a much harder venture.

And what can I say about writing correct multithreaded code? Only that it’s close to impossible.

That’s exactly why I started writing OmniThreadLibrary – I needed well-tested framework for multithreading processing.

But alas! there are bugs in OmniThreadLibrary too. Not many of them, true, but still they are there. Some are squashed soon, other remain hidden for a long time.

I found one such bug few days ago when I was searching for a reason why some code doesn’t parallelize well. In theory the speedup should be close to 8x (on a 8 core machine) but in practice the parallel code was only faster by a factor of 2 to 3.

At the end of a long day I found out a bug in the TOmniBlockingCollection that prevented all threads to be executing at once. Only two or three threads were really working and they did all the job – but only two to three times faster, of course.

The bug is now fixed in the trunk. Anybody can do a checkout and get a perfect (well, maybe not perfect but definitely a better-working) code.

Because this was quite an important fix I’ve also incorporated it into the 1.05 version of this unit. You can download it here. If you’re using 1.05 and TOmniBlockingCollection, you surely want to download the update.

I’m really sorry for letting this stupid bug slip through my testing. Won’t happen again. (Or maybe it will. Probably it will. Oh, heck, it surely will. I’ll just try to make such problems very rare. Promise.)

Monday, July 19, 2010

Scheduled OmniThreadLibrary presentations

On July 27th, I’ll be speaking for the Virtual Delphi Users Group. The topic will be “Parallel programming with OmniThreadLibrary”. Be aware that you have to register in advance if you want to participate (all questions will be answered!). The recording will be available some time after the presentation. [At the moment, the VDUG server has some occasional problems so be patient and/or retry later.]

On November 18 and 19, I'll be speaking at ITDev Con 2010. Two of my presentations will focus on OmniThreadLibrary and the third one on memory management with FastMM. All presentations will be given in English language.

Monday, July 12, 2010

TDM Rerun #16: Thread Pooling, The Practical Way

As we found out, the system thread pool in Windows 2000 (plus XP and 2003) is woefully inadequate for any serious use. It seems that its designer was only thinking about really trivial usage and expected everyone else to create their own thread pool. Luckily, it was possible to create a fully-fledged pooling layer based on the system thread pool and the application was saved.
- Thread Pooling, The Practial Way, The Delphi Magazine 112, December 2004
The December 2004 issue describes one of my first serious forays into the muddy waters of parallel processing. The article describes a work item pooling mechanism built in Windows (QueueUserWorkItem) and a management wrapper that I built around this API to make its use bearable.
The GpWinThreadPool unit (described in this article) has been later replaced with a TThread-based pool of my own design and that unit (GpThreadPool) was superseded by the thread pool built in the OmniThreadLibrary. The use of the code described in this TDM article is not really recommended (except maybe for the educational purposes).
Links: article (PDF, 116 KB), source code (ZIP, 2,6 MB).

Wednesday, June 30, 2010

Gp* units now available via SVN

Because YOU asked for it … my units are now available on Google Code.

Tuesday, June 29, 2010

Important DSiWin32 and GpSecurity update

1) Lots of things fixed in DSiWin32 and great thanks to Christian Wimmer for pointing out the problems and suggesting some solutions.

2) By my mistake a very internal GpSecurity containing parts of JWA got included in some downloadable ZIP files. This was a direct violation of the JWA license and I apologize deeply for that. To fix this problem, a new GpSecurity was released which depends on the JWA.

Monday, June 21, 2010

Built for speed

Unlike C and derivatives, Delphi is speedy …

build

174.684 lines per second. What’s your compile speed?

Wednesday, June 16, 2010

The Future of Delphi

Future of T, that is. Or, in Delphi syntax, TFuture<T>.
Yesterday I wrote about futures in OmniThreadLibrary 2.0 (supported only in D2009+) and I mentioned that implementing futures in plain D2009+ should be really simple. And it really is – it took me all of 15 minutes to write the supporting library and a simple test case.
The code below is released to public domain. I’m claiming no copyrights – use it as you wish. You don’t even have to attribute it to me. Just don’t use it for evil purposes ;)
unit DelphiFuture;

interface

uses
  Classes;

type
  IFuture<T> = interface
    function Value: T;
  end;

  TFutureDelegate<T> = reference to function: T;

  TFutureThread<T> = class(TThread)
  strict private
    FAction: TFutureDelegate<T>;
    FResult: T;
  public
    constructor Create(action: TFutureDelegate<T>);
    procedure Execute; override;
    property Result: T read FResult;
  end;

  TFuture<T> = class(TInterfacedObject, IFuture<T>)
  strict private
    FResult: T;
    FWorker: TFutureThread<T>;
  public
    constructor Create(action: TFutureDelegate<T>);
    function Value: T;
  end;

implementation

uses
  SysUtils;

{ TFutureThread<T> }

constructor TFutureThread<T>.Create(action: TFutureDelegate<T>);
begin
  inherited Create(false);
  FAction := action;
end;

procedure TFutureThread<T>.Execute;
begin
  FResult := FAction();
end;

{ TFuture<T> }

constructor TFuture<T>.Create(action: TFutureDelegate<T>);
begin
  inherited Create;
  FWorker := TFutureThread<T>.Create(action);
end;

function TFuture<T>.Value: T;
begin
  if assigned(FWorker) then begin
    FWorker.WaitFor;
    FResult := FWorker.Result;
    FreeAndNil(FWorker);
  end;
  Result := FResult;
end;

end.
I’ve used my usual test case, calculating number of primes between 1 and 1.000.000.
implementation

uses
  DelphiFuture;

function IsPrime(i: integer): boolean;
var
  j: integer;
begin
  Result := false;
  if i <= 0 then
    Exit;
  for j := 2 to Round(Sqrt(i)) do
    if (i mod j) = 0 then
      Exit;
  Result := true;
end;

procedure TForm1.btnTestClick(Sender: TObject);
var
  numPrimes: IFuture<integer>;
begin
  numPrimes := TFuture<integer>.Create(function: integer
    var
      iPrime: integer;
    begin
      Result := 0;
      for iPrime := 1 to 1000000 do
        if IsPrime(iPrime) then
          Inc(Result);
    end
  );
  lbLog.Items.Add(Format('%d primes from 1 to 1000000',
    [numPrimes.Value]));
end;

Tuesday, June 15, 2010

OmniThreadLibrary 2.0 sneak preview [2]

Futures in the OTL were not planned – they just happened. In fact, they are so new that you won’t find them in the SVN. (Don’t worry, they’ll be committed soon.)

As a matter of fact, I always believed that futures must be supported by the compiler. That changed few weeks ago when somebody somewhere (sorry, can’t remember the time and place) asked if they can be implemented in the OmniThreadLibrary. That question made me rethink the whole issue and I found out that not only it’s possible to implement them without changing the compiler – the implementation is almost trivial!

In the OTL 2.0 you’ll be able to declare a future …

var
numPrimes: IOmniFuture<integer>;

… start the evaluation …

  numPrimes := TOmniFuture<integer>.Create(a delegate returning integer);

… and wait on the result.

  numPrimes.Value

As simple as that. Declare the IOmniFuture<T>, create TOmniFuture<T> and retrieve the result by calling Value: T.

As a real-world example, the code below creates a future that calculates number of primes from 1 to CPrimesHigh and displays this value.

var
numPrimes: IOmniFuture<integer>;
begin
numPrimes := TOmniFuture<integer>.Create(function: integer
var
i: integer;
begin
Result := 0;
for i := 1 to CPrimesHigh do
if IsPrime(i) then
Inc(Result);
end
);
// do something else
lbLog.Items.Add(Format('%d primes from 1 to %d',
[numPrimes.Value, CPrimesHigh]));
end;

As a general rule, I would recommend against putting too much of the code inside the future’s constructor. A following approach is more readable and easier to maintain.

function CountPrimesToHigh(high: integer): integer;
var
i: integer;
begin
Result := 0;
for i := 1 to CPrimesHigh do
if IsPrime(i) then
Inc(Result);
end;

var
numPrimes: IOmniFuture<integer>;
begin
numPrimes := TOmniFuture<integer>.Create(function: integer
begin
Result := CountPrimesToHigh(CPrimesHigh);
end
);
// do something else
lbLog.Items.Add(Format('%d primes from 1 to %d',
[numPrimes.Value, CPrimesHigh]));
end;

Or you can take another step and create a future factory. That’s especially recommended if you’ll be using futures of the same kind in different places.

function StartCountingPrimesTo(high: integer): TOmniFuture<integer>;
begin
Result := TOmniFuture<integer>.Create(function: integer
var
i: integer;
begin
Result := 0;
for i := 1 to high do
if IsPrime(i) then
Inc(Result);
end
);
end;

var
numPrimes: IOmniFuture<integer>;
begin
numPrimes := StartCountingPrimesTo(CPrimesHigh);
// do something else
lbLog.Items.Add(Format('%d primes from 1 to %d',
[numPrimes.Value, CPrimesHigh]));
end;

Implementation

Believe it or not, the whole implementation fits in 27 lines (not counting empty lines).

type
TOmniFutureDelegate<T> = reference to function: T;

IOmniFuture<T> = interface
function Value: T;
end; { IOmniFuture<T> }

TOmniFuture<T> = class(TInterfacedObject, IOmniFuture<T>)
private
ofResult: T;
ofTask : IOmniTaskControl;
public
constructor Create(action: TOmniFutureDelegate<T>);
function Value: T;
end; { TOmniFuture<T> }
constructor TOmniFuture<T>.Create(action: TOmniFutureDelegate<T>);
begin
ofTask := CreateTask(procedure (const task: IOmniTask)
begin
ofResult := action();
end,
'TOmniFuture action').Run;
end; { TOmniFuture<T>.Create }

function TOmniFuture<T>.Value: T;
begin
ofTask.Terminate;
ofTask := nil;
Result := ofResult;
end; { TOmniFuture<T>.Value }

As you can see, the whole OTL task support is only used to simplify background thread creation. It would be quite simple to implement futures around Delphi’s own TThread. In fact, I think I’ll just go ahead and implement it!

Monday, June 14, 2010

OmniThreadLibrary 2.0 sneak preview [1]

You may have noticed that I’ve been strangely silent for the past two months. The reason for that is OmniThreadLibrary version 2. [And lots of other important work that couldn’t wait. And the OmniThreadLibrary version 2.]

The OTL 2.0 is not yet ready but I’ve decided to pre-announce some features. They are, after all, available to all programmers following the SVN trunk.

While the focus of the OTL 1 was to provide programmers with simple to use multithreading primitives, OTL 2 focuses mostly on the higher-level topics like parallel for and futures.

Caveat: Parallel For and Futures will work only in Delphi 2009 and newer. The implementation of both heavily depends on generics and anonymous methods and those are simply not available in Delphi 2007. Sorry, people. [I’m sad too – I’m still using Delphi 2007 for my day job.]

Parallel For

Parallel.ForEach was introduced in release 1.05 but that version was purely “technical preview” – a simple “let’s see if this can be done at all” implementation. In the last few months, Parallel.ForEach backend was completely redesigned which allowed the frontend (the API) to be vastly improved.

The basic ForEach(from, to: integer) has not changed much. The only difference is that the parameter type of the Execute delegate is now “integer” and not “TOmniValue”.

  Parallel.ForEach(1, testSize).Execute(
procedure (const elem: integer)
begin
if IsPrime(elem) then
outQueue.Add(elem);
end);

A trivial example, of course, but it shows the simplicity of Parallel.ForEach. The code passed to the Execute will be executed in parallel on all possible cores. [The outQueue parameter is of type TOmniBlockingCollection which allows Add to be called from multiple threads simultaneously.]

If you have data in a container that supports enumeration (with one limitation – enumerator must be implemented as a class, not as an interface or a record) then you can enumerate over it in parallel.

  var
nodeList := TList.Create;
Parallel.ForEach<integer>(nodeList).Execute(
procedure (const elem: integer)
begin
if IsPrime(elem) then
outQueue.Add(elem);
end);

The new ForEach backend allows parallel loops to be executed asynchronously. In the code sample below, the parallel loop tests numbers for primeness and adds primes to a TOmniBlockingCollection queue. A normal for loop, executing in parallel with the parallel loop, reads numbers from this queue and displays them on the screen.

var
prime : TOmniValue;
primeQueue: IOmniBlockingCollection;
begin
lbLog.Clear;
primeQueue := TOmniBlockingCollection.Create;
Parallel.ForEach(1, 1000).NoWait
.OnStop(
procedure
begin
primeQueue.CompleteAdding;
end)
.Execute(
procedure (const value: integer)
begin
if IsPrime(value) then begin
primeQueue.Add(value);
end;
end);
for prime in primeQueue do begin
lbLog.Items.Add(IntToStr(prime));
lbLog.Update;
end;
end;

This code depends on a TOmniBlockingCollection feature, namely that the enumerator will block when the queue is empty unless CompleteAdding is called [more info]. That’s why the OnStop delegate must be provided – without it the “normal” for loop would never stop. (It would just wait forever on the next element.)

While this shows two powerful functions (NoWait and OnStop) it is also kind of complicated and definitely not a code I would want to write too many times. That’s why OmniThreadLibrary also provides a syntactic sugar in a way of the Into function.

var
prime : TOmniValue;
primeQueue: IOmniBlockingCollection;
begin
lbLog.Clear;
primeQueue := TOmniBlockingCollection.Create;
Parallel.ForEach(1, 1000).PreserveOrder.NoWait
.Into(primeQueue)
.Execute(
procedure (const value: integer; var res: TOmniValue)
begin
if IsPrime(value) then
res := value;
end);
for prime in primeQueue do begin
lbLog.Items.Add(IntToStr(prime));
lbLog.Update;
end;
end;

This code demoes few different enhacements to the ForEach loop. Firstly, you can order the Parallel subsystem to preserve input order by calling the PreservedOrder function. [In truth, this function doesn’t work yet. That’s the part I’m currently working on.]

Secondly, because Into is called, ForEach will automatically call CompleteAdding on the parameter passed to the Into when the loop completes. No need for the ugly OnStop call.

Thirdly, Execute (also because of the Into) takes a delegate with a different signature. Instead of a standard ForEach signature procedure (const value: T) you have to provide it with a procedure (const value: integer; var res: TOmniValue). If the output parameter (res) is set to any value inside this delegate, it will be added to the Into queue and if it is not modified inside the deletage, it will not be added to the Into queue.  Basically, the parallel loop body is replaced with the code below and this code calls your own delegate (loopBody).

        result := TOmniValue.Null;
while (not Stopped) and localQueue.GetNext(value) do begin
loopBody(value, result);
if not result.IsEmpty then begin
oplIntoQueueObj.Add(result)
result := TOmniValue.Null;
end;
end;
oplIntoQueueObj.CompleteAdding;

The NoWait and Into provide you with a simple way to chain Parallel loops and implement multiple parallel processing stages. [Although this works in the current version, the OtlParallel does nothing to balance the load between all active Parallel loops. I’m not yet completely sure that this will be supported in the 2.0 release.]

var
dataQueue : IOmniBlockingCollection;
prime : TOmniValue;
resultQueue: IOmniBlockingCollection;
begin
lbLog.Clear;
dataQueue := TOmniBlockingCollection.Create;
resultQueue := TOmniBlockingCollection.Create;
Parallel.ForEach(1, 1000)
.NoWait.Into(dataQueue).Execute(
procedure (const value: integer; var res: TOmniValue)
begin
if IsPrime(value) then
res := value;
end
);
Parallel.ForEach<integer>(dataQueue as IOmniValueEnumerable)
.NoWait.Into(resultQueue).Execute(
procedure (const value: integer; var res: TOmniValue)
begin
// Sophie Germain primes
if IsPrime(2*value + 1) then
res := value;
end
);
for prime in primeQueue do begin
lbLog.Items.Add(IntToStr(prime));
lbLog.Update;
end;
end;

[BTW, there will be a better way to enumerate over TOmniBlockingCollection in the OTL 2.0 release. Passing “dataQueue as IOmniValueEnumerable” to the ForEach is ugly.]

If you want to iterate over something very nonstandard, you can write a “GetNext” delegate:

    Parallel.ForEach<integer>(
function (var value: integer): boolean
begin
value := i;
Result := (i <= testSize);
Inc(i);
end)
.Execute(
procedure (const elem: integer)
begin
outQueue.Add(elem);
end);

In case you wonder what the possible iteration sources are, here’s the full list:

    ForEach(const enumerable: IOmniValueEnumerable): IOmniParallelLoop; 
ForEach(const enum: IOmniValueEnumerator): IOmniParallelLoop;
ForEach(const enumerable: IEnumerable): IOmniParallelLoop;
ForEach(const enum: IEnumerator): IOmniParallelLoop;
ForEach(const sourceProvider: TOmniSourceProvider): IOmniParallelLoop;
ForEach(enumerator: TEnumeratorDelegate): IOmniParallelLoop;
ForEach(low, high: integer; step: integer = 1): IOmniParallelLoop<integer>;
ForEach<T>(const enumerable: IOmniValueEnumerable): IOmniParallelLoop<T>;
ForEach<T>(const enum: IOmniValueEnumerator): IOmniParallelLoop<T>;
ForEach<T>(const enumerable: IEnumerable): IOmniParallelLoop<T>;
ForEach<T>(const enum: IEnumerator): IOmniParallelLoop<T>;
ForEach<T>(const enumerable: TEnumerable<T>): IOmniParallelLoop<T>;
ForEach<T>(const enum: TEnumerator<T>): IOmniParallelLoop<T>;
ForEach<T>(enumerator: TEnumeratorDelegate<T>): IOmniParallelLoop<T>;
ForEach(const enumerable: TObject): IOmniParallelLoop;
ForEach<T>(const enumerable: TObject): IOmniParallelLoop<T>;

The last two versions are used to iterate over any object that supports class-based enumerators. Sadly, this feature is only available in Delphi 2010 because it uses extended RTTI to access the enumerator and its methods.

Parallel For Implementation

The backend allows for efficient parallel enumeration even when the enumeration source is not threadsafe. You can be assured that the data passed to the ForEach will be accessed only from one thread at the same time (although this will not always be the same thread). Only in special occasions, when backend knows that the source is threadsafe (for example when IOmniValueEnumerator is passed to the ForEach), the data will be accessed from multiple threads at the same time.

I’m planning to write an article of the parallel for implementation but it will have to wait until the PreserveOrder is implemented. At the moment backend implementation is not fixed yet.

Wednesday, June 09, 2010

A seriously overdue update

DSiWin32 1.55

  • Implemented DSiHasElapsed64 and DSiElapsedTime64.
  • Implemented DSiLogonAs and DSiVerifyPassword.
  • DSiGetProcAddress made public.

GpHugeFile 6.02

  • Prefetching parameters are now configurable -  TGpHugeFileStream.Create and .CreateW got parameters waitObject and numPrefetchBuffers which are passed to ResetEx/RewriteEx.

GpLists 1.44

  • TStringList helper split into TStrings and TStringList helpers

GpStreams 1.30

  • Implemented TGpFileStream class and two SafeCreateGpFileStream functions.
  • Unicode fixes.
  • Disable inlining for Delphi 2007 because of compiler bugs.
  • Added functions AtEnd and BytesLeft, AsAnsiString property and WriteAnsiStr method to the TStream class helper.
  • Implemented TGpFixedMemoryStream.CreateA and fixed TGpFixedMemoryStream.Create.

GpStructuredStorage 2.0b

  • Important bug fix! When the folder was deleted, it was not removed from the folder cache. Because of that, subsequent FolderExists call succeeded instead of failed, which could cause all sorts of weird problems.

GpStuff 1.21

  • Implemented overloads for Increment and Decrement in TGp4AlignedInt and TGp8AlignedInt64.
  • Implemented Add/Subtract methods in TGp4AlignedInt and TGp8AlignedInt64.
  • OpenArrayToVarArray supports vtUnicodeString variant type.

GpSync 1.23

  • Message queue works with Unicode Delphi, backwards compatible.

GpTextStream 1.08

  • Implemented 'lines in a text stream' enumerator EnumLines.
  • Implemented TGpTextStream.EOF.
  • Implemented text stream filter FilterTxt.

All free as usual. Enjoy!

Monday, June 07, 2010

What drives us

Monkeys work harder when they are not rewarded. People do, too.

Daniel H. Pink [wikipedia] collected the evidence about that fact and wrote (supposedly very good, didn’t read it yet) book Drive.

That’s not why I’m writing this post.

People are asking me from time to time why do I put so much work into providing free code and knowledge to the community.

My usual answer to that is: “Er, that’s hard to explain. I feel the need.” (Yep, that kind  of need.)

But that’s also not why I’m writing this post.

Not so much ago, RSA Animate published an 11-minute YouTube video containing a concentrated version of Daniel Pink’s talk based on the Drive book.

Now that’s why I’m writing this post!

This concentrated version of the book is so great that I definitely want to read the whole thing (in fact I already bought the Kindle version).

Even more – at 8:44 it defines me in few words: “Challenge, mastery and making a contribution.”

Exactly! I need the challenge, I want to master the subject and then I want to make a contribution!

Thanks to Dan Pink and the great people at RSA Animate I learned something about myself.

Sunday, June 06, 2010

Synchronisation in a multithreaded environment

Blaise Pascal #11 is out containing the third installment of my multithreading series, this time dealing with the synchronisation.

Monday, April 19, 2010

ParallelExtensionExtras

I’d just like to point out to all parallel-loving programmers that Parallel Programming with .NET blog posted a series of  11 articles (more to come) called A Tour of ParallelExtensionExtras. A gread read, full of interesting information and some ideas that could find its way into the OTL (which is, by the way, getting full Parallel.For support in these weeks).

Tuesday, March 23, 2010

Books: Garbage Collection

I put my eyes on Garbage Collection: Algorithms for Automatic Dynamic Memory Management quite some time ago but as it was quite expensive (and still is) had little expectations of reading it in a near time. However, an OmniThreadLibrary grant by Rico changed that. To show my gratitude I decided to write a short review of the book and all other programming-related books I will read in the future.

The “GC” book deals with – who would guess ;) – garbage collection. The topic is covered quite extensively. After the short introduction, three classical approaches are described – reference counting, mark-sweep algorithm, and copying algorithm. For each algorithm, the authors deal with the basics but also with most well-known implementations.

After that, more modern approaches are described – generational, incremental and concurrent GC. There are even chapters on cache-conscious GC (processor level 1/2 cache, that is) and distributed GC.

While most of the book is applicable only to managed and/or interpreted systems, two chapters deal with garbage collectors for C and C++.

The biggest problem of the book is that it’s 14 years old and it shows. For example, we can read thoughts like: “Today, although SIMM memory modules are comparatively cheap to buy and easy to install, programs are increasingly profligate in their consumption of this resource. Microsoft Windows’95, an operating system for a single-user personal computer, needs more than twelve megabytes of RAM to operate optimally.” Yeah, very relevant.

Other than that, I really loved this book. I know now enough from the GC field to have a semi-inteligent conversation on the topic and I will understand new algorithms and improvements when they appear (or at least I hope so). Plus I now know how big problem it is to write a GC for unmanaged environment (Delphi, for example). If there ever will be any and if it will be performing comparatively to the “classic” Delphi compiler, then kudos to the authors!

Friday, March 19, 2010

BlaisePascal #10

The tenth issue (congratulations!) of the Blaise Pascal Magazine is out and in the inside you can find the second part of my “multithreading” series dealing with various approaches to thread management in Delphi (TThread, Windows threads, AsyncCalls, OmniThreadLibrary).

Thursday, March 18, 2010

The Delphi Geek has moved to a new place

As the Google is phasing out ftp publishing of Blogger blogs, I had to move away from my trustworthy host at 17slon.com. From yesterday, The Delphi Geek is hosted directly at Blogger and can be accessed on the www.thedelphigeek.com. Thedelphigeek.com will also work, as will thedelphigeek.blogspot.com.

While I was at work I also changed the subscription publishing and moved it to the Feedburner. Please update your readers to use either http://feeds.feedburner.com/TheDelphiGeek (posts only) or http://www.thedelphigeek.com/feeds/comments/default (posts and comments) as The Delphi Geek source.

Wednesday, March 17, 2010

Faster CopyRecord required

As we saw yesterday, CopyRecord can be a source of substantial slowdown if records are used extensively. I can see only way to improve the situation – fix the compiler. It should be able to generate custom CopyRecord for each record type (or at least for “simple” records, however that simplicity is defined) and that would speed all record operations immensely.

To push this idea, I’ve created a QC report #83084. If you think this would be a significant improvement to the compiler, make sure to vote on that report.

And while you’re busy voting, I’d just like to state that I also find QC #47559 important (hint, hint).

Tuesday, March 16, 2010

Speed comparison: Variant, TValue, and TOmniValue

When I read TValue is very slow! at TURBU Tech blog earlier today, I immediately wondered about how fast is TOmniValue (the basic data-exchange type in the OmniThreadLibrary) in regards to Variant and TValue. What else could I do but write a benchmark?!

I choose to test the performance in a way that is slightly different from the Mason’s approach. My test does not measure only store operation but also load and (in some instances) add. Also, the framework is slightly different and decouples time-management code from the benchmark.

const
CBenchResult = 100*1000*1000; //100 million
procedure TfrmBenchmark.Benchmark(const benchName: string;
benchProc: TBenchProc);
var
benchRes : integer;
stopwatch: TStopWatch;
begin
stopwatch := TStopWatch.StartNew;
benchProc(benchRes);
stopwatch.Stop;
Assert(benchRes = CBenchResult);
lbLog.Items.Add(Format('%s: %d ms',
[benchName, stopwatch.ElapsedMilliseconds]));
lbLog.Update;
end;
procedure TfrmBenchmark.btnBenchmarkClick(Sender: TObject);
begin
Benchmark('Variant', TestVariant);
Benchmark('TValue', TestTValue);
Benchmark('TOmniValue', TestTOmniValue);
end;
procedure TfrmBenchmark.TestTOmniValue(var benchRes: integer);
var
counter: TOmniValue;
i : integer;
begin
counter := 0;
for i := 1 to CBenchResult do
counter := counter.AsInteger + 1;
benchRes := counter;
end;
procedure TfrmBenchmark.TestTValue(var benchRes: integer);
var
counter: TValue;
i : integer;
begin
counter := 0;
for i := 1 to CBenchResult do
counter := counter.AsInteger + 1;
benchRes := counter.AsInteger;
end;
procedure TfrmBenchmark.TestVariant(var benchRes: integer);
var
counter: Variant;
i : integer;
begin
counter := 0;
for i := 1 to CBenchResult do
counter := counter + 1;
benchRes := counter;
end;

As you can see, all three tests are fairly similar. They count from 0 to 100.000.000 and the counter is stored in a Variant/TValue/TOmniValue. The Variant test follows the same semantics as if the counter variable would be declared integer, while the TValue and TOmniValue tests require some programmer’s help to determine how the counter should be interpreted (AsInteger).

The results were interesting. TValue is about 5x slower than the Variant, which is 7x slower than the TOmniValue.

bench

Of course, I was interested in where this speed difference comes from and I looked at the assembler code.

Digging into the assembler

Variant

Unit32.pas.87: counter := counter + 1;
004B1232 8D55F0           lea edx,[ebp-$10]
004B1235 8D45E0           lea eax,[ebp-$20]
004B1238 E817AAF6FF       call @VarCopy
004B123D 8D45D0           lea eax,[ebp-$30]
004B1240 BA01000000       mov edx,$00000001
004B1245 B101             mov cl,$01
004B1247 E8DCF8F6FF       call @VarFromInt
004B124C 8D55D0           lea edx,[ebp-$30]
004B124F 8D45E0           lea eax,[ebp-$20]
004B1252 E8F523F7FF       call @VarAdd
004B1257 8D55E0           lea edx,[ebp-$20]
004B125A 8D45F0           lea eax,[ebp-$10]
004B125D E8F2A9F6FF       call @VarCopy

Very straightforward code. Variant is copied into a temporary location, number 1 is converted into Variant, those two variants are added and result is stored back into the counter variable. As you can see, Variant calculations are really clumsy. It would be much faster to convert Variant to integer, add one and convert the result back. Like this.

procedure TfrmBenchmark.TestVariant2(var benchRes: integer);
var
counter: Variant;
i,j : integer;
begin
counter := 0;
for i := 1 to CBenchResult do begin
j := counter;
counter := j + 1;
end;
benchRes := counter;
end;

This modified version generates much faster code.

Unit32.pas.100: j := counter;
004B1355 8D45F0           lea eax,[ebp-$10]
004B1358 E863B2F6FF       call @VarToInteger
004B135D 8BF0             mov esi,eax
Unit32.pas.101: counter := j + 1;
004B135F 8D45F0           lea eax,[ebp-$10]
004B1362 8D5601           lea edx,[esi+$01]
004B1365 B1FC             mov cl,$fc
004B1367 E8BCF7F6FF       call @VarFromInt

Benchmarking proves my theory. Optimized version needed only 1220 ms to complete the test which made it almost 5x faster than the original Variant code.

TValue

Unit32.pas.76: counter := counter.AsInteger + 1;
004B11A1 8D45E8           lea eax,[ebp-$18]
004B11A4 E86B96FFFF       call TValue.AsInteger
004B11A9 40               inc eax
004B11AA 8D55D0           lea edx,[ebp-$30]
004B11AD E8A695FFFF       call TValue.&op_Implicit
004B11B2 8D55D0           lea edx,[ebp-$30]
004B11B5 8D45E8           lea eax,[ebp-$18]
004B11B8 8B0D4C9F4A00     mov ecx,[$004a9f4c]
004B11BE E8D567F5FF       call @CopyRecord

The TValue code is quite neat. Counter is converted to an integer, one is added, result is converted into a temporary TValue and this temporary TValue is copied back into counter. Why then is TValue version so much slower? We’ll have to look into implementation to find the answer. Let’s find out first why TOmniValue is so fast.

TOmniValue

Unit32.pas.65: counter := counter.AsInteger + 1;
004B10AA 8D45F3           lea eax,[ebp-$0d]
004B10AD E8FAF3FFFF       call TOmniValue.IsInteger
004B10B2 84C0             test al,al
004B10B4 740E             jz $004b10c4
004B10B6 8B45F3           mov eax,[ebp-$0d]
004B10B9 8945E8           mov [ebp-$18],eax
004B10BC 8B45F7           mov eax,[ebp-$09]
004B10BF 8945EC           mov [ebp-$14],eax
004B10C2 EB32             jmp $004b10f6
004B10C4 8D45F3           lea eax,[ebp-$0d]
004B10C7 E8D8F3FFFF       call TOmniValue.IsEmpty
004B10CC 84C0             test al,al
004B10CE 7410             jz $004b10e0
004B10D0 C745E800000000   mov [ebp-$18],$00000000
004B10D7 C745EC00000000   mov [ebp-$14],$00000000
004B10DE EB16             jmp $004b10f6
004B10E0 B94C114B00       mov ecx,$004b114c
004B10E5 B201             mov dl,$01
004B10E7 A16CD14000       mov eax,[$0040d16c]
004B10EC E82747F6FF       call Exception.Create
004B10F1 E8D247F5FF       call @RaiseExcept
004B10F6 8B45E8           mov eax,[ebp-$18]
004B10F9 8BF0             mov esi,eax
004B10FB 8D55F3           lea edx,[ebp-$0d]
004B10FE 8D4601           lea eax,[esi+$01]
004B1101 E8AEF3FFFF       call TOmniValue.&op_Implicit

Weird stuff, huh?  Counter is converted to an integer, then a bunch of funny code is executed and the result is converted back to a a TOmniValue. The beginning and the end are easy to understand but what’s going on in-between?

The answer is – inlining. Much of the TOmniValue implementation is marked inline and what we are seeing here is the internal implementation of the AsInteger property.

I’ll return to this later but first let’s check what happens if all this inline modifiers are removed.

Unit32.pas.65: counter := counter.AsInteger + 1;
004B10EF 8D45F3           lea eax,[ebp-$0d]
004B10F2 E865F4FFFF       call TOmniValue.GetAsInteger
004B10F7 40               inc eax
004B10F8 8D55E0           lea edx,[ebp-$20]
004B10FB E8A4F4FFFF       call TOmniValue.&op_Implicit
004B1100 8D55E0           lea edx,[ebp-$20]
004B1103 8D45F3           lea eax,[ebp-$0d]
004B1106 8B0D5CF84A00     mov ecx,[$004af85c]
004B110C E88768F5FF       call @CopyRecord

The generated code is now almost the same as in the TValue case, only stack offsets are different. It is also much slower, instead of the 839 ms the code took 3119 ms to execute and was only twice as fast as the original Variant code (and much slower than the modified Variant code). Inlining the AsInteger couldn’t make such big change. It looks like the CopyRecord is the culprit for the slowdown. I didn’t verify this by measurement but if you look at the _CopyRecord implementation in the System.pas it is obvious that the record copying cannot be very fast.

The Delphi compiler team would do much good if in the future versions the compiler would generate custom code adapted to each record type to do the copying.

Use the source, Luke!

What’s left for me is to determine the reason for the big speed difference between TValue and TOmniValue. To find it, I had to dig into the implementation of both records. Of the biggest interest to me were the AsInteger getter and Implicit(from: integer) operator.

TOmniValue

TOmniValue lives in OtlCommon.pas. AsInteger getter GetAsInteger just remaps the call to the GetAsInt64 method. Similarly, Implicit maps to SetAsInt64.

type
  ovData: int64;
  ovType: (ovtNull, ovtBoolean, ovtInteger, ovtDouble, ovtExtended, 
           ovtString, ovtObject, ovtInterface, ovtVariant, 
           ovtWideString, ovtPointer);

function TOmniValue.GetAsInt64: int64;
begin
  if IsInteger then
    Result := ovData
  else if IsEmpty then
    Result := 0
  else
    raise Exception.Create('TOmniValue cannot be converted to int64');
end; { TOmniValue.GetAsInt64 }

procedure TOmniValue.SetAsInt64(const value: int64);
begin
  ovData := value;
  ovType := ovtInteger;
end; { TOmniValue.SetAsInt64 }

The code is quite straightforward. Some error checking is done in the getter and the value is just stored away in the setter. Now the assembler code from the first TOmniValue example makes some sense – we were simply looking at the implementation of those GetAsInt64. (Implicit operator was not inlined.)

TValue

The TValue record lives in RTTI.pas. AsInteger getter gets remapped to the generic version AsType<Integer> which calls TryAsType<T>. In a slightly less roundabout manner Implicit calls From<Integer>.

function TValue.TryAsType<T>(out AResult: T): Boolean;
var
val: TValue;
begin
Result := TryCast(System.TypeInfo(T), val);
if Result then
val.Get<T>(AResult);
end;
class function TValue.From<T>(const Value: T): TValue;
begin
Make(@Value, System.TypeInfo(T), Result);
end;

It’s quite obvious that the TValue internals are not optimized for speed. Everything is mapped to generics and the RTTI system which is fast, but not really that fast that it could be used for computationally-intensive code.

Conclusion

  1. Don’t use TValue for counting. Heck, don’t even use Variant or TOmniValue for counting – they were not designed for that purpose!
  2. TValue may look slow but in fact it is not. It is able to count from 1 to over three millions in one second. That’s not slow. It’s just not as fast as the register-based counter is. But that’s OK as you should always remember rule 1.
  3. TValue is incredibly powerful. Just look at its implementation. Therefore, it could afford to be a tad slower than other multi-purpose storage mechanisms.
  4. TOmniValue is very fast, but most of its speed (compared to the Variant) comes from the inlining and the compiler being smart enough not to call CopyRecord in this case.
  5. Delphi compiler should really be improved to generate custom CopyRecord for each record type.
  6. Assembler code tells a lot. Source code tells even more.

P.S.

Using OtlCommon won’t bring in any other parts of the OTL library. It will requires following units to compile: DSiWin32, GpStuff, and GpStringHash. Nothing from those units will be linked in as TOmniValue implementation doesn’t depend on them. The simplest way to get them all is to download the latest stable OmniThreadLibrary release.

Monday, March 08, 2010

OmniThreadLibrary 1.05a

OmniThreadLibrary 1.05a has just been released. It is available via
SVN or as a ZIP archive.

This is mostly a bugfix release:

  • Bug fixed: TOmniTaskControl.OnMessage(eventHandler: TOmniTaskMessageEvent) was broken.
  • Bug fixed: TOmniTaskControl.OnMessage/OnTerminate uses event monitor  created in the context of the task controller thread (was using a global event monitor created in the main thread).
  • Implemented TOmniEventMonitorPool, per-thread TOmniEventMonitor  allocator.

Upgrade is recommended for all 1.05 users.

Friday, March 05, 2010

TDM Rerun #15: Many Faces Of An Application

That all sounds easy, but how can we combine the windows (forms-based) aspect of an application with something completely different, for example an SvCom-based service application? The problem here is that the GUI part of an application uses forms while the SvCom service is based on another Application object, based on the SvCom_NTService unit. How can we combine the GUI Application.Initialize (where Application is an object in the Forms unit) with a service Application.Initialize (where Application is an object in the SvCom_NTService unit)? By fully qualifying each object, of course.

- Many Faces Of An Application, The Delphi Magazine 107, July 2004

In the 2004 July issue I described an approach that allows the programmer to put multiple application front-ends inside one .exe file by manually tweaking the project’s .dpr file. This is the technique I’m still using in my programs. For example, most of the services I write can be configured by starting the exe with the /config switch.

Links: article (PDF, 126 KB), source code (ZIP, 1 MB).

Friday, February 26, 2010

On satisfaction

It is a great feeling when an elegant piece of code comes together. Even if it can’t be compiled yet.

  Parallel.ForEach(nodeQueue as IOmniValueEnumerable)
.NumTasks(numTasks)
.CancelWith(cancelToken)
.Execute(
procedure (const elem: TOmniValue)
var
childNode: TNode;
node : TNode;
begin
node := TNode(elem.AsObject);
if node.Value = value then begin
nodeResult := node;
nodeQueue.CompleteAdding;
cancelToken.Signal;
end
else for childNode in node.Children do
nodeQueue.TryAdd(childNode);
end);


It is even a better feeling when a code that seems to be impossible to write, starts to work.



And the best one – that happens when the code is working so well that you are not afraid of releasing it to the public.



Well, make this almost the best. Because there’s something even better – when people call back to tell you that they like using the code and it is helping them to do their work faster and better.



The feeling that cannot be surpassed comes when such happy user says something like: “Thanks for the code, it helps me a lot. There’s an Amazon gift certificate, spend it as you like.”



01



I can only respond with: “Rico, thanks!”. OmniThreadLibrary 1.05 is dedicated to you.

Thursday, February 25, 2010

OmniThreadLibrary 1.05

As there were no error reports related to OmniThreadLibrary 1.05 RC, I’ve released final 1.05 version just few moments ago. There are almost no changes between the RC and final release – one demo was added and Parallel.Join code was tweaked a little.

You can download OTL 1.05 from the Google Code. Alternatively, you can update SVN trunk (checkout instructions) or checkout the release-1.05 tag.

Support is available on the web discussion forum.

Big rename

Many internal classes and interfaces was renamed. This should not affect most of the users.

  • TOmniBaseStack –> TOmniBaseBoundedStack
  • TOmniStack –> TOmniBoundedStack
  • TOmniBaseQueue –> TOmniBaseBoundedQueue
  • TOmniQueue –> TOmniBoundedQueue
  • IInterfaceDictionary –> IOmniInterfaceDictionary
  • IInterfaceDictionaryEnumerator -> IOmniInterfaceDictionaryEnumerator,
  • TInterfaceDictionaryPair –> TOmniInterfaceDictionaryPair

I’m sorry for that. Some names are badly chosen and some did not follow the OTL naming conventions.

Dynamic lock-free queue

Implemented dynamically allocated, O(1) enqueue and dequeue, threadsafe,  lock-free queue. Class TOmniBaseQueue contains base implementation while TOmniQueue adds observer support. Both classes live in the OtlContainers unit.

Read more about the TOmniQueue: Dynamic lock-free queue – doing it right.

Inverse semaphore

Implemented resource counter with empty state signalling TOmniResourceCount (unit  OtlSync).

Read more: Three steps to the blocking collection: [1] Inverse semaphore.

Blocking collection

New unit OtlCollection which contains blocking collection implementation  TOmniBlockingCollection.

Read more: Three steps to the blocking collection: [3] Blocking collection

Parallel

New high-level parallelism support (unit OtlParallel). Requires at least Delphi 2009.

Two parallel control structures are supported: for each (with optional aggregator) and join.

The demo for Parallel.ForEach can be found in project 35_ParallelFor. The same code is reprinted near the end of the Three steps to the blocking collection: [3] Blocking collection post.

Parallel.ForEach.Aggregate was described in Parallel.ForEach.Aggreate post and is demoed in project 36_ParallelAggregate.

At the moment ForEach is fairly limited. It can iterate over a range of numbers or over a collection supporting the IOmniValueEnumerable interface (TOmniBlockingCollection, for example). The second limitation will be removed in the future. The plan is to support any collection that implements IEnumerable.

Parallel.Join is very simple code that executes multiple tasks and waits for their completion. It was designed to execute simple tasks that don’t require communication with the owner. It is demoed in project 37_ParallelJoin.

Environment

Unit OtlCommon contains new interface IOmniEnvironment and function Environment that returns singleton of this type. Environment can be used to query some basic information on system, process and thread. Some information (for example process and thread affinity) can also be modified using the same interface.

  IOmniAffinity = interface
property AsString: string;
property Count: integer;
property Mask: DWORD;
end;

IOmniProcessEnvironment = interface
property Affinity: IOmniAffinity;
property Memory: TOmniProcessMemoryCounters;
property PriorityClass: TOmniProcessPriorityClass;
property Times: TOmniProcessTimes;
end;

IOmniSystemEnvironment = interface
property Affinity: IOmniAffinity;
end;

IOmniThreadEnvironment = interface
property Affinity: IOmniAffinity;
property ID: cardinal;
end;

IOmniEnvironment = interface
property Process: IOmniProcessEnvironment;
property System: IOmniSystemEnvironment;
property Thread: IOmniThreadEnvironment;
end;

Newer demos are using some parts of the Environment interface. For example, in demo 33_BlockingCollection, process affinity is set with

  Environment.Process.Affinity.Count := inpNumCPU.Value; 

while the demo 35_ParallelFor uses following code fragment to query process affinity

  numTasks := Environment.Process.Affinity.Count; 

Cancellation token

New interface IOmniCancellationToken is used in the Parallel.ForLoop (see post Three steps to the blocking collection: [3] Blocking collection for the example) and in IOmniTaskControl.TerminateWhen.

IOmniTaskControl and IOmniTask implement CancellationToken: IOmniCancellationToken  property which can be used by the task and task controller.

IOmniCancellationToken is just a simple wrapper around the Win32 event primitive and is defined in the OtlSync unit.

  IOmniCancellationToken = interface
procedure Clear;
function IsSignaled: boolean;
procedure Signal;
property Handle: THandle;
end; { IOmniCancellationToken }

Message dispatcher

IOmniTaskControl now implements message dispatching setter in form OnMessage(msgID, handler). Use it to route specific message IDs to specific functions when global TOmniEventMonitor is not used.

An example from one of my applications:

  spmDatabaseConn := CreateTask(
TSttdbPlaylistDatabaseWorker.Create(),
'Playlist Monitor Database Connection')
.SetParameters([serverAddress, serverPort, username, password])
.SetTimer(15*1000, @TSttdbPlaylistDatabaseWorker.CheckDBVersion)
.OnMessage(MSG_DB_ERROR, HandleError)
.OnMessage(MSG_DB_STATUS, HandleDatabaseStatus)
.OnMessage(MSG_DB_VERSION, HandleDatabaseVersion)
.Run;

UserData[]

Implemented IOmniTaskControl.UserData[]. The application can store any values in this array. It can be accessed via the integer or string index. This storage are can only be access from the task controller side. Access is not thread-safe so you should use it only from one thread or create your own protection mechanism.

Small changes

  • IOmniTask implements Implementor property which points back to the worker instance  (but only if worker is TOmniWorker-based).
  • Refactored and enhanced TOmniValueContainer.
  • TOmniTaskFunction now takes 'const' parameter. 
    TOmniTaskFunction = reference to procedure(const task: IOmniTask).
  • Implemented TOmniValue.IsInteger.

Bugs fixed

  • TOmniEventMonitor.OnTaskUndeliveredMessage was missing 'message' parameter.
  • Set package names and designtime/runtime type in D2009/D2010 packages.

New demos

  • 32_Queue: Stress test for new TOmniBaseQueue and TOmniQueue.
  • 33_BlockingCollection: Stress test for new TOmniBlockingCollection, also demoes  the use of Environment to set process affinity.
  • 34_TreeScan: Parallel tree scan using TOmniBlockingCollection.
  • 35_ParallelFor: Parallel tree scan using Parallel.ForEach (Delphi 2009 and newer).
  • 36_ParallelAggregate: Parallel calculations using Parallel.ForEach.Aggregate  (Delphi 2009 and newer).

Monday, February 22, 2010

Three steps to the blocking collection: [3] Blocking collection

About two months ago I started working on Delphi clone of .NET 4 BlockingCollection. Initial release was completed just before the end of 2009 and I started to write a series of articles on TOmniBlockingCollection in early January but then I got stuck in the dynamic lock-free queue implementation. Instead of writing articles I spent most of my free time working on that code.

Now it is (finally) time to complete the journey. Everything that had to be said about the infrastructure was told and I only have to show you the internal workings of the blocking collection itself.

[Step 1: Three steps to the blocking collection: [1] Inverse semaphore]

[Step 2: Dynamic lock-free queue – doing it right]

The blocking collecting is exposed as an interface that lives in the OtlCollections unit.

  IOmniBlockingCollection = interface(IGpTraceable) 
['{208EFA15-1F8F-4885-A509-B00191145D38}']
procedure Add(const value: TOmniValue);
procedure CompleteAdding;
function GetEnumerator: IOmniValueEnumerator;
function IsCompleted: boolean;
function Take(var value: TOmniValue): boolean;
function TryAdd(const value: TOmniValue): boolean;
function TryTake(var value: TOmniValue; timeout_ms: cardinal = 0): boolean;
end; { IOmniBlockingCollection }

There’s also a class TOmniBlockingCollection which implements this interface. This class is public and can be used or reused in your code.

The blocking collection works in the following way:

  • Add will add new value to the collection (which is internally implemented as a queue (FIFO, first in, first out)).
  • CompleteAdding tells the collection that all data is in the queue. From now on, calling Add will raise an exception.
  • TryAdd is the same as Add except that it doesn’t raise an exception but returns False if the value can’t be added.
  • IsCompleted returns True after the CompleteAdding has been called.
  • Take reads next value from the collection. If there’s no data in the collection, Take will block until the next value is available. If, however, any other thread calls CompleteAdding while the Take is blocked, Take will unblock and return False.
  • TryTake is the same as Take except that it has a timeout parameter specifying maximum time the call is allowed to wait for the next value.
  • Enumerator calls Take in the MoveNext method and returns that value. Enumerator will therefore block when there is no data in the collection. The usual way to stop the enumerator is to call CompleteAdding which will unblock all pending MoveNext calls and stop enumeration. [For another approach see the example at the end of this article.]

The trivial parts

Most of the blocking collection code is fairly trivial.

Add just calls TryAdd and raises an exception if TryAdd fails.

procedure TOmniBlockingCollection.Add(const value: TOmniValue);
begin
if not TryAdd(value) then
raise ECollectionCompleted.Create('Adding to completed collection');
end; { TOmniBlockingCollection.Add }

CompleteAdding sets two “completed” flags – one boolean flag and one Windows event. Former is used for speed in non-blocking tests while the latter is used when TryTake has to block.

procedure TOmniBlockingCollection.CompleteAdding;
begin
if not obcCompleted then begin
obcCompleted := true;
Win32Check(SetEvent(obcCompletedSignal));
end;
end; { TOmniBlockingCollection.CompleteAdding }

Take calls the TryTake with the INFINITE timeout.

function TOmniBlockingCollection.Take(var value: TOmniValue): boolean;
begin
Result := TryTake(value, INFINITE);
end; { TOmniBlockingCollection.Take }

TryAdd checks if CompleteAdding has been called. If not, the value is stored in the dynamic queue.

There’s a potential problem hiding in the TryAdd – between the time the completed flag is checked and the time the value is enqueued, another thread may call CompleteAdding. Strictly speaking, TryAdd should not succeed in that case. However, I cannot foresee a parallel algorithm where this could cause a problem.

function TOmniBlockingCollection.TryAdd(const value: TOmniValue): boolean;
begin
// CompleteAdding and TryAdd are not synchronised
Result := not obcCompleted;
if Result then
obcCollection.Enqueue(value);
end; { TOmniBlockingCollection.TryAdd }

Easy peasy.

The not so trivial part

And now for something completely different …

TryTake is a whole different beast. It must:

  • retrieve the data
  • observe IsCompleted
  • block when there’s no data and observer is completed
  • observe the timeout limitations

Not so easy.

In addition to the obcCompletedSignal (completed event) and obcCollection (dynamic data queue) it will also use obcObserver (a queue change mechanism used inside the OTL) and obcResourceCount, which is an instance of the TOmniResourceCount (inverse semaphore, introduced in Part 1). All these are created in the constructor:

constructor TOmniBlockingCollection.Create(numProducersConsumers: integer);
begin
inherited Create;
if numProducersConsumers > 0 then
obcResourceCount := TOmniResourceCount.Create(numProducersConsumers);
obcCollection := TOmniQueue.Create;
obcCompletedSignal := CreateEvent(nil, true, false, nil);
obcObserver := CreateContainerWindowsEventObserver;
obcSingleThreaded := (Environment.Process.Affinity.Count = 1);
if obcSingleThreaded then
obcCollection.ContainerSubject.Attach(obcObserver, coiNotifyOnAllInserts);
end; { TOmniBlockingCollection.Create }

TryTake is pretty long so I’ve split it into two parts. Let’s take a look at the non-blocking part first.

First, the code tries to retrieve data from the dynamic queue. If there’s data available, it is returned. End of story.

Otherwise, the completed flag is checked. If CompleteAdding has been called, TryTake returns immediately. It also returns if timeout is 0.

Otherwise, the code prepares for the blocking wait. Resource counter is allocated (reasons for this will be provided later), and observer is attached to the blocking collection. This observer will wake the blocking code when new value is stored in the collection.

[In the code below you can see a small optimization – if the code is running on a single core then the observer is attached in the TOmniBlockingCollection constructor and detached in the destructor. Before this optimization was introduced, Attach and Detach spent much too much time in busy-wait code (on a single-core computer).]

After all that is set, the code waits for the value (see the next code block), observer is detached from the queue and resource counter is released.

function TOmniBlockingCollection.TryTake(var value: TOmniValue;
timeout_ms: cardinal): boolean;
var
awaited : DWORD;
startTime : int64;
waitHandles: array [0..2] of THandle;
begin
if obcCollection.TryDequeue(value) then
Result := true
else if IsCompleted or (timeout_ms = 0) then
Result := false
else begin
if assigned(obcResourceCount) then
obcResourceCount.Allocate;
try
if not obcSingleThreaded then
obcCollection.ContainerSubject.Attach(obcObserver, coiNotifyOnAllInserts);
try
//wait for the value, see the next code block below
finally
if not obcSingleThreaded then
obcCollection.ContainerSubject.Detach(obcObserver, coiNotifyOnAllInserts);
end;
finally
if assigned(obcResourceCount) then
obcResourceCount.Release;
end;
end;
end; { TOmniBlockingCollection.TryTake }

Blocking part starts by storing the current time (millisecond-accurate TimeGetTime is used) and preparing wait handles. Then it enters the loop which repeats until the CompleteAdding has been called or timeout has elapsed (the Elapsed function which I’m not showing here for the sake of simplicty; see the source) or a value was dequeued.

In the loop, the code tries again to dequeue a value from the dynamic queue and exits the loop if dequeue succeeds. Otherwise, a WaitForMultipleObjects is called. This wait waits for one of three conditions:

  • Completed event. If this event is signalled, CompleteAdding has been called and TryTake must exit.
  • Observer event. If this event is signalled, new value was enqueued into the dynamic queue and code must try to dequeue this value.
  • Resource count event. If this event is signalled, all resources are used and the code must exit (more on that later).
        startTime := DSiTimeGetTime64;
waitHandles[0] := obcCompletedSignal;
waitHandles[1] := obcObserver.GetEvent;
if assigned(obcResourceCount) then
waitHandles[2] := obcResourceCount.Handle;
Result := false;
while not (IsCompleted or Elapsed) do begin
if obcCollection.TryDequeue(value) then begin
Result := true;
break; //while
end;
awaited := WaitForMultipleObjects(IFF(assigned(obcResourceCount), 3, 2),
@waitHandles, false, TimeLeft_ms);
if awaited <> WAIT_OBJECT_1 then begin
if awaited = WAIT_OBJECT_2 then
CompleteAdding;
Result := false;
break; //while
end;
end;

If new value was enqueued into the dynamic queue, TryDequeue is called again. It is entirely possible that another thread calls that function first and removes the value causing TryDequeue to fail and WaitForMultipleObjects to be called again. Such is life in the multithreaded world.

Enumerating the blocking collection

TOmniBlockingCollection enumerator is slightly more powerful than the usual Delphi enumerator. In addition to the usual methods it contains function Take which is required by the Parallel architecture (see Parallel.For and Parallel.ForEach.Aggregate for more information).

  IOmniValueEnumerator = interface ['{F60EBBD8-2F87-4ACD-A014-452F296F4699}']
function GetCurrent: TOmniValue;
function MoveNext: boolean;
function Take(var value: TOmniValue): boolean;
property Current: TOmniValue read GetCurrent;
end; { IOmniValueEnumerator }
  TOmniBlockingCollectionEnumerator = class(TInterfacedObject,
IOmniValueEnumerator)
constructor Create(collection: TOmniBlockingCollection);
function GetCurrent: TOmniValue; inline;
function MoveNext: boolean; inline;
function Take(var value: TOmniValue): boolean;
property Current: TOmniValue read GetCurrent;
end; { TOmniBlockingCollectionEnumerator }

The implementation is trivial.

constructor TOmniBlockingCollectionEnumerator.Create(collection: TOmniBlockingCollection);
begin
obceCollection_ref := collection;
end; { TOmniBlockingCollectionEnumerator.Create }

function TOmniBlockingCollectionEnumerator.GetCurrent: TOmniValue;
begin
Result := obceValue;
end; { TOmniBlockingCollectionEnumerator.GetCurrent }

function TOmniBlockingCollectionEnumerator.MoveNext: boolean;
begin
Result := obceCollection_ref.Take(obceValue);
end; { TOmniBlockingCollectionEnumerator.MoveNext }

function TOmniBlockingCollectionEnumerator.Take(var value: TOmniValue): boolean;
begin
Result := MoveNext;
if Result then
value := obceValue;
end; { TOmniBlockingCollectionEnumerator.Take }

Example

A not-so-simple how to on using the blocking collection can be seen in the demo 34_TreeScan. It uses the blocking collection to scan a tree with multiple parallel threads. This demo works in Delphi 2007 and newer.

A better example of using the blocking collection is in the demo 35_ParallelFor. Actually, it uses the same approach as demo 34 to scan the tree, except that the code is implemented as an anonymous method which causes it to be much simpler than the D2007 version. Of course, this demo works only in Delphi 2009 and above.

This is the full parallel scanner from the 35_ParallelFor demo:

function TfrmParallelForDemo.ParaScan(rootNode: TNode; value: integer): TNode;
var
cancelToken: IOmniCancellationToken;
nodeQueue : IOmniBlockingCollection;
nodeResult : TNode;
numTasks : integer;
begin
nodeResult := nil;
cancelToken := CreateOmniCancellationToken;
numTasks := Environment.Process.Affinity.Count;
nodeQueue := TOmniBlockingCollection.Create(numTasks);
nodeQueue.Add(rootNode);
Parallel.ForEach(nodeQueue as IOmniValueEnumerable)
.NumTasks(numTasks) // must be same number of task as in
nodeQueue to ensure stopping

.CancelWith(cancelToken)
.Execute(
procedure (const elem: TOmniValue)
var
childNode: TNode;
node : TNode;
begin
node := TNode(elem.AsObject);
if node.Value = value then begin
nodeResult := node;
nodeQueue.CompleteAdding;
cancelToken.Signal;
end
else for childNode in node.Children do
nodeQueue.TryAdd(childNode);
end);
Result := nodeResult;
end; { TfrmParallelForDemo.ParaScan }

The code first creates a cancellation token which will be used to stop the Parallel.ForEach loop. Number of tasks is set to number of cores accessible from the process and a blocking collection is created. Resource count for this collection is initialized to the number of tasks (parameter to the TOmniBlockingCollection.Create). The root node of the tree is added to the blocking collection.

Then the Parallel.ForEach is called. The IOmniValueEnumerable aspect of the blocking collection is passed to the ForEach. Currently, this is the only way to provide ForEach with data. This interface just tells the ForEach how to generate enumerator for each worker thread. [At the moment, each worker requires a separate enumerator. This may change in the future.]

  IOmniValueEnumerable = interface ['{50C1C176-C61F-41F5-AA0B-6FD215E5159F}']
function GetEnumerator: IOmniValueEnumerator;
end; { IOmniValueEnumerable }

The code also passes cancellation token to the ForEach loop and starts the parallel execution (call to Execute). In each parallel task, the following code is executed (this code is copied from the full ParaScan example above):

      procedure (const elem: TOmniValue)
var
childNode: TNode;
node : TNode;
begin
node := TNode(elem.AsObject);
if node.Value = value then begin
nodeResult := node;
nodeQueue.CompleteAdding;
cancelToken.Signal;
end
else for childNode in node.Children do
nodeQueue.TryAdd(childNode);
end

The code is provided with one element from the blocking collection (ForEach takes care of that). If the Value field is the value we’re searching for, nodeResult is set, blocking collection is put into CompleteAdding state (so that enumerators in other tasks will terminate blocking wait (if any)) and ForEach is cancelled.

Otherwise (not the value we’re looking for), all the children of the current node are added to the blocking collection. TryAdd is used (and its return value ignored) because another thread may call CompleteAdding while the for childNode loop is being executed.

That’s all! There is a blocking collection into which nodes are put (via the for childNode loop) and from which they are removed (via the ForEach infrastructure). If child nodes are not provided fast enough, blocking collection will block on Take and one or more tasks may sleep for some time until new values appear. Only when the value is found, the blocking collection and ForEach loop are completed/cancelled.

This is very similar to the code that was my inspiration for writing the blocking collection:

var targetNode = …;
var bc = new BlockingCollection<Node>(startingNodes);
// since we expect GetConsumingEnumerable to block, limit parallelism to the number of
// procs, avoiding too much thread injection
var parOpts = new ParallelOptions() { MaxDegreeOfParallelism = Enivronment.ProcessorCount };
Parallel.ForEach(bc.GetConsumingEnumerable(), parOpts, (node,loop) =>
{
    if (node == targetNode)
    {
        Console.WriteLine(“hooray!”);
        bc.CompleteAdding();
        loop.Stop();
    }
    else
    {
        foreach(var neighbor in node.Neighbors) bc.Add(neighbor);
    }
});

However, this C# code exhibits a small problem. If the value is not to be found in the tree, the code never stops. Why? All tasks eventually block in the Take method (because complete tree has been scanned) and nobody calls CompleteAdding and loop.Stop. Does the Delphi code contains the very same problem?

Definitely not! That’s exactly why the resource counter was added to the blocking collection!

If the blocking collection is initialized with number of resources greater then zero, it will allocate a resource counter in the constructor. This resource counter is allocated just before the thread blocks in TryTake and released after that. Each blocking wait in TryTake waits for this resource counter to become signalled. If all threads try to execute blocking wait, this resource counter drops to zero, signals itself and unblocks all TryTake calls!

This elegant solution has only one problem – resource counter must be initialized to the number of threads that will be reading from the blocking collection. That’s why in the code above (ParaScan) same number is passed to the blocking collection constructor (resource counter initialization) and to the ForEach.NumTasks method (number of parallel threads).

Download

TOmniBlockingCollection will be available in the OmniThreadLibrary 1.05, which will be released in few days.

For the impatient there is OTL 1.05 Release Candidate. The only code that will change between 1.05 RC and release are possible bug fixes.