While the biggest focus on the Pipeline improvement was on the exception handling, there were also some changes in the basic functionality.
The most important (and code breaking!) change happened to the Input function, which was renamed to From. If you now want to pass a input queue to a pipeline, use pipeline.From(queue).
Second code breaking change happened to the Run function which now returns IOmniPipeline (i.e. the pipeline interface) itself, not the output blocking collection. Luckily, both changes will be caught by the compiler which would not want to compile the old code any more.
The reason for both changes was that the IOmniPipeline now implements Input and Output functions returning input and output queue, respectively. There’s now a default input queue available (but may be empty if you don’t write anything to it) which you can switch for your own queue by calling the From function. In other words, you can provide data to the pipeline by either writing to pipeline.Input or specifying an input queue in pipeline.From(queue).
Pipeline now supports simple stages. If you specify stage delegate with signature procedure (const input: TOmniValue; var output: TOmniValue), Pipeline will provide own wrapper which will read data from the input queue, call your stage procedure and write data to the output queue. You can skip the writing phase by not assigning anything to the output value.
The last change is support for exceptions which I’ll discuss in a moment. Most of those features are demonstrated in the following example from the test 48_OtlParallelExceptions.
procedure StageException1(const input: TOmniValue; var output: TOmniValue); begin output := input.AsInteger * 42; end; procedure StageException2(const input, output: IOmniBlockingCollection); var outVal: TOmniValue; value : TOmniValue; begin for value in input do begin if value.IsException then begin value.AsException.Free; outVal.Clear; end else outVal := 1 / value.AsInteger; if not output.TryAdd(outVal) then break; //for end; end; procedure TfrmOtlParallelExceptions.btnPipeline1Click(Sender: TObject); var pipeline: IOmniPipeline; value : TOmniValue; begin Log('Should catch pipeline exception ' + '"TOmniValue cannot be converted to int64" in stage 1'); pipeline := Parallel.Pipeline .Stage(StageException1) .Stage(StageException2); pipeline.Run; // Provide input with pipeline.Input do begin // few normal elements Add(1); Add(2); // then trigger the exception in the first stage Add('three'); // this should never reach the pipeline output Add(4); CompleteAdding; end; // Process output try for value in pipeline.Output do Log(value.AsString); except on E: Exception do Log('Caught pipeline exception %s:%s', [E.ClassName, E.Message]); end; end;
As you can see from the code, StageException1 is a simple stage method. Also, btnPipeline1Click uses pipeline.Input to provide input to the pipeline and pipeline.Output to read output from the pipeline.
And then there’s the exception handling.
I have thought long and hard about possible ways to implement exceptions in the pipeline model and at the end decided that pipeline exceptions have to behave exactly the same as the pipeline data – they have to flow from one stage to another. To achieve that (and to have existing code behave sensibly) I had to make some other improvements first. Two of them, to be exact.
1. TOmniValue now natively supports exceptions. You can assign exception to a TOmniValue and read it back (property AsException) and you can also test TOmniValue if it contains an exception (IsException).
2. IOmniBlockingCollection can automatically re-raise exceptions. By default the collection still works exactly as before but if you call ReraiseExceptions method, it will flip internal flag that forces following code to execute in every TryTake call:
if Result and obcReraiseExceptions and value.IsException then raise value.AsException;
IOW, if “reraise exceptions” flag is set and queue element contains an exception, this exception is raised.
In pipeline, every queue (input queue, all intermediate queues and output queue) get ReraiseException called by default during the pipeline setup. Also, all unhandled exceptions are caught and stored in the output queue.
This causes exceptions to flow naturally through the pipeline. For example, if first stage causes exceptions, it is caught and stored in the output queue. When the next stage reads this element from the queue, collections detects that it’s an exception and reraises it. Second stage then catches this exception and stores it in its output queue and so on until the exception is stored in the output queue. When the main thread reads this element from the output queue, exception is reraised for the last time.
If you want to manually handle exceptions in some stage (i.e. if you know what kind of exceptions previous stage may produce and you want to do some special processing when they occur), you can disable automatic exception reraising by calling HandleExceptions. If you do that, you can test IsException function and then process value as exception. The followin code demonstrates that (StageException1 and StageException2 are the same as in the code above).
procedure TfrmOtlParallelExceptions.btnPipeline3Click(Sender: TObject); var pipeline: IOmniPipeline; value : TOmniValue; begin Log('Stage 2 should accept and correct stage 1 exception (third output will be empty)'); pipeline := Parallel.Pipeline .Stage(StageException1) .Stage(StageException2) .HandleExceptions .Run; // Provide input with pipeline.Input do begin // few normal elements Add(1); Add(2); // then trigger the exception in the first stage; this exception should be 'corrected' in the second stage Add('three'); Add(4); CompleteAdding; end; // Process output; there should be no exception in the output collection for value in pipeline.Output do Log(value.AsString); end;
Handling the last (output) stage is slightly different. If you don’t want to reraise exceptions, you have to turn “reraise exception” flag off on the output queue.
// Process output; exceptions will be processed manually pipeline.Output.ReraiseExceptions(false); for value in pipeline.Output do if not value.IsException then Log(value.AsString) else begin Log('%s:%s', [value.AsException.ClassName, value.AsException.Message]); value.AsException.Free; end;
There will probably be some minor changes between the current pipeline model and the 2.2 release but don’t expect anything drastic. If you want, you can go ahead and start using the new code, it has already proved itself in practice.