Monday, September 12, 2011

Life after 2.1: Parallel data production [Introducing Parallel.Task]

An interesting problem appeared on StackOverflow shortly ago – how to generate large quantities of data as fast as possible and store it in a file. (As one could expect) I wrote a parallel solution using OmniThreadLibrary, more specifically the Parallel.ForEach high-level primitive. I’m not posting the complete solution here, just the important part – a method that accepts two parameters, requested file size and output stream, and generates the data. Actual initialization of data buffers is delegated to the FillBuffer method which you can see in the StackOverflow post.
The logic behind the solution is:
  • Split the data into multiple blocks of constant size (with the last block being smaller than the rest).
  • Calculate the number of such blocks.
  • Iterate in parallel from 1 to the number of blocks.
    • In the Parallel.ForEach executor, first find out how much data should the current iteration generate.
    • Generate the data.
    • Store the data into TMemoryBuffer and put it into the output queue.
  • As ForEach is executed with the NoWait modifier, it won’t block execution in the current thread. Instead, current thread proceeds by reading from the queue and writing all data to the output stream.
There are few tricks worth mentioning.
  • Each thread must use its own random generator object which gets initialized in the Initialize and destroyed in Finalize. Those methods are called for each ForEach worker thread before it gets started and after it completes execution.
  • When ForEach completes its work it must “close” the output queue by calling its CompleteAdding method so that for..in loop in the main thread will stop enumeration. OnStop accomplishes that.
procedure CreateRandomFile(fileSize: integer; output: TStream);
const
  CBlockSize = 1 * 1024 * 1024 {1 MB};
var
  buffer        : TOmniValue;
  lastBufferSize: integer;
  memStr        : TMemoryStream;
  numBuffers    : integer;
  outQueue      : IOmniBlockingCollection;
begin
  outQueue := TOmniBlockingCollection.Create;
  numBuffers := (fileSize - 1) div CBlockSize + 1;
  lastBufferSize := (fileSize - 1) mod CBlockSize + 1;
  Parallel.ForEach(1, numBuffers).NoWait
    .NumTasks(Environment.Process.Affinity.Count)
    .OnStop(
      procedure
      begin
        outQueue.CompleteAdding;
      end)
    .Initialize(
      procedure(var taskState: TOmniValue)
      begin
        taskState := TGpRandom.Create;
      end)
    .Finalize(
      procedure(const taskState: TOmniValue)
      begin
        taskState.AsObject.Free;
      end)
    .Execute(
      procedure(const value: integer; var taskState: TOmniValue)
      var
        buffer      : TMemoryStream;
        bytesToWrite: integer;
      begin
        if value = numBuffers then
          bytesToWrite := lastBufferSize
        else
          bytesToWrite := CBlockSize;
        buffer := TMemoryStream.Create;
        buffer.Size := bytesToWrite;
        FillBuffer(buffer.Memory, bytesToWrite, 
          taskState.AsObject as TGpRandom);
        outQueue.Add(buffer);
      end);
  for buffer in outQueue do begin
    memStr := buffer.AsObject as TMemoryStream;
    output.CopyFrom(memStr, 0);
    FreeAndNil(memStr);
  end;
end;
Although the solution was well accepted (just think about how much code you would have to write to implement it with standard Delphi tools, that is with TThread), I was not satisfied with it. For starters, using parallel for to solve the problem looked like a cheat. Also, it it clumsy (three one-liner anonymous methods) and it uses a weird concept of iterating over block numbers instead of iterating over actual data byte count.
You can probably guess what I did. I expanded OmniThreadLibrary so that this class of problems can be solved without resorting to hacks.

IOmniCounter.Take

Running multiple tasks in parallel is not a problem but I got stuck in a completely unexpected place. You see, I wanted to write a solution working on the following principle:

  • do in parallel
    • repeat
      • find out how many bytes to process in this iteration
        • if there’s no more work to do, exit from the loop
      • prepare the buffer
      • send it to the output queue
      • until all work is done
The gotcha here is the third line. I wanted something like the following code, but thread safe.
if fileSize > CBlockSize then
  numBytes := CBlockSize
else
  numBytes := fileSize;
