Tuesday, February 13, 2018

Data-driven Multithreading

About a week ago, Craig Chapman posted a vlog Lockless Multi-Threading in Delphi where he programmed a lockless communication channel which transfers messages between the main thread and a worker thread (or, actually, between any two threads).

I do like Craig's implementation of a lockless queue. It is small, neat, and working. I also like that he approached multithreading from a communication viewpoint. I do, however, have several issues with how it is integrated into the application. While I whole understand the need for simple demo that viewers can understand, I feel that the Delphi world is full of such examples. That makes it hard for a newcomer to the multithreaded world to find appropriate patterns to copy.

Hence I decided to rewrite Craig's code with different objectives in mind. Instead of speed I focused on flexibility, ease of use and good multithreaded programming patterns. Before I jump into my solution, however, I must articulate the bad programming practices in the Lockless demo. (That is strictly subjective reasoning. Your mileage may vary. It is, alas, a reasoning supported by many many years of writing bad multithreaded code - and not yet enough years of writing good code.)


So, what are my peeves here:
  1. The queue Craig implemented supports only one-to-one communications. In other words, it is a single producer, single consumer queue. That is useful from time to time, but in a multithreaded world we constantly run into situations where we would like to have multiple writers and/or multiple readers. In other words, I prefer multiple producer, multiple consumer queue over a one-to-one approach even if the latter is faster.
  2. To read from the queue, thread constantly polls it for data. That's an equivalent of a small child doing Are we there yet? on a repeat. I believe that this programming practice - which I absolutely hate - comes as a consequence of TThread design and original TThread demos which are all written as a while not Terminated do loop.
  3. The main thread also polls the queue. Even more, it uses the OnIdle event handler. This event handler fires very fast when you move a mouse over the form or when you type, but is only called once or twice per second when the mouse and keyboard are idle. It is also not called at all if application doesn't have focus. All that makes it a bad candidate for polling. You never know when the message you sent from the worker thread will actually be processed.
If you don't believe my OnIdle claims, run the OnIdle demo from my GitHub.

If you don't want to use OmniThreadLibrary, there's a way to achieve all that with standard Delphi RTL. Even more, the solution will support all platforms. The queue will not be as fast as Craig's implementation, but - as I always say - if the speed of your program depends on processing millions of messages per second, your architecture is all wrong anyway.

I will move away from a lockless solution. Writing a lockless multi producer multi consumer queue is hard as you quickly run into the ABA problem. OmniThreadLibrary nevertheless implements two such queues - TOmniBaseBoundedQueue from the OtlContainers unit implements a constant-size queue and TOmniBaseQueue from the same unit implements a dynamically allocated queue. In this example I however promised to stay with the Delphi RTL and I will stay away from these implementations.

My code addresses previous complaints with three simple changes:
  1. TThreadedQueue from System.Generics.Collections implements multiple producers, multiple consumers queue.
  2. The worker thread is written so that it responds to external events and uses no CPU time when nothing is going on. In our case, a message has arrived is such an event and is implemented with TEvent from System.SyncObjs.
  3. Dephi also offers a good way to interrupt main thread by using TThread.Queue.
If you want to follow my post, you'll need the source code as I'm not publishing each small detail here. It is wrapped in ThreadedQueue demo on my GitHub.

All queueing and threading logic is implemented in the CommThread unit. Most of the implementation is wrapped in a queue class TMessageQueue and base thread class TCommThread.

TMessageQueue implements a simple message queue that can store elements of type T. It wraps a TThreadedQueue<T> which does the actual storing, a TEvent which notifies a thread that something is going on and an anonymous method TMessageProc<T> which is used to process messages in the main thread.

type
  TMessageProc<T> = reference to procedure (const data: T);
  TMessageQueue<T> = class
    constructor Create(numItems: integer;
      const messageReceiver: TMessageProc = nil);
    function Receive(var value: T): boolean;
    function Send(const value: T): boolean;
    property Event: TEvent read FEvent;
  end;

The TMessageQueue<T> class works in two modes. If you provide messageReceiver parameter to the constructor, the queue will assume that you want to receive messages (read from the queue) in the main thread. For each message that is posted to the queue, a messageReceiver procedure will be called. In this mode, the queue effectively becomes a single-consumer queue.

