Tuesday, February 10, 2009

OmniThreadLibrary 1.03

OmniThreadLibrary 1.03 was silently released two days ago. Even without the announcement, it was downloaded 133 times to this point. Awesome!

The main new feature is per-thread initialized data in thread pool. That allows you to create a connection pool with OTL. There’s a simple demo included in the distribution (24_ConnectionPool). I wrote few words about it yesterday.

No bugs were fixed so you don’t have to upgrade if you don’t need new thread pool functionality.

As usual, you can get it via SVN or as a ZIP archive.

Other important OTL links:

Monday, February 09, 2009

Building a connection pool

Recently, an OTL user asked me in the forum how to build a connection pool with the OTL. The answer, at the time, was – not possible. There was a crucial component missing.

It turned out that implementing thread-global data was not really hard to do so here it is – a tutorial on how to build a connection pool with the OTL (also included in the latest release as a demo 24_ConnectionPool). To run this code you’ll need OTL 1.03.

Let’s say we want to build a pool of some entities that take some time to initialize (database connections, for example). In a traditional sense, one would build a list of objects managing those entities and would then allocate them to the threads running the code. In practice, we can run in a big problem if such entities expect to always run from the thread in which they were created. (I had such problem once with TDBIB and Firebird Embedded.) To solve this, we would have to associate entities with threads and we’ll also have to monitor thread lifecycle (to deallocate entities when a thread is terminated).

With OTL, the logic is reversed. Threads will be managed by a thread pool and there will be no need for us to create/destroy them. We’ll just create a task and submit it into a thread pool. Thread pool will initialize the pool entity (database connection), associate it with a thread and pass it to all tasks that will run in this thread so that they can use it.

Furthermore, this solution allows you to use all the functionality of the OTL thread pool. You can set maximum number of concurrent tasks, idle thread timeout, maximum time the task will wait for execution and more and more.

So let’s see how we can code this in the OTL. All code was extracted from the demo 24_ConnectionPool.

Connection pool demo

In the OnCreate event the code creates a thread pool, assigns it a name and thread data factory. The latter is a function that will create and initialize new connection for each new thread. In the OnClose event the code terminates all waiting tasks (if any), allowing the application to shutdown gracefully. FConnectionPool is an interface and its lifetime is managed automatically so we don’t have to do anything explicit with it.

procedure TfrmConnectionPoolDemo.FormCreate(Sender: TObject);
begin
FConnectionPool := CreateThreadPool('Connection pool');
FConnectionPool.ThreadDataFactory := CreateThreadData;
end;

procedure TfrmConnectionPoolDemo.FormClose(Sender: TObject; var Action: TCloseAction);
begin
FConnectionPool.CancelAll;
end;

The magic CreateThreadData factory just creates a connection object (which would in a real program establish a database connection, for example).

function CreateThreadData: IInterface;
begin
Result := TConnectionPoolData.Create;
end;

There’s no black magic behind this connection object. It is an object which implements an interface. Any interface. This interface will be used only in your code. In this demo, TConnectionPoolData contains only one field – unique ID, which will help us follow the program execution.

type
IConnectionPoolData = interface ['{F604640D-6D4E-48B4-9A8C-483CA9635C71}']
function ConnectionID: integer;
end;

TConnectionPoolData = class(TInterfacedObject, IConnectionPoolData)
strict private
cpID: integer;
public
constructor Create;
destructor Destroy; override;
function ConnectionID: integer;
end; { TConnectionPoolData }

As this is not a code from a real world application, I didn’t bother connecting it to any specific database. TConnectionPoolData constructor will just notify the main form that it has begun its job, generate new ID and sleep for 5 seconds (to emulate establishing a slow connection). The destructor is even simpler, it just sends a notification to the main form.

constructor TConnectionPoolData.Create;
begin
PostToForm(WM_USER, MSG_CREATING_CONNECTION, integer(GetCurrentThreadID));
cpID := GConnPoolID.Increment;
Sleep(5000);
PostToForm(WM_USER, MSG_CREATED_CONNECTION, cpID);
end;

destructor TConnectionPoolData.Destroy;
begin
PostToForm(WM_USER, MSG_DESTROY_CONNECTION, cpID);
end;

Creating and running a task is really simple with the OTL:

procedure TfrmConnectionPoolDemo.btnScheduleClick(Sender: TObject);
begin
Log('Creating task');
CreateTask(TaskProc).MonitorWith(OTLMonitor).Schedule(FConnectionPool);
end;

