Monday, February 14, 2011

Parallel for implementation [3]: Output

In the previous two installments I’ve tried to give a little insight into how OmniThreadLibrary’s Parallel ForEach is implemented (part 1 – Overview, part 2 – Input). Today I will conclude this short series with the description of output ordering – or what is know in the OTL as the .PreserveOutput modifier.

Ordering is usually used together with the .Into modifier. The reason lies in the integration between the Parallel infrastructure and your parallel code (the one that is executing as Parallel.ForEach payload). In the “normal” foreach statement, output from this parallel payload is not defined. You are allowed to do whatever in the foreach, to generate any output (in case you need it) but Parallel will know nothing about that. Therefore, the OTL has no ability to preserver any ordering because – at least from the viewpoint of the library – the parallelized code is producing no output.

When .Into is used, however, your code uses a different signature (different parameters). Let’s examine a simple example from OTL’s demo 38:

  Parallel.ForEach(1, CMaxTest)
.PreserveOrder
.Into(primeQueue)
.Execute(
procedure (const value: integer; var res: TOmniValue)
begin
if IsPrime(value) then
res := value;
end);

Because .Into is specified, parallel payload takes two parameters. First is – as in the more common case – the input value while the second takes the output value. As you can see from the example, the parallelized code can produce zero or one output but not more. [This covers – at least in my opinion – all common usage cases.]

This small modification changes everything. Because the Parallel infrastructure has control over the output parameter it can manage it internally, associate it with the input and make sure that output is generated in the same order as input was.

Let’s switch the viewpoint to innermost code – the part that is scheduling parallel tasks. When Into is used, InternalExecuteTask (see part 1) executes the following, quite complicated code.

  InternalExecuteTask(
procedure (const task: IOmniTask)
var
localQueue : TOmniLocalQueue;
outputBuffer_ref: TOmniOutputBuffer;
position : int64;
result : TOmniValue;
value : TOmniValue;
begin
oplDataManager.SetOutput(oplIntoQueueIntf);
localQueue := oplDataManager.CreateLocalQueue;
try
outputBuffer_ref := oplDataManager.AllocateOutputBuffer;
try
localQueue.AssociateBuffer(outputBuffer_ref);
result := TOmniValue.Null;
while (not Stopped) and
localQueue.GetNext(position, value) do
begin
loopBody(task, value, result);
if not result.IsEmpty then begin
outputBuffer_ref.Submit(position, result);
result := TOmniValue.Null;
end;
end;
finally
oplDataManager.ReleaseOutputBuffer(outputBuffer_ref);
end;
finally
FreeAndNil(localQueue);
end;
end
);

Important points here are:

  • The data manager (see part 2) is associated with the output queue. (The oplIntoQueueIntf field contains a value passed to the .Into method.)

  • A local queue is created, same as when “normal” foreach is executed (see part 2).

  • An output buffer is created by the data manager and associated with the local queue (see part 2).

  • User code is executed and each non-empty output value is written into the output buffer.

  • Output buffer is released, as is local queue.

The interesting part is – as usual – hidden in the background; inside local queue, data manager and output buffer.

Putting it all together

The first modification lies in the data source. When .PreserveOrder is used, each data package (see part 2) know the source position it was read from. To simplify matters, data package splitting is not used in this case. [And because of that, data stealing cannot be used causing slightly less effective use of CPU as in the simpler foreach case.]

Each local queue has an output buffer set associated with it.

Each output buffer set manages two output buffers. One is active and task is writing into it and another may be either empty or full. Each output buffer is associated with an input position – just as the data package is.

image

When we look at data reading/writing from perspective of one task, everything is very simple. The task is reading data from a local queue (which reads data from a data package, associated with some position) and writing it to an output buffer (associated with the same position).

The tricky part comes up when the data package is exhausted (the “if not Result” branch in the code below).

function TOmniLocalQueueImpl.GetNext(var position: int64; var value: TOmniValue): boolean;
begin
Result := lqiDataPackage.GetNext(position, value);
if not Result then begin
{$IFDEF Debug}Assert(assigned(lqiBufferSet));{$ENDIF Debug}
lqiBufferSet.ActiveBuffer.MarkFull;
lqiBufferSet.ActivateBuffer;
// this will block if alternate buffer is also full
Result := lqiDataManager_ref.GetNext(lqiDataPackage);
if Result then begin
Result := lqiDataPackage.GetNext(position, value);
if Result then
lqiBufferSet.ActiveBuffer.Range := lqiDataPackage.Range;
end;
end;
end; { TOmniLocalQueueImpl.GetNext }

First, the currently active buffer is marked as full. This causes NotifyBufferFull to be called (see below). Then, alternate buffer is activated. This call (.ActivateBuffer) will actually block if alternate buffer is not free. In this case, the current thread is blocked until one of its buffers is written into the output queue.

From this point on, GetNext proceeds in the same way as when used in the simple foreach, except that it sets active buffer’s position whenever new data package is read from the data manager.

The other part of the magic happens in the method that is called from MarkFull. It walks the buffer list and checks if there are any output buffers that are a) full and b) destined for the current output position. Such buffers are copied to the output and returned into use.

procedure TOmniBaseDataManager.NotifyBufferFull(buffer: TOmniOutputBufferImpl);
begin
// Remove buffer from the list. Check if next buffer is waiting in
// the list.
Copy buffer if it is full and repeat the process.
dmBufferRangeLock.Acquire;
try
while (dmBufferRangeList.Count > 0) and
(BufferList[0].Range.First = dmNextPosition) and
BufferList[0].IsFull do
begin
buffer := TOmniOutputBufferImpl(
dmBufferRangeList.ExtractObject(0));
dmNextPosition := buffer.Range.Last + 1;
buffer.CopyToOutput;
end;
finally dmBufferRangeLock.Release; end;
end; { TOmniBaseDataManager.NotifyBufferFull }

To recap:

  • Each data buffer has associated position.

  • Each local queue has two output buffers, one is active and another is either free or full.

  • Each output buffer also has associated position.

  • Local queue writes data to output buffer.

  • When a buffer is full, it is put into a list of waiting buffers. At that moment all appropriate waiting buffers are copied to output.

  • And that’s all folks!

No comments:

Post a Comment