Tuesday, October 23, 2012

Generating Reports with Background Worker

Long time ago, in August, JimVern asked on the OmniThreadLibrary forum:

I am creating a Windows Service to handle report requests for our clients.  Each client has multiple users who may run a report.  I want to serialize the report requests for one client so that only one report will run for a client at a time (FIFO), but reports can run simultaneously for different clients.

Since users can submit report requests anytime, I need to be able to append a new request to the end of an existing client FIFO queue without interrupting the execution of the current report.  Or, if the client doesn't have a queue yet (e.g., it's the first report request), then I need to be able to add a client queue without disrupting any reports running for other clients.  Once a client queue is empty, I want to destroy the client queue.

I am new to OTL, but it looks like it will easily do what I need. I'm just not sure where to focus my attention. Is there a way to create named task groups that can execute the top task of each group simultaneously, where tasks can be appended to an existing group, and where groups can auto-terminate when the tasks are all executed? Perhaps there an example project or a similar post that I can look at to get started? I'm still somewhat of a novice when it comes to threads, so any advise is also welcome.

My answer at that time was just a pointer in the correct direction:

Instead of running multiple tasks for one client, why don't you consider running one task per client and sending workload requests to that task?

You could then schedule each such task into a thread pool. The task would run in a loop and process requests. When it runs out of requests, it would shut down. The main program would be responsible for starting a new task if not already running. [There's a small race condition here - you would have to maintain a shared structure enumerating live tasks that would be modified from the task code and from the main program and it would have to be locked-accessed.]

I promised Jim a working solution but then I got lost in all sorts of other, more important projects and it was only few days ago that I was able to finish the job. Sorry, Jim :(

An example project is available in the OmniThreadLibrary repository in folder examples/report generator. This post describes the inner workings of that project.

Report Generator Interface

Report generator manager (that is, the part that dynamically creates and destroys background workers and distributes the load) is implemented in the ReportGeneratorEngine unit. The job of generating a report is simulated by calling the Sleep function.

ReportGeneratorEngine implements a simple interface which is used by the main program (reportGenerator1) to schedule reports.

type
IReportGenerator = interface;

IReportInfo = interface ['{CDF09A38-11B0-4571-8908-AA5486D94A9A}']
property ClientName: string;
property ReportName: string;
property ReportDelay_ms: integer;
property WorkerThread: cardinal;
end;

TReportGeneratorWorkerEvent = reference to procedure(
Sender: IReportGenerator; const clientName: string);

TReportGeneratorRequestDoneEvent = reference to procedure(
Sender: IReportGenerator; const reportInfo: IReportInfo);

IReportGenerator = interface
procedure Schedule(const clientName, reportName: string;
reportDelay_ms: integer);
property OnCreateWorker: TReportGeneratorWorkerEvent;
property OnDestroyWorker: TReportGeneratorWorkerEvent;
property OnRequestDone_Asy: TReportGeneratorRequestDoneEvent;
end;
function CreateReportGenerator: IReportGenerator;

[I have removed getters/setters from this example to make the code easier to understand.]

Application calls the Schedule method to put a report into a background queue, providing a name of the client (used to distribute reports into appropriate queues), name of the report (used for logging) and a duration passed to the Sleep call inside the worker (used for simulation). Three events are defined. OnCreateWorker is called when a background worker is created and OnDestroyWorker when it is destroyed. (Both are used for logging only.) OnRequestDone_Asy is called whenever a report is generated and is called from the context of the worker thread.

The reason for this complication is that I didn’t want to depend on Windows message processing in the report generator owner as the original problem stated that the program will be running inside a Windows service. As synchronous versions of Background Worker (the workhorse inside the report generator) use OTL messaging to execute in the correct thread and all messaging in the ‘up’ direction inside the OmniThreadLibrary uses Windows messages, I simply couldn’t use this mechanism. The problem of receiving notifications in the context of a background thread must therefore be solved in the report generator owner (and I did provide a sample solution in the report generator project).

You may have noticed that event handlers are declared as ‘reference to procedure’. That way I’m completely flexible when I want to declare an event handler – it can be a procedure, a method or an anonymous method.

Main Program

Main program is quite simple. Start button handler creates a report generator and assigns event handlers. Handlers for OnCreateWorker and OnDestroyWorker are merely logging the information while the OnRequestDone_Asy handler passes completion report to the main thread by sending a message. Reference count on the message interface is incremented before the send so that the reportInfo object is not destroyed while the message is in progress.

Correspondingly, WMReportDone – the message handler for the WM_REPORT_DONE event – converts WParam back to an IReportInfo interface, decrements the reference count and displays a message.

procedure TfrmReportGenerator.btnStartClick(Sender: TObject);
begin
if not assigned(FReportGenerator) then begin
FReportGenerator := CreateReportGenerator;
FReportGenerator.OnCreateWorker :=
procedure(Sender: IReportGenerator; const clientName: string)
begin
lbLog.Items.Add(Elapsed + 'Created worker for ' + clientName);
end;
FReportGenerator.OnDestroyWorker :=
procedure(Sender: IReportGenerator; const clientName: string)
begin
lbLog.Items.Add(Elapsed + 'Destroyed worker for ' + clientName);
end;
FReportGenerator.OnRequestDone_Asy :=
procedure(Sender: IReportGenerator; const reportInfo: IReportInfo)
begin
reportInfo._AddRef;
PostMessage(frmReportGenerator.Handle, WM_REPORT_DONE,
WParam(reportInfo), 0);
end;
end;
btnStart.Enabled := false;
btnStop.Enabled := true;
StartSimulator;
end;

procedure TfrmReportGenerator.btnStopClick(Sender: TObject);
begin
StopSimulator;
FReportGenerator.Stop;
FReportGenerator := nil;
btnStart.Enabled := true;
btnStop.Enabled := false;
end;

procedure TfrmReportGenerator.WMReportDone(var msg: TMessage);
var
reportInfo: IReportInfo;
begin
reportInfo := IReportInfo(msg.WParam);
reportInfo._Release;
lbLog.Items.Add(Format(Elapsed +
'Completed report %s for client %s; worker thread %d',
[reportInfo.ReportName, reportInfo.ClientName, reportInfo.WorkerThread]));
end;

procedure TfrmReportGenerator.FormCloseQuery(Sender: TObject;
var CanClose: boolean);
begin
if assigned(FReportGenerator) then begin
FReportGenerator.OnRequestDone_Asy := nil;
btnStopClick(nil);
end;
CanClose := true;
end;

Report generator is destroyed from the Stop button handler. Same code is also executed from the OnCloseQuery handler (just in case user forgot to Stop the report generator) but in this case OnRequestDone_Asy event handler is disabled as we don’t want to post messages to the form while the application is being shut down.

You have probably noticed that Start and Stop event handlers also start and stop ‘a simulator’. I’ll be returning to that later.

Report Generator Engine

Report generator engine internally uses a dictionary (FWorkerDict) mapping client names into background worker descriptors. The TReportWorkerInfo record stores a reference to a background worker abstraction and a counter representing number of scheduled reports for that client.

type
TReportWorkerInfo = record
Worker : IOmniBackgroundWorker;
WorkItemCount: IOmniCounter;
end;
constructor TReportGenerator.Create;
begin
inherited;
FWorkerDict := TDictionary<string,TReportWorkerInfo>.Create;
FDestroyList := TStringList.Create;
GlobalParallelPool.MaxExecuting := -1; //unlimited tasks
end;

destructor TReportGenerator.Destroy;
begin
Stop;
FreeAndNil(FDestroyList);
FreeAndNil(FWorkerDict);
inherited;
end;
procedure TReportGenerator.Stop;
var
dictItem: TPair<string,TReportWorkerInfo>;
begin
for dictItem in FWorkerDict do // tell all workers to stop
dictItem.Value.Worker.Terminate(0);
for dictItem in FWorkerDict do // wait for all workers to stop
dictItem.Value.Worker.Terminate(INFINITE);
end;

Background workers are stopped in two passes. First pass merely marks the worker as stopped so that it will not accept new report generation requests and second pass actually waits for workers to finish the current report and stop.

When scheduling a new report, Schedule first retrieves or creates a worker for the clientName client (worker in this case being a background worker abstraction) and then schedules new work item to be processed in that worker.

procedure TReportGenerator.Schedule(const clientName, reportName: string;
reportDelay_ms: integer);
var
data : IReportInfo;
worker: IOmniBackgroundWorker;
begin
worker := AcquireWorker(clientName);
data := TReportInfo.Create(clientName, reportName, reportDelay_ms);
worker.Schedule(worker.CreateWorkItem(data));
end;

Putting the details of worker creation (AcquireWorker) aside for a moment, the next important method in the life of a report is the GenerateReport method which executes in a background thread and creates the report (or, in our case, sleeps for a specified time).

procedure TReportGenerator.GenerateReport(const workItem: IOmniWorkItem);
var
reportInfo: IReportInfo;
begin
reportInfo := workItem.Data.AsInterface as IReportInfo;
// Simulate heavy work
Sleep(reportInfo.ReportDelay_ms);
// Return result
(reportInfo as IReportInfoEx).SetWorkerThread(GetCurrentThreadID);
workItem.Result := reportInfo;
end;

When a report is ‘generated’, RequestDone_Asy is called from the context of the background worker thread. It releases the worker (details to follow in a moment) and calls the OnRequestDone_Asy event handler.

procedure TReportGenerator.RequestDone_Asy(const Sender: IOmniBackgroundWorker;
const workItem: IOmniWorkItem);
var
reportInfo: IReportInfo;
begin
reportInfo := (workItem.Result.AsInterface as IReportInfo);
ReleaseWorker_Asy(reportInfo.ClientName);
if assigned(FOnRequestDone_Asy) then
FOnRequestDone_Asy(Self, reportInfo);
end;

A Life Cycle of a Background Worker

The AcquireWorker function returns background worker for a client. If necessary, the worker will be created first.

function TReportGenerator.AcquireWorker(
const clientName: string): IOmniBackgroundWorker;
var
workerInfo: TReportWorkerInfo;
begin
FLock.Acquire;
try
if not FWorkerDict.TryGetValue(clientName, workerInfo) then begin
workerInfo.Worker :=
Parallel.BackgroundWorker.NumTasks(1)
.OnRequestDone_Asy(RequestDone_asy)
.Execute(GenerateReport);
workerInfo.WorkItemCount := CreateCounter(0);
FWorkerDict.Add(clientName, workerInfo);
if assigned(FOnCreateWorker) then
FOnCreateWorker(Self, clientName);
end;
workerInfo.WorkItemCount.Increment;
ProcessDestroyList(clientName);
Result := workerInfo.Worker;
finally FLock.Release; end;
end;

As the FWorkerDict may be accessed from multiple threads (the one calling the Schedule method and from various background worker threads via the ReleaseWork_Asy method), its internals are protected by a critical section.

The code first tries to fetch background worker information from the dictionary. If not successful, a new background worker is created and added to the dictionary. An OnCreateWorker event is also triggered at that point.

In all cases a number of work items (WorkItemCount) is incremented, and a ‘destroy list’ (more about that in a moment) is processed.

When a report is created, ReleaseWorker_Asy is called via the RequestDone_Asy. A number of work items associated with the worker is decrement and if it falls to zero, the worker is added to a ‘destroy list’. Again, everything happens under the protection of a critical section.

procedure TReportGenerator.ReleaseWorker_Asy(const clientName: string);
var
workerInfo: TReportWorkerInfo;
begin
FLock.Acquire;
try
workerInfo := FWorkerDict[clientName];
if workerInfo.WorkItemCount.Decrement = 0 then
FDestroyList.Add(clientName);
finally FLock.Release; end;
end;

Because the ReleaseWorker_Asy is called from the background worker’s notification handler, we cannot destroy the worker at that point. The next best thing to do is to add it to a list, which is processed every time a new report is scheduled.

ProcessDestroyList is called from the AcquireWorker and accepts a parameter holding the name of the ‘active’ client – the one that was passed to the AcquireWorker function. Obviously, we don’t want to destroy the worker for this client even if it can be found on the destroy list as it will be needed in a moment.

Except for that limitation, ProcessDestroyList iterates over the destroy list and destroys all background workers associated with client names on the list. It also calls the OnDestroyWorker event for each destroyed worker.

procedure TReportGenerator.ProcessDestroyList(const scheduledClient: string);
var
clientName: string;
workerInfo: TReportWorkerInfo;
begin
// This methods is only called when FLock is acquired
// 'ScheduledClient' contains name of the client being scheduled
// which must not be destroyed

for clientName in FDestroyList do begin
if clientName = scheduledClient then
continue;
workerInfo := FWorkerDict[clientName];
workerInfo.Worker.Terminate(INFINITE);
workerInfo.Worker := nil;
workerInfo.WorkItemCount := nil;
FWorkerDict.Remove(clientName);
if assigned(FOnDestroyWorker) then
FOnDestroyWorker(Self, clientName);
end;
FDestroyList.Clear;
end;

Simulator

Main program includes a simulator which uses a TTimer to schedule reports at predefined intervals. The script (marked in yellow) and corresponding events are shown in the table below.

image

3 comments:

  1. This sounds like a job for a database, a service and background processes. With background processes there would be no threaded hassles as some report writers are not thread safe, errant processes won't hose the main service, there is built in clean up when the process is finished as it will exit memory and the system could easily scale across multiple servers if needed.

    ReplyDelete
  2. Destroy workers on the 'destroy' list (Client1) never appears

    Calling a cleanup when nothing is running may solve that ?

    procedure TReportGenerator.Cleanup;
    begin
    ProcessDestroyList('');
    end;

    ReplyDelete