Sunday, February 19, 2012

Blaise Pascal Magazine Rerun #9: High Level Multithreading

This article was originally written for the Blaise Pascal Magazine and was published in Issue #18.
Relevant Delphi code is available at
http://17slon.com/blogs/gabr/BPM/HighLevel.zip.

Working with threads on the low level is fine and well but most programmers don’t want to do that – just as

they don’t want to code in assembler. They want high-level languages, they want VCL, they want packaged solutions.

For multithreading, OmniThreadLibrary is such a tool. Although the initial motivation for its design was to create an easy-to-use TThread wrapper, it turned out that this low-level TThread replacement is an excellent tool to write high-level constructs allowing any user to experience parallel processing without putting too much thought into it.

This article focuses on writing multithreaded code with the OmniThreadLibrary. As this library is constantly being developed, the article focuses on the last stable release, 2.1.

In the 2.1 release, OmniThreadLibrary supports six high-level parallelization constructs: Async (simple background execution), Join (execution of multiple background tasks), Future (execution of background tasks that return results), ForEach (parallel “for” statement), Pipeline (parallelized pipeline) and ForkJoin (parallel “divide and conquer”). The implementation of those tools actively uses anonymous methods which is why they are supported only in Delphi 2009 and newer.

To start using OmniThreadLibrary (OTL for short), download it from the Google Code. Unpack it to some folder (c:\omnithreadlibrary, for example). Add this folder and its “src” subfolder (c:\omnithreadlibrary\src in this example) to Delphi’s library path or to your project’s search path. Add “OtlParallel” to the uses lists. And that’s all folks! If you’ll have any questions after reading this article, visit http://otl.17slon.com/ where you’ll get pointers to articles about the OTL and a web forum where you can express your problems.

Async

The Async method allows the programmer to create simple, one-shot background tasks that don’t require much interaction with the main thread.

To create a background task (that is, a piece of code that will execute in a background thread), call Parallle.Aync and pass it a block of code. This can be a parameterless method, procedure or anonymous method. For short examples, such as those in the article, I like to stick with anonymous methods as they require least typing.

Let’s write a simple background task that just beeps and nothing more.

Parallel.Async(
procedure
begin
MessageBeep($FFFFFFF);
end
);

Yes, that’s it. Parallel.Async will create a background thread (or reuse a previously used thread that is waiting idle for some work) and run your code in it. If you don’t believe me, put a breakpoint on the MessageBeep call, run the program and check the Threads window (View, Debug windows, Threads).

Running unattended background tasks is fine, but sometimes you need additional information, for example you want to know when the task has completed its work. For such situations, Async accepts second parameter, Parallel.TaskConfig, which you can configure in various ways. If you just want to know when the task has completed, you have to write an OnTerminated handler like in the example below.

procedure TfrmAsync.btnOnTerminatedClick(Sender: TObject);
begin
btnOnTerminated.Enabled := false;
Parallel.Async(
procedure
begin
// executed in background thread
Sleep(500);
MessageBeep($FFFFFFFF);
end,
Parallel.TaskConfig.OnTerminated(
procedure (const task: IOmniTaskControl)
begin
// executed in main thread
btnOnTerminated.Enabled := true;
end
)
);
end;

Clicking the button disables it and executes background task. Method btnOnTerminatedClick then immediately exits and your app can proceed executing other code. After half a second sleep, MessageBeep is executed in a background thread. Background task then terminates. At the end, termination code that re-enables button btnAsync is executed in the main thread. (Again, you can put a breakpoint on the btnOnTerminated.Enabled := true to verify my claims.) If you’re wondering what the “const task” parameter is there – it represents the underlying “task control interface”, something that you would use when working with the OmniThreadLibrary on the low level, and you can safely ignore it here. You just have to remember to add this parameter to the OnTerminated handler and to add OtlTaskControl unit to the uses statement.

The final example shows how to execute some code in the context of the main thread. In other words, once you are running the background task, you can schedule some code to run back in the main thread. This is very important if you want to interact with the VCL as you must never do that from a background task! VCL is written with the assumption that it will always run from the main thread and very bad things can happen if you try to update the form or do other VCL work from a background thread!

The code below uses task.Invoke to execute a code fragment (again, this can be an anonymous method, normal method or classless procedure) in the main thread. In its operation it is similar to the Queue method, not Synchronize, as it doesn’t work on the code to complete. Code is scheduled to be executed on the main thread where it will execute at the first opportunity. Meanwhile, the background task continues with its own agenda.

