Monday, October 06, 2008

Erlangenizing the OmniThreadLibrary

Few days ago I “discovered” Erlang. [Such things happen when you read StackOverflow obsessively (sigh).] I started with Wikipedia and proceeded with the Pragmatic Programmer book – but that’s not really important. I wanted to talk about Erlang’s built-in concurrency support.

Some things are very similar to the OTL (if we take into account that Erlang is a functional language and Delphi is not) and some are different but one immediately jumped to my attention. In Erlang, the message recipient doesn’t use numeric code to discover what message it has received; instead of that, whole message is matched to a programmer provided pattern (or patterns). The interesting thing is that usually (by convention) the first element in the message is an atom (a name, a sequence of characters, if you want). That got me thinking … Why do we send integer messages in the OTL, anyway?

The Windows Way

From the very beginnings, OmniThreadLibrary tried to make programmer’s life simple. Well, at least simpler. One of the helping hands it offered was was simplified message processing. In the traditional Windows thread programming you have to wait for various objects to become signaled with WaitForMultipleObjects and then proceed accordingly. In practice that means that thread’s main logic is centralized in one very big method that handles all those events, mutexes and other stuff that can be waited upon.

OTL helps by implementing this logic internally (at least for workers that implement IOmniWorker interface). Instead of using kernel primitives, task owner sends messages to the task’s message queue. Messages are processed somewhere inside the OTL (specifically OtlTaskControl.pas/TOmniTaskExecutor.Asy_DispatchMessages) and are converted into method calls with Delphi’s Dispacth mechanism. That’s the same mechanism that makes sure that Windows messages are “converted” into Delphi methods and it requires that first two bytes of the dispatched message contain message ID. That’s why (until now) the recommended way to send a message to task was:


TAsyncHello = class(TOmniWorker)
strict private
aiMessage: string;
function Initialize: boolean; override;
procedure OMChangeMessage(var msg: TOmniMessage); message MSG_CHANGE_MESSAGE;
procedure OMSendMessage(var msg: TOmniMessage); message MSG_SEND_MESSAGE;

FHelloTask: IOmniTaskControl;

FHelloTask.Comm.Send(MSG_CHANGE_MESSAGE, 'Random ' + IntToStr(Random(1234)));

[You can read more about this approach in OmniThreadLibrary Example #4: Bidirectional communication, the OTL way.]

The Erlang Way

This approach simplifies writing threaded code – at least the one that doesn’t depend heavily on shared data structures. But there’s still some room for improvement. For example, do we really have to use numeric messages, which have to be declared in advance. Why couldn’t the task controller just tell the task to execute the OMChangeMessage method?

To cut the long story short – this is now possible. Yesterday I committed a set of OTL modifications that allow you to do this:

TAsyncHello = class(TOmniWorker)
procedure Change(const data: TOmniValue);

FHelloTask: IOmniTaskControl;

FHelloTask.Invoke('Change', 'Random ' + IntToStr(Random(1234)));

Yes, the Change method is published here. This is important.

Simple, huh? There’s a small problem, though – there are no compile-time checks. The code sends a string and compiler can do nothing to verify validity of this string. If the name was mistyped, you’d only notice it during the program execution.

To fix this problem, OTL allows another form of method invocation which uses a method address instead of  the name.

FHelloTask.Invoke(@TAsyncHello.Change, 'Random ' + IntToStr(Random(1234)));

In this case the compiler can check your typing, but still it won’t catch all problems – for example, the following code will compile and then raise exception during the execution.

procedure TfrmTestStringMsgDispatch.btnTestInvalidMsgClick(Sender: TObject);
if cbStringMessages.Checked then
// will fail, FooBar method is not defined
// will fail, can only invoke methods from the task's class

[All new functionality is exposed in new demo 18_StringMsgDispatch.]


To understand how the Invoke is implemented, it’s best to trace one such call. First we see that Invoke  gets converted into a normal message.

procedure TOmniTaskControl.Invoke(const msgMethod: pointer; msgData: TOmniValue);
Comm.Send(TOmniInternalAddressMsg.CreateMessage(msgMethod, msgData));
end; { TOmniTaskControl.Invoke }

class function TOmniInternalAddressMsg.CreateMessage(const msgMethod: pointer; msgData:
TOmniValue): TOmniMessage;
Result := TOmniMessage.Create(COtlReservedMsgID,
TOmniInternalAddressMsg.Create(msgMethod, msgData));
end; { TOmniInternalAddressMsg.CreateMessage }

This message has message ID COtlReservedMsgID (which is equal to $FFFF, so from now on please don’t use this message for you purposes). Message data field contains object which wraps method name and message data that was passed to the Invoke. Similar code is executed when Invoke is called with the method pointer parameter.

OK, so that’s how method name travels from the task controller to the task itself by using standard communication channel. But that is only half of the story … the simpler part!

On the receiving side, TOmniTaskExecutor.Asy_DispatchMessages detects new message and calls DispatchOmniMessage to process it.

if awaited = idxFirstMessage then
gotMsg := task.Comm.Receive(msg)
else begin
gotMsg := (oteCommList[awaited - idxFirstMessage - 1] as
finally oteInternalLock.Release; end;
if gotMsg and assigned(WorkerIntf) then

Now that’s where the things start to get really interesting as we have to use RTTI and even extended RTTI to call the appropriate method. It is also the point where I’ll cut the story short. This article is already very long, maybe even too long, and I have much more to say on the subject. Expect part II to be published in few days. [Of course, if you’re curious you can just look into the code to see how DispatchOmniMessage is implemented!]

Test it!

The newest OmniThreadLibrary code is only available in the repository. No snapshots this time.

I’d still be immensely grateful to anybody that will test the new functionality and provide me with his thoughts on this approach.

No comments:

Post a Comment