Wednesday, June 30, 2010

Gp* units now available via SVN

Because YOU asked for it … my units are now available on Google Code.

Tuesday, June 29, 2010

Important DSiWin32 and GpSecurity update

1) Lots of things fixed in DSiWin32 and great thanks to Christian Wimmer for pointing out the problems and suggesting some solutions.

2) By my mistake a very internal GpSecurity containing parts of JWA got included in some downloadable ZIP files. This was a direct violation of the JWA license and I apologize deeply for that. To fix this problem, a new GpSecurity was released which depends on the JWA.

Monday, June 21, 2010

Built for speed

Unlike C and derivatives, Delphi is speedy …

build

174.684 lines per second. What’s your compile speed?

Wednesday, June 16, 2010

The Future of Delphi

Future of T, that is. Or, in Delphi syntax, TFuture<T>.
Yesterday I wrote about futures in OmniThreadLibrary 2.0 (supported only in D2009+) and I mentioned that implementing futures in plain D2009+ should be really simple. And it really is – it took me all of 15 minutes to write the supporting library and a simple test case.
The code below is released to public domain. I’m claiming no copyrights – use it as you wish. You don’t even have to attribute it to me. Just don’t use it for evil purposes ;)
unit DelphiFuture;

interface

uses
  Classes;

type
  IFuture<T> = interface
    function Value: T;
  end;

  TFutureDelegate<T> = reference to function: T;

  TFutureThread<T> = class(TThread)
  strict private
    FAction: TFutureDelegate<T>;
    FResult: T;
  public
    constructor Create(action: TFutureDelegate<T>);
    procedure Execute; override;
    property Result: T read FResult;
  end;

  TFuture<T> = class(TInterfacedObject, IFuture<T>)
  strict private
    FResult: T;
    FWorker: TFutureThread<T>;
  public
    constructor Create(action: TFutureDelegate<T>);
    function Value: T;
  end;

implementation

uses
  SysUtils;

{ TFutureThread<T> }

constructor TFutureThread<T>.Create(action: TFutureDelegate<T>);
begin
  inherited Create(false);
  FAction := action;
end;

procedure TFutureThread<T>.Execute;
begin
  FResult := FAction();
end;

{ TFuture<T> }

constructor TFuture<T>.Create(action: TFutureDelegate<T>);
begin
  inherited Create;
  FWorker := TFutureThread<T>.Create(action);
end;

function TFuture<T>.Value: T;
begin
  if assigned(FWorker) then begin
    FWorker.WaitFor;
    FResult := FWorker.Result;
    FreeAndNil(FWorker);
  end;
  Result := FResult;
end;

end.
I’ve used my usual test case, calculating number of primes between 1 and 1.000.000.
implementation

uses
  DelphiFuture;

function IsPrime(i: integer): boolean;
var
  j: integer;
begin
  Result := false;
  if i <= 0 then
    Exit;
  for j := 2 to Round(Sqrt(i)) do
    if (i mod j) = 0 then
      Exit;
  Result := true;
end;

procedure TForm1.btnTestClick(Sender: TObject);
var
  numPrimes: IFuture<integer>;
begin
  numPrimes := TFuture<integer>.Create(function: integer
    var
      iPrime: integer;
    begin
      Result := 0;
      for iPrime := 1 to 1000000 do
        if IsPrime(iPrime) then
          Inc(Result);
    end
  );
  lbLog.Items.Add(Format('%d primes from 1 to 1000000',
    [numPrimes.Value]));
end;

Tuesday, June 15, 2010

OmniThreadLibrary 2.0 sneak preview [2]

Futures in the OTL were not planned – they just happened. In fact, they are so new that you won’t find them in the SVN. (Don’t worry, they’ll be committed soon.)

As a matter of fact, I always believed that futures must be supported by the compiler. That changed few weeks ago when somebody somewhere (sorry, can’t remember the time and place) asked if they can be implemented in the OmniThreadLibrary. That question made me rethink the whole issue and I found out that not only it’s possible to implement them without changing the compiler – the implementation is almost trivial!

In the OTL 2.0 you’ll be able to declare a future …

var
numPrimes: IOmniFuture<integer>;

… start the evaluation …

  numPrimes := TOmniFuture<integer>.Create(a delegate returning integer);

… and wait on the result.

  numPrimes.Value

As simple as that. Declare the IOmniFuture<T>, create TOmniFuture<T> and retrieve the result by calling Value: T.

As a real-world example, the code below creates a future that calculates number of primes from 1 to CPrimesHigh and displays this value.

var
numPrimes: IOmniFuture<integer>;
begin
numPrimes := TOmniFuture<integer>.Create(function: integer
var
i: integer;
begin
Result := 0;
for i := 1 to CPrimesHigh do
if IsPrime(i) then
Inc(Result);
end
);
// do something else
lbLog.Items.Add(Format('%d primes from 1 to %d',
[numPrimes.Value, CPrimesHigh]));
end;

