Here's my (simplified) task I would like to solve using OTL (Delphi XE) - my real-world task is somehow more complicated but can be described as:
input: a string. output: a TStringList containing characters (one per entry) of the input string.
Example: input: "delphi"
A background thread ("master") grabs the input string and splits it into several pieces (let's say 2): "del" and "phi". For each of the split strings a new thread ("child") is created that fills in the TStringList (output) with characters from the section of the string it receives.
At any time the "master" thread could be signaled to terminate (from the app's main thread) all child threads (and itself).
When everything is done the app's main thread processes the string list.
Preferably, the order of the characters should (when all ends) be 'd', 'e', 'l', 'p', 'h', 'i' (note that characters are actually items in the resulting string list).
I’m assuming that this is really just a simplified example because there is no sense in splitting short strings into character in multiple threads. In the forum we discussed the topic a little more and at the end I decided to write a background worker which waits for “split” requests and then runs multiple threads in parallel to split the input string.
My solution to this problem is stored in the SVN in the new folder called ‘examples’ in subfolder ‘stringlist parser’.
Low-level solutionOne task, a background worker, is created in FormCreate and destroyed in FormDestroy. Messages from that task are routed back to the owner form. Before termination, cancelation token is signaled to cancel the current operation.
FStringProcessor := CreateTask(StringProcessorLL).OnMessage(Self).Run;
FStringProcessor.CancellationToken.Signal; FStringProcessor.Terminate(INFINITE); FStringProcessor := nil;StringProcessorLL is the worker procedure for the background worker task. It waits on two events – termination request and new message signal and if there’s a new message, it processes it.
Message is read from the communication queue and passed to the BreakStringLL worker procedure. If the operation was canceled, a processing canceled message is sent to the owner. Otherwise, a resulting TStringList is sent to the owner.
procedure StringProcessorLL(const task: IOmniTask); var input : TOmniMessage; slOutput: TStringList; begin while DSiWaitForTwoObjects(task.TerminateEvent, task.Comm.NewMessageEvent, false, INFINITE) = WAIT_OBJECT_1 do begin task.Comm.Receive(input); slOutput := TStringList.Create; BreakStringLL(input.MsgData.AsString, slOutput, task.CancellationToken); if task.CancellationToken.IsSignalled then begin task.Comm.Send(WM_PROCESSING_CANCELED); FreeAndNil(slOutput); end else task.Comm.Send(WM_PROCESSING_RESULT, slOutput); end; end;To start new operation, main form clears the cancelation token and sends a string to the worker over the built-in communication channel.
FStringProcessor.CancellationToken.Clear; FStringProcessor.Comm.Send(0, inpString.Text);Both messages – cancelation and completion – are routed back to the form via Windows messages.
procedure WMProcessingCanceled(var msg: TOmniMessage); message WM_PROCESSING_CANCELED; procedure WMProcessingResult(var msg: TOmniMessage); message WM_PROCESSING_RESULT; procedure TfrmStringListParser.WMProcessingCanceled(var msg: TOmniMessage); begin lbLog.Items.Add('Canceled'); end; procedure TfrmStringListParser.WMProcessingResult(var msg: TOmniMessage); begin ShowResult(TStringList(msg.MsgData.AsObject)); end;To cancel current request, main form just signals cancelation token. String splitters will check that and abort the current operation.
FStringProcessor.CancellationToken.Signal;BreakStringLL is called for each input string that arrives over the communication channel. It first decides how many threads to use (number of cores minus one; the assumption here is that one core is used to run the main thread). One string list is then created for each string-splitting subtask. It will contain the results generated from that task.
All subtasks are then started. Each gets two parameters – an output string list and string to be split into characters. [A better solution which keeps only one global copy of the string and sends only the range to be processed to the worker thread is left as an exercise for the reader.] All subtasks get the same cancelation token so that they can be canceled in one go and all are put into one task group so that the code can wait for all of them to complete execution. Tasks are executed in a thread pool to minimize thread creation overhead.
When all subtasks are completed, partial results are collected into one TStringList object.
procedure BreakStringLL(s: string; const slOutput: TStringList; const cancellationToken: IOmniCancellationToken); var breakTasks : IOmniTaskGroup; charsPerTask: integer; iTask : integer; numTasks : integer; sPartial : string; taskResults : array of TStringList; begin numTasks := Environment.Process.Affinity.Count - 1; SetLength(taskResults, numTasks); for iTask := Low(taskResults) to High(taskResults) do taskResults[iTask] := TStringList.Create; breakTasks := CreateTaskGroup; for iTask := 1 to numTasks do begin // divide the remaining part in as-equal-as-possible segments charsPerTask := Round(Length(s) / (numTasks - iTask + 1)); CreateTask(BreakStringLLTask) .SetParameter('Output', taskResults[iTask-1]) .SetParameter('Job', Copy(s, 1, charsPerTask)) .CancelWith(cancellationToken) .Join(breakTasks) .Schedule; Delete(s, 1, charsPerTask); end; breakTasks.WaitForAll(INFINITE); for iTask := Low(taskResults) to High(taskResults) do begin for sPartial in taskResults[iTask] do slOutput.Add(sPartial); taskResults[iTask].Free; end; end;String breaker subtask is trivial – it just fetches both parameters and runs SplitPartialList.
procedure BreakStringLLTask(const task: IOmniTask); var job : string; param : TOmniValue; result: TStringList; begin param := task.Param['Output']; result := TStringList(param.AsObject); param := task.Param['Job']; job := param.AsString; SplitPartialList(job, result, task.CancellationToken); end;Actual string breaking is implemented as a standalone procedure because it is used from both the low-level and high-level code. It checks each input character and signals the cancelation token if the character is an exclamation mark. (This is implemented just as a cancelation testing mechanism.) It exits if the cancelation token is signaled. At the end, Sleep(100) simulates heavy processing and allows the user to click the Cancel button in the GUI before the operation is completed.
procedure SplitPartialList(const input: string; output: TStringList; const cancel: IOmniCancellationToken); var ch: char; begin for ch in input do begin if ch = '!' then // for testing cancel.Signal; if cancel.IsSignalled then break; //for ch output.Add(ch); Sleep(100); // simulate workload end; end;
High-level solutionHigh-level solution is very similar to the low-level one. It requires the current version of the OmniThreadLibrary from the SVN – it won’t work with release 2.2!
A pipeline is used to simulate background worker and a separate cancelation token is used to cancel the current operation. Pipeline.Cancel cannot be used for this purpose because it puts all pipeline queues into “completed” state which cannot be reversed to “normal” state. [IOW, if Pipeline.Cancel is used, the code would have to destroy and recreate the pipeline after each cancelation request.]
FPipelineCancel := CreateOmniCancellationToken; FPipeline := Parallel.Pipeline.Stage(StringProcessorHL, Parallel.TaskConfig.OnMessage(Self)).Run;Destruction is almost the same as the low-level code …
FPipelineCancel.Signal; FPipeline.Input.CompleteAdding; FPipeline.WaitFor(INFINITE); FPipeline := nil;… as is the code that starts new work item …
FPipelineCancel.Clear; FPipeline.Input.Add(inpString.Text);… and the code that cancels the current work.
FPipelineCancel.Signal;Result processing is identical – same message handling methods are used as above.
High-level background worker reads requests from the input collection (FPipeline.Input.CompleteAdding above will cause this for..in to terminate when the process is closed), sends strings to the breaker procedure and reports data to the owner.
procedure TfrmStringListParser.StringProcessorHL(const inputQueue, outputQueue: IOmniBlockingCollection; const task: IOmniTask); var input : TOmniValue; slResult: TStringList; begin for input in inputQueue do begin slResult := TStringList.Create; BreakStringHL(input.AsString, slResult, FPipelineCancel); if FPipelineCancel.IsSignalled then begin task.Comm.Send(WM_PROCESSING_CANCELED); FreeAndNil(slResult); end else task.Comm.Send(WM_PROCESSING_RESULT, slResult); end; end;BreakStringHL uses the same approach as above to create N partial string lists and N workers which achieve each their own substring of the original string. [Again, using a global string and partial indices would be a better solution.] This time, subtasks are implemented using the ParallelTask abstraction. A blocking collection is used to send substrings (i.e., workload) to all subtasks.
procedure BreakStringHL(input: string; output: TStringList; cancel: IOmniCancellationToken); var charsPerTask : integer; iTask : integer; numTasks : integer; partialQueue : IOmniBlockingCollection; s : string; stringBreaker: IOmniParallelTask; taskResults : array of TStringList; begin partialQueue := TOmniBlockingCollection.Create; numTasks := Environment.Process.Affinity.Count - 1; SetLength(taskResults, numTasks); for iTask := Low(taskResults) to High(taskResults) do taskResults[iTask] := TStringList.Create; stringBreaker := Parallel.ParallelTask.NumTasks(numTasks).NoWait .TaskConfig(Parallel.TaskConfig.CancelWith(cancel)) .Execute( procedure var workItem: TOmniValue; begin workItem := partialQueue.Next; SplitPartialList(string(workItem), taskResults[integer(workItem)], cancel); end ); // provide input to the ForEach loop above for iTask := 1 to numTasks do begin // divide the remaining part in as-equal-as-possible segments charsPerTask := Round(Length(input) / (numTasks - iTask + 1)); partialQueue.Add(TOmniValue.Create([iTask-1, Copy(input, 1, charsPerTask)])); Delete(input, 1, charsPerTask); end; // process output stringBreaker.WaitFor(INFINITE); for iTask := Low(taskResults) to High(taskResults) do begin for s in taskResults[iTask] do output.Add(s); taskResults[iTask].Free; end; end;As you can see, the high-level solution is not much shorter than the low-level one (if any) due to the nature of the problem. Still, it was instructive to do both because this high-level solution showed to me that the OmniThreadLibrary is missing at least one high-level abstraction – a background worker.
Background workerBackground worker will encapsulate an asynchronous task which will accept work items and deliver them back to the caller. The central idea will be the possibility of individual cancelation – the concept will allow each work item to be canceled independently of others (and you’ll have the option to cancel all work items).
Current syntax (which will likely change before the release) is:
FBackgroundWorker := Parallel.BackgroundWorker .TaskConfig(...) .StopOn(cancellationToken) .OnRequestDone(HandleRequestDone) .Execute( procedure (workItem: IOmniWorkItem) begin // input: workItem.Data // output: workItem.Result end ); FBackgroundWorker.Terminate; FBackgroundWorker := nil; FBackupWorker.Schedule( FBackgroundWorker.CreateWorkItem(data)); procedure HandleRequestDone(workItem: IOmniWorkItem); begin // check workItem.IsCanceled // process workItem.Result end; IOmniWorkItem = interface UniqueID: int64; Data: TOmniValue; Result: TOmniValue; procedure Cancel; function IsCanceled: boolean; end; FBackgroundWorker.Cancel;When this is implemented, I’ll post another version of high-level solution to the Žarko’s question.