Monday, November 30, 2009

OmniThreadLibrary patterns – Task controller needs an owner

Pop quiz. What’s wrong with this code?

CreateTask(MyWorker).Run;

Looks fine, but it doesn’t work. In most cases, running this code fragment would cause immediate access violation.

This is a common problem amongst new OTL users. Heck, even I have fallen into this trap!

The problem here is that CreateTask returns IOmniTaskControl interface, or task controller. This interface must be stored into some persistent location, or task controller would be destroyed immediately after Run is called (because the reference count would fall to 0).

A common solution is to just store the interface in some field.

FTaskControl := CreateTask(MyWorker).Run;

When you don’t need background worker anymore, you should terminate the task and free the task controller.

FTaskControl.Terminate;

FTaskControl := nil;

This works for background workers with long life span – for example if there’s a background thread running all the time the program itself is running. But what if you are starting a short-term background task? In this case you should monitor it with TOmniEventMonitor and cleanup task controller reference in OnTerminate event handler.

FTaskControl := CreateTask(MyWorker).MonitorWith(eventMonitor).Run;

In eventMonitor.OnTerminate:

FTaskControl := nil;

As it turns out, event monitor keeps task controller interface stored in its own list, which will also keep the task controller alive. That’s why the following code also works.

CreateTask(MyWorker).MonitorWith(eventMonitor).Run;

Since OTL v1.04 you have another possibility – write a method to free the task controller and pass it to the OnTerminated.

FTaskControl := CreateTask(MyWorker).OnTerminated(FreeTaskControl).Run;

procedure FreeTaskControl(const task: IOmniTaskControl);
begin
  FTaskControl := nil;
end;

If you’re using Delphi 2009 or 2010, you can put the cleanup code in anonymous method.

FTaskControl := CreateTask(MyWorker).OnTerminated(
procedure(const task: IOmniTaskControl) begin
  FTaskControl := nil;
end)
.Run;

OnTerminated does its magic by hooking task controller into internal event monitor. Therefore, you can get real tricky and just write “null” OnTerminated.

CreateTask(MyWorker).OnTerminated(DoNothing).Run;

procedure DoNothing(const task: IOmniTaskControl);
begin
end;

As that looks quite ugly, I’ve added method Unobserved just few days before version 1.04 was released. This method does essentially the same as the “null” OnTerminated approach, except that the code looks nicer and programmers intentions are more clearly expressed.

CreateTask(MyWorker).Unobserved.Run;

Monday, November 23, 2009

OmniThreadLibrary 1.04

Stable release is out! Get it while it’s still hot!

Click to download!

New since 1.04 alpha:

  • Bugfixes in the thread pool code.
  • Implemented IOmniTaskControl.Unobserved behaviour modifier.
  • D2010 designtime package fixed.
  • D2009 packages and test project group updated (thanks to mghie).

New since 1.03: read full list.

Tuesday, November 17, 2009

OmniThreadLibrary 1.04 now in beta

I’ve released OTL 1.04 beta, which is functionally the same as the alpha release but contains some bug fixes. You can download it from Google Code.

1.04 final will be released on 2009-11-23, i.e. next Monday.

Friday, November 13, 2009

OmniThreadLibrary 1.04 alpha

Not yet beta as I still have to fix few TODOs …

Get it here.

COMPATIBILITY ISSUES

  • Changed semantics in comm event notifications! When you get the 'new message' event, read all messages from the queue in a loop!
  • Message is passed to the TOmniEventMonitor.OnTaskMessage handler. There's no need to read from Comm queue in the handler.
  • Exceptions in tasks are now visible by default. To hide them, use IOmniTaskControl.SilentExceptions. Test 13_Exceptions was improved to demonstrate this behaviour.

Other changes

  • Works with Delphi 2010.
  • Default communication queue size reduced to 1000 messages.
  • Support for 'wait and send' in IOmniCommunicationEndpoint.SendWait.
  • Communication subsystem implements observer pattern.
  • WideStrings can be send over the communication channel.
  • New event TOmniEventMonitor.OnTaskUndeliveredMessage is called after the task is terminated for all messages still waiting in the message queue.
  • Implemented automatic event monitor with methods IOmniTaskControl.OnMessage and OnTerminated. Both support 'procedure of object' and 'reference to procedure' parameters.
  • New unit OtlSync contains (old) TOmniCS and IOmniCriticalSection together with (new) OmniMREW - very simple and extremely fast multi-reader-exclusive-writer - and atomic CompareAndSwap functions.
  • New unit OtlHooks contains API that can be used by external libraries to hook into OTL thread creation/destruction process and into exception chain.
  • All known bugs fixed.

