Sunday, July 27, 2008

OmniThreadLibrary alpha

WiP OmniThreadLibrary is slowly progressing to the alpha stage. Most of the interfaces have been defined and most of the functionality that I want to see in the 1.0 release is present and tested. I have not yet decided what to do with the messaging (current Variant-based solution can be rough on the edges sometimes) but rest of the stuff is mostly completed.

Most important changes since the last release:

  • First of all, big thanks to fellow Slovenian programmer Lee_Nover (the guy who wrote the SpinLock unit, BTW) who did a major cleanup job of the OTL code:
    • Added const prefix to parameters on many places (mostly when interfaces were passed - I tend to miss that). OTL should be slightly faster because of that.
    • Removed CreateTask(TOmniWorker) as CreateTask(IOmniWorker) is good enough for all occasions.
    • Cleaned the tests\* tree and fixed tests to compile with new Otl* units.
    • Removed IOmniTaskControl.FreeOnTerminate as it's no longer needed.
  • Created project group containing all test projects. To make it work, all projects in the tests tree were renamed.
  • *** IMPORTANT *** OtlTaskEvents unit was renamed to OtlEventMonitor and TOtlTaskEventDispatcher component was renamed to TOmniEventMonitor. Recompile the package!
  • *** IMPORTANT *** If you're working with a snapshot, you should delete the OmniThreadLibrary folder before unpacking new version as many files were renamed since the last snapshot!
  • Task UniqueID is now int64 (was integer). Somehow I wasn't happy with the fact that unique ID would roll over after merely 4,3 billion tasks ...
  • Thread priority can be controlled with IOmniTaskControl.SetPriority.
  • Exceptions are trapped and mapped into EXIT_EXCEPTION predefined exit code (OtlCommon.pas). Test in 13_Exceptions.
  • IOmniTaskControl.WithLock provides a simple way to share one lock (critical section, spinlock) between task owner and task worker. Lock can be accessed via the Lock property, implemented in IOmniTaskControl and IOmniTask interfaces. See the 12_Lock test for the example.
  • Eye candy - instead of calling omniTaskEventDispatch.Monitor(task) you can now use task.MonitorWith(omniTaskEventDispatch). The old way is still available. An example from the (new) 8_RegisterComm test:
procedure TfrmTestOtlComm.FormCreate(Sender: TObject);
begin
FCommChannel := CreateTwoWayChannel(1024);
FClient1 := CreateTask(TCommTester.Create(FCommChannel.Endpoint1, 1024))
.MonitorWith(OmniTED)
.Run;
FClient2 := CreateTask(TCommTester.Create(FCommChannel.Endpoint2, 1024))
.MonitorWith(OmniTED)
.Run;
end;
  • IOmniTaskControl.TerminateWhen enables you to add additional termination event to the task. One event can be shared between multiple tasks, allowing you to stop them all at once. Example in the 14_TerminateWhen.
  • Basic support for task groups has been added. At the moment you can start/stop multiple tasks and not much more. To add a task to a group, use group.Add(task) or task.Join(group) syntax. Demo in 15_TaskGroup.
IOmniTaskGroup = interface ['{B36C08B4-0F71-422C-8613-63C4D04676B7}']
function Remove(taskControl: IOmniTaskControl): IOmniTaskGroup;
function Add(taskControl: IOmniTaskControl): IOmniTaskGroup;
function RunAll: IOmniTaskGroup;
function TerminateAll(maxWait_ms: cardinal = INFINITE): boolean;
function WaitForAll(maxWait_ms: cardinal = INFINITE): boolean;
end; { IOmniTaskGroup }
  • Added support for task chaining. When one task is completed, next task is started. An excerpt from the 16_ChainTo demo:
procedure TfrmTestTaskGroup.btnStartTasksClick(Sender: TObject);
var
task1: IOmniTaskControl;
task2: IOmniTaskControl;
task3: IOmniTaskControl;
taskA: IOmniTaskControl;
begin
lbLog.Clear;
task3 := CreateTask(BgTask, '3').MonitorWith(OmniTED);
task2 := CreateTask(BgTask, '2').MonitorWith(OmniTED).ChainTo(task3);
task1 := CreateTask(BgTask, '1').MonitorWith(OmniTED).ChainTo(task2);
taskA := CreateTask(BgTask, 'A').MonitorWith(OmniTED).ChainTo(
CreateTask(BgTask, 'B').MonitorWith(OmniTED).ChainTo(
CreateTask(BgTask, 'C').MonitorWith(OmniTED)));
task1.Run; taskA.Run;
end;
  • A thread pool has been implemented.  Demo in 11_ThreadPool. I'll write a longer article on using it, but for now the demo should do.

As usual, code is available in the repository and as a snapshot.

