Tuesday, April 26, 2011

Configuring background OtlParallel tasks

High-level OmniThreadLibrary parallelism (i.e. the OtlParallel unit) is great when you are running mostly independent parts of code but what can you do when you want to communicate with the main thread from a task?
A few weeks ago my answer would be along the lines of setting communication channel manually or dropping down to low-level parallelism (i.e. the CreateTask) but both are very complicated solutions. There was certainly a room for better solution.
Since the release 910, released on April 17th, OtlParallel contains much simpler way to configure tasks and to set them up for communication with the owner.
Implementation details vary from one high-level structure to the other and we’ll look at them later. What they all have in common is the IOmniTaskConfig interface returned from the Parallel.TaskConfig function.
class function Parallel.TaskConfig: IOmniTaskConfig;
begin
  Result := TOmniTaskConfig.Create;
end;
This interface contains functions to set up messaging handlers (OnMessage), termination handlers (OnTerminated), pass cancellation token to the task (CancelWith), assign the task with a monitor (MonitorWith), or assign a counter (WithCounter) and a lock (WithLock) to the task. All those function return interface itself so they can be used in a fluent (chained) manner.
IOmniTaskConfig = interface
  procedure Apply(const task: IOmniTaskControl);
  function  CancelWith(const token: IOmniCancellationToken): 
    IOmniTaskConfig;
  function  MonitorWith(const monitor: IOmniTaskControlMonitor): 
    IOmniTaskConfig;
  function  OnMessage(eventDispatcher: TObject): 
    IOmniTaskConfig; overload;
  function  OnMessage(eventHandler: TOmniTaskMessageEvent): 
    IOmniTaskConfig; overload;
  function  OnMessage(msgID: word; 
    eventHandler: TOmniTaskMessageEvent): IOmniTaskConfig; overload;
  function  OnMessage(msgID: word; eventHandler: TOmniMessageExec): 
    IOmniTaskConfig; overload;
  function  OnTerminated(eventHandler: TOmniTaskTerminatedEvent): 
    IOmniTaskConfig; overload;
  function  OnTerminated(eventHandler: TOmniOnTerminatedFunction): 
    IOmniTaskConfig; overload;
  function  WithCounter(const counter: IOmniCounter): IOmniTaskConfig;
  function  WithLock(const lock: TSynchroObject; 
    autoDestroyLock: boolean = true): IOmniTaskConfig; overload;
  function  WithLock(const lock: IOmniCriticalSection): 
    IOmniTaskConfig; overload;
end;
As I’ve said the actual usage of IOmniTaskConfig depends on the high-level parallelism you are using. To make it simpler, I’ve collected usage patterns in the new demo application 47_TaskConfig.

Async

Parallel.Async methods all accept optional parameter of type IOmniTaskConfig. If set, the background task will be configured with the specified configuration.
class procedure Async(task: TProc; 
  taskConfig: IOmniTaskConfig = nil); overload;
class procedure Async(task: TOmniTaskDelegate; 
  taskConfig: IOmniTaskConfig = nil); overload;
class procedure Async(task: TProc; onTermination: TProc; 
  taskConfig: IOmniTaskConfig = nil); overload;
class procedure Async(task: TOmniTaskDelegate; onTermination: TProc;
   taskConfig: IOmniTaskConfig = nil); overload;
The demo application configures the task to send messages to Self (i.e. to the form) and sends some logging messages from the background task.
  Parallel.Async(
    procedure (const task: IOmniTask)
    var
      i: integer;
    begin
      task.Comm.Send(WM_LOG, 'Starting');
      for i := 1 to 10 do begin
        task.Comm.Send(WM_LOG, i);
        Sleep(200);
      end;
      task.Comm.Send(WM_LOG, 'Completed');
    end,

    procedure
    begin
      btnAsync.Enabled := true;
    end,

    Parallel.TaskConfig.OnMessage(Self)
    );
Those messages are handled in the main form by providing the WM_LOG handlers.
procedure WMLog(var msg: TOmniMessage); message WM_LOG;
procedure TfrmDemoParallelTaskConfig.WMLog(var msg: TOmniMessage);
begin
  lbLog.ItemIndex := lbLog.Items.Add('BGTASK: ' + msg.MsgData);
end;

Join

Similarly to Async, Parallel.Join accepts an optional parameter of the IOmniTaskConfig type.
class procedure Join(const task1, task2: TProc; 
  taskConfig: IOmniTaskConfig = nil); overload;
class procedure Join(const task1, task2: TOmniTaskDelegate;
  taskConfig: IOmniTaskConfig = nil); overload;
class procedure Join(const tasks: array of TProc; 
  taskConfig: IOmniTaskConfig = nil); overload;
class procedure Join(const tasks: array of TOmniTaskDelegate;
  taskConfig: IOmniTaskConfig = nil); overload;
The demo application uses TaskConfig.WithLock to pass a shared lock to two Join tasks accessing a shared resource.
procedure TfrmDemoParallelTaskConfig.btnJoinClick(Sender: TObject);
begin
  FSharedValue := 42;
  Parallel.Join(
    procedure (const task: IOmniTask)
    var
      i: integer;
    begin
      for i := 1 to 1000000 do begin
        task.Lock.Acquire;
        FSharedValue := FSharedValue + 17;
        task.Lock.Release;
      end;
    end,
    procedure (const task: IOmniTask)
    var
      i: integer;
    begin
      for i := 1 to 1000000 do begin
        task.Lock.Acquire;
        FSharedValue := FSharedValue - 17;
        task.Lock.Release;
      end;
    end
    ,Parallel.TaskConfig.WithLock(CreateOmniCriticalSection)
  );
  lbLog.ItemIndex := lbLog.Items.Add(Format(
    'JOIN: Shared value = %d (should be 42)', [FSharedValue]));
end;

Future

Parallel.Future works in the same way – IOmniTaskConfig is an optional parameter.
class function Future<T>(action: TOmniFutureDelegate<T>; 
  taskConfig: IOmniTaskConfig = nil): IOmniFuture<T>; overload;
class function Future<T>(action: TOmniFutureDelegateEx<T>; 
  taskConfig: IOmniTaskConfig = nil): IOmniFuture<T>; overload;
The demo application shows how you can send a message from the Future task.
procedure TfrmDemoParallelTaskConfig.btnFutureClick(Sender: TObject);
begin
  btnFuture.Enabled := false;
  FFuture := Parallel.Future<integer>(
    function (const task: IOmniTask): integer
    begin
      Sleep(500);
      Result := 42;
      task.Comm.Send(WM_FUTURE_RESULT);
    end,
    Parallel.TaskConfig.OnMessage(Self)
  )
end;
This message is handled in the main thread by providing a WM_FUTURE_RESULT message handler.
procedure WMFutureResult(var msg: TOmniMessage); 
  message WM_FUTURE_RESULT;
procedure TfrmDemoParallelTaskConfig.WMFutureResult(var msg: TOmniMessage);
begin
  lbLog.ItemIndex := lbLog.Items.Add('FUTURE: ' + IntToStr(FFuture.Value));
  FFuture := nil;
  btnFuture.Enabled := true;
end;

Pipeline

