Tuesday, November 23, 2010

Multistage processes with the OmniThreadLibrary

The OtlParallel unit in OmniThreadLibrary offers some high-level solutions that allow you to easily run some kinds of processes in parallel. Up to now it supported self-contained background calculations (Parallel.Future), independent parallel processes (Parallel.Join) and loop calculations where the background task is stateless (i.e. it only depends on the input – the loop value – and not on calculations done on other inputs – loop values; Parallel.ForEach). But lots of time processes don’t fall into any of those categories.

Mason Wheeler recently suggested adding support for multistage processes. Over the weekend I’ve implemented it in the OmniThreadLibrary and now this feature is ready for you to start testing. Great thanks to Mason for his suggestion and also for work on the implementation and design of newly introduced Parallel.Pipeline.


The assumption is that the process can be split into stages (or suprocesses), connected with data queues. Data flows from the (optional) input queue into the first stage, where it is partially processed and then emitted into intermediary queue. First stage then continues execution, processes more input data and outputs more output data. This continues until complete input is processed. Intermediary queue leads into the next stage which does the processing in a similar manner and so on and on. At the end, the data is output into a queue which can be then read and processed by the program that created this multistage process. As a whole, a multistage process functions as a pipeline – data comes in, data comes out (and a miracle occurs in-between).


What is important here is that no stage shares state with any other stage.  The only interaction between stages is done with the data passed through the intermediary queues. The quantity of data, however, doesn’t have to be constant. It is entirely possible for a stage to generate more or less data than it received on input.

In a classical single-threaded program the execution plan for a multistage process is very simple.


In a multithreaded environment, however, we can do better than that. Because the stages are largely independent, they can be executed in parallel.


Setting up tasks and intermediary queues is a fairly simple – but far from trivial – process. It would therefore be best if it could be automated in some way. That was the general idea the Mason came to me with and together we designed new part of the OmniThreadLibrary – Parallel.Pipeline.


A pipeline is created by calling Parallel.Pipeline function which returns IOmniPipeline interface. There are two overloaded versions – one for general pipeline building and another for simple pipelines that don’t require any special configuration.

class function Pipeline: IOmniPipeline; overload;
class function Pipeline(
  const stages: array of TPipelineStageDelegate;
  const input: IOmniBlockingCollection = nil): 
  IOmniPipeline; overload;

The latter version takes two parameters – an array of processing stages and an optional input queue. Input queue can be used to provide initial data to the first stage. It is also completely valid to pass ‘nil’ for the input queue parameter and run the first stage without any input.

Blocking collections are used for data queuing in the Parallel.Pipeline implementation.

Stages are implemented as anonymous procedures, procedures or methods taking two queue parameters – one for input and one for output. Except in the first stage where the input queue may not be defined, both are automatically created by the Pipeline implementation and passed to the stage delegate.

TPipelineStageDelegate = reference to procedure 
  (const input, output: IOmniBlockingCollection);

The next code fragment shows the simple Pipeline function in action. It is taken from the OmniThreadLibrary test 41_Pipeline.