To use Invoke, you have to pass the IOmniTask parameter to the Async task and add unit OtlTask to the uses list.

procedure TfrmAsync.btnInvokeClick(Sender: TObject);
var
formThreadID: DWORD;
begin
formThreadID := GetCurrentThreadID;
Parallel.Async(
procedure (const task: IOmniTask)
var
taskThreadID: DWORD;
begin
// this will execute in the context of the worker thread
taskThreadID := GetCurrentThreadID;
task.Invoke(
procedure
begin
// this will execute in the context of the main thread
frmAsync.lbLog.Items.Add(Format(
'Current thread ID: %d, task thread ID: %d, ' +
' form thread ID: %d',
[GetCurrentThreadID, taskThreadID, formThreadID]));
end
);
end
);
end;

Further information about TaskConfig can be found on my blog.

Join

Another very simple tool is Join. It allows you to start multiple background tasks and wait until they have all completed. No result is returned – at least directly, as you can always store result into a shared variable. If your code returns a result, a better approach may be to use a Future or Fork/Join.

A simple demonstration of Join (below) starts two tasks – one sleeps for two and another for three seconds. When you run this code, Parallel.Join will create two background threads and run RunTask1 in first and RunTask2 in second. It will then wait for both threads to complete their work and only then the execution of main thread will continue.

procedure TfrmJoin.btnParallelClick(Sender: TObject);
begin
btnJoinMethods.Enabled := false; Update;
Parallel.Join([RunTask1, RunTask2]);
btnJoinMethods.Enabled := true;
end;

procedure TfrmJoin.RunTask1;
begin
Sleep(2000);
end;

procedure TfrmJoin.RunTask2;
begin
Sleep(3000);
end;

Join takes special care for compatibility with single-core computers. If you run the above code on a single-core machine (or if you simply limit the process to one core), it will simply execute tasks sequentially, without creating a thread.

Join accepts anonymous methods. The above demo could also be coded as a single method executing two anonymous methods.

procedure TfrmJoin.btnAnonymousClick(Sender: TObject);
begin
btnJoinAnon.Enabled := false; Update;
Parallel.Join(
procedure begin
Sleep(2000);
end,
procedure begin
Sleep(3000);
end
);
btnJoinAnon.Enabled := true;
end;

Similar to the Async, Join accepts Parallel.TaskConfig as a second parameter. It also supports the IOmniTask parameter which you can use to communicate with the main thread.

Although the Join in release 2.1 is very simple, it was greatly improved after the release. New features are described in article Parallel.Join’s new clothes.

Future

Future is a tool that help you start background calculation and then forget about it until you need the result of the calculation.

To start background calculation, you simply create a IOmniFuture instance of a specific type (indicating the type returned from the calculation). 

Future := Parallel.Future<type>(calculation);

Calculation will start in background and main thread can continue with its work. When the calculation result is needed, simply query Future.Value. If the calculation has already completed its work, value will be returned immediately. If not, the main thread will block until the background calculation is done.

The example below starts background calculation that calculates number of prime numbers in interval 1..1000000. While the calculation is running, it uses main thread for “creative” work – outputting numbers into listbox and sleeping. At the end, calculation result is returned by querying future.Value.

procedure TfrmOTLDemoFuture.btnCalcFutureClick(Sender: TObject);
const
CMaxPrimeBound = 1000000;
var
future : IOmniFuture<integer>;
i : integer;
numPrimes: integer;
begin
// create the background calculation
future := Parallel.Future<integer>(
function: integer
begin
Result := CountPrimesTo(CMaxPrimeBound);
end
);
// simulate another task
for i := 1 to 10 do begin
lbLog.Items.Add(IntToStr(i));
Sleep(20);
lbLog.Update;
end;
// get the result
Log(Format('Num primes up to %d: %d', [CMaxPrimeBound, future.Value]));
end;

As with Join, there are two Future<T> overloads, one exposing the internal task parameter and another not. TaskConfig can be provided as the optional second parameter.

class function Future<T>(action: TOmniFutureDelegate<T>; 
taskConfig: IOmniTaskConfig = nil): IOmniFuture<T>; overload;
class function Future<T>(action: TOmniFutureDelegateEx<T>;
taskConfig: IOmniTaskConfig = nil): IOmniFuture<T>; overload;