14 comments:

  1. Kudos for you and Lee Nover, this had helped me sometime.


    Cesar Romero

    ReplyDelete
  2. Hi,

    I find your library very interesting, but I have some trouble using it. I have a task created from a method with a loop, and in the loop I check for task.terminated. But when I call Terminate from the main thread, the main thread locks until my task has finished. task.terminated never gets set. Why is that? Am I using it all wrong?

    regards,
    -Vegar

    ReplyDelete
  3. Show me the code.

    ReplyDelete
  4. This is a simplification of my case:

    unit Unit1;

    interface

    uses
    Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
    Dialogs, OtlTaskControl, OtlEventMonitor, OtlTask, StdCtrls;

    type
    TForm1 = class(TForm)
    Button1: TButton;
    Button2: TButton;
    Label1: TLabel;
    procedure Button1Click(Sender: TObject);
    procedure Button2Click(Sender: TObject);
    procedure FormCreate(Sender: TObject);
    procedure FTaskMessage(const task: IOmniTaskControl);
    procedure FTaskTerminated(const task: IOmniTaskControl);
    private
    { Private declarations }
    FTaskControl: IOmniTaskControl;
    FEventMonitor: TOmniEventMonitor;
    procedure UpdateExecutor(const task: IOmniTask);
    public
    { Public declarations }
    end;

    var
    Form1: TForm1;

    implementation

    uses
    OtlComm;

    {$R *.dfm}

    procedure TForm1.Button1Click(Sender: TObject);
    begin
    FTaskControl :=
    FEventMonitor.Monitor(CreateTask(UpdateExecutor, 'UpdateExecutor'))
    .Run
    ;
    end;

    procedure TForm1.Button2Click(Sender: TObject);
    begin
    FTaskControl.Terminate;
    end;

    procedure TForm1.FormCreate(Sender: TObject);
    begin
    FEventMonitor := TOmniEventMonitor.Create(nil);
    FEventMonitor.OnTaskMessage := FTaskMessage;
    FEventMonitor.OnTaskTerminated := FTaskTerminated;
    end;

    procedure TForm1.FTaskMessage(const task: IOmniTaskControl);
    var
    msg: TOmniMessage;
    begin
    task.Comm.Receive(msg);

    if msg.MsgID = 1 then
    label1.Caption := IntToStr(msg.MsgData)
    else if msg.MsgID = 2 then
    ShowMessage('Completed');
    end;

    procedure TForm1.FTaskTerminated(const task: IOmniTaskControl);
    begin
    ShowMessage('Terminated');
    end;

    procedure TForm1.UpdateExecutor(const task: IOmniTask);
    var
    i: Integer;
    list: TStringList;
    stamp: Integer;
    begin
    for i := 0 to 100000 - 1 do
    begin
    stamp := GetTickCount + 100;
    while GetTickCount < stamp do ;

    task.Comm.Send(1, i);

    if task.Terminated then
    Exit;
    end;

    task.comm.Send(2);
    end;

    end.

    ReplyDelete
  5. Instead of 'if task.Terminated' use

    if WaitForSingleObject(task.TerminateEvent, 0) = WAIT_OBJECT_0 then

    I do agree that this is my problem - bad object interface. Terminated signals whether the task has been fully stopped, not whether it should terminate.

    I'll do someting about that in the next release.

    ReplyDelete
  6. Thanks! That solved my problem.

    I'm quite new to threads, and have some problemes understanding some of the consepts. A documentation-project would be great ;-)

    Again, thanks,
    -Vegar

    ReplyDelete
  7. I totally agree. For starters, you should read all my blog articles on that topics.

    I'm also setting up a web site with introduction and tutorials. A forum is also being prepared.

    All that will of course be on standby for the next two weeks ...

    ReplyDelete
  8. Anonymous17:47

    Very nice Job!

    Just one question. Communication between task seem to be through fixed sized buffer. We must put an initial size for a queue.
    So to avoid any problem, maybe a dynamic allocation for queues or stacks will be better ? no ?

    Renaud

    ReplyDelete
  9. Yes, you have to limit the queue. While running multithreaded solutions you usually know in advance how big the queue will be - either there is a limited number of messages passing through or you'll running queues in producer/consumer mode and you'll want to implement some kind of throttling mechanism to temporarily pause the producer if there is too much data waiting for the consumer to process.

    The second scenario will probably get more support in the OTL in future releases but not in v1.0.

    Of course, you can always use your own message transfer mechanism ...

    ReplyDelete
  10. Anonymous23:08

    Humm... Implement throttles for producers seems to be more complicated that implement a simple dynamic queue for communication's channels. Also i am not sure that a throttle mechanism can be apply to all uses cases. But maybe use dynamic queue will have some bad result for which i am not aware ?

    Renaud

    ReplyDelete
  11. Dynamic queues are just slower. Current queue was designed for speed.

    Maybe some future incarnation of OTL will have dynamic queues. Who knows :) It will depend on user needs.

    Implementing a throttle is not so hard if there is a good support for that - in reality it could happen in the Comm.Send without the producer knowing it.

    ReplyDelete
  12. Anonymous23:57

    Thank for your quick reply !

    Do you imagine a solution as this kind ?


    procedure TOmniCommunicationEndpoint.Send(const msg: TOmniMessage);
    begin
    while not ceWriter_ref.Enqueue(msg) do begin
    Sleep(1);
    end;
    end;

    If we use ThreadPool, it's maybe not so good that a task wait and monopolize one thread ?

    Renaud

    ReplyDelete
  13. No, definitely not that way. I was thinking more along the lines of Comm.Send() silently blocking, but that would cause problems with windows message processing (if activated). I'll have to implement this in a real-world problem to see what the best approach would be.

    ReplyDelete