Tuesday, July 08, 2008

OmniThreadLibrary internals - OtlComm

WIP

Today I'll describe the inner workings of the OmniThreadLibrary communication subsystem. It lives in the OtlComm unit and is used extensively by the task control/task worker interfaces (described in the previous installment). Its use is not limited to the OTL as it has no knowledge of tasks and threads. Feel free to use it in your own non-OTL-related code.

At the surface, messaging subsystem looks deceptively simple.

image

IOmniTaskControl interface exposes property Comm: IOmniCommunicationEndpoint.

  IOmniCommunicationEndpoint = interface ['{910D329C-D049-48B9-B0C0-9434D2E57870}']
function GetNewMessageEvent: THandle;
//
procedure RemoveMonitor;
procedure Send(msgID: word; msgData: TOmniValue); overload;
procedure Send(msgID: word; msgData: array of const); overload;
procedure Send(const msg: TOmniMessage); overload;
procedure SetMonitor(hWindow: THandle; messageWParam, messageLParam: integer);
function Receive(var msgID: word; var msgData: TOmniValue): boolean; overload;
function Receive(var msg: TOmniMessage): boolean; overload;
property NewMessageEvent: THandle read GetNewMessageEvent;
end; { IOmniTaskCommunication }

This simple interface allows the owner to send messages, receive messages and wait on a new message. A property with a same name lives on the worker side, too (in the IOmniTask interface). Both endpoints are connected so that IOmniTaskControl.Comm.Send sends message to IOmniTask.Comm and vice versa. Simple.


But when you look under the surface ...


Here be dragons!


This is the real picture of objects and interfaces living inside the OtlComm unit. Although the surface view may give you an impression that the IOmniTaskCommunication is the most important part of the communication system, in reality everything revolves around the IOmniTwoWayChannel.


image 


To understand this picture, it's best to start at the bottom left (IOW, in the rectangle just above this line). The IOmniTaskControl interface is the one that is returned from the CreateTask procedure.


IOmniTaskControl is implemented by the TOmniTaskControl object, which owns an IOmniTwoWayChannel interface, created during TOmniTaskControl initialization.

procedure TOmniTaskControl.Initialize;
begin
//...
otcCommChannel := CreateTwoWayChannel;
//...
end; { TOmniTaskControl.Initialize }

This interface is also passed to the TOmniTask object (which implements IOmniTask interface) when task is run.

function TOmniTaskControl.Run: IOmniTaskControl;
var
task: IOmniTask;
begin
//...
task := TOmniTask.Create(..., otcCommChannel, ...);
//...
end; { TOmniTaskControl.Run }

TOmniTwoWayChannel owns two ring buffers of type TOmniRingBuffer (either the locking version, which is the default at the moment, or lock-free version if you compile the unit with /dOTL_LockFreBuffer). Those buffers are used as unidirectional message queues that store TOmniMessage records.

  TOmniMessage = record
MsgID : word;
MsgData: TOmniValue;
end; { TOmniMessage }

TOmniTwoWayChannel also owns two IOmniCommunicationEndpoint interfaces (implement by the TOmniCommunicationEndpoint class). One of them is exposed via the Enpoint1 property and another via Endpoint2.

  IOmniTwoWayChannel = interface ['{3ED1AB88-4209-4E01-AA79-A577AD719520}']
function Endpoint1: IOmniCommunicationEndpoint;
function Endpoint2: IOmniCommunicationEndpoint;
end; { IOmniTwoWayChannel }

Both endpoints are connected to both ring buffers. If we name those ring buffers A and B, endpoint 1 writes to A and reads from B while endpoint 2 writes to B and reads from A.


TOmniTaskControl.Comm and TOmniTask.Comm are simple mappers that return different endpoints of the same IOmniTwoWayChannel interface.

function TOmniTaskControl.GetComm: IOmniCommunicationEndpoint;
begin
Result := otcCommChannel.Endpoint1;
end; { TOmniTaskControl.GetComm }

function TOmniTask.GetComm: IOmniCommunicationEndpoint;
begin
Result := otCommChannel.Endpoint2;
end; { TOmniTask.GetComm }

And that's all folks ...


Autovivification


