Tuesday, January 10, 2012

Background Worker

When I wrote OmniThreadLibrary in Practice [2]–Background Worker and List Partitioning in November, I noticed that OmniThreadLibrary needed another high-level abstraction, a background worker. I published some ideas on how the background worker interface could look. Those ideas were later developed in a full-fledged high-level solution.

Background worker is designed around the concept of a work unit. You create a worker, which spawns one or more background threads, and then schedule work units to it. When they are processed, background worker notifies you so you can process the result. Work items are queued so you can schedule many work items at once and background thread will then process them one by one. [Actually, Parallel.Pipeline is used as a base for the implementation.]

Background worker is created by calling Parallel.BackgroundWorker factory. Usually you’ll also set the main work item processing method and completion method by calling Execute and OnRequestDone, respectively. As usual, you can provide OTL with a method, a procedure, or an anonymous method.

  FWorker := Parallel.BackgroundWorker
.OnRequestDone(StringRequestDone)
.Execute(StringProcessorHL);

To close the background worker, call the .Terminate method and set the reference (FWorker) to nil.

To create a work item, call CreateWorkItem factory and pass the result to the Schedule method. You can pass data to the work item by passing a parameter to the CreateWorkItem method. If you have to pass multiple parameters, you can collect them in a record and wrap it with a TOmniValue.FromRecord<T>.

  IOmniBackgroundWorker = interface
function CreateWorkItem(const data: TOmniValue): IOmniWorkItem;
procedure CancelAll; overload;
procedure CancelAll(upToUniqueID: int64); overload;
function Config: IOmniWorkItemConfig;
function Execute(const aTask: TOmniBackgroundWorkerDelegate = nil):
IOmniBackgroundWorker;
function NumTasks(numTasks: integer): IOmniBackgroundWorker;
function OnRequestDone(const aTask: TOmniWorkItemDoneDelegate):
IOmniBackgroundWorker;
function OnRequestDone_Asy(const aTask: TOmniWorkItemDoneDelegate):
IOmniBackgroundWorker;
procedure Schedule(const workItem: IOmniWorkItem;
const workItemConfig: IOmniWorkItemConfig = nil);
function StopOn(const token: IOmniCancellationToken):
IOmniBackgroundWorker;
function TaskConfig(const config: IOmniTaskConfig): IOmniBackgroundWorker;
function Terminate(maxWait_ms: cardinal): boolean;
function WaitFor(maxWait_ms: cardinal): boolean;
end;

You can pass additional configuration to the Schedule method by providing a configuration block (second parameter). Call the Config method to create the configuration parameter block. By using this approach, you can set a custom executor method or a custom completion method for each separate work item.

  IOmniWorkItemConfig = interface
function OnExecute(const aTask: TOmniBackgroundWorkerDelegate):
IOmniWorkItemConfig;
function OnRequestDone(const aTask: TOmniWorkItemDoneDelegate):
IOmniWorkItemConfig;
function OnRequestDone_Asy(const aTask: TOmniWorkItemDoneDelegate):
IOmniWorkItemConfig;
end;

Background worker supports two notification mechanisms. By calling OnRequestDone, you are setting a synchronous handler, which will be executed in the context of the thread that created the background worker (usually a main thread). In other words – if you call OnRequestDone, you don’t have to worry about thread synchronisation issues. On the other hand, OnRequestDone_Asy handler is executed asynchronously, in the context of the thread that processed the work item.

By calling NumTasks, you can set the degree of parallelism. Normally, background worker uses only one background task but you can override this behaviour.

