Thursday, November 03, 2011

OmniThreadLibrary in Practice [2]–Background Worker and List Partitioning

Today’s question was asked by Žarko Gajić, the excellent maintainer of About.com’s Delphi section.
Here's my (simplified) task I would like to solve using OTL (Delphi XE) - my real-world task is somehow more complicated but can be described as:
input: a string. output: a TStringList containing characters (one per entry) of the input string.
Example: input: "delphi"
A background thread ("master") grabs the input string and splits it into several pieces (let's say 2): "del" and "phi". For each of the split strings a new thread ("child") is created that fills in the TStringList (output) with characters from the section of the string it receives.
At any time the "master" thread could be signaled to terminate (from the app's main thread) all child threads (and itself).
When everything is done the app's main thread processes the string list.
Preferably, the order of the characters should (when all ends) be 'd', 'e', 'l', 'p', 'h', 'i' (note that characters are actually items in the resulting string list).

I’m assuming that this is really just a simplified example because there is no sense in splitting short strings into character in multiple threads. In the forum we discussed the topic a little more and at the end I decided to write a background worker which waits for “split” requests and then runs multiple threads in parallel to split the input string.
My solution to this problem is stored in the SVN in the new folder called ‘examples’ in subfolder ‘stringlist parser’.

Low-level solution

One task, a background worker, is created in FormCreate and destroyed in FormDestroy. Messages from that task are routed back to the owner form. Before termination, cancelation token is signaled to cancel the current operation.
  FStringProcessor := CreateTask(StringProcessorLL).OnMessage(Self).Run;
  FStringProcessor.CancellationToken.Signal;
  FStringProcessor.Terminate(INFINITE);
  FStringProcessor := nil;
StringProcessorLL is the worker procedure for the background worker task. It waits on two events – termination request and new message signal and if there’s a new message, it processes it.
Message is read from the communication queue and passed to the BreakStringLL worker procedure. If the operation was canceled, a processing canceled message is sent to the owner. Otherwise, a resulting TStringList is sent to the owner.
procedure StringProcessorLL(const task: IOmniTask);
var
  input   : TOmniMessage;
  slOutput: TStringList;
begin
  while DSiWaitForTwoObjects(task.TerminateEvent, 
task.Comm.NewMessageEvent, false, INFINITE) = WAIT_OBJECT_1 do 
begin
    task.Comm.Receive(input);
    slOutput := TStringList.Create;
    BreakStringLL(input.MsgData.AsString, slOutput, task.CancellationToken);
    if task.CancellationToken.IsSignalled then begin
      task.Comm.Send(WM_PROCESSING_CANCELED);
      FreeAndNil(slOutput);
    end
    else
      task.Comm.Send(WM_PROCESSING_RESULT, slOutput);
  end;
end;
To start new operation, main form clears the cancelation token and sends a string to the worker over the built-in communication channel.
  FStringProcessor.CancellationToken.Clear;
  FStringProcessor.Comm.Send(0, inpString.Text);
Both messages – cancelation and completion – are routed back to the form via Windows messages.
procedure WMProcessingCanceled(var msg: TOmniMessage); message WM_PROCESSING_CANCELED;
procedure WMProcessingResult(var msg: TOmniMessage); message WM_PROCESSING_RESULT;

procedure TfrmStringListParser.WMProcessingCanceled(var msg: TOmniMessage);
begin
  lbLog.Items.Add('Canceled');
end;

procedure TfrmStringListParser.WMProcessingResult(var msg: TOmniMessage);
begin
  ShowResult(TStringList(msg.MsgData.AsObject));
end;
To cancel current request, main form just signals cancelation token. String splitters will check that and abort the current operation.
  FStringProcessor.CancellationToken.Signal;
BreakStringLL is called for each input string that arrives over the communication channel. It first decides how many threads to use (number of cores minus one; the assumption here is that one core is used to run the main thread). One string list is then created for each string-splitting subtask. It will contain the results generated from that task.
All subtasks are then started. Each gets two parameters – an output string list and string to be split into characters. [A better solution which keeps only one global copy of the string and sends only the range to be processed to the worker thread is left as an exercise for the reader.] All subtasks get the same cancelation token so that they can be canceled in one go and all are put into one task group so that the code can wait for all of them to complete execution. Tasks are executed in a thread pool to minimize thread creation overhead.
When all subtasks are completed, partial results are collected into one TStringList object.
procedure BreakStringLL(s: string; const slOutput: TStringList; const cancellationToken: IOmniCancellationToken);
var
  breakTasks  : IOmniTaskGroup;
  charsPerTask: integer;
  iTask       : integer;
  numTasks    : integer;
  sPartial    : string;
  taskResults : array of TStringList;
begin
  numTasks := Environment.Process.Affinity.Count - 1;

  SetLength(taskResults, numTasks);
  for iTask := Low(taskResults) to High(taskResults) do
    taskResults[iTask] := TStringList.Create;

  breakTasks := CreateTaskGroup;
  for iTask := 1 to numTasks do begin
    // divide the remaining part in as-equal-as-possible segments
    charsPerTask := Round(Length(s) / (numTasks - iTask + 1));
    CreateTask(BreakStringLLTask)
      .SetParameter('Output', taskResults[iTask-1])
      .SetParameter('Job',    Copy(s, 1, charsPerTask))
      .CancelWith(cancellationToken)
      .Join(breakTasks)
      .Schedule;
    Delete(s, 1, charsPerTask);
  end;
  breakTasks.WaitForAll(INFINITE);

  for iTask := Low(taskResults) to High(taskResults) do begin
    for sPartial in taskResults[iTask] do
      slOutput.Add(sPartial);
    taskResults[iTask].Free;
  end;
end;
String breaker subtask is trivial – it just fetches both parameters and runs SplitPartialList.
procedure BreakStringLLTask(const task: IOmniTask);
var
  job   : string;
  param : TOmniValue;
  result: TStringList;
begin
  param := task.Param['Output']; result := TStringList(param.AsObject);
  param := task.Param['Job'];    job := param.AsString;
  SplitPartialList(job, result, task.CancellationToken);
end;
Actual string breaking is implemented as a standalone procedure because it is used from both the low-level and high-level code. It checks each input character and signals the cancelation token if the character is an exclamation mark. (This is implemented just as a cancelation testing mechanism.) It exits if the cancelation token is signaled. At the end, Sleep(100) simulates heavy processing and allows the user to click the Cancel button in the GUI before the operation is completed.
procedure SplitPartialList(const input: string; output: TStringList;
  const cancel: IOmniCancellationToken);
var
  ch: char;
begin
  for ch in input do begin
    if ch = '!' then // for testing
      cancel.Signal;
    if cancel.IsSignalled then
      break; //for ch
    output.Add(ch);
    Sleep(100); // simulate workload
  end;
end;

High-level solution

High-level solution is very similar to the low-level one. It requires the current version of the OmniThreadLibrary from the SVN – it won’t work with release 2.2!
A pipeline is used to simulate background worker and a separate cancelation token is used to cancel the current operation. Pipeline.Cancel cannot be used for this purpose because it puts all pipeline queues into “completed” state which cannot be reversed to “normal” state. [IOW, if Pipeline.Cancel is used, the code would have to destroy and recreate the pipeline after each cancelation request.]
  FPipelineCancel := CreateOmniCancellationToken;
  FPipeline := Parallel.Pipeline.Stage(StringProcessorHL,
    Parallel.TaskConfig.OnMessage(Self)).Run;
Destruction is almost the same as the low-level code …
  FPipelineCancel.Signal;
  FPipeline.Input.CompleteAdding;
  FPipeline.WaitFor(INFINITE);
  FPipeline := nil;
… as is the code that starts new work item …
  FPipelineCancel.Clear;
  FPipeline.Input.Add(inpString.Text);
… and the code that cancels the current work.
  FPipelineCancel.Signal;
Result processing is identical – same message handling methods are used as above.
High-level background worker reads requests from the input collection (FPipeline.Input.CompleteAdding above will cause this for..in to terminate when the process is closed), sends strings to the breaker procedure and reports data to the owner.
procedure TfrmStringListParser.StringProcessorHL(const inputQueue, outputQueue:
  IOmniBlockingCollection; const task: IOmniTask);
var
  input   : TOmniValue;
  slResult: TStringList;
begin
  for input in inputQueue do begin
    slResult := TStringList.Create;
    BreakStringHL(input.AsString, slResult, FPipelineCancel);
    if FPipelineCancel.IsSignalled then begin
      task.Comm.Send(WM_PROCESSING_CANCELED);
      FreeAndNil(slResult);
    end
    else
      task.Comm.Send(WM_PROCESSING_RESULT, slResult);
  end;
end;
BreakStringHL uses the same approach as above to create N partial string lists and N workers which achieve each their own substring of the original string. [Again, using a global string and partial indices would be a better solution.] This time, subtasks are implemented using the ParallelTask abstraction. A blocking collection is used to send substrings (i.e., workload) to all subtasks.
procedure BreakStringHL(input: string; output: TStringList; cancel: IOmniCancellationToken);
var
  charsPerTask : integer;
  iTask        : integer;
  numTasks     : integer;
  partialQueue : IOmniBlockingCollection;
  s            : string;
  stringBreaker: IOmniParallelTask;
  taskResults  : array of TStringList;
begin
  partialQueue := TOmniBlockingCollection.Create;
  numTasks := Environment.Process.Affinity.Count - 1;

  SetLength(taskResults, numTasks);
  for iTask := Low(taskResults) to High(taskResults) do
    taskResults[iTask] := TStringList.Create;

  stringBreaker := Parallel.ParallelTask.NumTasks(numTasks).NoWait
    .TaskConfig(Parallel.TaskConfig.CancelWith(cancel))
    .Execute(
      procedure
      var
        workItem: TOmniValue;
      begin
        workItem := partialQueue.Next;
        SplitPartialList(string(workItem[1]), 
taskResults[integer(workItem[0])], cancel);
      end
    );

  // provide input to the ForEach loop above
  for iTask := 1 to numTasks do begin
    // divide the remaining part in as-equal-as-possible segments
    charsPerTask := Round(Length(input) / (numTasks - iTask + 1));
    partialQueue.Add(TOmniValue.Create([iTask-1, 
Copy(input, 1, charsPerTask)]));
    Delete(input, 1, charsPerTask);
  end;

  // process output
  stringBreaker.WaitFor(INFINITE);
  for iTask := Low(taskResults) to High(taskResults) do begin
    for s in taskResults[iTask] do
      output.Add(s);
    taskResults[iTask].Free;
  end;
end;
As you can see, the high-level solution is not much shorter than the low-level one (if any) due to the nature of the problem. Still, it was instructive to do both because this high-level solution showed to me that the OmniThreadLibrary is missing at least one high-level abstraction – a background worker.

Background worker

Background worker will encapsulate an asynchronous task which will accept work items and deliver them back to the caller. The central idea will be the possibility of individual cancelation – the concept will allow each work item to be canceled independently of others (and you’ll have the option to cancel all work items).
Current syntax (which will likely change before the release) is:
FBackgroundWorker := Parallel.BackgroundWorker
  .TaskConfig(...)
  .StopOn(cancellationToken)
  .OnRequestDone(HandleRequestDone)
  .Execute(
    procedure (workItem: IOmniWorkItem)
    begin
      // input: workItem.Data
      // output: workItem.Result
    end
  );

FBackgroundWorker.Terminate;
FBackgroundWorker := nil;

FBackupWorker.Schedule(
  FBackgroundWorker.CreateWorkItem(data));

procedure HandleRequestDone(workItem: IOmniWorkItem);
begin
  // check workItem.IsCanceled
  // process workItem.Result
end;

IOmniWorkItem = interface
  UniqueID: int64;
  Data: TOmniValue;
  Result: TOmniValue;
  procedure Cancel;
  function IsCanceled: boolean;
end;

FBackgroundWorker.Cancel;
When this is implemented, I’ll post another version of high-level solution to the Žarko’s question.

3 comments:

  1. Primož,

    Again thanks much for your solution.

    Here's my initial attempt using TThread class:
    http://delphi.about.com/od/kbthread/a/threaded-delphi-string-parser.htm

    -žarko

    ReplyDelete
  2. Shouldn't "FBackupWorker.Schedule" be actually "FBackgroundWorker.Schedule"?

    ReplyDelete
  3. Yes, of course.

    ReplyDelete