We are monitoring the task with the TOmniEventMonitor component because a) we want to know when the task will terminate and b) otherwise we would have to keep reference to the IOmniTaskControl interface returned from the CreateTask.

The task worker procedure TaskProc is again really simple. First it pulls the connection data from the task interface (task.ThreadData as IConnectionPoolData), retrieves the connection ID and sends task and connection ID to the main form (for logging purposes) and then it sleeps for three seconds, indicating some heavy database activity.

procedure TaskProc(const task: IOmniTask);
begin
PostToForm(WM_USER + 1, task.UniqueID,
(task.ThreadData as IConnectionPoolData).ConnectionID);
Sleep(3000);
end;

Then … but wait! There’s no more! Believe it or not, that’s all. OK, there is some infrastructure code that is used only for logging but that you can look up by yourself.

There is also a code assigned to the second button (“Schedule and wait”) but it only demonstrates how you can schedule a task and wait on its execution. Useful if you’re running the task from a background thread (for example, Indy thread, as specified by the author of the original question).

Running the demo

Let’s run the demo and click on the Schedule key.

image

What happened here?

  • Task was created.
  • Immediately, it was scheduled for execution and thread pool called our thread data factory.
  • Thread data waited for five seconds and returned.
  • Thread pool immediately started executing the task.
  • Task waited for three seconds and exited.

OK, nothing special. Let’s click the Schedule button again.

image

Now a new task was created (with ID 4), was scheduled for execution in the same thread as the previous task and reused the connection that was created when the first task was scheduled. There is no 5 second wait, just the 3 second wait implemented in the task worker procedure.

If you now leave the program running for 10 seconds, a message Destroying connection 1 will appear. The reason for this is that the default thread idle timeout in the OTL thread pool is 10 seconds. In other words, if a thread does nothing for 10 seconds, it will be stopped. You are, of course, free to set this value to any number or even to 0, which would disable the idle thread termination mechanism.

If you now click the Schedule button again, new thread will be created in the thread pool and new connection will be created in our factory function (spending 5 seconds doing nothing).

image

Let’s try something else. I was running the demo on my laptop with a dual core CPU, which caused the OTL thread pool to limit maximum number of currently executing threads to two. By default, OTL thread pool uses as much threads as there are cores in the system, but again you can override the value. At the moment, you are limited by a maximum 60 concurrent threads, which should not cause any problems in the next few years, I hope. (The 60 thread limit is not an arbitrary number but is caused by the Windows limitation of allowing only up to 64 handles in the WaitForMultipleObjects function.) Yes, you are allowed to set this limitation to a value higher than the number of CPU cores in the system but still, running 60 active concurrent threads is really not recommended.

To recap – when running the demo, OTL thread pool was limited to two concurrent threads. When I clicked the Schedule button two times in a quick succession, first task was scheduled and first connection started being established (translation: entered the Sleep function). Then the second task was created (as the connection is being established from the worker thread, GUI is not blocked) and second connection started being established in the second thread. Five seconds later, connections are created and task start running (and wait three seconds, and exit).

image

Then I clicked the Schedule button two more times. Two tasks were scheduled and they immediately started execution in two worker threads.

image

For the third demo, I restarted the app and clicked the Shedule button three times. Only two worker threads were created and two connections established and two tasks started execution. The third task entered the thread pool queue and waited for the first task to terminate, after which it was immediately scheduled.

image

So here you have it – a very simple way to build a connection pool. Have fun!

Friday, February 06, 2009

Adding connection pool mechanism to OmniThreadLibrary

I have a problem.

I have this thread pool, which needs to be enhanced a little. And I don’t know how to do it.

