Tuesday, January 25, 2011

Parallel for implementation [2]: Input

In my last post, I’ve presented an overview of classes hiding behind the parallel for implementation. Today I’ll focus on the input part of the parallel for – the part that fetches values the loop is iterating over and passes them to parallel tasks. More specifically, I’ll present source providers, data managers and local queue.

Source Provider

A source provider is an object that fetches data from the enumeration source (the data that was passed to the parallel for) and repackages it into a format suitable for parallel consumption. Currently there are three source providers defined in the OtlDataManager unit:

  • TOmniIntegerRangeProvider
    Iterates over integer ranges (just like a “normal” for statement does). As such, it doesn’t really fetch data from enumeration source but generates it internally.
  • TOmniValueEnumeratorProvider
    Iterates over IOmniValueEnumerator, which is a special enumerator that can be accessed from multiple readers and doesn’t require locking. Currently it is only provided by the  IOmniBlockingCollection.
  • TOmniEnumeratorProvider
    Iterates over Windows enumerators (IEnumerator) or Delphi enumerators (GetEnumerator, wrapped into TOmniValueEnumerator class).

All source providers descend from an abstract class TOmniSourceProvider which provides common source provider interface. In theory, an interface should be used for that purpose, but in practice source providers are very performance intensive and not using interfaces speeds the program by a measurable amount.

  TOmniSourceProvider = class abstract
function Count: int64; virtual; abstract;
function CreateDataPackage: TOmniDataPackage; virtual; abstract;
function GetCapabilities: TOmniSourceProviderCapabilities;
virtual; abstract;
function GetPackage(dataCount: integer;
package: TOmniDataPackage): boolean; virtual; abstract;
function GetPackageSizeLimit: integer; virtual; abstract;
end; { TOmniSourceProvider }