Well, that's almost all. The last trick in the OtlComm is creation of communication infrastructure on as-needed basis. As you can see from the diagram, the whole system is quite "heavy". If you're running a simple fire-and-forget tasks, you may not need it at all. That's why the ring buffers and communication endpoints are not created until they are used.


The TOmniTwoWayChannel object is created whenever a task is created (see TOmniTaskControl.Initialize, above), but it does not create other parts of the infrastructure. This is only done in the endpoint accessor.

procedure TOmniTwoWayChannel.CreateBuffers;
begin
if twcUnidirQueue[1] = nil then
twcUnidirQueue[1] := TOmniRingBuffer.Create(twcMessageQueueSize);
if twcUnidirQueue[2] = nil then
twcUnidirQueue[2] := TOmniRingBuffer.Create(twcMessageQueueSize);
end; { TOmniTwoWayChannel.CreateBuffers }


function
TOmniTwoWayChannel.Endpoint1: IOmniCommunicationEndpoint;
begin
Assert((cardinal(@twcEndpoint[1]) AND 3) = 0);
if twcEndpoint[1] = nil then begin
twcLock.Acquire;
try
if twcEndpoint[1] = nil then begin
CreateBuffers;
twcEndpoint[1] := TOmniCommunicationEndpoint.Create(twcUnidirQueue[1], twcUnidirQueue[2]);
end;
finally twcLock.Release; end;
end;
Result := twcEndpoint[1];
end; { TOmniTwoWayChannel.Endpoint1 }

The code to create Endpoint2 is similar except that it uses twcEndpoint[2] and reverses parameters passed to the TOmniCommunicationEndpoint.Create.


The Endpoint1 method uses some tricks that may not be obvious.



  • Testing if interfaces have been already initialized is optimistic. The code first tests if endpoint is nil and if that is true (and that will be very rarely, only the first time), it locks access to internal structures and the retests the same condition before creating buffers and the endpoint.
  • The Assert checks if two least important bits of the endpoint address are 0. This makes the endpoint variable DWORD-aligned (its address is divisible by 4), which in turn causes reads/writes from/to that address to be atomic on the Intel architecture (the only one that concerns us). In other words, even if another thread is modifying the same variable on another CPU (or core), we know that we will read either the old value or the new value and not some mixture of both.

That concludes the OtlComm tour. The only remaining part (until the thread pool is implemented) is the TOmniTaskEventDispatch component, which I'll cover in a day or two.

12 comments:

  1. Hi Gabr,

    Been following your posts with interest, and am learning a lot of new things with your latest OmniThreadLibrary!

    Thanks again!

    ReplyDelete
  2. Anonymous13:36

    > DWORD-aligned ... atomic on the Intel

    Is this absolutely true even for multicore/multiprocessor computers ? (Just to be sure ...)

    ReplyDelete
  3. Do you have any example using OtlComm with non-OTL-code?

    ReplyDelete
    Replies
    1. No, but I can create one. What functionality would you like to see in such an example?

      Delete
  4. Thanks for answering.

    I'm testing the THttpApiServer class from mORMot framework, it creates some clones to be able to answer calls http simultaneously in OnRequest event. I want my procedure, associated with this event, to send a log object to another thread that makes the persistence in the database.

    function TMyWebServer.processRequest(Ctxt: THttpServerRequest): cardinal;
    begin
    // get request params
    // create my log obj
    // send to threadDB
    end;

    The ThreadDb can be a TThread or TOmniWorker descendent

    Thank you

    ReplyDelete
    Replies
    1. Is this `processRequest` executed in a main thread or in some worker thread?

      Delete
    2. In a worker thread

      Delete
    3. is this possible?

      Delete
    4. Yes, it is entirely possible. I just didn't have time to put together an answer yet.

      Delete
    5. I have committed such example to the GitHub: https://github.com/gabr42/OmniThreadLibrary/commit/df56e6df7be72b03b54c5795783b45510273f2e2

      You can just sync to the latest version and it will be downloaded to the "examples\TThread communication" subfolder.

      A blog article will follow in few days.

      Delete
    6. Article posted: http://www.thedelphigeek.com/2015/07/using-omnithreadlibrarys-message-queue.html

      Delete