Well, actually I know. I have at least three approaches. I just can’t tell which one is the best :(

I’m talking about the OmniThreadLibrary – I know you guessed that already. The problem is the thread pool in OTL doesn’t allow for per-thread resource initialization and that’s something that you need when you’re implementing a connection pool (more background info here). I started adding this functionality but soon found out that I don’t have a good idea on how to implement it. Oh, I have few ideas, they are just not very good :(

Current

At the moment, OTL thread pool functionality is exposed through an interface. This interface is pretty high-level and doesn’t allow the programmer to mess with the underlying thread management. All thread information is hidden in the implementation section. There’s a notification event that’s triggered when pool thread is created or destroyed, but it is only a notification and is triggered asynchronously (and possibly with a delay).

In short:

type
TOTPWorkerThread = class(TThread)
end;

TOmniThreadPool = class(TInterfacedObject, IOmniThreadPool)
end;

Event handlers

The first idea was to add OnThreadInitialization/OnThreadCleanup to the IOmniThreadPool. [Actually something similar already exists - OnWorkerThreadCreated_Asy and OnWorkerThreadDestroyed_Asy – but those events are part of the previous implementation and will be removed very soon.] Those two events would receive a TThread parameter and do the proper initialization there.

There are some big problems though. Let’s say you’ll be implementing database connection pool. You’ll have to open a database connection in OnThreadInitialization. Where would you store that info then? In an external structure, indexed by the TThread? Ugly! Even worse – how would you access the database info from the task that will be executing in the thread pool? By accessing that same structure? Eugh!

Rejected.

Thread subclassing

A better idea is to implement a subclassed thread class in your own code and then tell the thread pool to use this thread class when creating new thread object. You’d then manage database connection in overridden Initialize/Cleanup methods.

type
TDBConnectionPoolThread = class(TOTPWorkerThread)
strict private
FDBConnection: TDBConnectionInfo;
protected
function Initialize: boolean; override;
procedure Cleanup; override;
end;

GlobalOmniThreadPool.ThreadClass := TDBConnectionPoolThread;

Looks much better but there’s again a problem – I’d have to expose TOTPWorkerThread object in the interface section and that’s just plain ugly. Worker thread mechanism should be hidden. Only few people would ever be interested in it.

Thread data subclassing

An even better idea is to add an empty

TOTPWorkerThreadData = class
end;

definition to the interface section of the thread pool unit. IOmniThreadPool would contain a property ThreadDataClass which would point to this definition. And each worker thread would create/destroy an instance of this class in its Execute method.

You’d add database management as

type
TDBConnectionPoolThreadData = class(TOTPWorkerThreadData)
strict private
FDBConnection: TDBConnectionInfo;
protected
constructor Create;
destructor Destroy; override;
end;

GlobalOmniThreadPool.ThreadDataClass := TDBConnectionPoolThreadData;

[Maybe the constructor has to be virtual here? I never know until I try.]

There’s still a question of accessing this information from the task (and it goes the same for the previous attempt – I just skipped the issue then). I’d have to extend the IOmniTask interface with a method to access per-thread data.

Thread data with interfaces

While writing this mind dump a new idea crossed my mind – what if thread data would be implemented as an interface, without the need for subclassing the thread or thread data? In a way it is a first idea just reimplemented to remove all its problems.

Task interface would be extended with thread data access definitions, approximately like this:

type
IOtlThreadData = interface
end;

IOtlTask = interface
property ThreadData: IOtlThreadData;
end;

Thread pool would get a property containing a factory method.

type
TCreateThreadDataProc = function: IOtlThreadData;

IOmniThreadPool = interface
property ThreadDataFactory: TCreateThreadDataProc;
end;

This factory method would be called when thread is created to initialize thread data. Each task would get assigned that same interface into its ThreadData property just before starting its execution in a selected thread. Task would then access ThreadData property to retrieve this information.

In the database connection pool scenario, you’d have to write a connection interface, object and factory.

type
IDBConnectionPoolThreadData = interface(IOtlThreadData)
property ConnectionInfo: TDBConnectionInfo read GetConnectionInfo;
end;

TDBConnectionPoolThreadData = class(TInterfacedObject, IDBConnectionPoolThreadData )
strict private
FDBConnection: TDBConnectionInfo;
protected
constructor Create;
destructor Destroy; override;
end;

function CreateConnectionPoolThreadData: IDBCOnnectionPoolThreadData;
begin
Result := TDBConnectionPoolThreadData.Create;
end;

GlobalThreadPool.ThreadDataFactory := CreateConnectionPoolThreadData;

This approach requires slightly more work from the programmer but I like it most as it somehow seems the cleanest of them all (plus it is implemented with interfaces which is pretty much the approach used in all OTL code).

So, dear reader, what do you think? If you have better idea, or see a big problem with any of those implementations that I didn’t think of, please do tell in the comments!

Tuesday, February 03, 2009

Hassle-free critical section

While writing multithreaded code I sometimes need a fine-grained critical section that will synchronize access to some small (very small) piece of code. In the OmniThreadLibrary, for example, there’s a class TOmniTaskExecutor which has some of its internals (for example a set of Option flags) exposed to both the task controller and the task itself (and those two by definition live in two different threads). Access to those internal fields is serialized with a critical section.

Usually, I need two or more such critical sections. And because I’m lazy and I don’t want to write creation/destruction code every time I need a fine-grained lock, I usually create only one critical section and use if for all such accesses. In other words, when thread 1 is accessing field 1 (protected with that one critical section), thread 2 will be blocked from accessing field 2 (because it is protected with the same critical section). I can live with that, because the frequency of such accesses is very low (or I would not be reusing the same critical section).

Still, I was not happy with this status quo but I didn’t know what to do (except creating more critical sections, of course). Then, while developing the new OTL thread pool, I got a great idea – records! Records need no explicit .Create. Let’s make this new critical section a record!

Let’s start with the use scenario. I want to be able to declare the critical section object …

  TOmniTaskExecutor = class
strict private
oteInternalLock: TOmniCS;
//...
end;

… and then use it without any initialization.

  oteInternalLock.Acquire;
try
if not assigned(oteCommList) then
oteCommList := TInterfaceList.Create;
oteCommList.Add(comm);
SetEvent(oteCommRebuildHandles);
finally oteInternalLock.Release; end;

There are only two problems to be solved. I had to make sure that critical section is created when the record is first used and destroyed when the owning object is destroyed. It turned out that this is quite a big only

Destruction

Let’s start with the simpler problem – destruction. The solution to automatic record cleanup is well-documented (at least if you follow Delphi blogs where they talk about such things …). In general, Delphi compiler doesn’t guarantee what the initial state of record fields will be, but there are two exceptions to this rule – all strings are initialized to an empty string and all interfaces to nil (which in both cases means that the fields holding strings/interfaces are initialized to 0). In addition to that, the compiler will free memory allocated for string fields and destroy interfaces (well, decrease the reference count) when record goes out of scope. If the record is declared inside a method, this will happen when the method exits and if it is declared as a class field, the cleanup will occur when the class is destroyed. In any case, you can be sure that the compiler will take care for strings and interfaces.

So we already know something – TOmniCS record will contain an interface field and an instance of the object implementing this interface will do the actual critical section allocation and access.

  IOmniCriticalSection = interface ['{AA92906B-B92E-4C54-922C-7B87C23DABA9}']
procedure Acquire;
procedure Release;
function GetSyncObj: TSynchroObject;
end; { IOmniCriticalSection }

TOmniCS = record
private
ocsSync: IOmniCriticalSection;
function GetSyncObj: TSynchroObject;
public
procedure Initialize;
procedure Acquire; inline;
procedure Release; inline;
property SyncObj: TSynchroObject read GetSyncObj;
end; { TOmniCS }

The implementation of the IOmniCriticalSection interface is trivial.

  TOmniCriticalSection = class(TInterfacedObject, IOmniCriticalSection)
strict private
ocsCritSect: TSynchroObject;
public
constructor Create;
destructor Destroy; override;
procedure Acquire; inline;
function GetSyncObj: TSynchroObject;
procedure Release; inline;
end; { TOmniCriticalSection }

constructor TOmniCriticalSection.Create;
begin
ocsCritSect := TCriticalSection.Create;
end; { TOmniCriticalSection.Create }

destructor TOmniCriticalSection.Destroy;
begin
FreeAndNil(ocsCritSect);
end; { TOmniCriticalSection.Destroy }

procedure TOmniCriticalSection.Acquire;
begin
ocsCritSect.Acquire;
end; { TOmniCriticalSection.Acquire }

function TOmniCriticalSection.GetSyncObj: TSynchroObject;
begin
Result := ocsCritSect;
end; { TOmniCriticalSection.GetSyncObj }

procedure TOmniCriticalSection.Release;
begin
ocsCritSect.Release;
end; { TOmniCriticalSection.Release }
function CreateOmniCriticalSection: IOmniCriticalSection;
begin
Result := TOmniCriticalSection.Create;
end; { CreateOmniCriticalSection }

Creation

The destruction part was trivial (once you know the trick, of course), but the creation is not. Delphi only guarantees that the ocsSync interface will be initialized to nil (or 0, if you prefer), nothing more than that.

The TOmniCS record offloads all hard work to the Initialize method. It is called from Acquire and GetSyncObj (a method that returns underlying critical section), but not from Release and that’s for a reason. If you call Release before first calling Acquire, it is clearly a programming error and program should crash – and it will because the ocsSync will be nil.

procedure TOmniCS.Acquire;
begin
Initialize;
ocsSync.Acquire;
end; { TOmniCS.Acquire }

function TOmniCS.GetSyncObj: TSynchroObject;
begin
Initialize;
Result := ocsSync.GetSyncObj;
end; { TOmniCS.GetSyncObj }

procedure TOmniCS.Release;
begin
ocsSync.Release;
end; { TOmniCS.Release }

Let’s finally solve the hard work. Before the critical section can be used, ocsSync interface must be initialized. In a single-threaded world we would just create a TOmniCriticalSection object and store it in the ocsSync field. In the multi-threaded world this is not possible.

Let’s think about what can happen if two Acquire calls are made at the same time from two threads. Thread 1 checks if ocsSync is initialized, finds that it’s not and loses its CPU slice. Thread 2 checks if ocsSync is initialized, finds that it’s not, initializes it, calls Acquire and loses its CPU slice. Thread 1 creates another TOmniCriticalSection object, stores it in the ocsSync field (overwriting the previous value, which will get its reference count decremented, which will destroy the implementing object) and calls Acquire. Because this Acquire will be using a critical section different from the Acquire in thread 2, it will succeed and both threads will have access to the protected data. Bad!

The trick is to store TOmniCriticalSecion in the ocsSync field with an atomic operation that will succeed if and only if the ocsSync is empty (nil, zero). And that’s a job for the InterlockedCompareExchange (ICE in short).

ICE takes three parameters – first is an address of the memory area we are trying to modify. Second is the new value and third is the expected value stored in the memory area we are trying to modify. The function returns the current value of the affected memory area. If this memory is not equal to the third parameter than ICE will do nothing.

Quite a mouthful, I know. That’s how it is used in practice:

procedure TOmniCS.Initialize;
var
syncIntf: IOmniCriticalSection;
begin
Assert(cardinal(@ocsSync) mod 4 = 0, 'TOmniCS.Initialize: ocsSync is not 4-aligned!');
while not assigned(ocsSync) do begin
syncIntf := CreateOmniCriticalSection;
if InterlockedCompareExchange(PInteger(@ocsSync)^, integer(syncIntf), 0) = 0 then begin
pointer(syncIntf) := nil;
Exit;
end;
DSiYield;
end;
end; { TOmniCS.Initialize }

Initialize checks if ocsSync is allocated. If not, it will create a new instance of the IOmniCriticalSection interface and store it in the local variable. Then it tries to store it in the ocsSync field with a call to the ICE. The third parameter tells the ICE that we expect ocsSync to contain all zeroes. If this is so, interface will be stored in the ocsSync and ICE will return 0 (otherwise, it will return current value of the ocsSync field). If ICE succeeded, we have to clear the local variable without decrementing interface reference count and we can exit. If ICE failed, we’ll give the other thread a time slice (after all, the other thread just created the critical section, therefore we can assume it will Acquire it, therefore the current thread would not be able to Acquire it and it can sleep a little) and retry.

And that’s how you get a hassle-free critical section. Ugly, I know, but it works.

Just a word of warning – don’t try to pass a TOmniCS record around. Eventually you’ll do an assignment somewhere (newCS := oldCS) and that would screw things out. Just pass the critical section (TOmniCS.SyncObj) and all will be fine.


OmniThreadLibrary 1.02

If you haven’t noticed already – OmniThreadLibrary 1.02 has been released few days ago.

The main new feature is reimplemented thread pool. Previous implementation was never meant to be a permanent solution anyway. New thread pool uses OTL as an internal implementation mechanism. Talk about recursion! :) A longer article on the new implementation will follow.

There’s a new Enforced decorator which you can apply to the IOmniTask or IOmniTaskControl. In short – OTL always tries to execute your task. If you call taskControl.Terminate before the task has even started, OTL will set the termination signal and start executing task. This is not a good idea if the task was waiting in the thread pool queue and threadPool.CancelAll or threadPool.Terminate was executed. To bypass this auto-execute behaviour, you can call .Enforced(false).

I’ve implement a critical section which you only have do declare and start using. No need for .Create or .Initialize or similar. An article will follow …

There’s a new demo that shows how to use OTL for background file scanning.

Few small and not so small bugs were fixed and 3rd party units were sync’d to fresh releases.

As usual, you can get it via SVN or as a ZIP archive.

Other important OTL links: