Thursday, February 16, 2012

Blaise Pascal Magazine Rerun #7: Intraprocess Communication

This article was originally written for the Blaise Pascal Magazine and was published in Issue #12.
Relevant Delphi code is available at
http://17slon.com/blogs/gabr/BPM/Communication.zip.

Welcome back to the “multithreading” series. By now you already know how to create a thread and synchronize multiple threads but there’s some little part of the puzzle I’ve been ignoring – communication. And that’s not just any communication. When personal computers and programming are in question, communication is a very broad topic. It could mean communication between different computers (not the topic of today’s article) and communication between multiple processes on one computer (also something we’ll ignore today) but in the context of the multithreaded programming it can only mean communication between different threads inside a process. In other words – all communication happens inside one process.

The problems here are very much different from the first two examples. When talking between computers we have to send data across some external medium (in most cases intra/internet running TCP/IP as a protocol). When multiple processes on one computer are in question, we can use the same solution (TCP/IP) or we can establish some communication channel that uses computer memory for data exchange. (I’ll hint at few such solutions later in the article.) The problem here is the barrier – in the former case the network and in the latter case the memory protection enforced by the operating system. (One process normally cannot “see” memory belonging to another process.) In the multithreading scenario, however, no such barrier exists.

Inside one process, all threads can access memory belonging to other threads. That’s partly a great power as it is very simple (and fast!) to pass large blocks of memory between threads. Just think – you don’t have to send the memory over the wire and you don’t have to copy it into part of memory that accessible by another process, you just have to send an address of the memory – a four-byte number – to another thread and it could immediately start reading data from that memory. Or writing to it. And herein lies the problem – as I have already stated in previous articles, it is very easy to shoot yourself in the foot with the multithreading code. Writing multithreaded program that will crash at random times is really really simple.

We’ve seen that synchronization can be the partial answer to the problem. Before working on a shared data (data that is being access from the multiple threads) the programmer must lock this memory by using critical section, spinlock, mutex or whichever synchronization primitive from the rich Windows API. But that’s only part of an answer. Frequent memory locking can lead to speed degradation. Instead of happily running on multiple threads, then program could block in most threads, waiting for shared data to become unlocked, while only one thread would be running (and accessing that data). In other words – a lots of effort put into multithreaded program could lead to zero or negligible speed improvement. Communication is the key to this problem. Instead of locking the shared data, one thread can send a copy of the data to another thread and they could work on the data in parallel, each on its own copy.

You may want to ask why do we need any communication tools at all? After all, all the memory inside the process is shared. One thread can just write a message into some part of memory and another thread would read from that part and get the message. True, but you’ll be forgetting that access to this memory block must again be locked. Even more, the second thread must learn somehow that a new message is waiting in the area. One possibility is to constantly check if new message was written there (polling, typically a very bad idea) while another option is to use events, the Windows synchronization primitive we’ve learned above in the previous article.

What can we learn from that? Firstly that locking is very important in multithreading environment. Secondly, that communication between threads is mostly just a managed access to a common memory block. And thirdly that communication tool must encapsulate all the messiness away and give the programmer only a simple interface that doesn’t allow him or her to introduce too many bugs into the code.

Events

As we’ve already seen, events can be used to implement locked access to a shared resource. More typically, events are used to send a one bit information, a state, from one thread to another (or from one process to another). Most importantly, a thread can wait for an event – it can enter a Windows API call that will return only when the event is set (or signaled).

To create an event you have to call CreateEvent API. The code in demo project Events does that in the OnCreate event handler.

procedure TfrmEvents.FormCreate(Sender: TObject);
begin
FStartWorker := CreateEvent(nil, false, false, nil);
FHasResult := CreateEvent(nil, false, false, nil);
FWorker := TEventThread.Create(@FSource, @FResult, FStartWorker, FHasResult);
end;

The CreateEvent API accepts four parameters. First represents security attributes and will be typically nil in the intraprocess scenario. Second specifies whether the event is reset manually (when parameter is True) or automatically. In the former case, the programmer must call ResetEvent to reset the event and in the latter case (which we are using) then event is reset automatically. The third parameter specifies initial state (False = not set) and the fourth parameter contains the name of the event object. When working just inside one process, event names are not needed.

The code above creates two events and one worker thread. The worker threads receives four parameters – handles of both events and addresses of two integer fields declared inside the form class. The reason for this will become clear in a minute.

Events are released by calling CloseHandle. This is done in the OnDestroy handler.

procedure TfrmEvents.FormDestroy(Sender: TObject);
begin
CloseHandle(FHasResult);
CloseHandle(FStartWorker);
end;

