Pages

Saturday, September 20, 2008

Processing Windows messages in OmniThreadLibrary

In all my ravings about the OmniThreadLibrary I’ve never discussed the .MsgWait decorator – and partly because of that it was not even working in the 1.0 release :-(. Time to fix that!

Today I’ll show you how simple is to make a synchronous wrapper around the asynchronous component (using OTL, of course). The code itself is not an OTL test project but a unit I’m using to synchronously download web pages. Where’s the problem at all, you’ll say? Indy is synchronous and so is WinInet. True, but I’m using ICS, which is a completely asynchronous solution.

[For the readers who have no idea what I’m talking about: Synchronous means program calls the code, code executes for some time and then returns with web page. Asynchronous means: program calls the code and the code returns immediately. Somewhere in the background, web page is being retrieved and when it is fully transferred, your program gets notified in some way. Second way is more flexible, allows you to do more things in parallel, doesn’t block the UI but is more complicated to use.]

Sometimes (usually for quick and dirty tests) I need to retrieve a web page without having to think about messages and events and what will be called in which moment. For such times I put together a simple unit which wraps ICS stuff in a thread. It is called GpHttp and will be available on my web in some time. For now, you can download it here. You’ll also need ICS v6 (and it should work with ICS v5 if you remove Overbyte prefix from used units and classes).

Interface

I wanted to keep the interface really simple. The GpHttp unit exports only two functions, one to execute GET and another POST request. Both accept username and password and both return status code (200 if everything went OK), status text and page contents. In case of a socket problem, WinSock error code is returned in statusCode. (HTTP codes are always below 1,000 and WinSock codes are always above 10,000 so there is no possibility for collision.)

function GpHttpGet(const url, username, password: string; 
var statusCode: integer; var statusText, pageContents: string): boolean;

function GpHttpPost(const url, username, password, postData: string;
var statusCode: integer; var statusText, pageContents: string): boolean;

Both functions are simple wrappers around the third (local) function that executes any HTTP request.

function GpHttpGet(const url, username, password: string; 
var statusCode: integer; var statusText, pageContents: string): boolean;
begin
Result := GpHttpRequest(url, username, password, 'GET', '', statusCode,
statusText, pageContents);
end; { GpHttpGet }

function GpHttpPost(const url, username, password, postData: string;
var statusCode: integer; var statusText, pageContents: string): boolean;
begin
Result := GpHttpRequest(url, username, password, 'POST', postData,
statusCode, statusText, pageContents);
end; { GpHttpPost }

The real work starts in the GpHttpRequest method. First it creates a worker object and passes request data to it. The reference to the object (or more correct – to the interface implemented by this object) is stored in the worker variable. We’ll need it at the end to retrieve the status code and page contents.

Next, the task is created and run. Notice the call to .MsgWait which instructs the internal thread loop to process Windows messages (more on that later).

Next, we wait for the task to terminate. We’ll only be waiting up to 30 seconds (a constant in code which could be easily changed into a caller-provided parameter). If task is not finished in 30 seconds, we’ll terminate it and return False. Otherwise, we’ll retrieve status code, text and page contents from the worker object via the Implementor trick. (A better way would be to write a IGpHttpRequest interface supporting StatusCode, StatusText and PageContents properties, but sometimes I’m just too lazy …)

function GpHttpRequest(const url, username, password, request, 
postData: string; var statusCode: integer; var statusText,
pageContents: string): boolean;
var
task : IOmniTaskControl;
worker: IOmniWorker;
begin
worker := TGpHttpRequest.Create(url, username, password, request, postData);
task := CreateTask(worker, 'GpHttpRequest').MsgWait.Run;
Result := task.WaitFor(CGpHttpRequestTimeout_sec * 1000);
if not Result then
task.Terminate
else begin
statusCode := TGpHttpRequest(worker.Implementor).StatusCode;
statusText := TGpHttpRequest(worker.Implementor).StatusText;
pageContents := TGpHttpRequest(worker.Implementor).PageContents;
end;
end; { GpHttpRequest }

Background Task

The real work is done in the background task [as expected, doh, why are you even mentioning that??]. In the Initialize method, to be more specific.

function TGpHttpRequest.Initialize: boolean;
begin
hrHttpClient := THttpCli.Create(nil);
try
hrHttpClient.NoCache := true;
hrHttpClient.RequestVer := '1.1';
hrHttpClient.URL := hrURL;
hrHttpClient.Username := hrUsername;
hrHttpClient.Password := hrPassword;
hrHttpClient.FollowRelocation := true;
if hrUsername <> '' then
hrHttpClient.ServerAuth := httpAuthBasic;
hrHttpClient.SendStream := TStringStream.Create(hrPostData);
hrHttpClient.RcvdStream := TStringStream.Create('');
hrHttpClient.OnRequestDone := HandleRequestDone;
if SameText(hrRequest, 'GET') then
hrHttpClient.GetASync
else if SameText(hrRequest, 'POST') then
hrHttpClient.PostASync
else
raise Exception.CreateFmt(
'TGpHttpRequest.Initialize: Unknown request type %s',
[hrRequest]);
Result := true;
except
on E:ESocketException do begin
hrStatusCode := -1;
hrStatusText := E.Message;
Result := false;
end;
end;
end; { TGpHttpRequest.Initialize }

The code first instantiates a THttpCli object. THttpCli is the ICS’s HTTP client class used to execute HTTP requests asynchronously or synchronously. Here I’m using asynchronous mode because a) THttpCli.GetSync doesn’t have configurable timeout and b) it calls Application.ProcessMessages, which is not a good idea if you want to run your requests from a background thread (which I did).

