Saturday, December 20, 2008

Christmas mystery

We were tracking down quite an interesting problem in the past few days …

We have this service that accepts connections from multiple computers and reports some status back. And it does that via SOAP. And the SOAP implementation is our internal.

So far so good – we are using this approach in quite some programs deployed in many places. Everything is working well.

Everything, except one installation.

That customer was reporting weird things. Mysterious things. Sometimes the client on one computer would for a moment display a same data as another computer. And then the display would flip back to the correct data. Mysterious.

As expected, the guy who wrote the service pointed his finger to the guy that wrote the SOAP layer (me). And, as expected, I did the reverse. We were pretty sure that the problem was caused by the buggy code written by the other guy. We were both wrong.

Of course we first spent about a working day logging various parts of the service and SOAP server. You know the drill – add logging, compile, connect VPN, upload the source to the customer, start two clients at two machines on the remote site via RDP, wait for the problem, download logs, analyze. Booooring. At the end, we were none smarter. We only knew that the server always returns correct data to all clients.

Then we switched our attention to the client. Who’d say, the client always received the correct data. Except that it sometimes displayed wrong data. But that data was never received via the SOAP layer. As I said before – mystery.

Of course, once we got to that point we knew that the problem lies inside the client software so we only have to dig in that direction. And, of course, we got the answer. And boy was it a surprising one!

You see, our client is somewhat baroque. Layers of layers of code that were developed over the years. It’s a fate of all successful applications and this one is quite successful, at least in the vertical market we are working in. It is therefore not very surprising that the client is using temporary files in a somewhat strange arrangement. Instead of creating temporary files with unique names, it creates a temporary folder inside the %temp% and stores files there. This folder is guaranteed to be unique on the system as it uses a global counter as a part of the folder name. IOW, first client on the computer stores data in %temp%\data_1, second in %temp%\data_2. So the client on the first computer stored remote data (returned from the service) in %temp%\data_1\list.txt and the client on the second computer used %temp%\data_1\list.txt. They were both using ‘data_1’ subfolder as they were both the first (and only) client instance on that machine. A recipe for disaster? Not really as the %temp% folder is local to the computer.

Except that it was not.

Somebody at the customer’s site got a brilliant idea and configured temp folders for all domain clients to point to the domain controller. Don’t know who and surely don’t know why, but as the result %temp% pointed to the same folder (on the domain controller computer) on all client computers in the domain. So the first client downloaded its data to the %temp% (on the domain server), second client downloaded its data to the same location and then the first client displayed that data – data that was already modified and which belonged to the second client. A typical race condition if I ever saw one.

The solution is, of course, to always use GetTempFileName. But this time we surely (re)learnt it in an interesting way.

Thursday, November 27, 2008

Advanced enumerators

The Blaise Pascal magazine #4 is out. Inside you can find a second part of my “enumerators” series (first page).

Monday, November 03, 2008

Background file scanning with OmniThreadLibrary

Yesterday, a reader asked if I can create a demo for the background file searcher, so here it is. To get the demo source, update your SVN copy or download this file.

The app has a simple interface.

demo 23 - initial

Enter the path to be searched and the file mask in the edit field and click Scan.

demo 23 - scanning

Program starts scanning and displays current folder and number of found files during the process. If you click the X button during the scan, it will be aborted and program will close.

When background scanning completes, list of found files is displayed in the listbox.

demo 23 - final

During the scanning, main thread is fully active. You can move the program around, resize it, minimize, maximize and so on.

Implementation

Let’s take a look at the application in design mode.

demo 23 - design

Besides the components that are visible at runtime, there is also a TOmniEventMonitor component (named OTLMonitor) and a TTimer (tmrDisplayStatus) on the form.

When the user clicks the Scan button, a background task is created.

procedure TfrmBackgroundFileSearchDemo.btnScanClick(Sender: TObject);
begin
FFileList := TStringList.Create;
btnScan.Enabled := false;
tmrDisplayStatus.Enabled := true;
FScanTask := CreateTask(ScanFolders, 'ScanFolders')
.MonitorWith(OTLMonitor)
.SetParameter('FolderMask', inpFolderMask.Text)
.Run;
end;

ScanFolders is the method that will do the scanning (in a background thread). We’ll return to it later. Task will be monitored with the OTLMonitor component so that we will receive task messages. OTLMonitor will also tell us when the task will terminate. Input folder and mask is send to the task as a parameter FolderMask and task is started.

The FFileList field is a TStringList that will contain a list of all found files.

Let’s ignore the scanner details for the moment and skip to the end of the scanning process. When task has completed its job, OTLMonitor.OnTaskTerminated is called.

procedure TfrmBackgroundFileSearchDemo.OTLMonitorTaskTerminated(
const task: IOmniTaskControl);
begin
tmrDisplayStatus.Enabled := false;
outScanning.Text := '';
outFiles.Text := IntToStr(FFileList.Count);
lbFiles.Clear;
lbFiles.Items.AddStrings(FFileList);
FreeAndNil(FFileList);
FScanTask := nil;
btnScan.Enabled := true;
end;

At that point, number of found files is copied to the outFiles edit field and complete list is assigned to the listbox. Task reference FScanTask is then cleared, which causes the task object to be destroyed and Scan button is reenabled (it was disabled during the scanning process).

But what if the user closes the program by clicking the X button while the background scanner is active? Simple, we just catch the OnFormCloseQuery event and tell the task to terminate.

procedure TfrmBackgroundFileSearchDemo.FormCloseQuery(Sender: TObject;
var CanClose: boolean);
begin
if assigned(FScanTask) then begin
FScanTask.Terminate;
FScanTask := nil;
CanClose := true;
end;
end;

The Terminate method will do two things – tell the task to terminate and then wait for its termination. After that, we simply have to clear the task reference and allow the program to terminate.

Scanner

Let’s move to the scanning part now. The ScanFolders method (which is the main task method, the one we passed to the CreateTask) splits the value of the FolderMask parameter into folder and mask parts and passes them to the main worker ScanFolder.

procedure ScanFolders(const task: IOmniTask);
var
folder: string;
mask : string;
begin
mask := task.ParamByName['FolderMask'];
folder := ExtractFilePath(mask);
Delete(mask, 1, Length(folder));
if folder <> '' then
folder := IncludeTrailingPathDelimiter(folder);
ScanFolder(task, folder, mask);
end;

ScanFolder first finds all subfolders of the selected folder and calls itself recursively for each subfolder. That means that we’ll first process deepest folders and then proceed to the top of the folder tree.

Then it sends a message MSG_SCAN_FOLDER to the main thread. As a parameter of this message it sends the name of the folder being processed. There’s nothing magical about this message – it is just an arbitrary numeric constant from range 0 .. 65534 (yes, number 65535 is reserved for internal use).