procedure TfrmPipelineDemo.btnSimpleClick(Sender: TObject);
  pipeOut: IOmniBlockingCollection;
  pipeOut := Parallel.Pipeline(
    [StageGenerate, StageMult2, StageMinus3, StageMod5, StageSum]).Run;
  lbLog.Items.Add(Format('Pipeline result: %d',

Parallel.Pipeline accepts an array of stage delegates and returns IOmniPipeline interface. Run is then called on the interface to set up the infrastructure, start all tasks and return the output queue.

The main program waits for the last stage to produce a result and because it knows there will be exactly one value produced it can wait for it by calling the Next function (which was also implemented recently and is just a simple wrapper around the Take function).

The full power of the IOmniPipeline interface is usually accessed via the parameterless Parallel.Pipeline function.

IOmniPipeline = interface
  function  Input(const queue: IOmniBlockingCollection):
  function  NumTasks(numTasks: integer): IOmniPipeline;
  function  Run: IOmniBlockingCollection;
  function  Stage(pipelineStage: TPipelineStageDelegate):
  function  Stages(const pipelineStages: array of
    TPipelineStageDelegate): IOmniPipeline;
  function  Throttle(numEntries: integer; 
    unblockAtCount: integer = 0): IOmniPipeline;

Input sets the input queue. If it is not called, input queue will not be assigned and the first stage will receive nil for the input parameter.

Stage adds one pipeline stage.

Stages adds multiple pipeline stages.

NumTasks sets the number of parallel execution tasks for the stage(s) just added with the Stage(s) function (IOW, call Stage followed by NumTasks to do that). If it is called before any stage is added, it will specify the default for all stages. Number of parallel execution tasks for a specific stage can then still be overridden by calling NumTasks after the Stage is called.  [Read more about parallel execution in section Parallel stages below.]

Throttle sets the throttling parameters for stage(s) just added with the Stage(s) function.  Just as the NumTask it affects either the global defaults or just currently added stage(s). By default, throttling is set to 10240 elements. [See the Throttling section below for more info.]

Run does all the hard work – creates queues and sets up OmniThreadLibrary tasks. It returns the output queue which can be then used in your program to receive the result of the computation. Even if the last stage doesn’t produce any result this queue can be used to signal the end of computation. [When each stage ends, CompleteAdding is automatically called on the output queue. This allows the next stage to detect the end of input (blocking collection enumerator will exit or TryTake will return false). Same goes on for the output queue.]

An example (also taken from the 41_Pipeline) will help explain all this.

procedure TfrmPipelineDemo.btnExtended2Click(Sender: TObject);
  pipeOut: IOmniBlockingCollection;
  pipeOut := Parallel.Pipeline
    .Stages([StageMinus3, StageMod5])
  lbLog.Items.Add(Format('Pipeline result: %d',

First, a global throttling parameter is set. It will be applied to all stages. Two stages are then added, each with a separate call to the Stage function.

Another two stages are then added with one call. They are both set to execute in two parallel tasks. At the end another stage is added and the whole setup is executed.

The complete process will use seven tasks (one for StageGenerate, one for StageMult2, two for StageMinus3, two for StageMod5 and one for StageSum).

Generators, Mutators, and summarizers

Let’s take a look at three different examples of multiprocessing stages before I take a turn for the deep waters.

The first example is an example of a first stage that accepts no input and just generates output which is passed to the next stage in chain. (All examples are again taken from 41_Pipeline.)

procedure StageGenerate(const input, output: IOmniBlockingCollection);
  i: integer;
  for i := 1 to 1000000 do

The second example reads data from input and for each input generates and transmits one output value.

procedure StageMult2(const input, output: IOmniBlockingCollection);
  value: TOmniValue;
  for value in input do
    output.Add(2 * value.AsInteger);

The last example reads data from the input, adds all values together and produces only one summary value.

procedure StageSum(const input, output: IOmniBlockingCollection);
  sum  : integer;
  value: TOmniValue;
  sum := 0;
  for value in input do
    Inc(sum, value);

All examples are just special cases of the general principle – there’s no correlation required between the amount of input data and the amount of output data. There’s also absolutely no requirement that data must be all numbers. Feel free to pass around anything that can be contained in TOmniValue.


In my test case (41_Pipeline) a large amount of data (one million numbers) is passed through the multistage process. If one thread is suspended for some time – or if it performs a calculation that is slower than the previous thread – this thread’s input queue may fill up with data which can cause lots of memory to be allocated and later released. To even the data flow, Pipeline uses throttling, a new functionality in the blocking collection.

Throttling sets the maximum size (in TOmniValue units) of the blocking collection. When the specified amount of data items is stored in the collection, no more data can be added. The Add function will simply block until the collection is empty enough again or CompleteAdding has been called. Collection is deemed to be empty enough when the data count drops below some value which can be either passed as a second parameter to the Throttle function or is calculated as a 3/4 of the maximum size limit if the second parameter is not provided.

Parallel Stages

Usually, one task is started for each stage in the pipeline. In some specialized cases, however, it may be desirable to run more than one parallel task for each stage. An example of that was given in the btnExtended2Click method, above. The pipeline generated in this example can be represented with the following diagram.


As you can see, there’s always only one queue sitting between stages even if there are multiple processing units for a stage. This is easily accomplished by IOmniBlockingCollection supporting multiple readers and multiple writers in a threadesafe manner.

There’s an important caveat, though. If you split a stage into multiple tasks, data will be processed in an indeterminate order. You cannot know how many items will be processed by each task and in which order they will be processed. Even worse – data will exit multitask stage in an indeterminate order (data output from one task will be interleaved with the data from the other task). As of this moment there’s no way to enforce original ordering as this is done in the Parallel.ForEach implementation.

Completed or not?

Although the Parallel.Pipeline has been committed and is ready for testing, they may still be some quirks in the code. I’ll follow this with a post or two on testing and performance and you may want to wait until then before you start using it.

If you want to help me with development, then please go ahead, use it and report any problems back to me, either here in the comments or in the forum.


  1. Great stuff .. waiting to be able to use it in production code .. until we switch to D2009 or newer :)

  2. Really a great piece of code, Primoz!

  3. Anonymous13:29

    wonderful job...I always enjoy reading your blog. I only dream at the day when I could use your framework in production code ...