Sunday, May 15, 2011

Divide and Conquer (in parallel)

One of the latest additions to the high-level OtlParallel constructs, Fork/Join, was implemented just before ADUG 2011 and presented there for the first time. Shortly after the ADUG I improved the Fork/Join a little but then forgot to blog about it as I was busy implementing Parallel.Async and Parallel.TaskConfig. Sorry :(
Fork/Join helps you solve just one class of problems, but it is an important one – the problems that can be solved by so-called Divide and Conquer algorithm.
For deeper discussion of D&C, read the Wikipedia article linked above. What interests us is that D&C works by subdividing the problem. Instead of trying to solve the big problem, we divide it into many smaller problems and then we try to solve them. Those subproblems may again be too big and have to be subdivided even further. The process continues until the subproblems are small enough that they can be solved easily.
Of particular interest to us are D&C algorithms where subproblems can be solved in parallel.
In some case, solving a subproblem will produce a result which has to be passed one level higher to the code that created the subproblem. At other times, subproblem solvers don’t return any result. Fork/Join in OtlParallel handles both cases equally well.
The problem with D&C algorithms is that number of tasks gets very large and it is not a good idea to create as many threads as there are tasks. Fork/Join in OtlParallel only creates one less thread than there as cores in the system. (One less because the caller thread – the one that created Fork/Join and spawned first level of subtasks – also participates in the execution.)
If you know better, you can always override this decision by calling the NumTasks method of the IOmniForkJoin (or IOmniForkJoin<T>) interface.
  IOmniForkJoin = interface
    function  Compute(action: TOmniForkJoinDelegate): IOmniCompute;
    function  NumTasks(numTasks: integer): IOmniForkJoin;
    function  TaskConfig(const config: IOmniTaskConfig): IOmniForkJoin;
  end;

  IOmniForkJoin<T> = interface
    function  Compute(action: TOmniForkJoinDelegate<T>):
      IOmniCompute<T>;
    function  NumTasks(numTasks: integer): IOmniForkJoin<T>;
    function  TaskConfig(const config: IOmniTaskConfig):
      IOmniForkJoin<T>;
  end;
Both interfaces are very simple. Besides NumTasks they define TaskConfig (which you can read about in the previous article) and Compute (which I’ll describe later).

QuickSort

The first demo today is a well-known quicksort algorithm (see demo application 44_Fork-Join QuickSort for the full code).
Let’s start with a non-optimized single threaded sorter. I’m using simple quick sort implementation because it is very simple to convert it to the multi threaded form.

procedure TSequentialSorter.QuickSort(left, right: integer);
var
  pivotIndex: integer;
begin
  if right > left then begin
    if (right - left) <= CSortThreshold then
      InsertionSort(left, right)
    else begin
      pivotIndex := Partition(left, right, (left + right) div 2);
      QuickSort(left, pivotIndex - 1);
      QuickSort(pivotIndex + 1, right);
    end;
  end;
end;
As you can see, the code switches to an insertion sort when the dimension of the array drops below some threshold. This is not really important for the single threaded version (it only brings a small speedup) but it will help immensely with the multi threaded version (you’ll see why in a moment).
Converting this quicksort to a multithreaded version is quite simple.
First, you have to create Fork/Join manager. This is an IOmniForkJoin interface used to spawn subtasks. In this example, it is stored in a global field.

  FForkJoin := Parallel.ForkJoin;
The QuickSort itself is only changed a little.

procedure TParallelSorter.QuickSort(left, right: integer);
var
  pivotIndex: integer;
  sortLeft  : IOmniCompute;
  sortRight : IOmniCompute;
begin
  if right > left then begin
    if (right - left) <= CSortThreshold then
      InsertionSort(left, right)
    else begin
      pivotIndex := Partition(left, right, (left + right) div 2);
      sortLeft := FForkJoin.Compute(
        procedure
        begin
          QuickSort(left, pivotIndex - 1);
        end);
      sortRight := FForkJoin.Compute(
        procedure
        begin
          QuickSort(pivotIndex + 1, right);
        end);
      sortLeft.Await;
      sortRight.Await;
    end;
  end;
end;
The code looks much longer but changes are really simple – each recursive call to QuickSort is replaced with

      sortLeft := FForkJoin.Compute(
        procedure
        begin
          QuickSort(left, pivotIndex - 1);
        end);
followed by a call to .Await.
Instead of calling QuickSort directly, parallel version creates IOmniCompute interface by calling FForkJoin.Compute. This creates a subtask wrapping the anonymous function which was passed to the Compute and puts this subtask into the Fork/Join queue.
Subtask is then read from this queue by one of the Fork/Join workers and is processed in the background thread.
Calling Await checks if the subtask has finished its work. In that case, Await simply returns and the code can proceed. Otherwise (subtask is still working), Await tries to get one subtask from the Fork/Join queue, executes it, and then repeats from the beginning (by checking if the subtask has finished its work). This way, all threads are always busy either with executing their own code or a subtask from the Fork/Join queue.
Because two IOmniCompute interfaces are stored on the stack in the each QuickSort call, this code uses much more stack space than the single threaded version. That is the main reason why the parallel execution is stopped at some level and simple sequential version is used to sort remaining fields. If you remove the InsertionSort branch, the parallel version will be able to sort only relatively small arrays before running out of stack space.

Finding Max(array)

The second example locates maximum element in an array (demo application 45_Fork-Join max).

function TfrmQuickSortDemo.SequentialMax(left, right: integer): integer;
var
  iData: integer;
begin
  Result := FData[left];
  for iData := left + 1 to right do
    if FData[iData] > Result then
      Result := FData[iData];
end;
The parallel solution is similar to the quicksort example above with few important differences related to the fact that the code must return a value (the quicksort code merely sorted the array returning nothing).
This directly affects the interface usage – instead of working with IOmniForkJoin and IOmniCompute the code uses IOmniForkJoin<T> and IOmniCompute<T>. In the example above, function returns integer which is why the parallel code creates IOmniForkJoin<integer> and passes it to the ParallelMax function.

  max := ParallelMax(Parallel.ForkJoin<integer>, Low(FData), High(FData));
In this example Fork/Join manager is passed as a parameter (in the previous demo it was stored in a global field). This approach is more flexible but is also slightly slower and – more importantly – uses more stack space.

function TfrmQuickSortDemo.ParallelMax(
  const forkJoin: IOmniForkJoin<integer>; 
  left, right: integer): integer;
var
  computeLeft : IOmniCompute<integer>;
  computeRight: IOmniCompute<integer>;
  mid         : integer;
begin
  if (right - left) < CSeqThreshold then
    Result := SequentialMax(left, right)
  else begin
    mid := (left + right) div 2;
    computeLeft := forkJoin.Compute(
      function: integer
      begin
        Result := ParallelMax(forkJoin, left, mid);
      end);
    computeRight := forkJoin.Compute(
      function: integer
      begin
        Result := ParallelMax(forkJoin, mid + 1, right);
      end);
    Result := Max(computeLeft.Value, computeRight.Value);
  end;
end;
When the array subrange is small enough, ParallelMax calls the sequential (single threaded) version – just as the parallel quicksort did, and because of the same reason – not to run out of stack space.
With a big subrange, the code creates two IOmniCompute<integer> subtasks each wrapping a function returning an integer. This function in turn calls back ParallelMax (but with a smaller range).
To get the result of the anonymous function wrapped by the Compute, the code calls .Value function. Just as with the .Await, .Value either returns the result (if it was already computed) or executes a Fork/Join subtasks from the Fork/Join queue.
If you find this code too ugly, you can use a computation-creation function.

function TfrmQuickSortDemo.ParallelMax(
  const forkJoin: IOmniForkJoin<integer>; 
  left, right: integer): integer;
var
  computeLeft : IOmniCompute<integer>;
  computeRight: IOmniCompute<integer>;
  mid         : integer;

  function Compute(left, right: integer): IOmniCompute<integer>;
  begin
    Result := forkJoin.Compute(
      function: integer
      begin
        Result := ParallelMax(forkJoin, left, right);
      end
    );
  end;

begin
  if (right - left) < CSeqThreshold then
    Result := SequentialMax(left, right)
  else begin
    mid := (left + right) div 2;
    computeLeft := Compute(left, mid);
    computeRight := Compute(mid + 1, right);
    Result := Max(computeLeft.Value, computeRight.Value);
  end;
end; 
Semantically, the code is the same as the first ParallelMax but it is simpler to read.

Caveat

I have one last important point to make. This is wrong:

  Result := Max(Compute(left, mid).Value, 
    Compute(mid + 1, right).Value);
You must always create all subtasks before calling .Await or .Value on any one of them! Otherwise, your code will not execute in parallel at all – it will all be processed by a single thread!

4 comments:

  1. Anonymous06:37

    Do you have any benchmarks for these examples? How fast can parallel versions of classic algs be? How large are overheads? How good this scale on 2/4/etc CPUs? And what about Delphi memory manager? FastMM is not scale well for multi-threading. Does it affect your examples?

    ReplyDelete
  2. The only benchmarks are tests 44_Fork-Join QuickSort and 45_Fork-Join max. From them you can see that overhead is quite large as it takes a 8-core machine to QuickSort 10-million numbers faster than a single-threaded algorithm. I do not recommend using Fork/Join for something so "trivial" as QuickSort. You should use it on a problem with more CPU-intensive calculations in a workload.

    As for the memory manager - I don't know. FastMM is the only stable and free memory manager and that's why I'm sticking to it.

    ReplyDelete
  3. Hernan14:42

    I have a Phenom II t1100 X6 8gb windows 7 and delphi x2 update 1
    Im compile the test first
    app_44_Fork Join QuickSort
    and result is
    Generating 10000000 pseudorandom numbers
    Sequential sorted in 8607 ms
    Parallel sorted in 664 ms
    This test is good parallel is better but
    the test
    app_45_Fork Join Max
    Generating 10000000 pseudorandom numbers
    Sequential max=2147483447 found in 52 ms
    Parallel max=2147483447 found in 186 ms
    i dont undertand why sequential is better in my phenom x6 who is wrong
    the app_34_TreeScan is the same situation sequential found in 20ms and parallel found in 886ms

    ReplyDelete
  4. This is entirely expected. Tree scanning and finding a maximum element are such simple algorithms that it's hard to beat singlethreaded speed with the multithreaded application.

    ReplyDelete