Not all source providers are created equal and that’s why function GetCapabilities returns source provider capabilities:

  TOmniSourceProviderCapability = (
    spcCountable,  // source provider that knows how much data it holds
    spcFast,       // source provider operations are O(1)
    spcDataLimit   // data package can only hold limited amount of data

  TOmniSourceProviderCapabilities = set of

In practice, TOmniIntegerRangeProvider is both countable (it’s very simple to know how many values are between 1 and 10, for example) and fast (it takes same amount of time to fetch 10 values or 10.000 values) while other two source providers are neither countable nor fast. And what about spcDataLimit you may ask? Well, it is obsolete and not used and should be removed from the codebase any time soon. It was replaced by the GetPackageSizeLimit method.

The other important aspect of a source provider is the GetPackage method. It accesses the source (by ensuring a locked access if necessary), retrieves data and returns it in the data package. Implementation is highly dependent on the source data. For example, integer source provider just advances the current low field value and returns data package that doesn’t contain bunch of values but just low and high boundaries (and that’s why it is considered to be fast). Enumerator source provider locks the source, fetches the data and builds data package value by value. And in the simplest case, TOmniValueEnumerator source provider just fetches values and builds data package.

function TOmniValueEnumeratorProvider.GetPackage(dataCount: integer; package:
TOmniDataPackage): boolean;
iData : integer;
intPackage: TOmniValueEnumeratorDataPackage absolute package;
timeout : cardinal;
value : TOmniValue;
Assert(not StorePositions);
Result := false;
dataCount := intPackage.Prepare(dataCount);
timeout := INFINITE;
for iData := 1 to dataCount do begin
if not vepEnumerator.TryTake(value, timeout) then
break; //for
timeout := 0;
Result := true;
end; { TOmniValueEnumeratorProvider.GetPackage }

Data Manager

A data manager is the central hub in the OtlDataManager hierarchy. It seats between multiple local queues and the single source provider and makes sure that all parallel tasks always have some work to do.

At the moment, two different data managers are implemented – a countable data manager and a heuristic data manager. The former is used if source provider is countable and the latter if it is not. Both descend from the abstract class TOmniDataManager.

  TOmniDataManager = class abstract
function CreateLocalQueue: TOmniLocalQueue; virtual; abstract;
function AllocateOutputBuffer: TOmniOutputBuffer;
virtual; abstract;
function GetNext(package: TOmniDataPackage): boolean;
virtual; abstract;
procedure ReleaseOutputBuffer(buffer: TOmniOutputBuffer);
virtual; abstract;
procedure SetOutput(const queue: IOmniBlockingCollection);
overload; virtual; abstract;
end; { TOmniDataManager }

The main difference between them lies in function GetNextFromProvider which reads data from the source provider (by calling its GetPackage method). In the countable provider this is just a simple forwarder while in the heuristic provider this function tries to find a good package size that will allow all parallel tasks to work at the full speed.

function TOmniHeuristicDataManager.GetNextFromProvider(package: TOmniDataPackage;
generation: integer): boolean;
CDataLimit = Trunc(High(integer) / CFetchTimeout_ms);
dataPerMs: cardinal;
dataSize : integer;
time : int64;
// the goal is to fetch as much (but not exceeding <fetch_limit>)
// data as possible in
<fetch_timeout> milliseconds; highest amount
// of data is limited by the
GetDataCountForGeneration method.
dataSize := GetDataCountForGeneration(generation);
if dataSize > hdmEstimatedPackageSize.Value then
dataSize := hdmEstimatedPackageSize.Value;
time := DSiTimeGetTime64;
Result := SourceProvider.GetPackage(dataSize, package);
time := DSiTimeGetTime64 - time;
if Result then begin
if time = 0 then
dataPerMs := CDataLimit
else begin
dataPerMs := Round(dataSize / time);
if dataPerMs >= CDataLimit then
dataPerMs := CDataLimit;
// average over last four fetches for dynamic adaptation
hdmEstimatedPackageSize.Value := Round
((hdmEstimatedPackageSize.Value / 4 * 3) +
(dataPerMs / 4) * CFetchTimeout_ms);
end; { TOmniHeuristicDataManager.GetNextFromProvider }

Local Queue

As you’ve seen in the previous post, each parallel task reads data from a local queue, which is just a simple interface to data manager. The most important part of a local queue is its GetNext method which provides the task with the next value.

function TOmniLocalQueueImpl.GetNext(var value: TOmniValue): boolean;
Result := lqiDataPackage.GetNext(value);
if not Result then begin
Result := lqiDataManager_ref.GetNext(lqiDataPackage);
if Result then
Result := lqiDataPackage.GetNext(value);
end; { TOmniLocalQueueImpl.GetNext }

Each local queue contains a local data package. GetNext first tries to read next value from that data package. If that fails (data packages is empty – it was already fully processed), it tries to get new data package from the data manager and (if successful) retries fetching next data from the (refreshed) data package.

GetNext in the data manager first tries to get next package from the source provider (via private method GetNextFromProvider which calls source provider’s GetPackage method). If that fails, it tries to steal part of workload from another task.

Stealing is the feature that allows all parallel tasks to be active up to the last value being enumerated. To implement it, data manager iterates over all local queues and tries to split each local queue’s data package in half. If that succeeds, half of data package is left in the original local queue and another half is returned to the local queue that requested more data.

Package splitting is highly dependent on data type. For example, integer data package just recalculates boundaries while enumerator-based packages must copy data around.

function TOmniValueEnumeratorDataPackage.Split(
package: TOmniDataPackage): boolean;
intPackage: TOmniValueEnumeratorDataPackage absolute package;
iValue : integer;
value : TOmniValue;
Result := false;
for iValue := 1 to intPackage.Prepare(vedpApproxCount.Value div 2)
do begin
if not GetNext(value) then
break; //for
Result := true;
end; { TOmniValueEnumeratorDataPackage.Split }

Half the Work is Done …

Providing input to parallel tasks is just a part in the grand scheme of parallel for. Next time we’ll explore the other half – making sure output is correctly ordered.


  1. I had to read this post like 3 times to START understanding it :/ I must be a dummy, or you a genius.

  2. It's more that I my explanation skills are not that good as my programming skills ;)

  3. Teo Bon18:21

    Primož, first of all, thank you for OTL, it' s great!

    What do you think of new Java 7 concurrent features like fork/join?


    There is a part with works stealing and other interesting features

  4. @Teo: Thanks for the link, it's an interesting read. OTL already implements fork/join (OtlParallel.Join) except there is no facility that will help you return result from the forked code. Futures (OtlParallel.Future) are more result-oriented. Maybe there's an opportunity to mix the two and get simple to use syntax for fork/join - I'll think some more about it.