The Events demo has one input field (inpValue) and one button (btnCalc). The user enters a number into the input field and clicks the button. At that moment the following code executes.

procedure TfrmEvents.btnCalcClick(Sender: TObject);
begin
btnCalc.Enabled := false;
try
FSource := inpValue.Value;
Log('Calculating ...');
SetEvent(FStartWorker);
if WaitForSingleObject(FHasResult, 10 * 1000) = WAIT_OBJECT_0 then
// wait will automatically reset event
Log(Format('2 * %d = %d', [FSource, FResult]))
else
Log('No result');
finally btnCalc.Enabled := true; end;
end;

The code copies value from the input field into the FSource field. Then it sets the FStartWorker event. This signals the background FWorker thread to start calculating. Next it waits for the FHasResult event. The worker thread will signal this event when it will complete the calculation. At the end, the result is displayed.

As you can see, the WaitFor function accepts a timeout parameter. In the example above it was set to 10 seconds, which is definitely long enough time to complete the calculation. In theory you could set it to INFINITE (a constant defined in Windows.pas) but in practice it is better to select a large number that can never be reached if the program is working correctly and to report an error if the timeout is exceeded.

That’s all from the main program thread’s perspective: set a content of a shared memory, set an event, wait for another event, display a content of another shared memory.

The background thread is also not very complicated.

constructor TEventThread.Create(srcAddr, destAddr: PInteger; startWorker,
hasResult: THandle);
begin
inherited Create(false);
FSrcAddr := srcAddr;
FDestAddr := destAddr;
FStartWorker := startWorker;
FHasResult := hasResult;
FTerminate := CreateEvent(nil, false, false, nil);
end;

destructor TEventThread.Destroy;
begin
SetEvent(FTerminate);
inherited;
end;

procedure TEventThread.Execute;
var
handles: array [1..2] of THandle;
begin
handles[1] := FTerminate;
handles[2] := FStartWorker;
while WaitForMultipleObjects(2, @handles, false, INFINITE) =
(WAIT_OBJECT_0 + 1) do
begin
FDestAddr^ := 2 * FSrcAddr^;
SetEvent(FHasResult);
end;
end;

The constructor only stores parameters (pointers to the shared memory blocks and both events) away for later use and creates one event that will be used internally. Destructor just sets this event and calls inherited destructor. Together with properly coded Execute this allows us to just destroy the thread object and it will take care of its own termination.

The Execute waits on two events at the same time. In this case INFINITE is used because we have absolutely no idea how many time will pass between two clicks in the Calculate button. The WaitForMultipleObjects will return if either first or second event will be set. If first event is set, the API will return WAIT_OBJECT_0 (a constant in Windows.pas), if second WAIT_OBJECT_0 + 1 and similarly for other handles (if provided).

If the FStartWorker was set, the code will take value from the shared memory area, multiply it with two and store the result in another shared memory area. At the end, FHasResult event is set.

As both events are set to automatic reset, we don’t have to reset them manually. WaitForMultipleObjects will take care of that.

This simple example implements the most trivial communication example – shared memory is used to exchange data and two events are used for resource protection and message availability signalling.

For simple applications this may be enough. If you, however, want to send larger number of messages to the thread without waiting for the answer you need something else – a message queue. The simplest path to a message queue is to use the one provided by Windows.

Windows messages

Messages are the basis of Windows graphical user interface. Everything is governed by them – from application window size and position to captions on buttons and so on. They are, however, not limited to the main thread. One can also send messages to a windowless background thread. And vice versa – it is very simple to send a message from background thread to any window inside the program. The WinMessages demo project does it all.

The basic functionality is the same as before – enter number, click the button and background thread will multiply it with two. The implementation is, and you’ll be glad to hear that, much simpler.

Setting up the worker and tearing it down consists of creating a thread and passing the Handle of the main window to it. This handle will be used to send messages from the thread back to the window.

procedure TfrmMessages.FormCreate(Sender: TObject);
begin
FWorker := TMessageThread.Create(Handle);
end;

procedure TfrmMessages.FormDestroy(Sender: TObject);
begin
FreeAndNil(FWorker);
end;

Calculate button just sends a message to the worker thread. The value to be multiplied is passed as a parameter of this message. Instead of a “usual” PostMessage, PostThreadMessage must be used to post the message directly to the thread.

procedure TfrmMessages.btnCalcClick(Sender: TObject);
begin
PostThreadMessage(FWorker.ThreadID, WM_START, inpValue.Value, 0);
end;

Much has changed in the worker thread.

