Saturday, September 20, 2008

Processing Windows messages in OmniThreadLibrary

In all my ravings about the OmniThreadLibrary I’ve never discussed the .MsgWait decorator – and partly because of that it was not even working in the 1.0 release :-(. Time to fix that!

Today I’ll show you how simple is to make a synchronous wrapper around the asynchronous component (using OTL, of course). The code itself is not an OTL test project but a unit I’m using to synchronously download web pages. Where’s the problem at all, you’ll say? Indy is synchronous and so is WinInet. True, but I’m using ICS, which is a completely asynchronous solution.

[For the readers who have no idea what I’m talking about: Synchronous means program calls the code, code executes for some time and then returns with web page. Asynchronous means: program calls the code and the code returns immediately. Somewhere in the background, web page is being retrieved and when it is fully transferred, your program gets notified in some way. Second way is more flexible, allows you to do more things in parallel, doesn’t block the UI but is more complicated to use.]

Sometimes (usually for quick and dirty tests) I need to retrieve a web page without having to think about messages and events and what will be called in which moment. For such times I put together a simple unit which wraps ICS stuff in a thread. It is called GpHttp and will be available on my web in some time. For now, you can download it here. You’ll also need ICS v6 (and it should work with ICS v5 if you remove Overbyte prefix from used units and classes).

Interface

I wanted to keep the interface really simple. The GpHttp unit exports only two functions, one to execute GET and another POST request. Both accept username and password and both return status code (200 if everything went OK), status text and page contents. In case of a socket problem, WinSock error code is returned in statusCode. (HTTP codes are always below 1,000 and WinSock codes are always above 10,000 so there is no possibility for collision.)

function GpHttpGet(const url, username, password: string; 
var statusCode: integer; var statusText, pageContents: string): boolean;

function GpHttpPost(const url, username, password, postData: string;
var statusCode: integer; var statusText, pageContents: string): boolean;

Both functions are simple wrappers around the third (local) function that executes any HTTP request.

function GpHttpGet(const url, username, password: string; 
var statusCode: integer; var statusText, pageContents: string): boolean;
begin
Result := GpHttpRequest(url, username, password, 'GET', '', statusCode,
statusText, pageContents);
end; { GpHttpGet }

function GpHttpPost(const url, username, password, postData: string;
var statusCode: integer; var statusText, pageContents: string): boolean;
begin
Result := GpHttpRequest(url, username, password, 'POST', postData,
statusCode, statusText, pageContents);
end; { GpHttpPost }

The real work starts in the GpHttpRequest method. First it creates a worker object and passes request data to it. The reference to the object (or more correct – to the interface implemented by this object) is stored in the worker variable. We’ll need it at the end to retrieve the status code and page contents.

Next, the task is created and run. Notice the call to .MsgWait which instructs the internal thread loop to process Windows messages (more on that later).

Next, we wait for the task to terminate. We’ll only be waiting up to 30 seconds (a constant in code which could be easily changed into a caller-provided parameter). If task is not finished in 30 seconds, we’ll terminate it and return False. Otherwise, we’ll retrieve status code, text and page contents from the worker object via the Implementor trick. (A better way would be to write a IGpHttpRequest interface supporting StatusCode, StatusText and PageContents properties, but sometimes I’m just too lazy …)

function GpHttpRequest(const url, username, password, request, 
postData: string; var statusCode: integer; var statusText,
pageContents: string): boolean;
var
task : IOmniTaskControl;
worker: IOmniWorker;
begin
worker := TGpHttpRequest.Create(url, username, password, request, postData);
task := CreateTask(worker, 'GpHttpRequest').MsgWait.Run;
Result := task.WaitFor(CGpHttpRequestTimeout_sec * 1000);
if not Result then
task.Terminate
else begin
statusCode := TGpHttpRequest(worker.Implementor).StatusCode;
statusText := TGpHttpRequest(worker.Implementor).StatusText;
pageContents := TGpHttpRequest(worker.Implementor).PageContents;
end;
end; { GpHttpRequest }

Background Task

The real work is done in the background task [as expected, doh, why are you even mentioning that??]. In the Initialize method, to be more specific.

function TGpHttpRequest.Initialize: boolean;
begin
hrHttpClient := THttpCli.Create(nil);
try
hrHttpClient.NoCache := true;
hrHttpClient.RequestVer := '1.1';
hrHttpClient.URL := hrURL;
hrHttpClient.Username := hrUsername;
hrHttpClient.Password := hrPassword;
hrHttpClient.FollowRelocation := true;
if hrUsername <> '' then
hrHttpClient.ServerAuth := httpAuthBasic;
hrHttpClient.SendStream := TStringStream.Create(hrPostData);
hrHttpClient.RcvdStream := TStringStream.Create('');
hrHttpClient.OnRequestDone := HandleRequestDone;
if SameText(hrRequest, 'GET') then
hrHttpClient.GetASync
else if SameText(hrRequest, 'POST') then
hrHttpClient.PostASync
else
raise Exception.CreateFmt(
'TGpHttpRequest.Initialize: Unknown request type %s',
[hrRequest]);
Result := true;
except
on E:ESocketException do begin
hrStatusCode := -1;
hrStatusText := E.Message;
Result := false;
end;
end;
end; { TGpHttpRequest.Initialize }

The code first instantiates a THttpCli object. THttpCli is the ICS’s HTTP client class used to execute HTTP requests asynchronously or synchronously. Here I’m using asynchronous mode because a) THttpCli.GetSync doesn’t have configurable timeout and b) it calls Application.ProcessMessages, which is not a good idea if you want to run your requests from a background thread (which I did).