Next the code initializes bunch of THttpCli paramers and calls either GetASync or PostASync. If get or post cause an exception, it is remapped to status code and text and Initialize returns False. That signals the IOmniWorker main loop that it should not proceed with the task execution.

Now we have reached the asynchronous phase. ICS is working in the background and all other threads are just waiting for it to finish. Because ICS’s architecture is message based, somebody must process Windows messages which flow through the ICS itself. That’s why we had to use .MsgWait decorator when preparing the task.

Some time later (possibly before the timeout occurs), THttpCli will fully download the web page and call the HandleRequestDone event handler. Here we’ll just copy relevant data into task’s internal fields and then the code will tell the task (i.e. itself) to terminate.

procedure TGpHttpRequest.HandleRequestDone(sender: TObject; 
rqType: THttpRequest; error: word);
begin
if error <> 0 then begin
hrStatusCode := error;
hrStatusText := 'Socket error';
end
else begin
hrPageContents := TStringStream(hrHttpClient.RcvdStream).DataString;
hrStatusCode := hrHttpClient.StatusCode;
hrStatusText := hrHttpClient.ReasonPhrase;
end;
Task.Terminate;
end; { TGpHttpRequest.HandleRequestDone }

In the cleanup code (which is, same as Initialize, called automatically from the IOmniWorker main loop) we just have to tear down internal objects.

procedure TGpHttpRequest.Cleanup;
begin
if assigned(hrHttpClient) then begin
hrHttpClient.RcvdStream.Free;
hrHttpClient.SendStream.Free;
FreeAndNil(hrHttpClient);
end;
end; { TGpHttpRequest.Cleanup }

And that’s all, folks. It really is that simple.

MsgWait

In case you want to learn more about OTL internals, read ahead …

Let’s take a short look at the .MsgWait implementation. The function itself just sets two internal fields and returns the object itself so that we can chain another method to it.

function TOmniTaskControl.MsgWait(wakeMask: DWORD): IOmniTaskControl;
begin
Options := Options + [tcoMessageWait];
otcExecutor.WakeMask := wakeMask;
Result := Self;
end; { TOmniTaskControl.MsgWait }

The hard work is done in TOmniTaskExecutor.Asy_DispatchMessages. If the tcoMessageWait option is set, the MsgWaitForMultipleObjectsEx will also wait for Windows messages (in addition to everything else it does) because it will receive non-null waitWakeMask. When a message is detected, the code will call ProcessThreadMessages method which simply peeks and dispatches all Windows messages (and Delphi’s internal message dispatch mechanism takes care of all the rest).

if tcoMessageWait in Options then
waitWakeMask := WakeMask
else
waitWakeMask := 0;
//...
awaited := MsgWaitForMultipleObjectsEx(numWaitHandles, waitHandles,
cardinal(timeout_ms), waitWakeMask, flags);
//...
else if awaited = (WAIT_OBJECT_0 + numWaitHandles) then //message
ProcessThreadMessages

