Monday, January 21, 2013

Sending Event Handlers Across Thread Boundary

Recently, I ran into an interesting problem. I was doing some high-bandwidth, low-latency data processing where a background task received the data, dispatched it to a central hub which then forwarded data to the appropriate data processing & transmitting thread. Data absolutely had to travel through the main thread (the reasons for that are quite convoluted and not important for the story) which bothered me from a performance viewpoint. This could introduce some unwanted overhead as I can only send data to the main thread via Windows messages.

The simplest (given the current codebase) way to solve the problem was to introduce an asynchronous event handler. The main thread implements an event handler called for each received data block and passes this event handler to the receiver thread, which calls the event handler from its own context. As a result, the code that logically belongs to the main thread is executed from the worker thread and there is (almost) no delayed introduced. (The ‘almost’ part coming from the fact that I had to use some locking to synchronize operations inside the main thread.)

During the implementation phase, I found out that it’s quite difficult to send an event handler over the OmniThreadLibrary communication channel. The solution was non-trivial and required quite some ugly hacks so I’m posting it here for future reference.

There is an event handler defined by the TGpDVBDataReceivedNotify_Asy signature and implemented by the TGpDVBTeletextReceiver.Asy_DataReceivedNotify method.

type 
TGpDVBDataReceivedNotify_Asy = procedure(inputUID: integer;
rxTime_ms: int64; rxBuffer: pointer; lnsRead: byte) of object;


procedure TGpDVBTeletextReceiver.Asy_DataReceivedNotify(inputUID: integer;
rxTime_ms: int64; rxBuffer: pointer; lnsRead: byte);
begin

end;

The main thread first creates a task and then calls internal function SetNotification to set the event handler parameter.

SetNotification first casts the event handler into a TMethod record and then uses TOmniValue.FromRecord<TMethod> function to convert TMethod into a TOmniValue.

procedure CreateReceiver;

procedure SetNotification(proc: TGpDVBDataReceivedNotify_Asy);
begin
worker.SetParameter('OnReceived',
TOmniValue.FromRecord<TMethod>(TMethod(proc)));
end;

var
worker: IOmniTaskControl;
begin
worker := CreateTask(workerClass.Create(), 'Receiver worker');
SetNotification(Asy_DataReceivedNotify);
worker.Run;
end;

I couldn’t make the compiler accept my perfectly valid code without introducing this internal function.

Invalid typecase

The worker thread defines field FOnReceived.

FOnReceived: TGpDVBDataReceivedNotify_Asy;

Worker thread initializer takes the OnReceived parameter and converts it into a TMethod record by using the AsRecord<TMethod> function; this record is later cast into the event handler.

function TGpDVBTeletextReceiverThread.Initialize: boolean;
var
proc: TMethod;
begin
Result := inherited Initialize;
if Result then begin
proc := Task.Param['OnReceived'].AsRecord<TMethod>;
FOnReceived := TGpDVBDataReceivedNotify_Asy(proc);
end;
end;

This roundtrip via the local variable is necessary as Delphi compiler wouldn’t accept the code without it.

Variable required

The event handler is later called in a normal Delphi manner.

FOnReceived(uid, time, rxBuffer, lnsRead); 

As you can see, the code would be fairly straightforward (FromRecord<TMethod>, AsRecord<TMethod>) had we not have to code around Delphi’s idiosyncrasies.

3 comments:

  1. > I couldn’t make the compiler accept my perfectly valid code without introducing this internal function.

    I think it should work just by using a temp local variable to hold the event handler before doing the TMethod cast - just like you do further below.

    I agree the compiler should have handled this, but this is a known shortcoming for quite some time.

    ReplyDelete
    Replies
    1. You're correct, this also works:

      var
      proc: TGpDVBDataReceivedNotify_Asy;

      proc := Asy_DataReceivedNotify;
      worker.SetParameter('OnReceived', TOmniValue.FromRecord(TMethod(proc)));

      I've experimented with 'proc: TMethod' which didn't work but never though of that solution.

      Delete