fileSize := fileSize - numBytes;
As this code fragment is completely thread-unsafe, I had to find a different solution. After exploring many options, I’ve decided to enhance IOmniCounter with a Take function. A thread-unsafe implementation of Take would be very similar to the code fragment above.
function TOmniCounter.Take(count: integer): integer;
begin
  if Value > count then
    Result := count
  else
    Result := Value;
  Value := Value - count;
end;
Of course, the implementation is much more complicated as it must be thread-safe. In case you’re interested, this is it:
function TOmniCounterImpl.Take(count: integer): integer;
var
  current : integer;
  newValue: integer;
begin
  repeat
    current := Value;
    if current <= 0 then begin
      Result := 0;
      Exit;
    end;
    newValue := current - count;
    if newValue < 0 then
      newValue := 0;
    if ocValue.CAS(current, newValue) then begin
      Result := current - newValue;
      Exit;
    end;
  until false;
end; { TOmniCounterImpl.Take }
There’s another version of Take which returns the result in a var parameter and sets its result to True if value returned is larger than zero.
function TOmniCounterImpl.Take(count: integer; var taken: integer): boolean;
begin
  taken := Take(count);
  Result := (taken > 0);
end; { TOmniCounterImpl.Take }
This version of Take allows you to write elegant iteration code which also works when multiple tasks are accessing the same counter instance.
counter := CreateCounter(numBytes);
while counter.Take(blockSize, blockBytes) do begin
  // process blockBytes bytes
end;

CompleteQueue

My new solution still needs to “complete” the output queue when all workers are done. To simplify the job (and make the code less ugly), I’ve written a simple helper function returning appropriate OnStop delegate.
class function Parallel.CompleteQueue(
  const queue: IOmniBlockingCollection): TProc;
begin
  Result :=
    procedure
    begin
      queue.CompleteAdding;
    end;
end;
You can use the CompleteQueue class function in any OnStop handler (for example in the code above).

ParallelTask

The main burden of processing is now taken by the new Parallel.ParallelTask primitive, which takes one task delegate and executes it in multiple threads. It is a very light-weight wrapper around the Parallel.Join which takes care of real execution. [Parallel.Join also had to be enhanced a little as it didn’t implement the OnStop handler.]
At the moment you should take ParallelTask as a highly experimental code. It was not tested a lot (only with the test below) and doesn’t really have all bells and whistles implemented (exception support is completely missing). You can, however, start playing with it by syncing to the latest SVN revision.
procedure CreateRandomFile(fileSize: integer; output: TStream);
const
  CBlockSize = 1 * 1024 * 1024 {1 MB};
var
  buffer   : TOmniValue;
  memStr   : TMemoryStream;
  outQueue : IOmniBlockingCollection;
  unwritten: IOmniCounter;
begin
  outQueue := TOmniBlockingCollection.Create;
  unwritten := CreateCounter(fileSize);
  Parallel.ParallelTask.NoWait
    .NumTasks(Environment.Process.Affinity.Count)
    .OnStop(Parallel.CompleteQueue(outQueue))
    .Execute(
      procedure
      var
        buffer      : TMemoryStream;
        bytesToWrite: integer;
        randomGen   : TGpRandom;
      begin
        randomGen := TGpRandom.Create;
        try
          while unwritten.Take(CBlockSize, bytesToWrite) do begin
            buffer := TMemoryStream.Create;
            buffer.Size := bytesToWrite;
            FillBuffer(buffer.Memory, bytesToWrite, randomGen);
            outQueue.Add(buffer);
          end;
        finally FreeAndNil(randomGen); end;
      end
    );
  for buffer in outQueue do begin
    memStr := buffer.AsObject as TMemoryStream;
    output.CopyFrom(memStr, 0);
    FreeAndNil(memStr);
  end;
end;
As you can see, it is now much more obvious how solution works and where the initialization points are. It is also much shorter and more readable.

4 comments:

  1. Weird problem with SVN :(

    Update
    Could not open the requested SVN filesystem

    -TP-

    ReplyDelete
  2. SVN is entirely under Google control, let's hope they'll fix this soon.

    ReplyDelete
  3. very elegant solution and library. congrats!

    ReplyDelete