New demos

  • 25_WaitableComm: Demo for ReceiveWait and SendWait.
  • 26_MultiEventMonitor: How to run multiple event monitors in parallel.
  • 27_RecursiveTree: Parallel tree processing.
  • 28_Hooks: Demo for the new hook system.
  • 29_ImplicitEventMonitor: Demo for OnMessage and OnTerminated, named method approach.
  • 30_AnonymousEventMonitor: Demo for OnMessage and OnTerminated, anonymous method approach.

A teaser from demo 30

procedure TfrmAnonymousEventMonitorDemo.btnHelloClick(Sender: TObject);
begin
btnHello.Enabled := false;
FAnonTask := CreateTask(
procedure (task: IOmniTask) begin
task.Comm.Send(0, Format('Hello, world! Reporting from thread %d',
[GetCurrentThreadID]));
end,
'HelloWorld')
.OnMessage(
procedure(const task: IOmniTaskControl; const msg: TOmniMessage) begin
lbLog.ItemIndex := lbLog.Items.Add(Format('%d:[%d/%s] %d|%s',
[GetCurrentThreadID, task.UniqueID, task.Name, msg.msgID,
msg.msgData.AsString]));
end)
.OnTerminated(
procedure(const task: IOmniTaskControl) begin
lbLog.ItemIndex := lbLog.Items.Add(Format('[%d/%s] Terminated',
[task.UniqueID, task.Name]));
btnHello.Enabled := true;
FAnonTask := nil;
end)
.Run;
end;

Friday, November 06, 2009

Do we need DelphiOverflow.com?

Today I was interviewed for the greatest Delphi podcast of them all and Jim asked me a question I didn’t know how to answer: “Do you think there should be Delphi equivalent of StackOverflow.com?” I’m afraid my answer was somewhere along: “Hmph. Yes. Very good question. Very good. Let’s talk about something else.”

And now I can’t get it out of my head. Should there be delphioverflow.com? What could we get out of it? I would be the first to admit that the StackOverflow model is greatest thing since Belgian waffles and that having Delphi questions and answers in such form would be very useful.

But wait – there already are Delphi questions on StackOverflow! Not that many as C# questions, but still enough that Delphi is seen on the front page and that other users can read about it and see that it is alive and well. Even more – there are enough knowledgeable Delphi programmers on SO and most questions get great answers in less than five minutes.

What other positive result could such site bring? Maybe Embarcadero people would be more eager to participate and answer questions on their own server? Maybe, but not sure. Delphi R&D team is very busy and sometimes they can’t even find time to answer newsgroup questions. And I’m pretty sure that - whatever such change would bring – newsgroups wouldn’t go away.

Let’s take a look from another perspective. What would be negative consequences? Less Delphi questions on StackOverflow. And that’s a Bad Thing because it lowers Delphi’s discoverability. We want to talk about Delphi in public places, not on some secluded server!

Now I know how to answer. No, I don’t think we need DelphiOverflow. We need more Delphi R&D people answering questions on StackOverflow.

(Your comments on the topic are very much welcome, as always!)

Wednesday, November 04, 2009

GpStuff 1.19 & GpLists 1.43

I’ll finish my short overview of changes in various Gp units with new GpStuff and GpLists.

Let’s deal with the latter first. There were only two changes. Firstly, Slice, Walk and WalkKV enumerators got the step parameter. Now Delphi is really as powerful as Basic!

Secondly, I’ve added method FreeObjects to the TStringList helper. It will walk the string list and free all associated objects – something that is not done automatically in the TStringList destructor. Very useful helper, if I can say so.

procedure TGpStringListHelper.FreeObjects;
var
iObject: integer;
begin
for iObject := 0 to Count - 1 do begin
Objects[iObject].Free;
Objects[iObject] := nil;
end;
end; { TGpStringListHelper.FreeObjects }

Changes in GpStuff were more significant.

There are new enumerator factories. EnumStrings allows you do do stuff like this:

for s in EnumStrings(['one', 'two', 'three']) do
// ...

EnumValues will do the same for integer arrays. EnumPairs is similar to EnumStrings but returns (key, value) pairs:

var
kv: TGpStringPair;

for kv in EnumPairs(['1', 'one', '2', 'two']) do
// k.key = '1', k.value = 'one'
// k.key = '2', k.value = 'two'

There is also EnumList, which enumerates lists of items (where the whole list itself is a string):

for s in EnumList('one,two,"one,two,three"', ',', '"') do
// s = 'one'
// s = 'two'
// s = 'one,two,three'

There were some changes in TGp4AlignedInt internals – now all values are integer, not cardinal (because underlying Windows implementation works with integers). There is also new function “Compare and Swap” (CAS) in TGp4AlignedInt and TGp8AlignedInt64 (which was previously called TGp8AlignedInt).