As a general rule, I would recommend against putting too much of the code inside the future’s constructor. A following approach is more readable and easier to maintain.

function CountPrimesToHigh(high: integer): integer;
var
i: integer;
begin
Result := 0;
for i := 1 to CPrimesHigh do
if IsPrime(i) then
Inc(Result);
end;

var
numPrimes: IOmniFuture<integer>;
begin
numPrimes := TOmniFuture<integer>.Create(function: integer
begin
Result := CountPrimesToHigh(CPrimesHigh);
end
);
// do something else
lbLog.Items.Add(Format('%d primes from 1 to %d',
[numPrimes.Value, CPrimesHigh]));
end;

Or you can take another step and create a future factory. That’s especially recommended if you’ll be using futures of the same kind in different places.

function StartCountingPrimesTo(high: integer): TOmniFuture<integer>;
begin
Result := TOmniFuture<integer>.Create(function: integer
var
i: integer;
begin
Result := 0;
for i := 1 to high do
if IsPrime(i) then
Inc(Result);
end
);
end;

var
numPrimes: IOmniFuture<integer>;
begin
numPrimes := StartCountingPrimesTo(CPrimesHigh);
// do something else
lbLog.Items.Add(Format('%d primes from 1 to %d',
[numPrimes.Value, CPrimesHigh]));
end;

Implementation

Believe it or not, the whole implementation fits in 27 lines (not counting empty lines).

type
TOmniFutureDelegate<T> = reference to function: T;

IOmniFuture<T> = interface
function Value: T;
end; { IOmniFuture<T> }

TOmniFuture<T> = class(TInterfacedObject, IOmniFuture<T>)
private
ofResult: T;
ofTask : IOmniTaskControl;
public
constructor Create(action: TOmniFutureDelegate<T>);
function Value: T;
end; { TOmniFuture<T> }
constructor TOmniFuture<T>.Create(action: TOmniFutureDelegate<T>);
begin
ofTask := CreateTask(procedure (const task: IOmniTask)
begin
ofResult := action();
end,
'TOmniFuture action').Run;
end; { TOmniFuture<T>.Create }

function TOmniFuture<T>.Value: T;
begin
ofTask.Terminate;
ofTask := nil;
Result := ofResult;
end; { TOmniFuture<T>.Value }

As you can see, the whole OTL task support is only used to simplify background thread creation. It would be quite simple to implement futures around Delphi’s own TThread. In fact, I think I’ll just go ahead and implement it!

Monday, June 14, 2010

OmniThreadLibrary 2.0 sneak preview [1]

You may have noticed that I’ve been strangely silent for the past two months. The reason for that is OmniThreadLibrary version 2. [And lots of other important work that couldn’t wait. And the OmniThreadLibrary version 2.]

The OTL 2.0 is not yet ready but I’ve decided to pre-announce some features. They are, after all, available to all programmers following the SVN trunk.

While the focus of the OTL 1 was to provide programmers with simple to use multithreading primitives, OTL 2 focuses mostly on the higher-level topics like parallel for and futures.

Caveat: Parallel For and Futures will work only in Delphi 2009 and newer. The implementation of both heavily depends on generics and anonymous methods and those are simply not available in Delphi 2007. Sorry, people. [I’m sad too – I’m still using Delphi 2007 for my day job.]

Parallel For

Parallel.ForEach was introduced in release 1.05 but that version was purely “technical preview” – a simple “let’s see if this can be done at all” implementation. In the last few months, Parallel.ForEach backend was completely redesigned which allowed the frontend (the API) to be vastly improved.

The basic ForEach(from, to: integer) has not changed much. The only difference is that the parameter type of the Execute delegate is now “integer” and not “TOmniValue”.

  Parallel.ForEach(1, testSize).Execute(
procedure (const elem: integer)
begin
if IsPrime(elem) then
outQueue.Add(elem);
end);

A trivial example, of course, but it shows the simplicity of Parallel.ForEach. The code passed to the Execute will be executed in parallel on all possible cores. [The outQueue parameter is of type TOmniBlockingCollection which allows Add to be called from multiple threads simultaneously.]

If you have data in a container that supports enumeration (with one limitation – enumerator must be implemented as a class, not as an interface or a record) then you can enumerate over it in parallel.

  var
nodeList := TList.Create;
Parallel.ForEach<integer>(nodeList).Execute(
procedure (const elem: integer)
begin
if IsPrime(elem) then
outQueue.Add(elem);
end);

The new ForEach backend allows parallel loops to be executed asynchronously. In the code sample below, the parallel loop tests numbers for primeness and adds primes to a TOmniBlockingCollection queue. A normal for loop, executing in parallel with the parallel loop, reads numbers from this queue and displays them on the screen.

