Tuesday, January 11, 2011

Parallel for implementation [1]: Overview

Aeons ago I promised to write a blog post about all the magic that happens behind the scenes in Parallel.ForEach. That never happened (sorry). I was busy will other stuff and I had to put it on the backburner.

Today is the day to fulfill that promise.

This article starts the journey that will (hope, hope) explain the murky waters of parallel loop data management. Three parts are planned – Overview (which you are reading now), Input and Output.

Even if you are interested in parallel programming, you may think that such low-level stuff is of no interest to you. Well, you may be right, but let me state three reasons why you should read this three-part series.

1) Because you will then know what the OtlDataManager unit is good for and will be able to use it in your application.

2) Because it’s an interesting topic ;)

3) So I can convince you that Parallel.ForEach is better than home-brew multithreaded parallel loops and that you should always use OmniThreadLibrary ;)


Let’s start with a scary diagram. The picture below shows the internals of the OtlDataManager unit. I’ll refer to it a lot but for now you can safely ignore it.

Data Managers OtlDataManager

Got it? Ignore the nasty picture above. It is not important. Focus on the code instead.

Parallel.ForEach(1, 1000)
    procedure (const elem: integer)

This simple code iterates from 1 to 1000 on all available cores in parallel and executes a simple procedure that contains no workload. All in all, the code will do nothing – but it will do it in a very complicated manner.

ForEach method creates new TOmniParallelLoop<integer> object (that’s the object that will coordinate parallel tasks) and passes it a source provider – an object that knows how to access values that are being enumerated (integers from 1 to 1000 in this example).

OtlDataManager unit contains four different source providers – one for each type of source that can be passed to the ForEach method (more on them in the next installment). If there would be a need to extend ForEach with a new enumeration source, I would only have to add few simple methods to the OtlParallel unit and write a new source provider.

class function Parallel.ForEach(low, high: integer; step: integer):
  Result := TOmniParallelLoop<integer>.Create(
  CreateSourceProvider(low, high, step), true);
end; { Parallel.ForEach }

Next few dull things happen and at the end InternalExecuteTask is called. This method is responsible for creating and starting parallel loop tasks.

InternalExecuteTask first creates a data manager and attaches it to the source provider (compare this with the picture above – there are one source provider and one data manager).  Next it creates an appropriate number of tasks and calls the task-specific delegate method from each one. [This delegate wraps your parallel code and provides it with proper input (and sometimes, output). There are many calls to InternalExecuteTask in the OtlParallel, each with a different taskDelegate and each providing support for a different kind of the loop.]

procedure TOmniParallelLoopBase.InternalExecuteTask(
  taskDelegate: TOmniTaskDelegate);
  dmOptions    : TOmniDataManagerOptions;
  iTask        : integer;
  numTasks     : integer;
  task         : IOmniTaskControl;

    oplDataManager := CreateDataManager(oplSourceProvider,
      numTasks, dmOptions); 

    for iTask := 1 to numTasks do begin
      task := CreateTask(
        procedure (const task: IOmniTask)





Data manager is a global field in the TOmniParallelLoop<T> object so that it can be simply reused from the task delegate. [A cleaner design would be to send it to the task delegate as a parameter. Maybe I’ll fix this in the future.] The simplest possible task delegate (below) just creates a local queue and fetches values from the local queue one by one. This results in many local queues – one per task – all connected to the same data manager.

In case you’re wondering what loopBody is – it is the anonymous method you have passed to the Parallel.ForEach’s Execute method.

procedure InternalExecuteTask(const task: IOmniTask)
  localQueue: TOmniLocalQueue;
  value     : TOmniValue;
  localQueue := oplDataManager.CreateLocalQueue;
    while (not Stopped) and localQueue.GetNext(value) do
      loopBody(task, value);
  finally FreeAndNil(localQueue); end;

Let’s reiterate:

  1. Source provider is created.
  2. Data manager is created and associated with the source provider.
  3. Each task creates its own local queue and uses it to access the source data.
  4. As you’ll see the next time, local queue retrieves data in packages (data package) and sends it to a output buffer which makes sure that the output is produced in a correct order (the output buffer part happens only if .PreserveOrder method is called in the high-level code).
  5. If the task runs out of work, it requests a new data package from the data manager, which gets this data from the source provider (more on that in the next post). If the source provider runs out of data, data manager will attempt to steal some data from other tasks.

Data Managers - Data Flow Data flow in the OtlDataManager

All this was designed to provide fast data access (blocking is limited to the source provider, all other interactions are lock-free), good workload distribution (when a task runs out of work before other tasks, it will steal some work from other tasks) and output ordering (when required).

1 comment:

  1. very interesting! many thanks for your effort to maintain and expand OTL!!!

    best regards,