const
MSG_SCAN_FOLDER = 1;
MSG_FOLDER_FILES = 2;
procedure ScanFolder(const task: IOmniTask; const folder, mask: string);
var
err : integer;
folderFiles: TStringList;
S : TSearchRec;
begin
err := FindFirst(folder + '*.*', faDirectory, S);
if err = 0 then try
repeat
if ((S.Attr and faDirectory) <> 0) and (S.Name <> '.') and (S.Name <> '..') then
ScanFolder(task, folder + S.Name + '\', mask);
err := FindNext(S);
until task.Terminated or (err <> 0);
finally FindClose(S); end;
task.Comm.Send(MSG_SCAN_FOLDER, folder);
folderFiles := TStringList.Create;
try
err := FindFirst(folder + mask, 0, S);
if err = 0 then try
repeat
folderFiles.Add(folder + S.Name);
err := FindNext(S);
until task.Terminated or (err <> 0);
finally FindClose(S); end;
finally task.Comm.Send(MSG_FOLDER_FILES, folderFiles); end;
end;

ScanFolder then runs the FindFirst/FindNext/FindClose loop for the second time to search for files in the folder. [BTW, if you want to first scan folders nearer to the root, just change the two loops and scan for files first and for folders second.] Each file is added to an internal TStringList object which was created just a moment before. When folder scan is completed, this object is sent to the main thread as parameter of the MSG_FOLDER_FILES message.

This approach – sending data for one folder – is a compromise between returning the complete set (full scanned tree), which would not provide a good feedback, and returning each file as we detect it, which would unnecessarily put a high load on the system.

Both Find loops test the status of the task.Terminated function and exit immediately if it is True. That allows us to terminate the background task when the user closes the application and OnFormCloseQuery is called.

Receiving messages

That’s all that has to be done in the background task but we still have to process the messages in the main thread. For that, we write the OTLMonitor’s OnTaskMessage event.

procedure TfrmBackgroundFileSearchDemo.OTLMonitorTaskMessage(
const task: IOmniTaskControl);
var
folderFiles: TStringList;
msg : TOmniMessage;
begin
task.Comm.Receive(msg);
if msg.MsgID = MSG_SCAN_FOLDER then
FWaitingMessage := msg.MsgData
else if msg.MsgID = MSG_FOLDER_FILES then begin
folderFiles := TStringList(msg.MsgData.AsObject);
FFileList.AddStrings(folderFiles);
FreeAndNil(folderFiles);
FWaitingCount := IntToStr(FFileList.Count);
end;
end;

If the message is MSG_SCAN_FOLDER we just copy folder name to a local field. If the message is MSG_FOLDER_FILES, we copy file names from the parameter (which is a TStringList) to the global FFileList list and destroy the parameter. We also update a local field holding the number of currently found files.

So why don’t we directly update two edit fields on the form (one with current folder and another with number of found files)? Well, the background task can send many messages in one second (when processing folders will small number of files) and there’s no point in displaying them all – the user will never see what was displayed anyway. And it would slow down the GUI because Windows controls would be updated hundreds of times per second, which is never a good idea.

Instead of that we just store the strings to be displayed in two form fields and display them from a timer which is triggered three times per second. That will not show all scanned folders and all intermediate file count results, but will still provide the user with the sufficient feedback.

procedure TfrmBackgroundFileSearchDemo.tmrDisplayStatusTimer(Sender: TObject);
begin
if FWaitingMessage <> '' then begin
outScanning.Text := FWaitingMessage;
FWaitingMessage := '';
end;
if FWaitingCount <> '' then begin
outFiles.Text := FWaitingCount;
FWaitingCount := '';
end;
end;

And that’s all. Fully functional background file scanner that could easily be repackaged into a TComponent. And it’s all yours.

Sunday, November 02, 2008

OmniThreadLibrary 1.01

OmniThreadLibrary 1.01 has been released yesterday. It is available via SVN (http://omnithreadlibrary.googlecode.com/svn/tags/release-1.01) or as a ZIP archive.

Changes since version 1.0a:

  • *** Breaking interface change ***
    • IOmniTask.Terminated renamed to IOmniTask.Stopped.
    • New IOmniTask.Terminated that check whether the task has been requested to terminate. [demo 22]
  • [GJ] Redesigned stack cotainer with better lock contention.
  • [GJ] Totally redesigned queue container, which is no longer based on stack and allows multiple readers.
  • Full D2009 support; D2009 packages, project files and Tests project group.
  • Invoke-by-name and invoke-by-address messaging implemented [http://17slon.com/blogs/gabr/2008/10/erlangenizing-omnithreadlibrary.html, http://17slon.com/blogs/gabr/2008/10/omnithreadlibrary-using-rtti-to-call.html, demos 18 and 19]
  • Implemented CreateTask(reference to function (task: ITaskControl)). [D2009 only, demo 21]
  • Implemented blocking wait (ReceiveWait). [demo 19]
  • Added enumerator to the IOmniTaskGroup interface.
  • Implemented IOmniTaskGroup.RegisterAllWithTask and .UnregisterAllFromTask.
  • Added automatic comm unregistration for IOmniTaskGroup.RegisterAllCommWith.
  • Implemented IOmniTaskGroup.SendToAll.
  • IOmniTaskControl.Terminate now kills the task after the timeout.
  • New/updated tests/demos:
    • 10_Containers
      • 2 -> 2, 1 -> 4 and 4 -> 4 tests for stacks and queues.
      • [1, 2, 4] -> [1, 2, 4] full tests.
      • Writes CSV file with cumulative test results.
    • 17_MsgWait: demo for the .MsgWait decorator.
    • 18_StringMsgDispatch: Invoke demo.
    • 19_StringMsgBenchmark: Invoke benchmark, ReceiveWait demo.
    • 20_QuickSort: Parallel quicksort demo.
    • 21_Anonymous_methods: Anonymous methods demo (D2009 only).
    • 22_TerminationTest: Task termination demo.
  • Message ID $FFFF is now reserved for internal purposes.
  • Better default queue length calculation that takes into account OtlContainers  overhead and FastMM4 granulation.
  • Bug fixed: TOmniValue.Null was not really initialized to Null.
  • Bug fixed: Setting timer interval resets timer countdown.
  • Bug fixed: TOmniTaskControl.Schedule always scheduled task to the global thread pool.
  • Current versions of 3rd part units included.

Known problems:

  • Thread pool is not stable under high  load.

Thursday, October 30, 2008

When one and one makes one

Some time ago I promised to write an article on TGpJoinedStream class. A time has come to fulfill the promise.

TGpJoinedStream is a stream wrapper class – a class that wraps one or more existing streams and provides a TStream interface on the outside. My GpStreams unit contains several examples of such wrappers: TGpStreamWindow provides a stream access to a part of another stream, TGpScatteredStream provides contiguous access to a scattered data (hm, I never wrote an article about that one, didn’t I?), and TGpBufferedStream adds data caching to any stream.

TGpJoinedStream’s role is simple. You pass it a bunch of streams and it provides stream access to concatenated data. IOW, if first stream contains numbers <1, 2, 3> and second stream contains <4, 5>, joined stream would function as if it contains <1, 2, 3, 4, 5>.

Implementation is pretty straightforward and won’t describe it here. I'll rather show how TGpJoinedStream is used in practice and why I wrote it at all.

We have a MPEG parser class that takes a TStream containing MPEG video data and extracts some information from it. It works fine when you need to extract information from a video file, but not so good if you want to use it inside a DirectX filter. From a viewpoint of a DirectX source or sink filter, video is not a stream but a bunch of buffers, which we must process one by one. The trouble is that buffers can almost never be fully processed because MPEG sequences don’t contain data length.

In MPEG, each sequence starts with byte signature 00 00 01, followed by a byte that tells you what kind of data you’re processing, followed by some data. Unless you want to parse each and every sequence data and you know exactly how all well-formed and malformed sequences in the existence are built, you only know that you reached the end of one sequence when you reach another 00 00 01 signature. That’s why the last sequence in the buffer can never be processed (unless this is the last buffer of them all) until we find 00 00 01 in the next buffer. Uff. I hope somebody understands this at all.

In short, we are processing data buffers. A variable-length part of each buffer will stay unprocessed and will have to be prepended to the next buffer before it is passed through the MPEG parser. And so on, until the end of data.

And that’s where TGpJoinedStream comes to help. The unprocessed part is stored in a memory stream FLeftovers, which is empty at the beginning. Buffer parser concatenates FLeftovers with the current data and passes the result through the stream parser. When that one returns, .Position in the combined stream indicates the first unprocessed byte. The unprocessed data is then copied into the FLeftovers stream so it can be used next time the buffer parser is called.

That’s how it looks in code (slightly simplified real code; I just removed some details that are not interesting for our story).

procedure TGpMPEGSequentialParser.ParseNext(buffer: pointer; 
bufferSize: integer);
var
combinedStream: TGpJoinedStream;
mpegParser : TGpMPEGParser;
newLeftovers : TMemoryStream;
strBuffer : TGpFixedMemoryStream;
begin
strBuffer := TGpFixedMemoryStream.Create(buffer^, bufferSize);
try
newLeftovers := TMemoryStream.Create;
try
combinedStream := TGpJoinedStream.Create([FLeftovers, strBuffer]);
try
mpegParser := TGpMPEGParser.Create;
try
mpegParser.MPEGStream := combinedStream;

mpegParser.Parse(HandleMpegSequence);

if combinedStream.Position < combinedStream.Size then
newLeftovers.CopyFrom(combinedStream,
combinedStream.Size - combinedStream.Position);
finally FreeAndNil(mpegParser); end;
finally FreeAndNil(combinedStream); end;
finally
FreeAndNil(FLeftovers);
FLeftovers := newLeftovers;
end;
finally FreeAndNil(strBuffer); end;
end;


Hope you like it!

Tuesday, October 21, 2008

Internet, as predicted in 1946

“You know the logics setup. You got a logic in your house. It looks like a vision receiver used to, only it's got keys instead of dials and you punch the keys for what you wanna get. It's hooked in to the tank, which has the Carson Circuit all fixed up with relays. Say you punch "Station SNAFU" on your logic. Relays in the tank take over an' whatever vision-program SNAFU is telecastin' comes on your logic's screen. Or you punch "Sally Hancock's Phone" an' the screen blinks an' sputters an' you're hooked up with the logic in her house an' if somebody answers you got a vision-phone connection. But besides that, if you punch for the weather forecast or who won today's race at Hialeah or who was mistress of the White House durin' Garfield's administration or what is PDQ and R sellin' for today, that comes on the screen too. The relays in the tank do it. The tank is a big buildin' full of all the facts in creation an' all the recorded telecasts that ever was made—an' it's hooked in with all the other tanks all over the country—an' everything you wanna know or see or hear, you punch for it an' you get it. Very convenient. Also it does math for you, an' keeps books, an' acts as consultin' chemist, physicist, astronomer, an' tea-leaf reader, with a "Advice to the Lovelorn" thrown in. The only thing it won't do is tell you exactly what your wife meant when she said, "Oh, you think so, do you?" in that peculiar kinda voice. Logics don't work good on women. Only on things that make sense.”

- Murray Leinster, A Logic Named Joe

For Sci-Fi fans: Baen Free Library.

Tuesday, October 14, 2008

TDM Rerun #11: Shared Pools

Sometimes we run into problems because of a Windows feature, and most of the time this is a very good feature, that a file mapping (an essential part of my shared memory implementation) cannot exist on its own. A file mapping, like mutexes, events, and other Windows primitives, must have an owner. If all processes associated with a given file mapping die, the file mapping will be destroyed. Because in my shared memory implementation this file mapping is backed with a page file, its contents won’t be preserved on an accessible part of the disk.

- Shared Pools, The Delphi Magazine 95, July 2003

Shared pool architectureThe shared pool was one of my more baroque creations. In fact, it was so complicated that it was never used in a deployed application. Basically, the article described an architecture to implement a pool of shared memory objects, which multiple Writers could use to send data to one Reader (typically sitting in another process). The system also handled cleanup when a Reader task died and other management details.

You really should not be playing with this code. There are better solutions.

Links: article (PDF, 193 KB), source code (ZIP, 1.9 MB)

Tuesday, October 07, 2008

OmniThreadLibrary: Using RTTI to call task methods

Yesterday I wrote an article on by-name and by-address invocations in the new OmniThreadLibrary (development version) but I didn’t finish the description of the OTL internal magic that makes those new calls work. Let’s fix that …

I ended the story right at the point where TOmniTaskExecutor.Asy_DispatchMessages calls DispatchOmniMessage.

Most of the magic happens inside this method, so it’s only fair to display it in its full glory.

procedure TOmniTaskExecutor.DispatchOmniMessage(msg: TOmniMessage);
var
methodAddr : pointer;
methodInfoObj : TObject;
methodInfo : TOmniInvokeInfo absolute methodInfoObj;
methodName : string;
methodSignature: TOmniInvokeType;
msgData : TOmniValue;
obj : TObject;
begin
if msg.MsgID = COtlReservedMsgID then begin
Assert(assigned(WorkerIntf));
GetMethodNameFromInternalMessage(msg, methodName, msgData);
if methodName = '' then
raise Exception.Create('TOmniTaskExecutor.DispatchOmniMessage: Method name not set');
if not assigned(oteMethodHash) then
oteMethodHash := TGpStringObjectHash.Create(17, true); //usually there won't be many methods
if not oteMethodHash.Find(methodName, methodInfoObj) then begin
GetMethodAddrAndSignature(methodName, methodAddr, methodSignature);
methodInfo := TOmniInvokeInfo.Create(methodAddr, methodSignature);
oteMethodHash.Add(methodName, methodInfo);
end;
case methodInfo.Signature of
itSelf:
TOmniInvokeSignature_Self(methodInfo.Address)(WorkerIntf.Implementor);
itSelfAndOmniValue:
TOmniInvokeSignature_Self_OmniValue(methodInfo.Address)(WorkerIntf.Implementor, msgData);
itSelfAndObject:
begin
obj := msgData.AsObject;
TOmniInvokeSignature_Self_Object(methodInfo.Address)(WorkerIntf.Implementor, obj);
end
else
RaiseInvalidSignature(methodName);
end; //case methodSignature
end
else
WorkerIntf.DispatchMessage(msg);
end; { TOmniTaskExecutor.DispatchMessage }

Now let’s dissect it. The code first checks if  the message ID contains the magic value. If not, the message is dispatched as before by using Delphi’s Dispatch.

  if msg.MsgID = COtlReservedMsgID then begin
//...
end
else
WorkerIntf.DispatchMessage(msg);
end; { TOmniTaskExecutor.DispatchMessage }

Not interesting. Let’s take a look at the other path – when msg.MsgID is an internal message ID.

In this case, DispatchMessages extracts the method name from the message. If the caller passed a method name to the Invoke, then the job is simple – it must only be extracted from the TOmniInternalAddressMsg object. If, on the other hand, method pointer was used, the code calls MethodName to convert it to a (who would guess?) method name.

procedure TOmniTaskExecutor.GetMethodNameFromInternalMessage(const msg: TOmniMessage; var
msgName: string; var msgData: TOmniValue);
var
internalType: TOmniInternalMessageType;
method : pointer;
begin
internalType := TOmniInternalMessage.InternalType(msg);
case internalType of
imtStringMsg:
TOmniInternalStringMsg.UnpackMessage(msg, msgName, msgData);
imtAddressMsg:
begin
TOmniInternalAddressMsg.UnpackMessage(msg, method, msgData);
msgName := WorkerIntf.Implementor.MethodName(method);
if msgName = '' then
raise Exception.CreateFmt('TOmniTaskExecutor.GetMethodNameFromInternalMessage: ' +
'Cannot find method name for method %p', [method]);
end
else
raise Exception.CreateFmt('TOmniTaskExecutor.GetMethodNameFromInternalMessage: ' +
'Internal message type %s is not supported',
[GetEnumName(TypeInfo(TOmniInternalMessageType), Ord(internalType))]);
end; //case internalType
end; { TOmniTaskExecutor.GetMethodNameFromInternalMessage }

Next, the code looks into an internal hash table and tries to fetch information on that method name.  If there’s no such information (when some method is called for the first time), GetMethodAddrAndSignature is called to convert the name to the method’s address and signature. Then this information is stored in the hash table so it will be immediately ready next time.

if not oteMethodHash.Find(methodName, methodInfoObj) then begin
GetMethodAddrAndSignature(methodName, methodAddr, methodSignature);
methodInfo := TOmniInvokeInfo.Create(methodAddr, methodSignature);
oteMethodHash.Add(methodName, methodInfo);
end;

Method signature is then used to select the type of call that will be performed. Only three signatures are supported: (Self: TObject), (Self: TObject; const msgData: TOmniValue) and (Self: TObject; var obj: TObject). Demo 18 demonstrates the use of all three. What I find especially convenient is that the third option allows you to put any TObject descendant in the parameter list. In other words, you can do:

type
TAsyncHello = class(TOmniWorker)
published
procedure TheAnswer(var sl: TStringList);
end;

procedure TfrmTestStringMsgDispatch.btnSendObjectClick(Sender: TObject);
var
sl: TStringList;
begin
sl := TStringList.Create;
sl.Text := '42';
FHelloTask.Invoke(@TAsyncHello.TheAnswer, sl);
end;

procedure TAsyncHello.TheAnswer(var sl: TStringList);
begin
FreeAndNil(sl);
end;

Convenient, huh?

RTTI

The above description of the DispatchOmniMessage looks very much like the famous Sydney Harris cartoon. In step one a method name is retrieved from the message. In step three the method is invoked using the right signature and method pointer. In step two, well …

Sydney Harris

Let’s be more explicit, then. We have a method name and we want to find the address for that method and some information about its parameters. Sounds like a job for RTTI, yes? Well, basic RTTI (the one that you enable with {$M+} or {$TYPEINFO ON} or simply by declaring a published property) only handles published properties, not methods. For methods, we need to use extended RTTI, which is enabled with {$METHODINFO ON}. I won’t describe it here as David Glassborow and Hallvard Vassbotn already did the job better than I could. If you’re interested in extended RTTI, I suggest that your research starts at Hallvard’s David Glassborow on extended RTTI article.

Great thanks for publishing all that info, guys, it helped a lot!

GetMethodAddrAndSignature first uses Delphi’s ObjAuto unit to extract method information header. Then it uses TObject’s MethodAddress to convert method name into address. Of course, it doesn’t use TObject directly, as it has no idea where the methods belonging to the task object are stored – it must call MethodAddress on your task object directly.

  methodInfoHeader := ObjAuto.GetMethodInfo(WorkerIntf.Implementor, methodName);
methodAddress := WorkerIntf.Implementor.MethodAddress(methodName);

After that, some sanity checks are run, which I won’t reprint here.

Then the method signature is checked. First we want to ensure that we’re dealing with a procedure, not a function.

  if assigned(methodInfoHeader.ReturnInfo.ReturnType) then
raise Exception.CreateFmt('TOmniTaskExecutor.DispatchMessage: ' +
'Method %s.%s must not return result',
[WorkerIntf.Implementor.ClassName, methodName]);

At the end, a messy fragment of code walks over the parameter description list for this method and checks that first parameter is a class reference (because this method is a part of a class, it has a hidden Self parameter at the beginning) and that the second parameter, if present at all, is either const data: TOmniValue or var obj: TObject (or descendant of TObject). Detected signature is then returned in the methodSignature parameter.

  // only limited subset of method signatures is allowed:
// (Self), (Self, const TOmniValue), (Self, var TObject)
headerEnd := cardinal(methodInfoHeader) + methodInfoHeader^.Len;
params := PParamInfo(cardinal(methodInfoHeader) + SizeOf(methodInfoHeader^)
- CShortLen + SizeOf(TReturnInfo) + Length(methodInfoHeader^.Name));
paramNum := 0;
methodSignature := itUnknown;
// Loop over the parameters
while cardinal(params) < headerEnd do begin
Inc(paramNum);
paramType := params.ParamType^;
if paramNum = 1 then
if (params^.Flags <> []) or (paramType^.Kind <> tkClass) then
RaiseInvalidSignature(methodName)
else
methodSignature := itSelf
else if paramNum = 2 then
//code says 'const' but GetMethodInfo says 'pfVar' :(
if (params^.Flags * [pfConst, pfVar] <> []) and (paramType^.Kind = tkRecord) and
(SameText(paramType^.Name, 'TOmniValue'))
then
methodSignature := itSelfAndOmniValue
else if (params^.Flags = [pfVar]) and (paramType^.Kind = tkClass) then
methodSignature := itSelfAndObject
else
RaiseInvalidSignature(methodName)
else
RaiseInvalidSignature(methodName);
params := params.NextParam;
end;

It looks messy and it is messy, but it does the job. If you have problems understanding this code, I’d recommend stepping over it with the debugger.

Benchmarks

In test 19 I implemented some benchmarking code to find out how much this approach is slower than the standard Comm.Send(msg, data). It turned out that not very much, thanks to the built-in caching.

image

Integer methods dispatching is about twice as fast as the string/pointer dispatching. That’s completely acceptable speed for something that is executed only few (thousand) times in the program’s lifetime. If your program model is based on sending millions and millions of do computation messages to the worker thread then it is doomed since the beginning anyway.

That would be enough for today, hope you liked it and stay well since the next time. Bye!

Monday, October 06, 2008

Erlangenizing the OmniThreadLibrary

Few days ago I “discovered” Erlang. [Such things happen when you read StackOverflow obsessively (sigh).] I started with Wikipedia and proceeded with the Pragmatic Programmer book – but that’s not really important. I wanted to talk about Erlang’s built-in concurrency support.

Some things are very similar to the OTL (if we take into account that Erlang is a functional language and Delphi is not) and some are different but one immediately jumped to my attention. In Erlang, the message recipient doesn’t use numeric code to discover what message it has received; instead of that, whole message is matched to a programmer provided pattern (or patterns). The interesting thing is that usually (by convention) the first element in the message is an atom (a name, a sequence of characters, if you want). That got me thinking … Why do we send integer messages in the OTL, anyway?

The Windows Way

From the very beginnings, OmniThreadLibrary tried to make programmer’s life simple. Well, at least simpler. One of the helping hands it offered was was simplified message processing. In the traditional Windows thread programming you have to wait for various objects to become signaled with WaitForMultipleObjects and then proceed accordingly. In practice that means that thread’s main logic is centralized in one very big method that handles all those events, mutexes and other stuff that can be waited upon.

OTL helps by implementing this logic internally (at least for workers that implement IOmniWorker interface). Instead of using kernel primitives, task owner sends messages to the task’s message queue. Messages are processed somewhere inside the OTL (specifically OtlTaskControl.pas/TOmniTaskExecutor.Asy_DispatchMessages) and are converted into method calls with Delphi’s Dispacth mechanism. That’s the same mechanism that makes sure that Windows messages are “converted” into Delphi methods and it requires that first two bytes of the dispatched message contain message ID. That’s why (until now) the recommended way to send a message to task was:

const
MSG_CHANGE_MESSAGE = 1;
MSG_SEND_MESSAGE = 2;

type
TAsyncHello = class(TOmniWorker)
strict private
aiMessage: string;
public
function Initialize: boolean; override;
procedure OMChangeMessage(var msg: TOmniMessage); message MSG_CHANGE_MESSAGE;
procedure OMSendMessage(var msg: TOmniMessage); message MSG_SEND_MESSAGE;
end;

FHelloTask: IOmniTaskControl;

FHelloTask.Comm.Send(MSG_CHANGE_MESSAGE, 'Random ' + IntToStr(Random(1234)));

[You can read more about this approach in OmniThreadLibrary Example #4: Bidirectional communication, the OTL way.]

The Erlang Way

This approach simplifies writing threaded code – at least the one that doesn’t depend heavily on shared data structures. But there’s still some room for improvement. For example, do we really have to use numeric messages, which have to be declared in advance. Why couldn’t the task controller just tell the task to execute the OMChangeMessage method?

To cut the long story short – this is now possible. Yesterday I committed a set of OTL modifications that allow you to do this:

type
TAsyncHello = class(TOmniWorker)
published
procedure Change(const data: TOmniValue);
end;

FHelloTask: IOmniTaskControl;

FHelloTask.Invoke('Change', 'Random ' + IntToStr(Random(1234)));

Yes, the Change method is published here. This is important.

Simple, huh? There’s a small problem, though – there are no compile-time checks. The code sends a string and compiler can do nothing to verify validity of this string. If the name was mistyped, you’d only notice it during the program execution.

To fix this problem, OTL allows another form of method invocation which uses a method address instead of  the name.

FHelloTask.Invoke(@TAsyncHello.Change, 'Random ' + IntToStr(Random(1234)));

In this case the compiler can check your typing, but still it won’t catch all problems – for example, the following code will compile and then raise exception during the execution.

procedure TfrmTestStringMsgDispatch.btnTestInvalidMsgClick(Sender: TObject);
begin
if cbStringMessages.Checked then
// will fail, FooBar method is not defined
FHelloTask.Invoke('FooBar')
else
// will fail, can only invoke methods from the task's class
FHelloTask.Invoke(@Self.btnTestInvalidMsg);
end;

[All new functionality is exposed in new demo 18_StringMsgDispatch.]

Implementation

To understand how the Invoke is implemented, it’s best to trace one such call. First we see that Invoke  gets converted into a normal message.

procedure TOmniTaskControl.Invoke(const msgMethod: pointer; msgData: TOmniValue);
begin
Comm.Send(TOmniInternalAddressMsg.CreateMessage(msgMethod, msgData));
end; { TOmniTaskControl.Invoke }

class function TOmniInternalAddressMsg.CreateMessage(const msgMethod: pointer; msgData:
TOmniValue): TOmniMessage;
begin
Result := TOmniMessage.Create(COtlReservedMsgID,
TOmniInternalAddressMsg.Create(msgMethod, msgData));
end; { TOmniInternalAddressMsg.CreateMessage }

This message has message ID COtlReservedMsgID (which is equal to $FFFF, so from now on please don’t use this message for you purposes). Message data field contains object which wraps method name and message data that was passed to the Invoke. Similar code is executed when Invoke is called with the method pointer parameter.

OK, so that’s how method name travels from the task controller to the task itself by using standard communication channel. But that is only half of the story … the simpler part!

On the receiving side, TOmniTaskExecutor.Asy_DispatchMessages detects new message and calls DispatchOmniMessage to process it.

if awaited = idxFirstMessage then
gotMsg := task.Comm.Receive(msg)
else begin
oteInternalLock.Acquire;
try
gotMsg := (oteCommList[awaited - idxFirstMessage - 1] as
IOmniCommunicationEndpoint).Receive(msg);
finally oteInternalLock.Release; end;
end;
if gotMsg and assigned(WorkerIntf) then
DispatchOmniMessage(msg);

Now that’s where the things start to get really interesting as we have to use RTTI and even extended RTTI to call the appropriate method. It is also the point where I’ll cut the story short. This article is already very long, maybe even too long, and I have much more to say on the subject. Expect part II to be published in few days. [Of course, if you’re curious you can just look into the code to see how DispatchOmniMessage is implemented!]

Test it!

The newest OmniThreadLibrary code is only available in the repository. No snapshots this time.

I’d still be immensely grateful to anybody that will test the new functionality and provide me with his thoughts on this approach.

Friday, October 03, 2008

Bulk update

Delphi 2009 was released so now it’s time to publish updates to my various units … In this article I’m just publishing the short changelog; you can expect more details on some enhancements (TGpJoinedStream, 4- and 8- aligned integer, Unicode support in GpStructuredStorage) in the near future.

DSiWin32 1.41

  • Compatible with Delphi 2009.
  • Created DSiInterlocked*64 family of functions by copying the code from http://qc.borland.com/wc/qcmain.aspx?d=6212. Functions were written by Will DeWitt Jr and are included with permission.
  • New functions DSiCopyFileAnimated, DSiConnectToNetworkResource, DSiIsHtmlFormatOnClipboard, DSiGetHtmlFormatFromClipboard, DSiCopyHtmlFormatToClipboard, DSiYield.
  • Added constants FILE_LIST_DIRECTORY, FILE_SHARE_FULL, FILE_ACTION_ADDED, FILE_ACTION_REMOVED, FILE_ACTION_MODIFIED, FILE_ACTION_RENAMED_OLD_NAME, FILE_ACTION_RENAMED_NEW_NAME.
  • Forced {$T-} as the code doesn't compile in {$T+} state.
  • Bug fixed: It was not possible to use DSiTimeGetTime64 in parallel from multiple threads

GpHugeFile 5.05a

  • Optimization: Under some circumstances, lots of unnecessary SetFilePointer calls were made.

GpLists 1.41

  • Works with Delphi 2009.

GpStreams 1.25

  • Added TGpJoinedStream class.
  • Span-storing class can now be modified via TGpScatteredStream.SpanClass.
  • TGpScatteredStream's AddSpan and AddSpanOS now return span offset in the span list.
  • Added bunch of BE_ overloads to the TGpStreamEnhancer class.
  • Small optimization in KeepStreamPositionWrapper destructor.

GpStructuredStorage 2.0

  • Works with Delphi 2009.
  • Added Unicode support to the underlying storage. File and folder names are stored in UTF-16, as are attribute names and values. API is still based on Delphi's 'string' type - meaning that values are converted to Unicode and back on the fly using the current locale.
  • Existing structured storage files will be upgraded automatically if they are not open for readonly access. Applications, compiled with GpStructuredStorage 1.x will not be able to read new/upgraded files. Version is incremented to 2.0.0.0 when 1.x file is opened unless it is opened in readonly mode. Newly created storages have version 2.0.0.0.

GpStuff 1.13

  • Implemented 4-aligned integer, TGp4AlignedInt.
  • Implemented 8-aligned integer, TGp8AlignedInt.
  • Added function OpenArrayToVarArray, written by Thomas Schubbauer.
  • ReverseWord/ReverseDWord rewritten in assembler (by GJ).
  • Declared MaxInt64 constant.

GpSync 1.21

  • Added optional external message counter to the message queue.

GpTextFile 4.01

  • Added TGpTextFile.Write(ws: WideString) and TGpTextFile.Writeln(s: string) overloads.
  • TGpTextFile.Write[ln] string parameters made 'const'.
  • Bug fixed [found by AKi]: TGpTextFile.Write was not working when using CP_UTF8 codepage.

GpTextStream 1.06

  • Works with Delphi 2009.
  • Exported StringToWideString, WideStringToString, and GetDefaultAnsiCodepage.

GpVersion 2.02

  • Added another CreateVersion overload.
  • Extended IVersion interface with IsHigherThan and IsLowerThan.

Saturday, September 20, 2008

Processing Windows messages in OmniThreadLibrary

In all my ravings about the OmniThreadLibrary I’ve never discussed the .MsgWait decorator – and partly because of that it was not even working in the 1.0 release :-(. Time to fix that!

Today I’ll show you how simple is to make a synchronous wrapper around the asynchronous component (using OTL, of course). The code itself is not an OTL test project but a unit I’m using to synchronously download web pages. Where’s the problem at all, you’ll say? Indy is synchronous and so is WinInet. True, but I’m using ICS, which is a completely asynchronous solution.

[For the readers who have no idea what I’m talking about: Synchronous means program calls the code, code executes for some time and then returns with web page. Asynchronous means: program calls the code and the code returns immediately. Somewhere in the background, web page is being retrieved and when it is fully transferred, your program gets notified in some way. Second way is more flexible, allows you to do more things in parallel, doesn’t block the UI but is more complicated to use.]

Sometimes (usually for quick and dirty tests) I need to retrieve a web page without having to think about messages and events and what will be called in which moment. For such times I put together a simple unit which wraps ICS stuff in a thread. It is called GpHttp and will be available on my web in some time. For now, you can download it here. You’ll also need ICS v6 (and it should work with ICS v5 if you remove Overbyte prefix from used units and classes).

Interface

I wanted to keep the interface really simple. The GpHttp unit exports only two functions, one to execute GET and another POST request. Both accept username and password and both return status code (200 if everything went OK), status text and page contents. In case of a socket problem, WinSock error code is returned in statusCode. (HTTP codes are always below 1,000 and WinSock codes are always above 10,000 so there is no possibility for collision.)

function GpHttpGet(const url, username, password: string; 
var statusCode: integer; var statusText, pageContents: string): boolean;

function GpHttpPost(const url, username, password, postData: string;
var statusCode: integer; var statusText, pageContents: string): boolean;

Both functions are simple wrappers around the third (local) function that executes any HTTP request.

function GpHttpGet(const url, username, password: string; 
var statusCode: integer; var statusText, pageContents: string): boolean;
begin
Result := GpHttpRequest(url, username, password, 'GET', '', statusCode,
statusText, pageContents);
end; { GpHttpGet }

function GpHttpPost(const url, username, password, postData: string;
var statusCode: integer; var statusText, pageContents: string): boolean;
begin
Result := GpHttpRequest(url, username, password, 'POST', postData,
statusCode, statusText, pageContents);
end; { GpHttpPost }

The real work starts in the GpHttpRequest method. First it creates a worker object and passes request data to it. The reference to the object (or more correct – to the interface implemented by this object) is stored in the worker variable. We’ll need it at the end to retrieve the status code and page contents.

Next, the task is created and run. Notice the call to .MsgWait which instructs the internal thread loop to process Windows messages (more on that later).

Next, we wait for the task to terminate. We’ll only be waiting up to 30 seconds (a constant in code which could be easily changed into a caller-provided parameter). If task is not finished in 30 seconds, we’ll terminate it and return False. Otherwise, we’ll retrieve status code, text and page contents from the worker object via the Implementor trick. (A better way would be to write a IGpHttpRequest interface supporting StatusCode, StatusText and PageContents properties, but sometimes I’m just too lazy …)

function GpHttpRequest(const url, username, password, request, 
postData: string; var statusCode: integer; var statusText,
pageContents: string): boolean;
var
task : IOmniTaskControl;
worker: IOmniWorker;
begin
worker := TGpHttpRequest.Create(url, username, password, request, postData);
task := CreateTask(worker, 'GpHttpRequest').MsgWait.Run;
Result := task.WaitFor(CGpHttpRequestTimeout_sec * 1000);
if not Result then
task.Terminate
else begin
statusCode := TGpHttpRequest(worker.Implementor).StatusCode;
statusText := TGpHttpRequest(worker.Implementor).StatusText;
pageContents := TGpHttpRequest(worker.Implementor).PageContents;
end;
end; { GpHttpRequest }

Background Task

The real work is done in the background task [as expected, doh, why are you even mentioning that??]. In the Initialize method, to be more specific.

function TGpHttpRequest.Initialize: boolean;
begin
hrHttpClient := THttpCli.Create(nil);
try
hrHttpClient.NoCache := true;
hrHttpClient.RequestVer := '1.1';
hrHttpClient.URL := hrURL;
hrHttpClient.Username := hrUsername;
hrHttpClient.Password := hrPassword;
hrHttpClient.FollowRelocation := true;
if hrUsername <> '' then
hrHttpClient.ServerAuth := httpAuthBasic;
hrHttpClient.SendStream := TStringStream.Create(hrPostData);
hrHttpClient.RcvdStream := TStringStream.Create('');
hrHttpClient.OnRequestDone := HandleRequestDone;
if SameText(hrRequest, 'GET') then
hrHttpClient.GetASync
else if SameText(hrRequest, 'POST') then
hrHttpClient.PostASync
else
raise Exception.CreateFmt(
'TGpHttpRequest.Initialize: Unknown request type %s',
[hrRequest]);
Result := true;
except
on E:ESocketException do begin
hrStatusCode := -1;
hrStatusText := E.Message;
Result := false;
end;
end;
end; { TGpHttpRequest.Initialize }

The code first instantiates a THttpCli object. THttpCli is the ICS’s HTTP client class used to execute HTTP requests asynchronously or synchronously. Here I’m using asynchronous mode because a) THttpCli.GetSync doesn’t have configurable timeout and b) it calls Application.ProcessMessages, which is not a good idea if you want to run your requests from a background thread (which I did).

Next the code initializes bunch of THttpCli paramers and calls either GetASync or PostASync. If get or post cause an exception, it is remapped to status code and text and Initialize returns False. That signals the IOmniWorker main loop that it should not proceed with the task execution.

Now we have reached the asynchronous phase. ICS is working in the background and all other threads are just waiting for it to finish. Because ICS’s architecture is message based, somebody must process Windows messages which flow through the ICS itself. That’s why we had to use .MsgWait decorator when preparing the task.

Some time later (possibly before the timeout occurs), THttpCli will fully download the web page and call the HandleRequestDone event handler. Here we’ll just copy relevant data into task’s internal fields and then the code will tell the task (i.e. itself) to terminate.

procedure TGpHttpRequest.HandleRequestDone(sender: TObject; 
rqType: THttpRequest; error: word);
begin
if error <> 0 then begin
hrStatusCode := error;
hrStatusText := 'Socket error';
end
else begin
hrPageContents := TStringStream(hrHttpClient.RcvdStream).DataString;
hrStatusCode := hrHttpClient.StatusCode;
hrStatusText := hrHttpClient.ReasonPhrase;
end;
Task.Terminate;
end; { TGpHttpRequest.HandleRequestDone }

In the cleanup code (which is, same as Initialize, called automatically from the IOmniWorker main loop) we just have to tear down internal objects.

procedure TGpHttpRequest.Cleanup;
begin
if assigned(hrHttpClient) then begin
hrHttpClient.RcvdStream.Free;
hrHttpClient.SendStream.Free;
FreeAndNil(hrHttpClient);
end;
end; { TGpHttpRequest.Cleanup }

And that’s all, folks. It really is that simple.

MsgWait

In case you want to learn more about OTL internals, read ahead …

Let’s take a short look at the .MsgWait implementation. The function itself just sets two internal fields and returns the object itself so that we can chain another method to it.

function TOmniTaskControl.MsgWait(wakeMask: DWORD): IOmniTaskControl;
begin
Options := Options + [tcoMessageWait];
otcExecutor.WakeMask := wakeMask;
Result := Self;
end; { TOmniTaskControl.MsgWait }

The hard work is done in TOmniTaskExecutor.Asy_DispatchMessages. If the tcoMessageWait option is set, the MsgWaitForMultipleObjectsEx will also wait for Windows messages (in addition to everything else it does) because it will receive non-null waitWakeMask. When a message is detected, the code will call ProcessThreadMessages method which simply peeks and dispatches all Windows messages (and Delphi’s internal message dispatch mechanism takes care of all the rest).

if tcoMessageWait in Options then
waitWakeMask := WakeMask
else
waitWakeMask := 0;
//...
awaited := MsgWaitForMultipleObjectsEx(numWaitHandles, waitHandles,
cardinal(timeout_ms), waitWakeMask, flags);
//...
else if awaited = (WAIT_OBJECT_0 + numWaitHandles) then //message
ProcessThreadMessages

procedure TOmniTaskExecutor.ProcessThreadMessages;
var
msg: TMsg;
begin
while PeekMessage(Msg, 0, 0, 0, PM_REMOVE) and (Msg.Message <> WM_QUIT) do begin
TranslateMessage(Msg);
DispatchMessage(Msg);
end;
end; { TOmniTaskControl.ProcessThreadMessages }

The Asy_DispatchMessages is probably the most complicated part of the OTL (once you understand the lock-free structures inside the OtlContainers ;) Once you understand how it works you’ll be fully prepared to write custom thread loops in highly specialized threaded code. But don’t worry, you can use OTL even if you don’t understand the magic hidden inside.

Saturday, September 06, 2008

Interfacing with external programs

Say you have a program. A popular program (at least in some circles) that other people want to write add-ons for. Your program does some job and in some processing steps you want to be able to execute some external code and then proceed according to the result returned by that code. Similar to the way CGI code is executed when a HTTP server processes a HTTP request.

The other day I was trying to enumerate all possible ways to do that. I found following solutions:

  • Running external program. Data can be passed in program’s standard input and read from its standard output – just like when CGI program is launched from the HTTP server. Simple, stable (it is simple to protect against malfunctioning add-ons), but quite slow.
  • Third-party DLL. Relatively simple, very fast, but can seriously destabilize the whole product. Complicated to upgrade the DLL (must shut down main application to upgrade add-on).
  • [D]COM[+]. Not my bag of Swedish … sorry, wrong movie. Definitely not the way I’d like to pursue. Unstable. Leads to problems that nobody seems to be able to troubleshoot.
  • Windows messages. Messy. Plus the main program runs as a service while add-on maybe wouldn’t.
  • TCP. Implement add-on as a text-processing TCP/IP service (another HTTP server, if we continue the CGI analogy). Interesting idea, but not very simple to implement. Fast when both are running on the same machine. Flexible – each can be shutdown and upgraded independently; processing can be distributed over several computers. Complicated to configure when multiple add-ons are installed (each must be configured to a different port). Firewall and antivirus software may cause problems.
  • Drop folder. Main app drops a file into some folder and waits for the result file. Clumsy, possibly faster than the external program solution (add-on can be always running), simple to implement and very stable.
  • Message queues (as in the MSMQ). Interesting but possibly too complicated for most customers to install and manage.

And now to the main point of my ramblings. What did I miss? Are there more possibilities? If you have any idea how to approach my problem from a different direction, leave me a comment. Don’t mention SOAP, BTW, it is implicitly included in the “add-on as a TCP server” solution.

And thanks!

Wednesday, September 03, 2008

OmniThreadLibrary patterns - How to (not) create a task

From the first public release two months ago (or is it closer to three already?), OmniThreadLibrary has been constantly changing. Only with the 1.0 release it has reached a somewhat stable state. That flux was good as it was only through the development of the OTL that I discovered how to do some things properly (maybe I was not so wrong in Why is software third time lucky?, after all). But it was also bad as some introductionary topics that I wrote don't accurately reflect the current state anymore. Yes, I know, I should start writing good tutorials. Not fun :( I like writing blog articles more. Today I'll focus on task creation.

OTL offers three different ways to create a task (and a fourth one will be introduced when Delphi 2009 is available). Those three ways are mapped to three overloads of the CreateTask method (OtlTaskControl.pas).

  function CreateTask(worker: TOmniTaskProcedure; 
const taskName: string = ''): IOmniTaskControl; overload;
function CreateTask(worker: TOmniTaskMethod;
const taskName: string = ''): IOmniTaskControl; overload;
function CreateTask(const worker: IOmniWorker;
const taskName: string = ''): IOmniTaskControl; overload;

The simplest possible way (demoed in test application 2_TwoWayHello; see OmniThreadLibrary Example #3: Bidirectional communication) is to pass a name of a global procedure to the CreateTask. This global procedure must consume one parameter of type IOmniTask (it was described in the OmniThreadLibrary internals - OtlTask article, which is still mostly valid).

CreateTask(RunHelloWorld, 'HelloWorld').Run;

procedure RunHelloWorld(const task: IOmniTask);
begin
end;

A variation on the theme is passing a name of a method to the CreateTask. This approach is used in the test application 1_HelloWorld. The interesting point to make here is that you can declare this method in the same class from which the CreateTask is called. That way you can access the class fields and methods from the threaded code. Just keep in mind that you'll be doing this from another thread so make sure you know what you're doing!

procedure TfrmTestHelloWorld.RunHelloWorld(const task: IOmniTask);
begin
end;

For all except the simplest tasks, you'll use the third approach, because it will give you access to the true OTL power (internal wait loop and message dispatching). To use it, you have to create a worker object, which implements the IOmniWorker interface. An example from the 5_TwoWayHello_without_loop demo:

procedure TfrmTestTwoWayHello.actStartHelloExecute(Sender: TObject);
var
worker: IOmniWorker;
begin
worker := TAsyncHello.Create;
FHelloTask :=
OmniEventMonitor1.Monitor(CreateTask(worker, 'Hello')).
SetTimer(1000, MSG_SEND_MESSAGE).
SetParameter('Delay', 1000).
SetParameter('Message', 'Hello').
Run;
end;

As you're exposing this worker via interface, Delphi will manage its lifetime. When the task is terminated, worker instance will be destroyed too.

Because of the same reason, there's no need to create the worker class in advance. From the demo 7_InitTest:

task := OmniEventMonitor1.Monitor(CreateTask(TInitTest.Create(success), 'InitTest'))
.SetPriority(tpIdle).Run;

There's a catch, though. Because the CreateTask expects an interface, you must never pass to it a variable or field that is declared as a object. This will not work:

procedure TfrmTestTwoWayHello.actStartHelloExecute(Sender: TObject);
var
worker: TAsyncHello;
begin
worker := TAsyncHello.Create;
FHelloTask :=
OmniEventMonitor1.Monitor(CreateTask(worker, 'Hello')).
SetTimer(1000, MSG_SEND_MESSAGE).
SetParameter('Delay', 1000).
SetParameter('Message', 'Hello').
Run;
end;

If you need to store some information inside the worker during its execution and access it from the owner, you can use the Implementor property, which will "convert" IOmniTask interface back to the implementing object (which you must then cast to your worker class). From the demo 6_TwoWayHello_with_object_worker:

procedure TfrmTestTwoWayHello.actStopHelloExecute(Sender: TObject);
begin
FHelloTask.Terminate;
FHelloTask := nil;
lbLog.ItemIndex := lbLog.Items.Add(Format('%d Hello World''s sent',
[TAsyncHello(FWorker.Implementor).Count]));
end;

BTW, don't just set task reference (FHelloTask in the example above) to nil when you want to terminate the task. You have to call Terminate or use some other way to notify the task that it must terminate.

BTW2, if you use a parameterless constructor inside the CreateTask, you have to put empty parenthesis after the .Create or Delphi would not compile your code. I'm not really sure, why. The following segment is from test 14_TerminateWhen:

Log(Format('Task started: %d',
[CreateTask(TMyWorker.Create())
.TerminateWhen(FTerminate).WithCounter(FCounter)
.MonitorWith(OmniTED).Run.UniqueID]));

That completes today's tour.

In the introduction I mentioned the fourth CreateTask overload that will be introduced when Delphi 2009 is ready. OmniThreadLibrary will support anonymous methods so you'll be able to do something like that:

CreateTask(procedure begin MessageBeep(MB_ICONEXCLAMATION); end);

Tuesday, September 02, 2008

OmniThreadLibrary 1.0a released

This is basically a bugfix release. There was a problem with the .MsgWait decorator which I fixed in the repository three days ago. As I want to write about this function soon, I thought it would be nice to make a release first ...

Other changes:

  • TGp4AlignedInt from GpStuff.pas is supposedly D2006 compatible (says a reader) so it has been enabled on that plaform. As a result, it is now possible to compiled and use OTL in D2006 (said the same source).
  • SpinLock.pas has been updated.
  • Test 6 has been changed to show why one would want to use IOmniWorker.Implementor function. I'll write more about that very soon.
  • I've included parts of the FastMM4 package into the repository to simplify debugging. FastMM4 was created by Pierre le Riche and is not covered by the OmniThreadLibrary license. It is released under a dual licensed and you can use it either under the MPL 1.1 or LGPL 2.1. More details in the included readme file and on the FastMM4 home page.

Monday, September 01, 2008

Introduction to enumerators

The Blaise Pascal magazine #3 is out. Lots of content there, including my Introduction to enumerators article (first page).

Wednesday, August 27, 2008

OmniThreadLibrary 1.0, day 1

First 24 hours totally exceeded my expectations.

day 1 downloads

Now start using it! Big Grin

Tuesday, August 26, 2008

OmniThreadLibrary 1.0 released

After 164 repository commits and two months of development, OmniThreadLibrary 1.0 was released today.

There were little changes since the 1.0 beta, mostly regarding the TOmniValue record, which can now seamlessly hold Double, Extended and Boolean values.

OmniThreadLibrary 1.0 is available via SVN (http://omnithreadlibrary.googlecode.com/svn/tags/release-1.0) or as a ZIP archive.

If you like OmniThreadLibrary, spread the word around. After all, not every Delphi developer is reading my blog (yet ;). Feel free to include any parts of the marketing blurb (below).

What is OmniThreadLibrary?

OmniThreadLibrary is simple to use threading library for Delphi. It's main "selling" points (besides the price, of course ;) are power, simplicity, and openess. With just few lines of code, you can set up multiple threads, send messages between them, process Windows messages and more. OmniThreadLibrary doesn't limit you in any way - if it is not powerfull enough for you, you can ignore any part of its "smartness" and replace it with your own code.

OmniThreadLibrary is an open source project. It lives in the Google Code and is licensed under the BSD license.

At the moment, OmniThreadLibrary supports Delphi 2007 on the Win32 platform. It will support Delphi 2009 as soon as it's available. Currently, there are no plans to support older Delphi compilers and .NET.

Where can I get more imformation?

Home page: http://otl.17slon.com/

Web discussion forum: http://otl.17slon.com/forum/

Downloads: http://code.google.com/p/omnithreadlibrary/downloads/list

Issue tracker: http://code.google.com/p/omnithreadlibrary/issues/list

SVN checkout instructions: http://code.google.com/p/omnithreadlibrary/source/checkout

Author's blog: http://thedelphigeek.com

Author's home page: http://gp.17slon.com

Friday, August 22, 2008

OmniThreadLibrary 1.0 beta

Hello, my dear readers. Two weeks I spent there

Ugljan

but sadly I'm back now. Crying

My bad, your win - today I made last scheduled modification to the OTL sources and published version 1.0 beta. [snapshot, repository]

Underlying message format is no more based on Variants. TOmniValue is now a smart record with some implicit operators. In some cases that would cause more typing (using .AsInteger and similar) but in some other cases (when sending interfaces and objects) TOmniValue is now much simpler to use. Even more, it is slightly smaller and slightly faster.

There were also some changes in the TOmniEventMonitor component which now uses hash to access monitored tasks/pools. The old version was implemented with TInterfaceList and that could get quite slow if working with large number of tasks.

[BTW, I checked the code with Tiburon and it's working without a glitch.]

I've also started to work on the support infrastructure, namely the forum. Home page, FAQ and documentation will also appear - all in due time.

Friday, August 01, 2008

The Delphi Geek goes to standby mode

and so does OmniThreadLibrary. It's bicycle and swimming time.

Comments will stay in limbo as I won't be moderating them. Feel free to post them, though - I'll process everything as soon as I come back.

Sunday, July 27, 2008

OmniThreadLibrary alpha

WiP OmniThreadLibrary is slowly progressing to the alpha stage. Most of the interfaces have been defined and most of the functionality that I want to see in the 1.0 release is present and tested. I have not yet decided what to do with the messaging (current Variant-based solution can be rough on the edges sometimes) but rest of the stuff is mostly completed.

Most important changes since the last release:

  • First of all, big thanks to fellow Slovenian programmer Lee_Nover (the guy who wrote the SpinLock unit, BTW) who did a major cleanup job of the OTL code:
    • Added const prefix to parameters on many places (mostly when interfaces were passed - I tend to miss that). OTL should be slightly faster because of that.
    • Removed CreateTask(TOmniWorker) as CreateTask(IOmniWorker) is good enough for all occasions.
    • Cleaned the tests\* tree and fixed tests to compile with new Otl* units.
    • Removed IOmniTaskControl.FreeOnTerminate as it's no longer needed.
  • Created project group containing all test projects. To make it work, all projects in the tests tree were renamed.
  • *** IMPORTANT *** OtlTaskEvents unit was renamed to OtlEventMonitor and TOtlTaskEventDispatcher component was renamed to TOmniEventMonitor. Recompile the package!
  • *** IMPORTANT *** If you're working with a snapshot, you should delete the OmniThreadLibrary folder before unpacking new version as many files were renamed since the last snapshot!
  • Task UniqueID is now int64 (was integer). Somehow I wasn't happy with the fact that unique ID would roll over after merely 4,3 billion tasks ...
  • Thread priority can be controlled with IOmniTaskControl.SetPriority.
  • Exceptions are trapped and mapped into EXIT_EXCEPTION predefined exit code (OtlCommon.pas). Test in 13_Exceptions.
  • IOmniTaskControl.WithLock provides a simple way to share one lock (critical section, spinlock) between task owner and task worker. Lock can be accessed via the Lock property, implemented in IOmniTaskControl and IOmniTask interfaces. See the 12_Lock test for the example.
  • Eye candy - instead of calling omniTaskEventDispatch.Monitor(task) you can now use task.MonitorWith(omniTaskEventDispatch). The old way is still available. An example from the (new) 8_RegisterComm test:
procedure TfrmTestOtlComm.FormCreate(Sender: TObject);
begin
FCommChannel := CreateTwoWayChannel(1024);
FClient1 := CreateTask(TCommTester.Create(FCommChannel.Endpoint1, 1024))
.MonitorWith(OmniTED)
.Run;
FClient2 := CreateTask(TCommTester.Create(FCommChannel.Endpoint2, 1024))
.MonitorWith(OmniTED)
.Run;
end;
  • IOmniTaskControl.TerminateWhen enables you to add additional termination event to the task. One event can be shared between multiple tasks, allowing you to stop them all at once. Example in the 14_TerminateWhen.
  • Basic support for task groups has been added. At the moment you can start/stop multiple tasks and not much more. To add a task to a group, use group.Add(task) or task.Join(group) syntax. Demo in 15_TaskGroup.
IOmniTaskGroup = interface ['{B36C08B4-0F71-422C-8613-63C4D04676B7}']
function Remove(taskControl: IOmniTaskControl): IOmniTaskGroup;
function Add(taskControl: IOmniTaskControl): IOmniTaskGroup;
function RunAll: IOmniTaskGroup;
function TerminateAll(maxWait_ms: cardinal = INFINITE): boolean;
function WaitForAll(maxWait_ms: cardinal = INFINITE): boolean;
end; { IOmniTaskGroup }
  • Added support for task chaining. When one task is completed, next task is started. An excerpt from the 16_ChainTo demo:
procedure TfrmTestTaskGroup.btnStartTasksClick(Sender: TObject);
var
task1: IOmniTaskControl;
task2: IOmniTaskControl;
task3: IOmniTaskControl;
taskA: IOmniTaskControl;
begin
lbLog.Clear;
task3 := CreateTask(BgTask, '3').MonitorWith(OmniTED);
task2 := CreateTask(BgTask, '2').MonitorWith(OmniTED).ChainTo(task3);
task1 := CreateTask(BgTask, '1').MonitorWith(OmniTED).ChainTo(task2);
taskA := CreateTask(BgTask, 'A').MonitorWith(OmniTED).ChainTo(
CreateTask(BgTask, 'B').MonitorWith(OmniTED).ChainTo(
CreateTask(BgTask, 'C').MonitorWith(OmniTED)));
task1.Run; taskA.Run;
end;
  • A thread pool has been implemented.  Demo in 11_ThreadPool. I'll write a longer article on using it, but for now the demo should do.

As usual, code is available in the repository and as a snapshot.

OmniThreadLibrary internals - OtlContainers

WiP

The time has come to bring the OtlContainers subthread to the end. In previous parts (1, 2, 3, 4) I obsessively described underlying lock-free structures but hadn't said a word about the higher-level classes that are actually used for message transfer inside the OmniThreadLibrary.

Let's take a quick look at all OtlContainers classes. TOmniBaseContainer is an abstract parent of the whole container hierarchy. It's basic function is to provide memory allocation and lock-free algorithms for all descendants. Most of the code I already described in previous installments and the rest is trivial.

  TOmniBaseContainer = class abstract(TInterfacedObject)
strict protected
obcBuffer : pointer;
obcElementSize : integer;
obcNumElements : integer;
obcPublicChain : TOmniHeadAndSpin;
obcRecycleChain: TOmniHeadAndSpin;
class function InvertOrder(chainHead: POmniLinkedData): POmniLinkedData; static;
class function PopLink(var chain: TOmniHeadAndSpin): POmniLinkedData; static;
class procedure PushLink(const link: POmniLinkedData; var chain: TOmniHeadAndSpin); static;
class function UnlinkAll(var chain: TOmniHeadAndSpin): POmniLinkedData; static;
public
destructor Destroy; override;
procedure Empty; virtual;
procedure Initialize(numElements, elementSize: integer); virtual;
function IsEmpty: boolean; virtual;
function IsFull: boolean; virtual;
property ElementSize: integer read obcElementSize;
property NumElements: integer read obcNumElements;
end; { TOmniBaseContainer }

Then there is the base stack implementation, which only adds Push and Pop to the base container. We've seen those two already.

  TOmniBaseStack = class(TOmniBaseContainer)
public
function Pop(var value): boolean; virtual;
function Push(const value): boolean; virtual;
end; { TOmniBaseStack }

The queue is only slightly more complex. It provides Enqueue and Dequeue, which we already know, and overrides Empty and IsEmpty to take the dequeued messages chain into account.

  TOmniBaseQueue = class(TOmniBaseContainer)
strict protected
obqDequeuedMessages: TOmniHeadAndSpin;
public
constructor Create;
function Dequeue(var value): boolean; virtual;
procedure Empty; override;
function Enqueue(const value): boolean; virtual;
function IsEmpty: boolean; override;
end; { TOmniBaseQueue }

Now we come to the interesting part. Basic stack and queue operations are also described with interfaces IOmniStack and IOmniQueue - just in case you'd like to use them in your programs. OTL uses class representation of the queue directly.

  IOmniStack = interface ['{F4C57327-18A0-44D6-B95D-2D51A0EF32B4}']
procedure Empty;
procedure Initialize(numElements, elementSize: integer);
function Pop(var value): boolean;
function Push(const value): boolean;
function IsEmpty: boolean;
function IsFull: boolean;
end; { IOmniStack }

IOmniQueue = interface ['{AE6454A2-CDB4-43EE-9F1B-5A7307593EE9}']
procedure Empty;
procedure Initialize(numElements, elementSize: integer);
function Enqueue(const value): boolean;
function Dequeue(var value): boolean;
function IsEmpty: boolean;
function IsFull: boolean;
end; { IOmniQueue }

Actual implementation of the higher-level structures is exposed with classes TOmniStack and TOmniQueue. Besides implementing IOmniStack and IOmniQueue (respectively), they both implement IOmniNotifySupport and IOmniMonitorSupport.

  TOmniStack = class(TOmniBaseStack, IOmniStack, IOmniNotifySupport, IOmniMonitorSupport)
TOmniQueue = class(TOmniBaseQueue, IOmniQueue, IOmniNotifySupport, IOmniMonitorSupport)

IOmniMonitorSupport is describing a container with monitoring support (notifies attached monitor whenever new message is sent). IOmniNotifySupport is describing a container that notifies interested parties (by setting an event) that new message has been sent. Both are used in the OTL - notification support is needed to implement IOmniCommunicationEndpoint.NewMessageEvent in the OtlComm and monitoring support is needed to support IOmniTaskControl.MonitorWith in the OtlTaskControl unit.

  IOmniMonitorSupport = interface ['{6D5F1191-9E4A-4DD5-99D8-694C95B0DE90}']
function GetMonitor: IOmniMonitorParams;
//
procedure Notify;
procedure RemoveMonitor;
procedure SetMonitor(monitor: IOmniMonitorParams);
property Monitor: IOmniMonitorParams read GetMonitor;
end; { IOmniMonitorSupport }

IOmniNotifySupport = interface ['{E5FFC739-669A-4931-B0DC-C5005A94A08B}']
function GetNewDataEvent: THandle;
//
procedure Signal;
property NewDataEvent: THandle read GetNewDataEvent;
end; { IOmniNotifySupport }

The following excerpt from the TOmniQueue code demonstrates the implementation of those interfaces. TOmniStack is implemented in a similar manner.

The constructor takes the options parameter when you can enable monitoring and/or notification support.

type
TOmniContainerOption = (coEnableMonitor, coEnableNotify);
TOmniContainerOptions = set of TOmniContainerOption;

constructor TOmniQueue.Create(numElements, elementSize: integer;
options: TOmniContainerOptions);
begin
inherited Create;
Initialize(numElements, elementSize);
orbOptions := options;
if coEnableMonitor in Options then
orbMonitorSupport := TOmniMonitorSupport.Create;
if coEnableNotify in Options then
orbNotifySupport := TOmniNotifySupport.Create;
end; { TOmniQueue.Create }

function TOmniQueue.Dequeue(var value): boolean;
begin
Result := inherited Dequeue(value);
if Result then
if coEnableNotify in Options then
orbNotifySupport.Signal;
end; { TOmniQueue.Dequeue }

function TOmniQueue.Enqueue(const value): boolean;
begin
Result := inherited Enqueue(value);
if Result then begin
if coEnableNotify in Options then
orbNotifySupport.Signal;
if coEnableMonitor in Options then
orbMonitorSupport.Notify;
end;
end; { TOmniQueue.Enqueue }

Enqueue calls original enqueueing code and then signals notification (if enabled) and notifies the monitor (if enabled). Dequeue is similar, except that it only triggers notification so that the reader rechecks the input queue.

Stop. Enough words have been said about the OtlContainer. Next time, I'll discuss something completely different.