procedure TMessageThread.Execute;
var
handles: array [1..1] of THandle;
begin
handles[1] := FTerminate;
while MsgWaitForMultipleObjects(1, handles, false, INFINITE, QS_ALLPOSTMESSAGE) = (WAIT_OBJECT_0 + 1) do
ProcessThreadMessages;
end;

procedure TMessageThread.ProcessThreadMessages;
var
msg: TMsg;
begin
while PeekMessage(msg, 0, 0, 0, PM_REMOVE) do begin
if msg.message = WM_START then
Start(msg.wParam);
end;
end;

procedure TMessageThread.Start(var value: integer);
begin
PostMessage(FOwnerHandle, WM_RESULT, 2 * value, value);
end;

In this case, Execute uses a different version of wait, MsgWaitForMultipleObjects. Beside waiting for handles (only one handle in this case – the terminate event)  this version also waits for some subset of windows messages passed to this thread. In this case, QS_ALLPOSTMESSAGE was used as a filter because we are only interested about Posted messages. (QS_ALLEVENTS is always a good selection if you’re not sure which filter to use.) When a message is received, WAIT_OBJECT_0 + <length of handles array> is returned from the function.

When this happens, ProcessThreadMessages is called. It removes messages from the message queue and calls Start method for each WM_START message.

Start multiplies the value (taken from the WM_START message) by 2 and sends both original and multiplied values to the main window as a WM_RESULT message. Now the PostMessage is used as we are sending message to a window and not to a thread.

Processing completes in the WM_RESULT message handler.

// TfrmMessages declaraction contains
procedure WMResult(var msg: TMessage); message WM_RESULT;

procedure TfrmMessages.WMResult(var msg: TMessage);
begin
Log(Format('2 * %d = %d', [msg.LParam, msg.WParam]));
end;

The main difference between this and previous example is that now the processing is completely decoupled. The main program is not waiting on the result. When the result is available, it will be passed to the WMResult function. Multiple WM_START commands can be sent to the thread (Windows will keep them waiting in the thread’s message queue until they are processed). Similarly, WM_RESULT messages do accumulate in the window’s message queue. This allows for a great flexibility in multithreaded programming.

Other communication mechanisms

There are some other communication mechanisms that can be used for intraprocess communication although they are more suitable for other two scenarios – multiple processes or multiple computers. Still, they deserved to be mentioned here as they may find practical use in the multithreaded programming too.

First on the list are pipes. A pipe is a one-way or duplex (bidirectional) communication channel between a client and a server. Multiple clients may connect to one server. Pipes can be used over the network. The biggest problem with pipes is that the API is quite clumsy. If you want to use pipes it would be better to start with a Delphi wrapper, like the one written by Russell Libby.

Another such mechanism are mailslots. They are unidirectional only and are typically used over the network. The main power of mailslots is that they support broadcasting – one application can broadcast a single mailslot message which will be received by multiple processes.

We cannot not mention sockets. TCP/IP is typically used to communicate between processes but it can easily be used between threads. Mostly, you would set up such scenario for testing TCP clients or servers as other methods (directly sharing memory, Windows messages) are much more effective.

OmniThreadLibrary queues

To wrap up the Communication article I’ve prepared two examples using the OmniThreadLibrary multithreading library which as you already know I’m very fond of. The first example compiles with the official release 1.05 while the second requires most recent version from the SVN.

The first example, OTLQueue, uses OmniThreadLibrary communication queue combined with a standard TThread worker. The worker does most of the work (sorry, bad pun).

type
TOTLCommThread = class(TThread)
private
FChannel : IOmniTwoWayChannel;
FTerminate: THandle;
public
constructor Create;
destructor Destroy; override;
procedure Execute; override;
procedure Start(value: integer);
property Channel: IOmniTwoWayChannel read FChannel;
end;

constructor TOTLCommThread.Create;
begin
inherited Create(false);
FTerminate := CreateEvent(nil, false, false, nil);
FChannel := CreateTwoWayChannel(1000, FTerminate);
end;

destructor TOTLCommThread.Destroy;
begin
SetEvent(FTerminate);
inherited;
end;

procedure TOTLCommThread.Execute;
var
handles: array [1..2] of THandle;
msg : TOmniMessage;
begin
handles[1] := FTerminate;
handles[2] := FChannel.Endpoint2.NewMessageEvent;
while WaitForMultipleObjects(2, @handles, false, INFINITE) =
(WAIT_OBJECT_0 + 1) do
begin
while FChannel.Endpoint2.Receive(msg) do begin
if msg.MsgID = WM_START then
Start(msg.MsgData.AsInteger);
end;
end;
end;