There are also some “standard” methods, namely Terminate and WaitFor, and something called StopOn which I, to be completely frank, have no idea what is good for. Actually, it is not even implemented, because I cannot remember why I put it there :( It will probably be just silently removed at some point.

Before I wrap up this article with description of Cancel and CancelAll methods, let’s take a look at the work item interface, IOmniWorkItem.

  IOmniWorkItem = interface ['{3CE2762F-B7A3-4490-BF22-2109C042EAD1}']
function DetachException: Exception;
function FatalException: Exception;
function IsExceptional: boolean;
property CancellationToken: IOmniCancellationToken read GetCancellationToken;
property Data: TOmniValue read GetData;
property Result: TOmniValue read GetResult write SetResult;
property UniqueID: int64 read GetUniqueID;
end;

It contains input data (Data property), result (Result) and an unique ID, which is assigned in the CreateWorkItem call. First work item created gets ID 1, second ID 2 and so on. This allows some flexibility when you want to cancel work items. You can cancel one specific item by calling workItem.CancellationToken.Signal or multiple items by calling backgroundWorker.CancelAll(highestIDToBeCancelled) or all items (backgroundWorker.CancelAll).

Cancellation is partly automatic and partly cooperative. If the work item that is to be cancelled has not reached the execution yet, then the system will prevent it from ever being executed. If, however, work item is already being processed, your code must occasionally check workItem.CancellationToken.IsSignalled and exit if that happens (provided that you want to support cancellation at all). Regardless of how the work item was cancelled, completion handler will still be called and it can check workItem.CancellationToken.IsSignalled to check whether the work item was cancelled prematurely or not.

There’s also an exception support built in. Any uncaught exception will be stored in the FatalException property. You can detach (and take ownership of) the exception by calling the DetachException and you can test if there was an exception by calling the IsExceptional. If IsExceptional is true, any access to the Result property will raise exception stored in the FatalException property. [In other words – if an unhandled exception occurs in the executor code (in the background thread), it will propagate to the place where you access the workItem.Result.]

If this seems slightly too abstract to you, don’t bother. I'm putting together a remake of the article mentioned at the very beginning of this post and I’ll show how background worker can be used in practice. In the meantime, check demo 52_BackgrondWorker which demonstrates most of the BackgroundWorker abstraction.

6 comments:

  1. Neat implementation, Primoz! Thanks!
    Will there be any real life examples? What tasks is this pattern intended to solve?

    ReplyDelete
  2. Yes, there will be an example.

    This pattern solves the problem of a background worker accepting work units which can be canceled at any time. In a way it is very similar to a single-stage pipeline but with added cancellation support.

    ReplyDelete
  3. I am using the BackgroundWorker abstraction to process "orders". I get the orders that have been submitted and schedule a WorkItem for each order. When all the orders have been processed, I need to go back and check for new orders. I need a way to know when there are no more scheduled items. I have looked in the code, your book and webinars but don't see where this is a capability.
    How would you suggest I handle this use case?

    ReplyDelete
  4. Why would the worker need to know where there are more scheduled items? I don't see any good reason for that.

    ReplyDelete
  5. The Worker doesn't need to know but the main thread that creates the worker does so that it can check for new unprocessed orders and then schedule more tasks. For now, I have a workaround that captures workitem.UniqueID and the done callback uses it to know when to check for the next batch of unprocessed orders. This is not guaranteed to work in all cases as the last task might complete quicker than previous ones.

    Also, the WaitFor method of BackgroundWorker is not working as I understand described on page 53 of your book. It seems to return before the done method for each task is called and returns. This is leading to a memory leak in my code as I use the done method to free allocated objects that were passed into the tasks' Create method. Is there a better place to cleanup after completed tasks?

    To reproduce the error, modify test_52_BackgroundWorker.pas as follows:
    - Create new Terminate button with below code
    - Click on Work x3! button
    - Immediately click on Terminate button
    - Message from the three scheduled tasks HandleRequestDone2 method is never written to the log

    procedure TForm16.btnTerminateClick(Sender: TObject);
    begin
    lbLog.ItemIndex := lbLog.Items.Add('Starting WaitFor / Terminate test');
    FBackgroundWorker2.WaitFor(INFINITE);
    lbLog.ItemIndex := lbLog.Items.Add('After WaitFor');
    FBackgroundWorker2 := nil;
    end;

    ReplyDelete
  6. Main thread can always maintain a simple counter containing number of items that were sent to the worker but are yet unprocessed.

    As for the second problem, I'll check the code and report the findings here.

    ReplyDelete