If you don't provide the messageReceiver parameter, the queue will assume that you want to receive messages in one or mode background threads. In this mode, TMessageQueue<T> will create internal TEvent  object which is exposed through the Event property. A thread (or multiple threads) can wait on this Event and call Receive function to read from the queue. 

In both cases, anybody can call Send to write data to the queue. You can even use the queue to send messages back to yourself.

function TMessageQueue<T>.MakeCallback(const value: T): TThreadProcedure;
begin
  Result :=
    procedure
    begin
      FReceiver(value);
    end;
end;

procedure TMessageQueue<T>.DispatchMessages;
var
  value: T;
begin
  while FQueue.PopItem(value) = wrSignaled do
    TThread.Queue(nil, MakeCallback(value));
end;

function TMessageQueue<T>.Send(const value: T): boolean;
begin
  Result := (FQueue.PushItem(value) = wrSignaled);
  if Result then begin
    if assigned(FEvent) then
      FEvent.SetEvent;
    if assigned(FReceiver) then
      DispatchMessages;
  end;
end; 

To simplify writing a thread that uses two such communication channels (one can only transfer data in one direction - to the queue or from the queue) I wrote a TCommThread<TToThread, TToMain> class. It has two generic parameters. TToThread specifies type of data that is sent to the background thread and TToMain specifies type of data that is sent to the main thread. If you want to send different types of data, you can use the TValue (System.RTTI) as the type parameter.

type
  TCommThread<TToThread, TToMain> = class(TThread)
  strict private
    FToThread: TMessageQueue<TToThread>;
    FToMain  : TMessageQueue<TToMain>;
  protected
    procedure ProcessMessage(const data: TToThread); virtual; abstract;
    function SendToMain(const value: TToMain): boolean;
    procedure TerminatedSet; override;
  public
    constructor Create(AQueueToThread: TMessageQueue;
      AQueueToMain: TMessageQueue);
    procedure Execute; override;
  end;

The only code of interest here is Execute. It waits for the event to become signalled and then either exits or reads from the queue (until it is empty) and calls ProcessMessage for every piece of data read from the queue.

procedure TCommThread<TToThread, TToMain>.Execute;
var
  data: TToThread;
begin
  while FToThread.Event.WaitFor = wrSignaled do begin
    if Terminated then
      break;
    FToThread.Event.ResetEvent;
    while FToThread.Receive(data) do
      ProcessMessage(data);
  end;
end;

To be able to stop the worker without sending it a message, the code overrides TerminatedSet method which is called when you call TThread.Terminate in the code. Overridden version sets the event on which the Execute waits. Because of that, WaitFor in the Execute stops waiting, the code sees that Terminated flag is set and exits.

procedure TCommThread<TToThread, TToMain>.TerminatedSet;
begin
  inherited;
  FToThread.Event.SetEvent;
end
;

To implement a worker thread one has to write a derived class implementing a ProcessMessage method.

type
  TCopyThread = class(TCommThread<string,string>)
  protected
    procedure ProcessMessage(const data: string); override;
  end;

Implementation in the demo code just passes data back to the main thread and reports an error if the queue is full.

procedure TCopyThread.ProcessMessage(const data: string);
begin
  if not SendToMain('Processed: ' + data) then
    TThread.Queue(nil,
      procedure
      begin
        frmThreadedQueue.Log(Format(
          '*** Thread failed to post message [%s]'
,
          [data]));
      end);
end;

To test all that, main form creates two communication channels and multiple worker threads.

procedure TfrmThreadedQueue.FormCreate(Sender: TObject);
var
  i: integer;
begin
  FQueueToWorkers := TMessageQueue<string>.Create(100);
  FQueueToMain := TMessageQueue<string>.Create(100, HandleWorkerMessage);
  SetLength(FWorkers, TThread.ProcessorCount);
  for i := Low(FWorkers) to High(FWorkers) do
    FWorkers[i] := TCopyThread.Create(FQueueToWorkers, FQueueToMain);
end;

procedure TfrmThreadedQueue.FormDestroy(Sender: TObject);
var
  i: integer;
