Friday, July 29, 2011

Life after 2.1: Parallel.Join’s new clothes

Parallel.Join has started its life as a very simple construct.
class procedure Join(const task1, task2: TProc); overload;
class procedure Join(const task1, task2: TOmniTaskDelegate); overload;
class procedure Join(const tasks: array of TProc); overload;
class procedure Join(const tasks: array of TOmniTaskDelegate); overload;
Later it got an optional parameter of the IOmnITaskConfig type, but that didn’t change its simplicity. You called Join, it executed some code in parallel, and only when all code blocks completed its execution, your main thread would proceed by executing the statement following the Join call.
Then I started to think about handling exceptions (just as I did for the Parallel.Future) and somehow this simplicity didn’t feel right to me anymore. At the same time I got involved in a prolonged discussion with Антон Алисов (Anton Alisov) and together we defined new features that new Join would have to have.
In post-2.1 world (i.e. you have to follow the trunk to use these improvements) Parallel.Join returns an IOmniParallelJoin interface, similar to all other methods of the Parallel class.
class function Join(const task1, task2: TProc): IOmniParallelJoin; overload;
class function Join(const task1, task2: TOmniJoinDelegate): 
  IOmniParallelJoin; overload;
class function Join(const tasks: array of TProc): IOmniParallelJoin; overload;
class function Join(const tasks: array of TOmniJoinDelegate):
  IOmniParallelJoin; overload;
This interface is in turn defined as
IOmniParallelJoin = interface
  function  Cancel: IOmniParallelJoin;
  function  DetachException: Exception;
  function  Execute: IOmniParallelJoin;
  function  FatalException: Exception;
  function  IsCancelled: boolean;
  function  IsExceptional: boolean;
  function  NumTasks(numTasks: integer): IOmniParallelJoin;
  function  Task(const task: TProc): IOmniParallelJoin; overload;
  function  Task(const task: TOmniJoinDelegate): IOmniParallelJoin; overload;
  function  TaskConfig(const config: IOmniTaskConfig): IOmniParallelJoin;
  function  NoWait: IOmniParallelJoin;
  function  WaitFor(timeout_ms: cardinal): boolean;
end;
As you can see, there’s lots of new stuff here, some more important, some less. There is, however, the single most important function – Execute.

Breaking change