With a Parallel.Pipeline, different configuration block can be sent to each stage.
IOmniPipeline = interface
  function  Stage(pipelineStage: TPipelineStageDelegate; 
    taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload;
  function  Stage(pipelineStage: TPipelineStageDelegateEx; 
    taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload;
  function  Stages(const pipelineStages: 
    array of TPipelineStageDelegate; 
    taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload;
  function  Stages(const pipelineStages: 
    array of TPipelineStageDelegateEx; 
    taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload;
end;
The demo application again just sends messages from pipeline stages to the main thread.
procedure TfrmDemoParallelTaskConfig.btnPipelineClick(Sender: TObject);
var
  pipeOut: IOmniBlockingCollection;
  value  : TOmniValue;
begin
  pipeOut := Parallel.Pipeline
    .Stages([PipelineStage1, PipelineStage2], 
       Parallel.TaskConfig.OnMessage(Self))
    .Run;
  while not pipeOut.TryTake(value) do
    Application.ProcessMessages;
  lbLog.ItemIndex := lbLog.Items.Add('PIPELINE: ' + 
    IntToStr(value) + ' (should be 500500)');
end;
procedure TfrmDemoParallelTaskConfig.PipelineStage1(const input, output:
    IOmniBlockingCollection; const task: IOmniTask);
var
  i: integer;
begin
  task.Comm.Send(WM_LOG, 'Pipeline stage 1 starting');
  for i := 1 to 1000 do
    output.Add(i);
  task.Comm.Send(WM_LOG, 'Pipeline stage 1 stopped');
end;

ForEach

Parallel.ForEach uses yet another approach. IOmniParallelLoop (the interface returned from the ForEach method) implements the TaskConfig function which accepts configuration block.
IOmniParallelLoop = interface
  function  TaskConfig(
    const config: IOmniTaskConfig): IOmniParallelLoop;
end; 
The demo application illustrates how to use TaskConfig in ForEach and at the same time demonstrates another interesting fact – although the interface returned from Parallel.ForEach is most of the time consumed in the Execute method, this doesn’t have to be the case. It can also be stored in a variable/field. This is especially useful if you’re running ForEach with the NoWait option. In this case, the reference to the interface returned from Parallel.ForEach must be stored somewhere until the parallel loop completes its execution.
The code below demonstrates this. Parallel.ForEach result is stored in a global FParallel field. OnStop method is used to clear out this field at the end. FParallel.Execute is then called as a separate statement.
var
    FParallel: IOmniParallelLoop<integer>;
procedure TfrmDemoParallelTaskConfig.btnForEachClick(Sender: TObject);
begin
  FParallel := Parallel.ForEach(1, 17)
    .TaskConfig(Parallel.TaskConfig.OnMessage(Self))
    .NoWait
    .OnStop(procedure begin FParallel := nil; end);
  FParallel
    .Execute(
      procedure (const task: IOmniTask; const value: integer)
      begin
        task.Comm.Send(WM_LOG, value);
      end);
end;

Fork/Join

Parallel.ForkJoin implements only a limited support for task configuration. Although you can pass the task configuration to the IOmniForkJoin interface by calling the TaskConfig method (just as in the ForEach case), the IOmniTask interface is not exposed in the parallel computations (in IOmniForkJoin.Compute tasks) and therefore cannot be used for messaging. This is caused by the ForkJoin implementation and probably wouldn’t change in the near future.
IOmniForkJoin = interface
  function  TaskConfig(const config: IOmniTaskConfig): IOmniForkJoin;
end;
Because of this limitation, demo application only uses task configuration to send a notification that fork/join method has completed its execution.
procedure TfrmDemoParallelTaskConfig.btnForkJoinClick(Sender: TObject);
var
  data: TArray<integer>;
  max : integer;
begin
  data := TArray<integer>.Create(1, 17, 4, 99, -250, 7, 13, 132, 101);
  max := ParallelMax(
    data,
    Parallel.ForkJoin<integer>
      .TaskConfig(Parallel.TaskConfig.OnTerminated(
        procedure (const task: IOmniTaskControl)
        begin
          lbLog.ItemIndex := lbLog.Items.Add(Format(
            'COMPUTE: Task %d terminated', [task.UniqueID]));
        end
      )),
    Low(data),
    High(data));
  lbLog.ItemIndex := lbLog.Items.Add('FORKJOIN: ' + IntToStr(max) + 
    ' (expected 132)');
end;

4 comments:

  1. Very nice!

    Now that I can monitor a parallel task and attach an IOmniTaskControl to it, is there any way for the code running in the parallel task to gain access to its IOmniTaskControl?

    ReplyDelete
  2. @Mason: No, why would you want to do that? Are you missing any IOmniTaskControl functionality on the IOmniTask side?

    ReplyDelete
  3. What I need is a way for a task to pause its own execution for either a specific amount of time or until an external event has completed. The obvious method, using Sleep or the WaitFor* APIs, will leave a suspended thread sitting around in the task pool taking up space.

    IOmniTaskControl provides equivalent methods that appear to make the task pool aware that this task is suspended for the moment. (Unless I'm reading this all wrong.) So I'm looking for a way to gain access to this functionality from within the task.

    ReplyDelete
  4. I believe you're reading it wrong. Which methods?

    What you're looking for is at the moment supported at the low-level API (i.e. SetTimer for timed triggers and RegisterWaitObject for WaitFor* integration). At the moment it would be quite hard to add support for those into OtlParallel as high-level tasks are not based on the TOmniWorker.

    ReplyDelete