Next the code initializes bunch of THttpCli paramers and calls either GetASync or PostASync. If get or post cause an exception, it is remapped to status code and text and Initialize returns False. That signals the IOmniWorker main loop that it should not proceed with the task execution.

Now we have reached the asynchronous phase. ICS is working in the background and all other threads are just waiting for it to finish. Because ICS’s architecture is message based, somebody must process Windows messages which flow through the ICS itself. That’s why we had to use .MsgWait decorator when preparing the task.

Some time later (possibly before the timeout occurs), THttpCli will fully download the web page and call the HandleRequestDone event handler. Here we’ll just copy relevant data into task’s internal fields and then the code will tell the task (i.e. itself) to terminate.

procedure TGpHttpRequest.HandleRequestDone(sender: TObject; 
rqType: THttpRequest; error: word);
begin
if error <> 0 then begin
hrStatusCode := error;
hrStatusText := 'Socket error';
end
else begin
hrPageContents := TStringStream(hrHttpClient.RcvdStream).DataString;
hrStatusCode := hrHttpClient.StatusCode;
hrStatusText := hrHttpClient.ReasonPhrase;
end;
Task.Terminate;
end; { TGpHttpRequest.HandleRequestDone }

In the cleanup code (which is, same as Initialize, called automatically from the IOmniWorker main loop) we just have to tear down internal objects.

procedure TGpHttpRequest.Cleanup;
begin
if assigned(hrHttpClient) then begin
hrHttpClient.RcvdStream.Free;
hrHttpClient.SendStream.Free;
FreeAndNil(hrHttpClient);
end;
end; { TGpHttpRequest.Cleanup }

And that’s all, folks. It really is that simple.

MsgWait

In case you want to learn more about OTL internals, read ahead …

Let’s take a short look at the .MsgWait implementation. The function itself just sets two internal fields and returns the object itself so that we can chain another method to it.

function TOmniTaskControl.MsgWait(wakeMask: DWORD): IOmniTaskControl;
begin
Options := Options + [tcoMessageWait];
otcExecutor.WakeMask := wakeMask;
Result := Self;
end; { TOmniTaskControl.MsgWait }

The hard work is done in TOmniTaskExecutor.Asy_DispatchMessages. If the tcoMessageWait option is set, the MsgWaitForMultipleObjectsEx will also wait for Windows messages (in addition to everything else it does) because it will receive non-null waitWakeMask. When a message is detected, the code will call ProcessThreadMessages method which simply peeks and dispatches all Windows messages (and Delphi’s internal message dispatch mechanism takes care of all the rest).

if tcoMessageWait in Options then
waitWakeMask := WakeMask
else
waitWakeMask := 0;
//...
awaited := MsgWaitForMultipleObjectsEx(numWaitHandles, waitHandles,
cardinal(timeout_ms), waitWakeMask, flags);
//...
else if awaited = (WAIT_OBJECT_0 + numWaitHandles) then //message
ProcessThreadMessages

procedure TOmniTaskExecutor.ProcessThreadMessages;
var
msg: TMsg;
begin
while PeekMessage(Msg, 0, 0, 0, PM_REMOVE) and (Msg.Message <> WM_QUIT) do begin
TranslateMessage(Msg);
DispatchMessage(Msg);
end;
end; { TOmniTaskControl.ProcessThreadMessages }

The Asy_DispatchMessages is probably the most complicated part of the OTL (once you understand the lock-free structures inside the OtlContainers ;) Once you understand how it works you’ll be fully prepared to write custom thread loops in highly specialized threaded code. But don’t worry, you can use OTL even if you don’t understand the magic hidden inside.

1 comment:

  1. Hi Gabr,

    Heads up on new article on locks

    http://www.ddj.com/hpc-high-performance-computing/210604448

    ReplyDelete