var
prime : TOmniValue;
primeQueue: IOmniBlockingCollection;
begin
lbLog.Clear;
primeQueue := TOmniBlockingCollection.Create;
Parallel.ForEach(1, 1000).NoWait
.OnStop(
procedure
begin
primeQueue.CompleteAdding;
end)
.Execute(
procedure (const value: integer)
begin
if IsPrime(value) then begin
primeQueue.Add(value);
end;
end);
for prime in primeQueue do begin
lbLog.Items.Add(IntToStr(prime));
lbLog.Update;
end;
end;

This code depends on a TOmniBlockingCollection feature, namely that the enumerator will block when the queue is empty unless CompleteAdding is called [more info]. That’s why the OnStop delegate must be provided – without it the “normal” for loop would never stop. (It would just wait forever on the next element.)

While this shows two powerful functions (NoWait and OnStop) it is also kind of complicated and definitely not a code I would want to write too many times. That’s why OmniThreadLibrary also provides a syntactic sugar in a way of the Into function.

var
prime : TOmniValue;
primeQueue: IOmniBlockingCollection;
begin
lbLog.Clear;
primeQueue := TOmniBlockingCollection.Create;
Parallel.ForEach(1, 1000).PreserveOrder.NoWait
.Into(primeQueue)
.Execute(
procedure (const value: integer; var res: TOmniValue)
begin
if IsPrime(value) then
res := value;
end);
for prime in primeQueue do begin
lbLog.Items.Add(IntToStr(prime));
lbLog.Update;
end;
end;

This code demoes few different enhacements to the ForEach loop. Firstly, you can order the Parallel subsystem to preserve input order by calling the PreservedOrder function. [In truth, this function doesn’t work yet. That’s the part I’m currently working on.]

Secondly, because Into is called, ForEach will automatically call CompleteAdding on the parameter passed to the Into when the loop completes. No need for the ugly OnStop call.

Thirdly, Execute (also because of the Into) takes a delegate with a different signature. Instead of a standard ForEach signature procedure (const value: T) you have to provide it with a procedure (const value: integer; var res: TOmniValue). If the output parameter (res) is set to any value inside this delegate, it will be added to the Into queue and if it is not modified inside the deletage, it will not be added to the Into queue.  Basically, the parallel loop body is replaced with the code below and this code calls your own delegate (loopBody).

        result := TOmniValue.Null;
while (not Stopped) and localQueue.GetNext(value) do begin
loopBody(value, result);
if not result.IsEmpty then begin
oplIntoQueueObj.Add(result)
result := TOmniValue.Null;
end;
end;
oplIntoQueueObj.CompleteAdding;

The NoWait and Into provide you with a simple way to chain Parallel loops and implement multiple parallel processing stages. [Although this works in the current version, the OtlParallel does nothing to balance the load between all active Parallel loops. I’m not yet completely sure that this will be supported in the 2.0 release.]

var
dataQueue : IOmniBlockingCollection;
prime : TOmniValue;
resultQueue: IOmniBlockingCollection;
begin
lbLog.Clear;
dataQueue := TOmniBlockingCollection.Create;
resultQueue := TOmniBlockingCollection.Create;
Parallel.ForEach(1, 1000)
.NoWait.Into(dataQueue).Execute(
procedure (const value: integer; var res: TOmniValue)
begin
if IsPrime(value) then
res := value;
end
);
Parallel.ForEach<integer>(dataQueue as IOmniValueEnumerable)
.NoWait.Into(resultQueue).Execute(
procedure (const value: integer; var res: TOmniValue)
begin
// Sophie Germain primes
if IsPrime(2*value + 1) then
res := value;
end
);
for prime in primeQueue do begin
lbLog.Items.Add(IntToStr(prime));
lbLog.Update;
end;
end;

[BTW, there will be a better way to enumerate over TOmniBlockingCollection in the OTL 2.0 release. Passing “dataQueue as IOmniValueEnumerable” to the ForEach is ugly.]

If you want to iterate over something very nonstandard, you can write a “GetNext” delegate:

    Parallel.ForEach<integer>(
function (var value: integer): boolean
begin
value := i;
Result := (i <= testSize);
Inc(i);
end)
.Execute(
procedure (const elem: integer)
begin
outQueue.Add(elem);
end);

In case you wonder what the possible iteration sources are, here’s the full list:

    ForEach(const enumerable: IOmniValueEnumerable): IOmniParallelLoop; 