All those changes have introduced a very inconvenient breaking change to your code :(
This simple code, which worked well with OmniThreadLibrary 2.0 and 2.1, would now silently break.
Parallel.Join(
  procedure 
  begin
  end,
  procedure 
  begin
  end);
When the previous implementation has run all background tasks in the Join itself, the new implementation defers code execution to the Execute method. To fix the code above, you have to append .Execute at the end.
Parallel.Join(
  procedure
  begin
  end,
  procedure    begin
  end).Execute;
There’s another breaking change, but at least the compiler will warn you about it. The following code does not compile anymore.
Parallel.Join(
  procedure (const task: IOmniTask)
  begin
    task.Send(...);
  end,
  procedure (const task: IOmniTask)
  begin
    task.Send(...);
  end);
There is no overload accepting IOmniTask anymore. Instead of that, you have to use an (anonymous) method that accepts IOmniJoinState parameter and then use the Task method of that interface. The reasons for that will become clear in the next section.
Parallel.Join(
  procedure (const joinState: IOmniJoinState)
  begin
    joinState.Task.Send(...);
  end,
  procedure (const joinState: IOmniJoinState)
  begin
    joinState.Task.Send(...);
  end).Execute;
New ways to use Join are documented in the 37_ParallelJoin demo.

New functionality

You can now tell the scheduler how many background threads to use by calling the NumTasks method, for example:
Parallel.Join(...).NumTasks(4).Execute;
You can add tasks dynamically by (repeatedly) calling the .Task function. In fact, that’s just how the Parallel.Join overloads are implemented.
class function Parallel.Join(const task1, task2: TProc): IOmniParallelJoin;
begin
  Result := TOmniParallelJoin.Create.Task(task1).Task(task2);
end;

class function Parallel.Join(const tasks: array of TProc): IOmniParallelJoin;
var
  aTask: TProc;
begin
  Result := TOmniParallelJoin.Create;
  for aTask in tasks do
    Result.Task(aTask);
end;
IOmniTaskConfig is now provided via the TaskConfig function (see updated demo 47_TaskConfig).
Background tasks now support cooperative cancellation. If you are using TOmniJoinDelegate tasks (that is, tasks accepting the IOmniJoinState parameter), then any task can call the Cancel method of this interface. This, in turn, sets internal cancellation flag which may be queried by calling the IsCancelled method. That way, one task can interrupt other tasks provided that they are testing IsCancelled repeatedly.
Parallel.Join can optionally be called with the .NoWait modifier. In this version, Join returns immediately and main thread can continue with execution. Main thread can also cancel its subtasks by calling IOmniParallelJoin.Cancel and can test the cancellation flag by calling IsCancelled. To wait on background tasks to complete the execution, main thread must call IOmniParallelJoin.WaitFor which accepts an optional timeout parameter.
The following demo code, taken from the 37_ParallelJoin demo, demonstrates most of concepts mentioned above.
var
  join: IOmniParallelJoin;
  time: int64;
begin
  FJoinCount.Value := 0;
  FJoinCount2.Value := 0;
  join := Parallel.Join(
    procedure (const joinState: IOmniJoinState)
    var
      i: integer;
    begin
      for i := 1 to 10 do begin
        Sleep(100);
        FJoinCount.Increment;
        if joinState.IsCancelled then
          break; //for
      end;
    end,
    procedure (const joinState: IOmniJoinState)
    var
      i: integer;
    begin
      for i := 1 to 10 do begin
        Sleep(200);
        FJoinCount2.Increment;
        if joinState.IsCancelled then
          break; //for
      end;
    end
  ).NoWait.Execute;
  Sleep(500);
  time := DSiTimeGetTime64;
  join.Cancel.WaitFor(INFINITE);
  Log(Format('Waited %d ms for joins to terminate', [DSiElapsedTime64(time)]));
  Log(Format('Joins counted up to %d and %d', 
    [FJoinCount.Value, FJoinCount2.Value]));
end;
The call to Parallel.Join starts two tasks. Because the NoWait is used, the call returns immediately and stores resulting IOmniParallelJoin interface in the local variable join. Main code then sleeps for half a second, cancels the execution and immediately waits for background tasks to terminate.
Both tasks execute a simple loop which waits a little, increments a counter and checks the cancellation flag. Because the cancellation flag is set after 500 ms, we would except five or six repetitions of the first loop (five repetitions take exactly 500 ms and we can’t tell exactly what will execute first – Cancel or fifth IsCancelled) and three repetitions of the second loop. That is exactly what the program prints out.
IOmniJoinState is the interface that provides you with access to the underlying task interface (Task), to the cancellation mechanism (Cancel and IsCancelled) and to the exception handling mechanism (IsExceptional).
IOmniJoinState = interface
  function  GetTask: IOmniTask;
//
  procedure Cancel;
  function  IsCancelled: boolean;
  function  IsExceptional: boolean;
  property Task: IOmniTask read GetTask;
end; 

Exception handling

New Join handles exceptions much better than the old one. (Not surprising if you know that they were just ignored in the old implementation.) Exceptions are now implemented in the same manner as in the Parallel.Future.
Exceptions in background tasks are caught and re-raised in the WaitFor method. If you are using synchronous version of Join (without the NoWait modifier), then WaitFor is called at the end of the Execute method (in other words, Parallel.Join(…).Execute will re-raise task exceptions). If, however, you are using the asynch version (by calling Parallel.Join(…).NoWait.Execute), exception will only be raised when you wait on the background tasks to complete by calling WaitFor. Both approaches are shown in the 48_OtlParallelExceptions demo.
You can test for the exception by calling FatalException function. It will first wait on all background task to complete (without raising the exception) and then return the exception object.You can also detach the exception object from the Parallel.Join and handle it yourself by using the DetachException function. All of these are also shown in the 48_OtlParallelExceptions demo.
There’s also an IsExceptional function (available in IOmniParallelJoin and IOmniJoinState interfaces) which tells you if any background task has thrown an exception. [Most probably this function will also make it into Parallel.Future construct.]
There’s an additional complication in Join – as it executes multiple tasks, there can be multiple background exceptions. To get you full access to those exceptions, Parallel,Join wraps them into EJoinException object.
EJoinException = class(Exception)
  constructor Create; reintroduce;
  destructor  Destroy; override;
  procedure Add(iTask: integer; taskException: Exception);
  function  Count: integer;
  property Inner[idxException: integer]: TJoinInnerException 
    read GetInner; default;
end;
This exception class contains combined error messages from all background tasks in its Message property and allows you to access exception information for all caught exceptions directly. The following code demonstrates this.
var
  iInnerExc: integer;
begin
  try
    Parallel.Join([
      procedure begin
        raise ETestException.Create('Exception 1 in Parallel.Join');
      end,
      procedure begin
      end,
      procedure begin
        raise ETestException.Create('Exception 2 in Parallel.Join');
      end]).Execute;
  except
    on E: EJoinException do begin
      Log('Join raised exception %s:%s', [E.ClassName, E.Message]);
      for iInnerExc := 0 to E.Count - 1 do
        Log('  Task #%d raised exception: %s:%s', [E[iInnerExc].TaskNumber,
          E[iInnerExc].FatalException.ClassName,
          E[iInnerExc].FatalException.Message]);
    end;
  end;
end;
The iInnerExc variable loops over all caught exceptions and for each such exception displays the task number (first parameter to Parallel.Join has task number 0, second task number 1 and so on), exception class and exception message.
This approach allows you to either just log the exception or, if you are interested in details, examine specific inner exceptions and handle them appropriately.

3 comments:

  1. Anonymous10:16

    Wow...that's create...and easy up the implementation. Thank you soo much for sharing this great thread library...8-)))

    ReplyDelete
  2. Hi! I m plan to translate some of your OTL posts into russian! Do you plan to have some wiki, so it will be possible to have multylanguage pages?

    ReplyDelete
  3. Great!

    Yes, there's http://code.google.com/p/omnithreadlibrary/w/list which could hold such documents, but the better way would probably be to link to them from original articles and from otl.17slon.com.

    ReplyDelete