procedure TOmniTaskExecutor.ProcessThreadMessages;
var
msg: TMsg;
begin
while PeekMessage(Msg, 0, 0, 0, PM_REMOVE) and (Msg.Message <> WM_QUIT) do begin
TranslateMessage(Msg);
DispatchMessage(Msg);
end;
end; { TOmniTaskControl.ProcessThreadMessages }

The Asy_DispatchMessages is probably the most complicated part of the OTL (once you understand the lock-free structures inside the OtlContainers ;) Once you understand how it works you’ll be fully prepared to write custom thread loops in highly specialized threaded code. But don’t worry, you can use OTL even if you don’t understand the magic hidden inside.

Saturday, September 06, 2008

Interfacing with external programs

Say you have a program. A popular program (at least in some circles) that other people want to write add-ons for. Your program does some job and in some processing steps you want to be able to execute some external code and then proceed according to the result returned by that code. Similar to the way CGI code is executed when a HTTP server processes a HTTP request.

The other day I was trying to enumerate all possible ways to do that. I found following solutions:

  • Running external program. Data can be passed in program’s standard input and read from its standard output – just like when CGI program is launched from the HTTP server. Simple, stable (it is simple to protect against malfunctioning add-ons), but quite slow.
  • Third-party DLL. Relatively simple, very fast, but can seriously destabilize the whole product. Complicated to upgrade the DLL (must shut down main application to upgrade add-on).
  • [D]COM[+]. Not my bag of Swedish … sorry, wrong movie. Definitely not the way I’d like to pursue. Unstable. Leads to problems that nobody seems to be able to troubleshoot.
  • Windows messages. Messy. Plus the main program runs as a service while add-on maybe wouldn’t.
  • TCP. Implement add-on as a text-processing TCP/IP service (another HTTP server, if we continue the CGI analogy). Interesting idea, but not very simple to implement. Fast when both are running on the same machine. Flexible – each can be shutdown and upgraded independently; processing can be distributed over several computers. Complicated to configure when multiple add-ons are installed (each must be configured to a different port). Firewall and antivirus software may cause problems.
  • Drop folder. Main app drops a file into some folder and waits for the result file. Clumsy, possibly faster than the external program solution (add-on can be always running), simple to implement and very stable.
  • Message queues (as in the MSMQ). Interesting but possibly too complicated for most customers to install and manage.

And now to the main point of my ramblings. What did I miss? Are there more possibilities? If you have any idea how to approach my problem from a different direction, leave me a comment. Don’t mention SOAP, BTW, it is implicitly included in the “add-on as a TCP server” solution.

And thanks!

Wednesday, September 03, 2008

OmniThreadLibrary patterns - How to (not) create a task

From the first public release two months ago (or is it closer to three already?), OmniThreadLibrary has been constantly changing. Only with the 1.0 release it has reached a somewhat stable state. That flux was good as it was only through the development of the OTL that I discovered how to do some things properly (maybe I was not so wrong in Why is software third time lucky?, after all). But it was also bad as some introductionary topics that I wrote don't accurately reflect the current state anymore. Yes, I know, I should start writing good tutorials. Not fun :( I like writing blog articles more. Today I'll focus on task creation.

OTL offers three different ways to create a task (and a fourth one will be introduced when Delphi 2009 is available). Those three ways are mapped to three overloads of the CreateTask method (OtlTaskControl.pas).

  function CreateTask(worker: TOmniTaskProcedure; 
const taskName: string = ''): IOmniTaskControl; overload;
function CreateTask(worker: TOmniTaskMethod;
const taskName: string = ''): IOmniTaskControl; overload;
function CreateTask(const worker: IOmniWorker;
const taskName: string = ''): IOmniTaskControl; overload;

The simplest possible way (demoed in test application 2_TwoWayHello; see OmniThreadLibrary Example #3: Bidirectional communication) is to pass a name of a global procedure to the CreateTask. This global procedure must consume one parameter of type IOmniTask (it was described in the OmniThreadLibrary internals - OtlTask article, which is still mostly valid).

CreateTask(RunHelloWorld, 'HelloWorld').Run;

procedure RunHelloWorld(const task: IOmniTask);
begin
end;

A variation on the theme is passing a name of a method to the CreateTask. This approach is used in the test application 1_HelloWorld. The interesting point to make here is that you can declare this method in the same class from which the CreateTask is called. That way you can access the class fields and methods from the threaded code. Just keep in mind that you'll be doing this from another thread so make sure you know what you're doing!

procedure TfrmTestHelloWorld.RunHelloWorld(const task: IOmniTask);
begin
end;

For all except the simplest tasks, you'll use the third approach, because it will give you access to the true OTL power (internal wait loop and message dispatching). To use it, you have to create a worker object, which implements the IOmniWorker interface. An example from the 5_TwoWayHello_without_loop demo:

procedure TfrmTestTwoWayHello.actStartHelloExecute(Sender: TObject);
var
worker: IOmniWorker;
begin
worker := TAsyncHello.Create;
FHelloTask :=
OmniEventMonitor1.Monitor(CreateTask(worker, 'Hello')).
SetTimer(1000, MSG_SEND_MESSAGE).
SetParameter('Delay', 1000).
SetParameter('Message', 'Hello').
Run;
end;

As you're exposing this worker via interface, Delphi will manage its lifetime. When the task is terminated, worker instance will be destroyed too.

Because of the same reason, there's no need to create the worker class in advance. From the demo 7_InitTest:

task := OmniEventMonitor1.Monitor(CreateTask(TInitTest.Create(success), 'InitTest'))
.SetPriority(tpIdle).Run;

There's a catch, though. Because the CreateTask expects an interface, you must never pass to it a variable or field that is declared as a object. This will not work:

procedure TfrmTestTwoWayHello.actStartHelloExecute(Sender: TObject);
var
worker: TAsyncHello;
begin
worker := TAsyncHello.Create;
FHelloTask :=
OmniEventMonitor1.Monitor(CreateTask(worker, 'Hello')).
SetTimer(1000, MSG_SEND_MESSAGE).
SetParameter('Delay', 1000).
SetParameter('Message', 'Hello').
Run;
end;

If you need to store some information inside the worker during its execution and access it from the owner, you can use the Implementor property, which will "convert" IOmniTask interface back to the implementing object (which you must then cast to your worker class). From the demo 6_TwoWayHello_with_object_worker:

procedure TfrmTestTwoWayHello.actStopHelloExecute(Sender: TObject);
begin
FHelloTask.Terminate;
FHelloTask := nil;
lbLog.ItemIndex := lbLog.Items.Add(Format('%d Hello World''s sent',
[TAsyncHello(FWorker.Implementor).Count]));
end;

BTW, don't just set task reference (FHelloTask in the example above) to nil when you want to terminate the task. You have to call Terminate or use some other way to notify the task that it must terminate.

BTW2, if you use a parameterless constructor inside the CreateTask, you have to put empty parenthesis after the .Create or Delphi would not compile your code. I'm not really sure, why. The following segment is from test 14_TerminateWhen:

Log(Format('Task started: %d',
[CreateTask(TMyWorker.Create())
.TerminateWhen(FTerminate).WithCounter(FCounter)
.MonitorWith(OmniTED).Run.UniqueID]));

That completes today's tour.

In the introduction I mentioned the fourth CreateTask overload that will be introduced when Delphi 2009 is ready. OmniThreadLibrary will support anonymous methods so you'll be able to do something like that:

CreateTask(procedure begin MessageBeep(MB_ICONEXCLAMATION); end);

Tuesday, September 02, 2008

OmniThreadLibrary 1.0a released

This is basically a bugfix release. There was a problem with the .MsgWait decorator which I fixed in the repository three days ago. As I want to write about this function soon, I thought it would be nice to make a release first ...

Other changes:

  • TGp4AlignedInt from GpStuff.pas is supposedly D2006 compatible (says a reader) so it has been enabled on that plaform. As a result, it is now possible to compiled and use OTL in D2006 (said the same source).
  • SpinLock.pas has been updated.
  • Test 6 has been changed to show why one would want to use IOmniWorker.Implementor function. I'll write more about that very soon.
  • I've included parts of the FastMM4 package into the repository to simplify debugging. FastMM4 was created by Pierre le Riche and is not covered by the OmniThreadLibrary license. It is released under a dual licensed and you can use it either under the MPL 1.1 or LGPL 2.1. More details in the included readme file and on the FastMM4 home page.

Monday, September 01, 2008

Introduction to enumerators

The Blaise Pascal magazine #3 is out. Lots of content there, including my Introduction to enumerators article (first page).