IOmniFuture<T> has some other useful features. You can cancel the calculation (Cancel) and check if calculation has been cancelled (IsCancelled). You can also check if calculation has already completed (IsDone and TryValue).

IOmniFuture<T> = interface
procedure Cancel;
function IsCancelled: boolean;
function IsDone: boolean;
function TryValue(timeout_ms: cardinal; var value: T): boolean;
function Value: T;
end;

Some other info about futures can be found in blog post. Also, there were some changes after the 2.1 release, mostly in relation with the exception handling.

ForEach

Parallel For (actually called ForEach because For would clash with the reserved keyword for) is a construct that enumerates in a parallel fashion over different containers. The most typical usage is enumerating over range of integers (just like in the classical for), but it can also be used similar to the for..in – for enumerating over Delphi- or Windows-provided enumerators.

A very simple example loops over an integer range and increments a global counter for each number that is also a prime number. In other way, the code below counts number of primes in range 1..CHighPrimeBound.

procedure TfrmForEach.btnForEachIntClick (Sender: TObject);
var
numPrimes: TGp4AlignedInt;
begin
numPrimes.Value := 0;
Parallel
.ForEach(2, CHighPrimeBound)
.Execute(
procedure (const value: integer)
begin
if IsPrime(value) then
numPrimes.Increment;
end
);
lbLog.ItemIndex := lbLog.Items.Add(Format('%d primes', [numPrimes.Value]));
end;

As the code accesses a shared variable from multiple threads, it must make sure that they don’t step on each other toes. That’s why the shared variable (numPrimes) is not a simple integer, but a special threadsafe object, provided by the GpStuff unit, which is included in the standard OmniThreadLibrary distribution.

If you have data in a container that supports enumeration (with one limitation – enumerator must be implemented as a class, not as an interface or a record) then you can enumerate over it in parallel.

nodeList := TList.Create;
// …
Parallel.ForEach<integer>(nodeList).Execute(
procedure (const elem: integer)
begin
if IsPrime(elem) then
outQueue.Add(elem);
end);

ForEach is extremely powerful and allows you to iterate over various containers, aggregate results, run without blocking the main thread and more. For longer introduction, see my blog post and the “implementation trilogy” articles [1], [2], [3].

Pipeline

Pipeline construct implements high-level support for multistage processes. The assumption is that the process can be split into stages (or subprocesses), connected with data queues. Data flows from the (optional) input queue into the first stage, where it is partially processed and then emitted into intermediary queue. First stage then continues execution, processes more input data and outputs more output data. This continues until complete input is processed. Intermediary queue leads into the next stage which does the processing in a similar manner and so on and on. At the end, the data is output into a queue which can be then read and processed by the program that created this multistage process. As a whole, a multistage process functions as a pipeline – data comes in, data comes out.

image

What is important here is that no stage shares state with any other stage.  The only interaction between stages is done with the data passed through the intermediary queues. The quantity of data, however, doesn’t have to be constant. It is entirely possible for a stage to generate more or less data than it received on input.

In a classical single-threaded program the execution plan for a multistage process is very simple.

image

In a multithreaded environment, however, we can do better than that. Because the stages are largely independent, they can be executed in parallel.

image

A pipeline is created by calling Parallel.Pipeline function which returns IOmniPipeline interface. There are two overloaded versions – one for general pipeline building and another for simple pipelines that don’t require any special configuration.

class function Pipeline: IOmniPipeline; overload;
class function Pipeline(
const stages: array of TPipelineStageDelegate;
const input: IOmniBlockingCollection = nil):
IOmniPipeline; overload;

The latter version takes two parameters – an array of processing stages and an optional input queue. Input queue can be used to provide initial data to the first stage. It is also completely valid to pass ‘nil’ for the input queue parameter and run the first stage without any input.

Stages are implemented as anonymous procedures, procedures or methods taking two queue parameters – one for input and one for output. Except in the first stage where the input queue may not be defined, both are automatically created by the Pipeline implementation and passed to the stage delegate. To use the pipeline, you will have to add OtlCollections to the uses list as it is the home of the IOmniBlockingCollection.

TPipelineStageDelegate = reference to procedure 
(const input, output: IOmniBlockingCollection);

The next code fragment shows a simple pipeline containing five stages. Result of Parallel.Pipeline is a IOmniBlockingCollection, which is a kind of single-ended queue. Result is accessed by reading an element from this queue (by calling pipeOut.Next), which will block until this element is ready.

