A few weeks ago my answer would be along the lines of setting communication channel manually or dropping down to low-level parallelism (i.e. the CreateTask) but both are very complicated solutions. There was certainly a room for better solution.
Since the release 910, released on April 17th, OtlParallel contains much simpler way to configure tasks and to set them up for communication with the owner.
Implementation details vary from one high-level structure to the other and we’ll look at them later. What they all have in common is the IOmniTaskConfig interface returned from the Parallel.TaskConfig function.
class function Parallel.TaskConfig: IOmniTaskConfig; begin Result := TOmniTaskConfig.Create; end;This interface contains functions to set up messaging handlers (OnMessage), termination handlers (OnTerminated), pass cancellation token to the task (CancelWith), assign the task with a monitor (MonitorWith), or assign a counter (WithCounter) and a lock (WithLock) to the task. All those function return interface itself so they can be used in a fluent (chained) manner.
IOmniTaskConfig = interface procedure Apply(const task: IOmniTaskControl); function CancelWith(const token: IOmniCancellationToken): IOmniTaskConfig; function MonitorWith(const monitor: IOmniTaskControlMonitor): IOmniTaskConfig; function OnMessage(eventDispatcher: TObject): IOmniTaskConfig; overload; function OnMessage(eventHandler: TOmniTaskMessageEvent): IOmniTaskConfig; overload; function OnMessage(msgID: word; eventHandler: TOmniTaskMessageEvent): IOmniTaskConfig; overload; function OnMessage(msgID: word; eventHandler: TOmniMessageExec): IOmniTaskConfig; overload; function OnTerminated(eventHandler: TOmniTaskTerminatedEvent): IOmniTaskConfig; overload; function OnTerminated(eventHandler: TOmniOnTerminatedFunction): IOmniTaskConfig; overload; function WithCounter(const counter: IOmniCounter): IOmniTaskConfig; function WithLock(const lock: TSynchroObject; autoDestroyLock: boolean = true): IOmniTaskConfig; overload; function WithLock(const lock: IOmniCriticalSection): IOmniTaskConfig; overload; end;As I’ve said the actual usage of IOmniTaskConfig depends on the high-level parallelism you are using. To make it simpler, I’ve collected usage patterns in the new demo application 47_TaskConfig.
Async
Parallel.Async methods all accept optional parameter of type IOmniTaskConfig. If set, the background task will be configured with the specified configuration.class procedure Async(task: TProc; taskConfig: IOmniTaskConfig = nil); overload; class procedure Async(task: TOmniTaskDelegate; taskConfig: IOmniTaskConfig = nil); overload; class procedure Async(task: TProc; onTermination: TProc; taskConfig: IOmniTaskConfig = nil); overload; class procedure Async(task: TOmniTaskDelegate; onTermination: TProc; taskConfig: IOmniTaskConfig = nil); overload;The demo application configures the task to send messages to Self (i.e. to the form) and sends some logging messages from the background task.
Parallel.Async( procedure (const task: IOmniTask) var i: integer; begin task.Comm.Send(WM_LOG, 'Starting'); for i := 1 to 10 do begin task.Comm.Send(WM_LOG, i); Sleep(200); end; task.Comm.Send(WM_LOG, 'Completed'); end, procedure begin btnAsync.Enabled := true; end, Parallel.TaskConfig.OnMessage(Self) );Those messages are handled in the main form by providing the WM_LOG handlers.
procedure WMLog(var msg: TOmniMessage); message WM_LOG;
procedure TfrmDemoParallelTaskConfig.WMLog(var msg: TOmniMessage); begin lbLog.ItemIndex := lbLog.Items.Add('BGTASK: ' + msg.MsgData); end;
Join
Similarly to Async, Parallel.Join accepts an optional parameter of the IOmniTaskConfig type.class procedure Join(const task1, task2: TProc; taskConfig: IOmniTaskConfig = nil); overload; class procedure Join(const task1, task2: TOmniTaskDelegate; taskConfig: IOmniTaskConfig = nil); overload; class procedure Join(const tasks: array of TProc; taskConfig: IOmniTaskConfig = nil); overload; class procedure Join(const tasks: array of TOmniTaskDelegate; taskConfig: IOmniTaskConfig = nil); overload;The demo application uses TaskConfig.WithLock to pass a shared lock to two Join tasks accessing a shared resource.
procedure TfrmDemoParallelTaskConfig.btnJoinClick(Sender: TObject); begin FSharedValue := 42; Parallel.Join( procedure (const task: IOmniTask) var i: integer; begin for i := 1 to 1000000 do begin task.Lock.Acquire; FSharedValue := FSharedValue + 17; task.Lock.Release; end; end, procedure (const task: IOmniTask) var i: integer; begin for i := 1 to 1000000 do begin task.Lock.Acquire; FSharedValue := FSharedValue - 17; task.Lock.Release; end; end ,Parallel.TaskConfig.WithLock(CreateOmniCriticalSection) ); lbLog.ItemIndex := lbLog.Items.Add(Format( 'JOIN: Shared value = %d (should be 42)', [FSharedValue])); end;
Future
Parallel.Future works in the same way – IOmniTaskConfig is an optional parameter.class function Future<T>(action: TOmniFutureDelegate<T>; taskConfig: IOmniTaskConfig = nil): IOmniFuture<T>; overload; class function Future<T>(action: TOmniFutureDelegateEx<T>; taskConfig: IOmniTaskConfig = nil): IOmniFuture<T>; overload;The demo application shows how you can send a message from the Future task.
procedure TfrmDemoParallelTaskConfig.btnFutureClick(Sender: TObject); begin btnFuture.Enabled := false; FFuture := Parallel.Future<integer>( function (const task: IOmniTask): integer begin Sleep(500); Result := 42; task.Comm.Send(WM_FUTURE_RESULT); end, Parallel.TaskConfig.OnMessage(Self) ) end;This message is handled in the main thread by providing a WM_FUTURE_RESULT message handler.
procedure WMFutureResult(var msg: TOmniMessage); message WM_FUTURE_RESULT;
procedure TfrmDemoParallelTaskConfig.WMFutureResult(var msg: TOmniMessage); begin lbLog.ItemIndex := lbLog.Items.Add('FUTURE: ' + IntToStr(FFuture.Value)); FFuture := nil; btnFuture.Enabled := true; end;
Pipeline
With a Parallel.Pipeline, different configuration block can be sent to each stage.IOmniPipeline = interface function Stage(pipelineStage: TPipelineStageDelegate; taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload; function Stage(pipelineStage: TPipelineStageDelegateEx; taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload; function Stages(const pipelineStages: array of TPipelineStageDelegate; taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload; function Stages(const pipelineStages: array of TPipelineStageDelegateEx; taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload; end;The demo application again just sends messages from pipeline stages to the main thread.
procedure TfrmDemoParallelTaskConfig.btnPipelineClick(Sender: TObject); var pipeOut: IOmniBlockingCollection; value : TOmniValue; begin pipeOut := Parallel.Pipeline .Stages([PipelineStage1, PipelineStage2], Parallel.TaskConfig.OnMessage(Self)) .Run; while not pipeOut.TryTake(value) do Application.ProcessMessages; lbLog.ItemIndex := lbLog.Items.Add('PIPELINE: ' + IntToStr(value) + ' (should be 500500)'); end;
procedure TfrmDemoParallelTaskConfig.PipelineStage1(const input, output: IOmniBlockingCollection; const task: IOmniTask); var i: integer; begin task.Comm.Send(WM_LOG, 'Pipeline stage 1 starting'); for i := 1 to 1000 do output.Add(i); task.Comm.Send(WM_LOG, 'Pipeline stage 1 stopped'); end;
ForEach
Parallel.ForEach uses yet another approach. IOmniParallelLoop (the interface returned from the ForEach method) implements the TaskConfig function which accepts configuration block.IOmniParallelLoop = interface function TaskConfig( const config: IOmniTaskConfig): IOmniParallelLoop; end;The demo application illustrates how to use TaskConfig in ForEach and at the same time demonstrates another interesting fact – although the interface returned from Parallel.ForEach is most of the time consumed in the Execute method, this doesn’t have to be the case. It can also be stored in a variable/field. This is especially useful if you’re running ForEach with the NoWait option. In this case, the reference to the interface returned from Parallel.ForEach must be stored somewhere until the parallel loop completes its execution.
The code below demonstrates this. Parallel.ForEach result is stored in a global FParallel field. OnStop method is used to clear out this field at the end. FParallel.Execute is then called as a separate statement.
var
FParallel: IOmniParallelLoop<integer>;
procedure TfrmDemoParallelTaskConfig.btnForEachClick(Sender: TObject); begin FParallel := Parallel.ForEach(1, 17) .TaskConfig(Parallel.TaskConfig.OnMessage(Self)) .NoWait .OnStop(procedure begin FParallel := nil; end); FParallel .Execute( procedure (const task: IOmniTask; const value: integer) begin task.Comm.Send(WM_LOG, value); end); end;
Fork/Join
Parallel.ForkJoin implements only a limited support for task configuration. Although you can pass the task configuration to the IOmniForkJoin interface by calling the TaskConfig method (just as in the ForEach case), the IOmniTask interface is not exposed in the parallel computations (in IOmniForkJoin.Compute tasks) and therefore cannot be used for messaging. This is caused by the ForkJoin implementation and probably wouldn’t change in the near future.IOmniForkJoin = interface function TaskConfig(const config: IOmniTaskConfig): IOmniForkJoin; end;Because of this limitation, demo application only uses task configuration to send a notification that fork/join method has completed its execution.
procedure TfrmDemoParallelTaskConfig.btnForkJoinClick(Sender: TObject); var data: TArray<integer>; max : integer; begin data := TArray<integer>.Create(1, 17, 4, 99, -250, 7, 13, 132, 101); max := ParallelMax( data, Parallel.ForkJoin<integer> .TaskConfig(Parallel.TaskConfig.OnTerminated( procedure (const task: IOmniTaskControl) begin lbLog.ItemIndex := lbLog.Items.Add(Format( 'COMPUTE: Task %d terminated', [task.UniqueID])); end )), Low(data), High(data)); lbLog.ItemIndex := lbLog.Items.Add('FORKJOIN: ' + IntToStr(max) + ' (expected 132)'); end;
Very nice!
ReplyDeleteNow that I can monitor a parallel task and attach an IOmniTaskControl to it, is there any way for the code running in the parallel task to gain access to its IOmniTaskControl?
@Mason: No, why would you want to do that? Are you missing any IOmniTaskControl functionality on the IOmniTask side?
ReplyDeleteWhat I need is a way for a task to pause its own execution for either a specific amount of time or until an external event has completed. The obvious method, using Sleep or the WaitFor* APIs, will leave a suspended thread sitting around in the task pool taking up space.
ReplyDeleteIOmniTaskControl provides equivalent methods that appear to make the task pool aware that this task is suspended for the moment. (Unless I'm reading this all wrong.) So I'm looking for a way to gain access to this functionality from within the task.
I believe you're reading it wrong. Which methods?
ReplyDeleteWhat you're looking for is at the moment supported at the low-level API (i.e. SetTimer for timed triggers and RegisterWaitObject for WaitFor* integration). At the moment it would be quite hard to add support for those into OtlParallel as high-level tasks are not based on the TOmniWorker.