TL;DR: Here is the documentation and here is the unit.
You probably know I do a lot of multithreaded programming😜 Quite a fun work with lots of fun problems, one of them being sending data across threads. OmniThreadLibrary uses queues and Windows messages for that but sometimes this is just not good enough.
Recently I was solving a problem of dispatching events across multiple threads. I have a UDP receiver that runs in a thread, does some processing (unpacks TS packets from the UDP) and sends unpacked data to receiver threads. I didn't want to use standard OTL messaging mechanisms for that as there are many packets received every second and Windows message queues would surely overflow.
I started working on a custom solution (something with a shared queue where the data is pushed and smart notification mechanism that is not triggered for every UDP packet) but then I got bored and I started thinking about a more general solution. I also wanted to move away from Windows messages as they require special work in the receiver thread. The general internet knowledge pointed me to Asynchronous Procedure Calls and an idea for a new implementation was born!
Example
GpEventBus subscription mechanism is based on events so firstly you have to define an event. An event is a record type that carries whatever data it needs to carry.
type
TDataReadyEvent = record
FileName: string;
BytesRead: Int64;
Success: Boolean;
class function Create(const AFileName: string; ABytes: Int64;
ASuccess: Boolean): TDataReadyEvent; static;
end;
class function TDataReadyEvent.Create(const AFileName: string;
ABytes: Int64; ASuccess: Boolean): TDataReadyEvent;
begin
Result.FileName := AFileName;
Result.BytesRead := ABytes;
Result.Success := ASuccess;
end;
To subscribe from a main thread, call EventBus.Subscribe<T>. (EventBus is a function returning the global event bus. You can create other event bus instances by calling CreateEventBus function). As a parameter to Subscribe you provide an event handler:
procedure TMainForm.AfterConstruction;
begin
inherited;
// Main thread doesn't need RegisterThread (auto-detected)
FSubscription := EventBus.Subscribe<TDataReadyEvent>(
procedure(const evt: TDataReadyEvent)
begin
// Executes in main thread - safe to update UI
if evt.Success then
StatusBar1.SimpleText := Format('Loaded %s (%d bytes)',
[evt.FileName, evt.BytesRead])
else
ShowMessage('Load failed: ' + evt.FileName);
end);
end;
You have to keep FSubcription (of type IEventSubscription) alive for the time the thread is subscribed.
To unsubscribe, call FSubscription.Unsubscribe. After that you can clear the FSubscription interface.
To subscribe for events from a background thread, you have to register the thread first by calling EventBus.RegisterThread. Before the thread is being destroyed, you have to unregister it by calling EventBus.UnregisterThread. Here is a simple example:
procedure TWorkerThread.Execute;
begin
// STEP 1: Register this thread for event dispatch
EventBus.RegisterThread;
try
// STEP 2: Subscribe to events
FSubscription := EventBus.Subscribe<TDataReadyEvent>(
procedure(const evt: TDataReadyEvent)
begin
// Executes in THIS background thread
ProcessFile(evt.FileName, evt.BytesRead);
end);
// STEP 3: Alertable wait loop (REQUIRED for QueueUserAPC)
while not Terminated do
begin
// Sleep in alertable state - APCs will wake us up
if SleepEx(100, True) = WAIT_IO_COMPLETION then
Continue; // APC executed, loop again
// Optional: Do periodic work here
CheckHeartbeat;
end;
finally
// STEP 4: Clean up before thread terminates
FSubscription := nil; // Unsubscribe
EventBus.UnregisterThread;
end;
end;
To fire an event, just call EventBus.Fire<T> from any thread - main or background.
procedure TFileLoader.LoadComplete(const fileName: string; bytesRead: Int64);
begin
EventBus.Fire<TDataReadyEvent>(
TDataReadyEvent.Create(fileName, bytesRead, True));
end;If you have subscribed to the event from the main thread, event will be dispatched via the TThread.Queue mechanism. You don't have to do any preparation in the code. Background threads, however, need to spend some time in alertable wait for the events to be dispatched.
In the example above, the code calls SleepEx with second parameter set to true. This creates aleartable wait which allows Asynchronous Procedure Calls to be executed which then allows event handlers to be called.
Let me simplify that: While the code is in SleepEx, subscribed event handlers may be executed.
Another option to achieve an alertable wait is to call WaitForSingleObjectEx, WaitForMultipleObjectsEx, or MsgWaitForMultipleObjectsEx and set bAlertable parameter to true.
If your worker thread is an OTL task then you just have to add .Alertable modifier to the task creation code.
Failure cases
As is usual in multithreaded programming, the biggest problems occur when the threads are shutting down. For example, what if the APC that carries an event has already been scheduled, but the receiving thread has just terminated? GpEventBus contains code that handles that! You can still run into problems, though, if you forget to call EventBus.UnregisterThread. (Because then EventBus would not know that your thread has terminated and it will still try to execute event handlers in its context.)
To help with such problems, there are some tests in the GpEventBus unit that are only compiled if DEBUG is defined:
- Subscribe will raise exception if called from a worker thread that did not register itself.
- When event bus is destroyed, it log a message (OutputDebugString) if a registered thread was not unregistered. Similar check is done in the APC dispatch. The program will stop in a breakpoint if you are running it in a debugger.
- UnregisterThread will raise exception if the thread was not registered.
- RegisterThread internally schedules a special APC. If it is not delivered in 5 seconds, the code raises an exception and tells you to put the thread in alertable state. (This test is part of the GpEventBus.AlertableWaitMonitor unit.)
For more information, I recommend reading the accompanying documentation and, of course, browsing the code.
No comments:
Post a Comment