Fork/Join helps you solve just one class of problems, but it is an important one – the problems that can be solved by so-called Divide and Conquer algorithm.
For deeper discussion of D&C, read the Wikipedia article linked above. What interests us is that D&C works by subdividing the problem. Instead of trying to solve the big problem, we divide it into many smaller problems and then we try to solve them. Those subproblems may again be too big and have to be subdivided even further. The process continues until the subproblems are small enough that they can be solved easily.
Of particular interest to us are D&C algorithms where subproblems can be solved in parallel.
In some case, solving a subproblem will produce a result which has to be passed one level higher to the code that created the subproblem. At other times, subproblem solvers don’t return any result. Fork/Join in OtlParallel handles both cases equally well.
The problem with D&C algorithms is that number of tasks gets very large and it is not a good idea to create as many threads as there are tasks. Fork/Join in OtlParallel only creates one less thread than there as cores in the system. (One less because the caller thread – the one that created Fork/Join and spawned first level of subtasks – also participates in the execution.)
If you know better, you can always override this decision by calling the NumTasks method of the IOmniForkJoin (or IOmniForkJoin<T>) interface.
IOmniForkJoin = interface function Compute(action: TOmniForkJoinDelegate): IOmniCompute; function NumTasks(numTasks: integer): IOmniForkJoin; function TaskConfig(const config: IOmniTaskConfig): IOmniForkJoin; end; IOmniForkJoin<T> = interface function Compute(action: TOmniForkJoinDelegate<T>): IOmniCompute<T>; function NumTasks(numTasks: integer): IOmniForkJoin<T>; function TaskConfig(const config: IOmniTaskConfig): IOmniForkJoin<T>; end;Both interfaces are very simple. Besides NumTasks they define TaskConfig (which you can read about in the previous article) and Compute (which I’ll describe later).
QuickSortThe first demo today is a well-known quicksort algorithm (see demo application 44_Fork-Join QuickSort for the full code).
Let’s start with a non-optimized single threaded sorter. I’m using simple quick sort implementation because it is very simple to convert it to the multi threaded form.
procedure TSequentialSorter.QuickSort(left, right: integer); var pivotIndex: integer; begin if right > left then begin if (right - left) <= CSortThreshold then InsertionSort(left, right) else begin pivotIndex := Partition(left, right, (left + right) div 2); QuickSort(left, pivotIndex - 1); QuickSort(pivotIndex + 1, right); end; end; end;As you can see, the code switches to an insertion sort when the dimension of the array drops below some threshold. This is not really important for the single threaded version (it only brings a small speedup) but it will help immensely with the multi threaded version (you’ll see why in a moment).
Converting this quicksort to a multithreaded version is quite simple.
First, you have to create Fork/Join manager. This is an IOmniForkJoin interface used to spawn subtasks. In this example, it is stored in a global field.
FForkJoin := Parallel.ForkJoin;The QuickSort itself is only changed a little.
procedure TParallelSorter.QuickSort(left, right: integer); var pivotIndex: integer; sortLeft : IOmniCompute; sortRight : IOmniCompute; begin if right > left then begin if (right - left) <= CSortThreshold then InsertionSort(left, right) else begin pivotIndex := Partition(left, right, (left + right) div 2); sortLeft := FForkJoin.Compute( procedure begin QuickSort(left, pivotIndex - 1); end); sortRight := FForkJoin.Compute( procedure begin QuickSort(pivotIndex + 1, right); end); sortLeft.Await; sortRight.Await; end; end; end;The code looks much longer but changes are really simple – each recursive call to QuickSort is replaced with
sortLeft := FForkJoin.Compute( procedure begin QuickSort(left, pivotIndex - 1); end);followed by a call to .Await.
Instead of calling QuickSort directly, parallel version creates IOmniCompute interface by calling FForkJoin.Compute. This creates a subtask wrapping the anonymous function which was passed to the Compute and puts this subtask into the Fork/Join queue.
Subtask is then read from this queue by one of the Fork/Join workers and is processed in the background thread.
Calling Await checks if the subtask has finished its work. In that case, Await simply returns and the code can proceed. Otherwise (subtask is still working), Await tries to get one subtask from the Fork/Join queue, executes it, and then repeats from the beginning (by checking if the subtask has finished its work). This way, all threads are always busy either with executing their own code or a subtask from the Fork/Join queue.
Because two IOmniCompute interfaces are stored on the stack in the each QuickSort call, this code uses much more stack space than the single threaded version. That is the main reason why the parallel execution is stopped at some level and simple sequential version is used to sort remaining fields. If you remove the InsertionSort branch, the parallel version will be able to sort only relatively small arrays before running out of stack space.
Finding Max(array)The second example locates maximum element in an array (demo application 45_Fork-Join max).
function TfrmQuickSortDemo.SequentialMax(left, right: integer): integer; var iData: integer; begin Result := FData[left]; for iData := left + 1 to right do if FData[iData] > Result then Result := FData[iData]; end;The parallel solution is similar to the quicksort example above with few important differences related to the fact that the code must return a value (the quicksort code merely sorted the array returning nothing).
This directly affects the interface usage – instead of working with IOmniForkJoin and IOmniCompute the code uses IOmniForkJoin<T> and IOmniCompute<T>. In the example above, function returns integer which is why the parallel code creates IOmniForkJoin<integer> and passes it to the ParallelMax function.
max := ParallelMax(Parallel.ForkJoin<integer>, Low(FData), High(FData));In this example Fork/Join manager is passed as a parameter (in the previous demo it was stored in a global field). This approach is more flexible but is also slightly slower and – more importantly – uses more stack space.
function TfrmQuickSortDemo.ParallelMax( const forkJoin: IOmniForkJoin<integer>; left, right: integer): integer; var computeLeft : IOmniCompute<integer>; computeRight: IOmniCompute<integer>; mid : integer; begin if (right - left) < CSeqThreshold then Result := SequentialMax(left, right) else begin mid := (left + right) div 2; computeLeft := forkJoin.Compute( function: integer begin Result := ParallelMax(forkJoin, left, mid); end); computeRight := forkJoin.Compute( function: integer begin Result := ParallelMax(forkJoin, mid + 1, right); end); Result := Max(computeLeft.Value, computeRight.Value); end; end;When the array subrange is small enough, ParallelMax calls the sequential (single threaded) version – just as the parallel quicksort did, and because of the same reason – not to run out of stack space.
With a big subrange, the code creates two IOmniCompute<integer> subtasks each wrapping a function returning an integer. This function in turn calls back ParallelMax (but with a smaller range).
To get the result of the anonymous function wrapped by the Compute, the code calls .Value function. Just as with the .Await, .Value either returns the result (if it was already computed) or executes a Fork/Join subtasks from the Fork/Join queue.
If you find this code too ugly, you can use a computation-creation function.
function TfrmQuickSortDemo.ParallelMax( const forkJoin: IOmniForkJoin<integer>; left, right: integer): integer; var computeLeft : IOmniCompute<integer>; computeRight: IOmniCompute<integer>; mid : integer; function Compute(left, right: integer): IOmniCompute<integer>; begin Result := forkJoin.Compute( function: integer begin Result := ParallelMax(forkJoin, left, right); end ); end; begin if (right - left) < CSeqThreshold then Result := SequentialMax(left, right) else begin mid := (left + right) div 2; computeLeft := Compute(left, mid); computeRight := Compute(mid + 1, right); Result := Max(computeLeft.Value, computeRight.Value); end; end;Semantically, the code is the same as the first ParallelMax but it is simpler to read.
CaveatI have one last important point to make. This is wrong:
Result := Max(Compute(left, mid).Value, Compute(mid + 1, right).Value);You must always create all subtasks before calling .Await or .Value on any one of them! Otherwise, your code will not execute in parallel at all – it will all be processed by a single thread!