ForEach(const enum: IOmniValueEnumerator): IOmniParallelLoop;
ForEach(const enumerable: IEnumerable): IOmniParallelLoop;
ForEach(const enum: IEnumerator): IOmniParallelLoop;
ForEach(const sourceProvider: TOmniSourceProvider): IOmniParallelLoop;
ForEach(enumerator: TEnumeratorDelegate): IOmniParallelLoop;
ForEach(low, high: integer; step: integer = 1): IOmniParallelLoop<integer>;
ForEach<T>(const enumerable: IOmniValueEnumerable): IOmniParallelLoop<T>;
ForEach<T>(const enum: IOmniValueEnumerator): IOmniParallelLoop<T>;
ForEach<T>(const enumerable: IEnumerable): IOmniParallelLoop<T>;
ForEach<T>(const enum: IEnumerator): IOmniParallelLoop<T>;
ForEach<T>(const enumerable: TEnumerable<T>): IOmniParallelLoop<T>;
ForEach<T>(const enum: TEnumerator<T>): IOmniParallelLoop<T>;
ForEach<T>(enumerator: TEnumeratorDelegate<T>): IOmniParallelLoop<T>;
ForEach(const enumerable: TObject): IOmniParallelLoop;
ForEach<T>(const enumerable: TObject): IOmniParallelLoop<T>;

The last two versions are used to iterate over any object that supports class-based enumerators. Sadly, this feature is only available in Delphi 2010 because it uses extended RTTI to access the enumerator and its methods.

Parallel For Implementation

The backend allows for efficient parallel enumeration even when the enumeration source is not threadsafe. You can be assured that the data passed to the ForEach will be accessed only from one thread at the same time (although this will not always be the same thread). Only in special occasions, when backend knows that the source is threadsafe (for example when IOmniValueEnumerator is passed to the ForEach), the data will be accessed from multiple threads at the same time.

I’m planning to write an article of the parallel for implementation but it will have to wait until the PreserveOrder is implemented. At the moment backend implementation is not fixed yet.

Wednesday, June 09, 2010

A seriously overdue update

DSiWin32 1.55

  • Implemented DSiHasElapsed64 and DSiElapsedTime64.
  • Implemented DSiLogonAs and DSiVerifyPassword.
  • DSiGetProcAddress made public.

GpHugeFile 6.02

  • Prefetching parameters are now configurable -  TGpHugeFileStream.Create and .CreateW got parameters waitObject and numPrefetchBuffers which are passed to ResetEx/RewriteEx.

GpLists 1.44

  • TStringList helper split into TStrings and TStringList helpers

GpStreams 1.30

  • Implemented TGpFileStream class and two SafeCreateGpFileStream functions.
  • Unicode fixes.
  • Disable inlining for Delphi 2007 because of compiler bugs.
  • Added functions AtEnd and BytesLeft, AsAnsiString property and WriteAnsiStr method to the TStream class helper.
  • Implemented TGpFixedMemoryStream.CreateA and fixed TGpFixedMemoryStream.Create.

GpStructuredStorage 2.0b

  • Important bug fix! When the folder was deleted, it was not removed from the folder cache. Because of that, subsequent FolderExists call succeeded instead of failed, which could cause all sorts of weird problems.

GpStuff 1.21

  • Implemented overloads for Increment and Decrement in TGp4AlignedInt and TGp8AlignedInt64.
  • Implemented Add/Subtract methods in TGp4AlignedInt and TGp8AlignedInt64.
  • OpenArrayToVarArray supports vtUnicodeString variant type.

GpSync 1.23

  • Message queue works with Unicode Delphi, backwards compatible.

GpTextStream 1.08

  • Implemented 'lines in a text stream' enumerator EnumLines.
  • Implemented TGpTextStream.EOF.
  • Implemented text stream filter FilterTxt.

All free as usual. Enjoy!

Monday, June 07, 2010

What drives us

Monkeys work harder when they are not rewarded. People do, too.

Daniel H. Pink [wikipedia] collected the evidence about that fact and wrote (supposedly very good, didn’t read it yet) book Drive.

That’s not why I’m writing this post.

People are asking me from time to time why do I put so much work into providing free code and knowledge to the community.

My usual answer to that is: “Er, that’s hard to explain. I feel the need.” (Yep, that kind  of need.)

But that’s also not why I’m writing this post.

Not so much ago, RSA Animate published an 11-minute YouTube video containing a concentrated version of Daniel Pink’s talk based on the Drive book.

Now that’s why I’m writing this post!

This concentrated version of the book is so great that I definitely want to read the whole thing (in fact I already bought the Kindle version).

Even more – at 8:44 it defines me in few words: “Challenge, mastery and making a contribution.”

Exactly! I need the challenge, I want to master the subject and then I want to make a contribution!

Thanks to Dan Pink and the great people at RSA Animate I learned something about myself.

Sunday, June 06, 2010

Synchronisation in a multithreaded environment

Blaise Pascal #11 is out containing the third installment of my multithreading series, this time dealing with the synchronisation.