Wednesday, July 09, 2008

OmniThreadLibrary Example #5: Registering additional communication channels

WIPI thought I would be documenting the TOmniTaskEventDispatch component today, but I was working on something interesting yesterday and today and I want to show it to the public. The functionality I'll be talking about is demoed in the tests\8_RegisterComm project, available in the repository and in the today's snapshot. [All new functionality was uploaded, not just this demo, of course.]

I was working on the OtlComm testing and benchmarking code. I wanted to find out if the communication code is working correctly (by stress-testing it) and if the lock-free buffer implementation is faster than the locking one. So I set up two threaded clients (using OTL, of course!) and started to write code that would establish a direct communication channel between them. I wanted to use the standard Comm channel for test control and reporting, but not for running tests.

At first, the task seemed quite simple. I set up a new communication channel of type IOmniTwoWayChannel and created two tasks, based on the TOmniWorker class. Each task received one endpoint of the new communication channel as a parameter.

  FCommChannel := CreateTwoWayChannel(1024);
FClient1 := OmniTaskEventDispatch1.Monitor(
CreateTask(TCommTester.Create(FCommChannel.Endpoint1, 1024))).Run;
FClient2 := OmniTaskEventDispatch1.Monitor(
CreateTask(TCommTester.Create(FCommChannel.Endpoint2, 1024))).Run;

An image is worth more than thousand words. additional communication channel


When I had this infrastructure ready, I started to think about message loop in my tasks. At that moment (yesterday), internal message loop only handled the default communication channel. Writing a whole message loop to support one measly communication channel didn't seem like a right decision. What to do? Extend OTL, of course!


Today's snapshot contains support for additional communication channels. You only have to call RegisterComm and internal message dispatcher will handle everything for you.


The 8_RegisterComm demo will help you understand. There are two buttons on the form, first sends a random number to task 1 and second sends a random number to task 2.


8_RegisterComm


A code for first button's OnClick event should suffice. Code for the second button is almost the same.

procedure TfrmTestOtlComm.btnSendTo1Click(Sender: TObject);
var
value: integer;
begin
value := Random(100);
Log(Format('Sending %d to task 1', [value]));
FClient1.Comm.Send(MSG_FORWARD, value);
end;

The TCommTester class implements message handler for the MSG_FORWARD message. The code in this method firstly notifies the owner that MSG_FORWARD message was received and secondly sends MSG_FORWARDING message to the task-to-task communication channel.

type
TCommTester = class(TOmniWorker)
strict private
ctComm : IOmniCommunicationEndpoint;
ctCommSize: integer;
public
constructor Create(commEndpoint: IOmniCommunicationEndpoint; commBufferSize: integer);
function Initialize: boolean; override;
procedure OMForward(var msg: TOmniMessage); message MSG_FORWARD;
procedure OMForwarding(var msg: TOmniMessage); message MSG_FORWARDING;
end; { TCommTester }

constructor TCommTester.Create(commEndpoint: IOmniCommunicationEndpoint; commBufferSize:
integer);
begin
inherited Create;
ctComm := commEndpoint;
ctCommSize := commBufferSize;
end;

function TCommTester.Initialize: boolean;
begin
Task.RegisterComm(ctComm);
Result := true;
end;

procedure TCommTester.OMForward(var msg: TOmniMessage);
begin
Task.Comm.Send(MSG_NOTIFY_FORWARD, msg.MsgData);
ctComm.Send(MSG_FORWARDING, msg.MsgData);
end;

procedure TCommTester.OMForwarding(var msg: TOmniMessage);
begin
Task.Comm.Send(MSG_NOTIFY_RECEPTION, msg.MsgData);
end;

The MSG_FORWARDING handler (OMForwarding) just notifies the owner that the message was received from another task.


In the screenshot above, you can see how value 0 traveled from task 1 to task 2 and how value 3 traveled from task 2 to task 1.


The really magical part happens in TCommTester.Initialize. The code fragment Task.RegisterComm(ctComm) registers communication endpoint as an additional message source. From that point onwards, messages that arrive on the ctComm channel are treated same way as messages arriving on the Comm channel.


The road to this deceptively simple solution was quite long. I had to do major refactoring in the OtlTask unit. Quite some code was moved from the TOmniTaskControl class to the TOmniTaskExecutor class (which was a record before). I think that the new code works fine (I've run all test cases), but only time will tell ...


If you feel the need to check how additional communication channel support is implemented, check TOmniTaskExecutor. Asy_DispatchMessages and TOmniTaskExecutor.Asy_RegisterComm (OtlTask unit).

No comments:

Post a Comment