begin
  for i := Low(FWorkers) to High(FWorkers) do begin
    FWorkers[i].Terminate;
    FWorkers[i].Free;
  end;
  FreeAndNil(FQueueToWorkers);
  FreeAndNil(FQueueToMain);
end;

A click on a button then sends multiple messages to FQueueToWorkers.

procedure TfrmThreadedQueue.Button1Click(Sender: TObject);
var
  i: integer;
  msg: string;
begin
  ListBox1.Clear;
  msg := Edit1.Text;
  for i := 1 to SpinEdit1.Value do
    if not FQueueToWorkers.Send(msg + ' #' + i.ToString) then
      Log(Format('*** Main failed to post message [%d]', [i]));
end;

These messages are then processed by the workers on a first come - first serve basis. Because there are multiple workers, messages will arrive back to the main thread shuffled.

If you only need a 1:1 communication, unit CommThread also implements a wrapper class TSingleCommThread<TToThread, TToMain> which internally manages its own communication channels. As the comm channels are not exposed to the owner, it also implements method SendToThread which sends message to the thread.

type
  TSingleCommThread<TToThread, TToMain> = class(TCommThread<TToThread, TToMain>
  strict private
    FToThreadQueue: TMessageQueue<TToThread>;
    FToMainQueue: TMessageQueue<TToMain>;
  public
    constructor Create(numItems: integer;
      const messageReceiver: TMessageProc<TToMain> = nil);
    destructor Destroy; override;
    function SendToThread(const value: TToThread): boolean;
  end;

Let's recap:
  1. This implementation allows multiple producers to send data to multiple workers over one communication channel. It is indeed slower than an optimized lockless solution, but this is rarely important.
  2. Worker threads don't spend CPU time waiting for messages. Instead of that each worker thread waits on an event which uses zero CPU time.
  3. Main thread also doesn't spend CPU time waiting for a message. Instead of that a message is pushed to it via TThread.Queue.
And the most important fact:
  1. Message processing is decoupled from thread logic. Thread logic is handled in the base class which you only have to derive from and write on message-processing method. Same goes for the main thread - you only have to write one message-processing method and that is that.

7 comments:

  1. Yes, a multiple producer, multiple consumer queue that is event driven and self contained, is the solution to many threading applications. Once you learn how to express your multitasking problem into those terms, the rest is much easier.

    Using TThreadedQueue brings back the horrors of the TMonitor bugs in the early generic versions, that made my implementation of a similar framework using TThreadedQueue fall flat on the ground. It was finally fixed in XE2 update 4.

    Thanks for a well written article.

    ReplyDelete
  2. Anonymous10:34

    Hi,
    Thank you for this article. Nice one.

    I read Gary's article, and I was a bit confused too, mainly for 2 reasons :
    - Same as your article exposed (Utility of one to one)
    - And for the fact that current implementation let developper to easely exposed to hard concurrency problem, in a "bad use" case. It is *very easy* to obtain there.
    I like much Gary's article usually, hope he'll post a more advanced implementation later.

    ReplyDelete
  3. Man, why is this thread stuff still hard? :-/

    ReplyDelete
  4. Hello

    Thank you for this article and your book (Delpi High Performance). I am using them to guide myself into parallel processing.

    I am using your code to implement a queued database worker thread. When I send a query to the input queue, and the output queue signals a result, I want to be sure I get the results of my query and not somebody else's query. I can include an identifier to tag my results, but if I receive a result that is not mine, that result set is already popped and no one else will get a chance to examine it.

    I could send the result back to to output queue if I don't want it, and let the output queue re-signal. I run the risk of re-examining the same result repeatedly until the result I want appears.

    I could possibly try to peek at the result before I pop it, but that seems to be moving away from the spirit of your code.

    Do you have thoughts on how I might solve this problem?

    Thanks

    David

    ReplyDelete
  5. You could, for example, create one output queue per client and include it into the request data. DB worker thread would then fetch a request, process it, and write the output to the queue reference from the request record.

    BTW, a better place for such discussions is https://en.delphipraxis.net/forum/32-omnithreadlibrary/

    ReplyDelete