Finally, there are new interface and class - IGpTraceable and TGpTraceable.

type
IGpTraceable = interface(IInterface)
function GetTraceReferences: boolean; stdcall;
procedure SetTraceReferences(const value: boolean); stdcall;
function _AddRef: integer; stdcall;
function _Release: integer; stdcall;
function GetRefCount: integer; stdcall;
property TraceReferences: boolean read GetTraceReferences write SetTraceReferences;
end; { IGpTraceable }

TGpTraceable = class(TInterfacedObject, IGpTraceable)
private
gtTraceRef: boolean;
public
destructor Destroy; override;
function _AddRef: integer; stdcall;
function _Release: integer; stdcall;
function GetRefCount: integer; stdcall;
function GetTraceReferences: boolean; stdcall;
procedure SetTraceReferences(const value: boolean); stdcall;
property TraceReferences: boolean read GetTraceReferences write SetTraceReferences;
end; { TGpTraceable }

The TGpTraceable class helps me debug interface problems. It exposes GetRefCount function which returns reference count, and it can trigger debugger interrupt on each reference count change if TraceReferences property is set.

function TGpTraceable._AddRef: integer;
begin
Result := inherited _AddRef;
if gtTraceRef then
asm int 3; end;
end; { TGpTraceable._AddRef }

function TGpTraceable._Release: integer;
begin
if gtTraceRef then
asm int 3; end;
Result := inherited _Release;
end; { TGpTraceable._Release }
---Published under the Creative Commons Attribution 3.0 license

Monday, November 02, 2009

Read prefetch in GpHugeFile

There is only one big change in the latest GpHugeFile – read prefetch. Most people won’t need it at all and other will only need it occasionally, but for some people, sometimes, it will be a life saver.

The prefetch option is only useful when you read a file mostly sequentially from a relatively slow media. Useless? You never did that before? Did you ever played a video file from the network server or from the YouTube? Well, there you are!

Playing video files (especially HD) over network is not a trivial task. In some occasions (namely, slow networks or high bitrate files) the network speed is only slightly above the minimum required for the seamless video playout. Even more – the network speed is not constant because you share it with other users and at some times it may not be high enough to play the video without stuttering.

To solve this problem, video players use prefetch (or read-ahead) – they will read more data than required and use this buffer when the network slows down. Better said – video will always play from this buffer but the buffer size will vary depending on current network speed.

So how’s this typically done? One way is with a background thread that sequentially reads through the file and buffers the data and another is with asynchronous read operations. This very powerful approach is part of the standard ReadFileEx Win32 API and is relatively easy to use – you just start the read operation and some time later the system will notify you that the data is available. There are some problems, though, the biggest of them the requirement that your reading thread must be in a special alertable sleep state for this notification to occur.

The third option is not to use threads or asynch file ops, but to pass hfoPrefetch and hfoBuffered flags to the ResetEx. In the same call you can also set the number of prefetched buffers. As for the buffer size – it is also settable with a ResetEx parameters and will be rounded up to the next multiplier of the system page size (async file io requirement) or it will be set to 64 KB if you leave the parameter at 0.

When you se hfoPrefetch, TGpHugeFile will create background thread and this thread will issue asynchronous file io calls. Prefetched data is stored in a cache which is shared between the worker thread and the owner. Unfortunately for some, this option is only available in Delphi 2007 and newer because the worker object is implemented using OmniThreadLibrary.

Maybe you’ll wonder why the thread is not issuing normal synchronous reads? For two reasons – I didn’t want the thread to block reading data when owner executes a Seek (file repositioning will immediately tell the prefetcher that it should start reading from a different file offset) and I wanted to issue multiple read commands at the same time (namely 2).

Enough talk – if you want to learn more, look at the code. I’ll only give you the simplest possible demo:

program Project12;

{$APPTYPE CONSOLE}

uses
SysUtils,
GpHugeF;

var
hf : TGpHugeFile;
buf: array [1..65536] of byte;
bytesRead: cardinal;
bytesTotal: int64;

begin
hf := TGpHugeFile.Create(ParamStr(1));
try
if hf.ResetEx(1, 0, 0, 0, [hfoBuffered, hfoPrefetch]) <> hfOK then
Writeln('Fail!')
else begin
bytesTotal := 0;
repeat
hf.BlockRead(buf, SizeOf(buf), bytesRead);
Inc(bytesTotal, bytesRead);
until bytesRead = 0;
Writeln('Total bytes read: ', bytesTotal);
end;
finally FreeAndNil(hf); end;
Readln;
end.