Tuesday, January 17, 2012

OmniThreadLibrary in Practice [2a]–Backround Worker and List Partitioning

Today I’m revisiting example from November 2011. This time I’ll solve it using the new Parallel.BackgroundWorker abstraction.

Part of the Zarko’s requirements (see the original post for full text) was a cancellation support.

At any time the "master" thread could be signaled to terminate (from the app's main thread) all child threads (and itself).

When I was originally implementing this using the Parallel.Pipeline abstraction I had to put some work into the cancellation support. The main reason for this was inappropriate abstraction – Parallel.Pipeline is designed around the data flow processing and supports only a basic cancellation of the “stop everything” type. To be fair, that would comply with the Zarko’s requirements, but I wanted nicer solution where you can stop processing and then continue with a new work item without rebuilding the background thread mechanism. [Pipeline solution inherently supports cancellation but you cannot recover from it – to continue processing one would have to destroy the pipeline and build a new one.]

BackgroundWorker solution is very similar to the Pipeline solution except that it is simpler and requires less coding. OmniThreadLibrary 3.0 is required to use the BackgrounWorker abstraction while you’ll need current SVN checkout to get the reimplemented “Stringlist parser” example.s

To set up a background worker, simply call Parallel.BackgroundWorker and provide it with a code that will process work items (BreakStringHL) and a code that will process results of the work item processor (ShowResultHL). It is important to keep in mind that the former (BreakStringHL) executes in the background thread while the latter (ShowResultHL) executes in the main thread. [Actually, it executes in the thread which calls Parallel.BackgroundWorker but in most cases that will be the main thread.]

  FBackgroundWorker := Parallel.BackgroundWorker
.Execute(BreakStringHL)
.OnRequestDone(ShowResultHL);

Tearing it down is also simple.

  FBackgroundWorker.CancelAll;
FBackgroundWorker.Terminate(INFINITE);
FBackgroundWorker := nil;

CancellAll is called to cancel any pending work requests, Terminate stops the worker (and waits for it to complete execution) and assignment clears the interface variable and destroys last pieces of the worker.

If you look at the original high level code, you’ll see the StringProcessorHL method which wraps the pipeline and handles cancellation. There’s no need for this method (or its equivalent) in the new version as all that is handled by the background worker itself.

The BreakStringHL is, however, very similar to the original code.

procedure TfrmStringListParser.BreakStringHL(const workItem: IOmniWorkItem);
var
charsPerTask : integer;
input : string;
iTask : integer;
numTasks : integer;
output : TStringList;
partialQueue : IOmniBlockingCollection;
s : string;
stringBreaker: IOmniParallelTask;
taskResults : array of TStringList;
begin
partialQueue := TOmniBlockingCollection.Create;
numTasks := Environment.Process.Affinity.Count - 1;

SetLength(taskResults, numTasks);
for iTask := Low(taskResults) to High(taskResults) do
taskResults[iTask] := TStringList.Create;

stringBreaker := Parallel.ParallelTask.NumTasks(numTasks).NoWait
.TaskConfig(Parallel.TaskConfig.CancelWith(workItem.CancellationToken))
.Execute(
procedure (const task: IOmniTask)
var
workItem: TOmniValue;
begin
workItem := partialQueue.Next;
SplitPartialList(workItem[1].AsString,
taskResults[workItem[0].AsInteger], task.CancellationToken);
end
);

// provide input to the ForEach loop above
input := workItem.Data;
for iTask := 1 to numTasks do begin
// divide the remaining part in as-equal-as-possible segments
charsPerTask := Round(Length(input) / (numTasks - iTask + 1));
partialQueue.Add(TOmniValue.Create([iTask-1,
Copy(input, 1, charsPerTask)]));
Delete(input, 1, charsPerTask);
end;

// process output
stringBreaker.WaitFor(INFINITE);
if not workItem.CancellationToken.IsSignalled then begin
output := TStringList.Create;
for iTask := Low(taskResults) to High(taskResults) do begin
for s in taskResults[iTask] do
output.Add(s);
end;
workItem.Result := output;
end;
for iTask := Low(taskResults) to High(taskResults) do
taskResults[iTask].Free;
end;

If you compare it with the original code, you’ll see that both versions are almost step-by-step identical except that new version takes the data to be processed from the workItem.Data, returns result in workItem.Result and uses cancellation token in workItem.CancellationToken.

Parallelized list splitter (SplitPartialList) was not changed at all since the last time as this part of the code doesn’t have any knowledge of the upper layer abstraction (pipeline or background worker).

The only missing piece is the result processing method.

procedure TfrmStringListParser.ShowResultHL(const Sender: IOmniBackgroundWorker;
const workItem: IOmniWorkItem);
begin
if workItem.CancellationToken.IsSignalled then
lbLog.Items.Add('Canceled')
else
ShowResult(workItem.Result.AsObject as TStringList);
end;

It receives IOmniBackgroundWorker interface (useful if you are sharing one method between several background workers) and the work item that was processed (or cancelled). The code simply checks if the work item was cancelled and displays the result (by using the ShowResult from the original code) otherwise.

I hope this example is not too complicated and that the beauty of the background worker can be appreciated from it.

1 comment:

  1. It can be understood. It is impossible to create it unless you are an expert with the framework, thought.

    Do you think it is possible to create a more, hm, standard set of objects to accomplish this? For example, some form ParallelTask class with event handlers, like OnCreate, OnInputCreate, OnOutputProcess, OnCancellation...

    On a different note, don't you happen to have an example around on how to use OTL for more some menial task like downloading an file to implement an auto-update feature? :-)

    ReplyDelete