Tuesday, September 06, 2011

Life after 2.1: Pimp My Pipeline

While the biggest focus on the Pipeline improvement was on the exception handling, there were also some changes in the basic functionality.

The most important (and code breaking!) change happened to the Input function, which was renamed to From. If you now want to pass a input queue to a pipeline, use pipeline.From(queue).

Second code breaking change happened to the Run function which now returns IOmniPipeline (i.e. the pipeline interface) itself, not the output blocking collection. Luckily, both changes will be caught by the compiler which would not want to compile the old code any more.

The reason for both changes was that the IOmniPipeline now implements Input and Output functions returning input and output queue, respectively. There’s now a default input queue available (but may be empty if you don’t write anything to it) which you can switch for your own queue by calling the From function. In other words, you can provide data to the pipeline by either writing to pipeline.Input or specifying an input queue in pipeline.From(queue).

Pipeline now supports simple stages. If you specify stage delegate with signature procedure (const input: TOmniValue; var output: TOmniValue), Pipeline will provide own wrapper which will read data from the input queue, call your stage procedure and write data to the output queue. You can skip the writing phase by not assigning anything to the output value.

The last change is support for exceptions which I’ll discuss in a moment. Most of those features are demonstrated in the following example from the test 48_OtlParallelExceptions.

procedure StageException1(const input: TOmniValue; var output: TOmniValue);
begin
  output := input.AsInteger * 42;
end;

procedure StageException2(const input, output: IOmniBlockingCollection);
var
  outVal: TOmniValue;
  value : TOmniValue;
begin
  for value in input do begin
    if value.IsException then begin
      value.AsException.Free;
      outVal.Clear;
    end
    else
      outVal := 1 / value.AsInteger;
    if not output.TryAdd(outVal) then
      break; //for
  end;
end;

procedure TfrmOtlParallelExceptions.btnPipeline1Click(Sender: TObject);
var
  pipeline: IOmniPipeline;
  value   : TOmniValue;
begin
  Log('Should catch pipeline exception ' + 
    '"TOmniValue cannot be converted to int64" in stage 1');
  pipeline := Parallel.Pipeline
    .Stage(StageException1)
    .Stage(StageException2);
  pipeline.Run;

  // Provide input
  with pipeline.Input do begin
    // few normal elements
    Add(1);
    Add(2);
    // then trigger the exception in the first stage
    Add('three');
    // this should never reach the pipeline output
    Add(4);
    CompleteAdding;
  end;

  // Process output
  try
    for value in pipeline.Output do
      Log(value.AsString);
  except
    on E: Exception do
      Log('Caught pipeline exception %s:%s', [E.ClassName, E.Message]);
  end;
end;

As you can see from the code, StageException1 is a simple stage method. Also, btnPipeline1Click uses pipeline.Input to provide input to the pipeline and pipeline.Output to read output from the pipeline.

And then there’s the exception handling.

Pipelined Exceptions

I have thought long and hard about possible ways to implement exceptions in the pipeline model and at the end decided that pipeline exceptions have to behave exactly the same as the pipeline data – they have to flow from one stage to another. To achieve that (and to have existing code behave sensibly) I had to make some other improvements first. Two of them, to be exact.

1. TOmniValue now natively supports exceptions. You can assign exception to a TOmniValue and read it back (property AsException) and you can also test TOmniValue if it contains an exception (IsException).

2. IOmniBlockingCollection can automatically re-raise exceptions. By default the collection still works exactly as before but if you call ReraiseExceptions method, it will flip internal flag that forces following code to execute in every TryTake call:

  if Result and obcReraiseExceptions and value.IsException then
    raise value.AsException;

IOW, if “reraise exceptions” flag is set and queue element contains an exception, this exception is raised.

In pipeline, every queue (input queue, all intermediate queues and output queue) get ReraiseException called by default during the pipeline setup. Also, all unhandled exceptions are caught and stored in the output queue.

This causes exceptions to flow naturally through the pipeline. For example, if first stage causes exceptions, it is caught and stored in the output queue. When the next stage reads this element from the queue, collections detects that it’s an exception and reraises it. Second stage then catches this exception and stores it in its output queue and so on until the exception is stored in the output queue. When the main thread reads this element from the output queue, exception is reraised for the last time.

pipeline exception model

If you want to manually handle exceptions in some stage (i.e. if you know what kind of exceptions previous stage may produce and you want to do some special processing when they occur), you can disable automatic exception reraising by calling HandleExceptions. If you do that, you can test IsException function and then process value as exception. The followin code demonstrates that (StageException1 and StageException2 are the same as in the code above).

procedure TfrmOtlParallelExceptions.btnPipeline3Click(Sender: TObject);
var
  pipeline: IOmniPipeline;
  value   : TOmniValue;
begin
  Log('Stage 2 should accept and correct stage 1 exception (third output will be empty)');
  pipeline := Parallel.Pipeline
    .Stage(StageException1)
    .Stage(StageException2)
      .HandleExceptions
    .Run;

  // Provide input
  with pipeline.Input do begin
    // few normal elements
    Add(1);
    Add(2);
    // then trigger the exception in the first stage; this exception should be 'corrected' in the second stage
    Add('three');
    Add(4);
    CompleteAdding;
  end;

  // Process output; there should be no exception in the output collection
  for value in pipeline.Output do
    Log(value.AsString);
end;

Handling the last (output) stage is slightly different. If you don’t want to reraise exceptions, you have to turn “reraise exception” flag off on the output queue.

  // Process output; exceptions will be processed manually
  pipeline.Output.ReraiseExceptions(false);
  for value in pipeline.Output do
    if not value.IsException then
      Log(value.AsString)
    else begin
      Log('%s:%s', [value.AsException.ClassName, value.AsException.Message]);
      value.AsException.Free;
    end;

There will probably be some minor changes between the current pipeline model and the 2.2 release but don’t expect anything drastic. If you want, you can go ahead and start using the new code, it has already proved itself in practice.

4 comments:

  1. The stage signature's a bit awkward. Why procedure (const input: TOmniValue; var output: TOmniValue) rather than function (const input: TOmniValue): TOmniValue?

    ReplyDelete
  2. This comment has been removed by the author.

    ReplyDelete
  3. I guess it's so that you can choose to not return an output? ... if the output is returned as a Result of a function, and you decide not to return a Result, the compiler will warn you that the value of the function might be undefined.

    ReplyDelete
  4. @Mason, Exactly what Chee said. Saying that, another overload accepting what you suggested would not hurt.

    ReplyDelete