procedure TfrmPipeline.btnPipelineClick(Sender: TObject);
var
pipeOut: IOmniBlockingCollection;
begin
pipeOut := Parallel.Pipeline([
StageGenerate,
StageMult2,
StageMinus3,
StageMod5,
StageSum]
).Run;

lbLog.ItemIndex := lbLog.Items.Add(Format('Pipeline result: %d',
[pipeOut.Next.AsInteger]));
end;

Pipeline stages are shown below. First stage ignores the input (which is not provided) and generates elements internally. Each element is written to the output queue.

procedure StageGenerate(const input, output: IOmniBlockingCollection);
var
i: integer;
begin
for i := 1 to CNumTestElements do
if not output.TryAdd(i) then Exit;
end;

Next three stages are reading data from input (by using for..in loop), and outputting modified data into output queue. For..in will automatically terminate when previous stage terminates and input queue runs out of data (that’s a feature of the IOmniBlockingCollection enumerator).

As you can see from the code, values in input/output queues are not integers, but TOmniValue (declared in the OtlCommon unit), which is an OTL version of Delphi’s Variant.

procedure StageMult2(const input, output: IOmniBlockingCollection);
var
value: TOmniValue;
begin
for value in input do
if not output.TryAdd(2 * value.AsInteger) then
Exit;
end;

procedure StageMinus3(const input, output: IOmniBlockingCollection);
var
value: TOmniValue;
begin
for value in input do
if not output.TryAdd(value.AsInteger - 3) then
Exit;
end;

procedure StageMod5(const input, output: IOmniBlockingCollection);
var
value: TOmniValue;
begin
for value in input do
if not output.TryAdd(value.AsInteger mod 5) then
Exit;
end;

The last stage also reads data from input but outputs only one number – a sum of all input values.

procedure StageSum(const input, output: IOmniBlockingCollection);
var
sum : integer;
value: TOmniValue;
begin
sum := 0;
for value in input do
Inc(sum, value);
output.TryAdd(sum);
end;

Read more about pipelines in the OmniThreadLibrary on my blog.

Fork/Join

Fork/Join is the most complicated high-level parallelism in the OmniThreadLibrary. It is an implementation of “Divide and conquer” technique. In short, Fork/Join allows you to execute multiple tasks, wait for them to terminate and collect results.

The trick here is that subtasks may spawn new subtasks and so on ad infinitum (probably a little less, or you’ll run out of stack ;) ). For optimum execution, Fork/Join must therefore guarantee that the code is never executing too much background threads (an optimal value is usually equal to the number of cores in the system) and that those threads don’t run out of work.

Fork/Join subtasks are in many way similar to Futures. They offer slightly less functionality (no cancellation support) but they are enhanced in another way – when Fork/Join subtask runs out of work, it will start executing some other task’s workload, keeping the system busy.

A typical way to use Fork/Join is to create an IOmniForkJoin<T> instance

forkJoin := Parallel.ForkJoin<integer>;

and then create computations owned by this instance

max1 := forkJoin.Compute(
function: integer begin
Result :=
end);
max2 := forkJoin.Compute(
function: integer begin
Result :=
end);

To access computation result, simply call computation object’s Value function.

Result := Max(max1.Value, max2.Value);

The code below shows how Fork/Join can be used to find maximum element in an array. At each computation level, ParallelMaxRange receives a slice of original array. If it is small enough, sequential function is called to determine maximum element in the slice. Otherwise, two subcomputations are created, each working on one half of the original slice. Results from both subcomputations are aggregated by calling the Max function and result is returned to the upper level.

function TfrmForkJoin.ParallelMax(const forkJoin: IOmniForkJoin<integer>; 
  left, right: integer): integer;
var
computeLeft : IOmniCompute<integer>;
computeRight: IOmniCompute<integer>;
mid : integer;

function Compute(left, right: integer): IOmniCompute<integer>;
begin
Result := forkJoin.Compute(
function: integer
begin
Result := ParallelMax(forkJoin, left, right);
end
);
end;

begin
if (right - left) < CSeqThreshold then
Result := SequentialMax(left, right)
else begin
mid := (left + right) div 2;
computeLeft := Compute(left, mid);
computeRight := Compute(mid + 1, right);
Result := Max(computeLeft.Value, computeRight.Value);
end;
end;

My blog post contains more information about the ForkJoin and shows how to implement parallel QuickSort with the help of this OTL construct.

No comments:

Post a Comment