Saturday, July 04, 2015

Using OmniThreadLibrary’s Message Queue with a TThread Worker

A reader recently asked about using OmniThreadLibrary’s communication channels with a TThread-based worker thread so I prepared a simple example, now part of the OTL repository (stored in the folder examples\TThread communication).
Two separate topics are covered in this example:
  • Sending data from any thread (main or background) to a TThread-based worker.
  • Sending data from a TThread-based worker to a form.
Let’s deal with them one by one.
1. Sending data from multiple producers to a single worker
To send data form a form to a thread, we need a message queue. This example uses a TOmniMessageQueue object for that purpose. An instance of this object is created in the main thread. All threads – the main thread, the worker threads, and possible other data-producing threads – use the same shared object which is written with thread-safety in mind.
1.a Initialization and cleanup
The TOmniMessageQueue constructor takes a maximum queue size for a parameter. TWorker is just a simple TThread descendant which accepts the instance of the message queue as a parameter so it can read from the queue.
FCommandQueue := TOmniMessageQueue.Create(1000);FWorker := TWorker.Create(FCommandQueue);

The shutdown sequence is fairly standard. Stop is used instead of Terminate so it can set internal event which is used to signal the thread to stop.
if assigned(FWorker) then begin  FWorker.Stop;

1.b Sending data to the worker

To put some data into a queue, use its Enqueue method. It accepts a TOmniMessage record. Each TOmniMessage contains an integer message ID (not used in this example) and a TOmniValue data which, in turn, can hold any data type.
procedure TfrmTThreadComm.Query(value: integer);begin  if not FCommandQueue.Enqueue(TOmniMessage.Create(0 {ignored}, value)) then    raise Exception.Create('Command queue is full!');end;

Enqueue returns False if the queue is full. (A TOmniMessageQueue can only hold as much elements as specified in the constructor call.)

The example also shows how everything works correctly if two threads are started at the same time and both write to the message queue.
var  th1: TThread;  th2: TThread;begin  th1 := TThread.CreateAnonymousThread(    procedure    begin      Query(Random(1000));    end);  th2 := TThread.CreateAnonymousThread(    procedure    begin      Query(Random(1000));    end);  th1.Start;  th2.Start;end;

1.c Receiving the data

The worker’s Execute method waits on two handles in a loop. If a FStopEvent (an internal event) is signalled, the loop will exit. If the message queue’s GetNewMessageEvent (a THandle-returning method) gets signalled, a new data has arrived to the queue. In that case, the code loops to empty the message queue and then waits again for something to happen.
procedure TWorker.Execute;var  handles: array [0..1] of THandle;  msg    : TOmniMessage;begin  handles[0] := FStopEvent.Handle;  handles[1] := FCommandQueue.GetNewMessageEvent;  while WaitForMultipleObjects(2, @handles, false, INFINITE) = 
          (WAIT_OBJECT_0 + 1) do 
  begin    while FCommandQueue.TryDequeue(msg) do begin      //process the message …    end;  end;end;

2. Sending data from a worker to the form

To send messages from a worker thread to a form we need another instance of TOmniMessageQueue. As we can’t wait on a handle in the main thread (that would block the user interface), we’ll use a different notification mechanism – a window message observer.

2.a Initialization and cleanup

We create the queue just as in the first part. Then we create a window message observer and at the end we Attach it to the message queue. A window message observer sends a window message to some window each time a message queue changes. The four parameters passed to CreateContainerWindowsMessageObserver are the handle of the window that will receive those messages, a message ID, WPARAM, and LPARAM.
FResponseQueue := TOmniMessageQueue.Create(1000, false);
FResponseObserver := CreateContainerWindowsMessageObserver(Handle,

While shutting down, we first have to Detach the observer from the queue. Then we destroy the observer and empty the response queue (ProcessResults) to process any results that may still be waiting inside.

2.b Sending data to the form

To send a data, we use exactly the same approach as in 1.b.
if not FResponseQueue.Enqueue(TOmniMessage.Create(0 {ignored},
     Format('= %d', [msg.MsgData.AsInteger * 2]))) then  raise Exception.Create('Response queue is full!');

2.c Receiving the data

On the receiving side (the form) we have to set up a message function associated with the message that is sent from the window message observer. In this method we’ll call another method ProcessResults.
procedure WorkerResult(var msg: TMessage); message MSG_WORKER_RESULT;
procedure TfrmTThreadComm.WorkerResult(var msg: TMessage);begin  ProcessResults;end;

As a final step, ProcessResults reads data from the message queue and displays each element in a listbox.
procedure TfrmTThreadComm.ProcessResults;var  msg: TOmniMessage;begin  while FResponseQueue.TryDequeue(msg) do begin    //msg.MsgID is ignored in this demo    //msg.MsgData contains a string, generated by the worker    lbLog.ItemIndex := lbLog.Items.Add(msg.MsgData);

3. Using a TOmniBlockingCollection instead of TOmniMessageQueue

Alternatively, you can use the blocking collection implementation from OtlCollections instead. A blocking collection would be appropriate in case you have to handle large number of work requests or responses stored in a queue as a blocking collection grows and shrinks dynamically.

The only important change to the code would be in part 1 as you’d have to create an event observer manually while TOmniMessageQueue does it automatically. For details you can check TOmniMessageQueue.AttachWinEventObserver and TOmniMessageQueue.Destroy.

No comments:

Post a Comment