class procedure Join(const task1, task2: TProc); overload; class procedure Join(const task1, task2: TOmniTaskDelegate); overload; class procedure Join(const tasks: array of TProc); overload; class procedure Join(const tasks: array of TOmniTaskDelegate); overload;Later it got an optional parameter of the IOmnITaskConfig type, but that didn’t change its simplicity. You called Join, it executed some code in parallel, and only when all code blocks completed its execution, your main thread would proceed by executing the statement following the Join call.
Then I started to think about handling exceptions (just as I did for the Parallel.Future) and somehow this simplicity didn’t feel right to me anymore. At the same time I got involved in a prolonged discussion with Антон Алисов (Anton Alisov) and together we defined new features that new Join would have to have.
In post-2.1 world (i.e. you have to follow the trunk to use these improvements) Parallel.Join returns an IOmniParallelJoin interface, similar to all other methods of the Parallel class.
class function Join(const task1, task2: TProc): IOmniParallelJoin; overload; class function Join(const task1, task2: TOmniJoinDelegate): IOmniParallelJoin; overload; class function Join(const tasks: array of TProc): IOmniParallelJoin; overload; class function Join(const tasks: array of TOmniJoinDelegate): IOmniParallelJoin; overload;This interface is in turn defined as
IOmniParallelJoin = interface function Cancel: IOmniParallelJoin; function DetachException: Exception; function Execute: IOmniParallelJoin; function FatalException: Exception; function IsCancelled: boolean; function IsExceptional: boolean; function NumTasks(numTasks: integer): IOmniParallelJoin; function Task(const task: TProc): IOmniParallelJoin; overload; function Task(const task: TOmniJoinDelegate): IOmniParallelJoin; overload; function TaskConfig(const config: IOmniTaskConfig): IOmniParallelJoin; function NoWait: IOmniParallelJoin; function WaitFor(timeout_ms: cardinal): boolean; end;As you can see, there’s lots of new stuff here, some more important, some less. There is, however, the single most important function – Execute.
Breaking changeAll those changes have introduced a very inconvenient breaking change to your code :(
This simple code, which worked well with OmniThreadLibrary 2.0 and 2.1, would now silently break.
Parallel.Join( procedure begin end, procedure begin end);When the previous implementation has run all background tasks in the Join itself, the new implementation defers code execution to the Execute method. To fix the code above, you have to append .Execute at the end.
Parallel.Join( procedure begin end, procedure begin end).Execute;There’s another breaking change, but at least the compiler will warn you about it. The following code does not compile anymore.
Parallel.Join( procedure (const task: IOmniTask) begin task.Send(...); end, procedure (const task: IOmniTask) begin task.Send(...); end);There is no overload accepting IOmniTask anymore. Instead of that, you have to use an (anonymous) method that accepts IOmniJoinState parameter and then use the Task method of that interface. The reasons for that will become clear in the next section.
Parallel.Join( procedure (const joinState: IOmniJoinState) begin joinState.Task.Send(...); end, procedure (const joinState: IOmniJoinState) begin joinState.Task.Send(...); end).Execute;New ways to use Join are documented in the 37_ParallelJoin demo.
New functionalityYou can now tell the scheduler how many background threads to use by calling the NumTasks method, for example:
Parallel.Join(...).NumTasks(4).Execute;You can add tasks dynamically by (repeatedly) calling the .Task function. In fact, that’s just how the Parallel.Join overloads are implemented.
class function Parallel.Join(const task1, task2: TProc): IOmniParallelJoin; begin Result := TOmniParallelJoin.Create.Task(task1).Task(task2); end; class function Parallel.Join(const tasks: array of TProc): IOmniParallelJoin; var aTask: TProc; begin Result := TOmniParallelJoin.Create; for aTask in tasks do Result.Task(aTask); end;IOmniTaskConfig is now provided via the TaskConfig function (see updated demo 47_TaskConfig).
Background tasks now support cooperative cancellation. If you are using TOmniJoinDelegate tasks (that is, tasks accepting the IOmniJoinState parameter), then any task can call the Cancel method of this interface. This, in turn, sets internal cancellation flag which may be queried by calling the IsCancelled method. That way, one task can interrupt other tasks provided that they are testing IsCancelled repeatedly.
Parallel.Join can optionally be called with the .NoWait modifier. In this version, Join returns immediately and main thread can continue with execution. Main thread can also cancel its subtasks by calling IOmniParallelJoin.Cancel and can test the cancellation flag by calling IsCancelled. To wait on background tasks to complete the execution, main thread must call IOmniParallelJoin.WaitFor which accepts an optional timeout parameter.
The following demo code, taken from the 37_ParallelJoin demo, demonstrates most of concepts mentioned above.
var join: IOmniParallelJoin; time: int64; begin FJoinCount.Value := 0; FJoinCount2.Value := 0; join := Parallel.Join( procedure (const joinState: IOmniJoinState) var i: integer; begin for i := 1 to 10 do begin Sleep(100); FJoinCount.Increment; if joinState.IsCancelled then break; //for end; end, procedure (const joinState: IOmniJoinState) var i: integer; begin for i := 1 to 10 do begin Sleep(200); FJoinCount2.Increment; if joinState.IsCancelled then break; //for end; end ).NoWait.Execute; Sleep(500); time := DSiTimeGetTime64; join.Cancel.WaitFor(INFINITE); Log(Format('Waited %d ms for joins to terminate', [DSiElapsedTime64(time)])); Log(Format('Joins counted up to %d and %d', [FJoinCount.Value, FJoinCount2.Value])); end;The call to Parallel.Join starts two tasks. Because the NoWait is used, the call returns immediately and stores resulting IOmniParallelJoin interface in the local variable join. Main code then sleeps for half a second, cancels the execution and immediately waits for background tasks to terminate.
Both tasks execute a simple loop which waits a little, increments a counter and checks the cancellation flag. Because the cancellation flag is set after 500 ms, we would except five or six repetitions of the first loop (five repetitions take exactly 500 ms and we can’t tell exactly what will execute first – Cancel or fifth IsCancelled) and three repetitions of the second loop. That is exactly what the program prints out.
IOmniJoinState is the interface that provides you with access to the underlying task interface (Task), to the cancellation mechanism (Cancel and IsCancelled) and to the exception handling mechanism (IsExceptional).
IOmniJoinState = interface function GetTask: IOmniTask; // procedure Cancel; function IsCancelled: boolean; function IsExceptional: boolean; property Task: IOmniTask read GetTask; end;
Exception handlingNew Join handles exceptions much better than the old one. (Not surprising if you know that they were just ignored in the old implementation.) Exceptions are now implemented in the same manner as in the Parallel.Future.
Exceptions in background tasks are caught and re-raised in the WaitFor method. If you are using synchronous version of Join (without the NoWait modifier), then WaitFor is called at the end of the Execute method (in other words, Parallel.Join(…).Execute will re-raise task exceptions). If, however, you are using the asynch version (by calling Parallel.Join(…).NoWait.Execute), exception will only be raised when you wait on the background tasks to complete by calling WaitFor. Both approaches are shown in the 48_OtlParallelExceptions demo.
You can test for the exception by calling FatalException function. It will first wait on all background task to complete (without raising the exception) and then return the exception object.You can also detach the exception object from the Parallel.Join and handle it yourself by using the DetachException function. All of these are also shown in the 48_OtlParallelExceptions demo.
There’s also an IsExceptional function (available in IOmniParallelJoin and IOmniJoinState interfaces) which tells you if any background task has thrown an exception. [Most probably this function will also make it into Parallel.Future construct.]
There’s an additional complication in Join – as it executes multiple tasks, there can be multiple background exceptions. To get you full access to those exceptions, Parallel,Join wraps them into EJoinException object.
EJoinException = class(Exception) constructor Create; reintroduce; destructor Destroy; override; procedure Add(iTask: integer; taskException: Exception); function Count: integer; property Inner[idxException: integer]: TJoinInnerException read GetInner; default; end;This exception class contains combined error messages from all background tasks in its Message property and allows you to access exception information for all caught exceptions directly. The following code demonstrates this.
var iInnerExc: integer; begin try Parallel.Join([ procedure begin raise ETestException.Create('Exception 1 in Parallel.Join'); end, procedure begin end, procedure begin raise ETestException.Create('Exception 2 in Parallel.Join'); end]).Execute; except on E: EJoinException do begin Log('Join raised exception %s:%s', [E.ClassName, E.Message]); for iInnerExc := 0 to E.Count - 1 do Log(' Task #%d raised exception: %s:%s', [E[iInnerExc].TaskNumber, E[iInnerExc].FatalException.ClassName, E[iInnerExc].FatalException.Message]); end; end; end;The iInnerExc variable loops over all caught exceptions and for each such exception displays the task number (first parameter to Parallel.Join has task number 0, second task number 1 and so on), exception class and exception message.
This approach allows you to either just log the exception or, if you are interested in details, examine specific inner exceptions and handle them appropriately.