procedure TOTLCommThread.Start(value: integer);
begin
FChannel.Endpoint2.Send(WM_RESULT, 2 * value);
end;

Besides creating a terminate event, worker also creates an OTL communication channel. This channel is also available via the Channel property so it can be used from the main thread. Each IOmniTwoWayChannel has two endpoints. Whatever is sent on the Endpoint1 is received on Endpoint2 and vice versa. In the example Endpoint1 is owned by the main thread and Endpoint2 by the worker thread but that is just a convention, not something enforced by the OmniThreadLibrary.

The background worker’s Execute method waits on two events, one indicating the thread termination and another specifying that a new message was received. If the latter happens, messages (there can be more than one waiting) are read from the Endpoint2 and processed. When WM_START is received, appropriate WM_RESULT is sent to the Endpoint2.

The calculation model in main thread is very similar to the Event project. (The other example shows a solution similar to the WinMessages example.)

procedure TfrmMessages.btnCalcClick(Sender: TObject);
var
msgID : word;
result: TOmniValue;
begin
btnCalc.Enabled := false;
try
FChannel.Endpoint1.Send(WM_START, inpValue.Value);
if FChannel.Endpoint1.ReceiveWait(msgID, result, 10 * 1000) then
Log(Format('2 * %d = %d', [inpValue.Value, result.AsInteger]))
else
Log('No result');
finally btnCalc.Enabled := true; end;
end;

To start the calculation, the program sends WM_START message to the channel's Endpoint1. The content of the input field is passed as a message parameter. Then the code waits on the result for up to 10 seconds. Resulting message ID is ignored as the code knows that only WM_RESULT can be sent from the worker. At the end, the result is displayed.

Why would one want to use OTL queues instead of Windows message queues? The OTL queue has many advantages over the Windows model. The queue can be made very large (Windows queue is quite limited), it is much faster and it can pass non-integer data such as strings, floating point numbers and interfaces.

OmniThreadLibrary worker

The last example shows a fully OmniThreadLibrary solution and is, because of its simplicity, shown here almost in full.

unit otlWorker1;

interface

uses

OtlCommon, OtlComm, OtlTask, OtlTaskControl;

const
WM_START = WM_USER;
WM_RESULT = WM_USER + 1;

type
TfrmMessages = class(TForm)

private
FWorker: IOmniTaskControl;
procedure WMResult(var msg: TOmniMessage); message WM_RESULT;
end;

implementation

type
TOTLCommThread = class(TOmniWorker)
protected
procedure Start(var value: TOmniMessage); message WM_START;
end;

procedure TfrmMessages.FormCreate(Sender: TObject);
begin
FWorker := CreateTask(TOTLCommThread.Create())
.OnMessage(Self)
.Run;
end;

procedure TfrmMessages.FormDestroy(Sender: TObject);
begin
FWorker.Terminate;
FWorker := nil;
end;

procedure TfrmMessages.btnCalcClick(Sender: TObject);
begin
FWorker.Comm.Send(WM_START, inpValue.Value);
end;

procedure TfrmMessages.WMResult(var msg: TOmniMessage);
begin
Log(Format('2 * %d = %d',
[integer(msg.MsgData[0]), integer(msg.MsgData[1])]));
end;

procedure TOTLCommThread.Start(var value: TOmniMessage);
begin
Task.Comm.Send(WM_RESULT,
[value.MsgData.AsInteger, 2 * value.MsgData.AsInteger]);
end;

end.

The worker is created in FormCreate (CreateTask call). OnMessage(Self) specifies that all messages sent from the thread will be dispatched directly to the form (frmMessages) and Run starts the background worker.

The Calculate button (btnCalcClick) just sends a WM_START message on the communication channel associated with the worker.

This messages is automatically dispatched to the TOTLCommThread.Start (because of the ‘message WM_START’ part in the declaration). This method sends back WM_RESULT message with two parameters – original and multiplied value.

WM_RESULT is again automatically dispatched to the WMResult method where both message parameters are read from the MsgData field and logged on the screen.

As in the WinMessages example this is a fully asynchronous solution with message queues on both background worker and main window side. The difference here is that there was less coding to be done and that the message queue is more powerful (see the advantages noted in the previous section).

Conclusion

I cannot repeat it enough – multithreading is hard. If you’re going to write multithreaded code, I can only recommend using message passing (be it of classical Windows or any other form) as much as possible. Shared data access will always cause you problems while message passing by itself will not. Of course, you may abuse message queue to pass around addresses of shared memory and then use locking but the you should see item one – multithreading is hard.

See you next time in the last part of this series when I’ll write about debugging multithreading solutions.

1 comment: