Monday, February 09, 2009

Building a connection pool

Recently, an OTL user asked me in the forum how to build a connection pool with the OTL. The answer, at the time, was – not possible. There was a crucial component missing.

It turned out that implementing thread-global data was not really hard to do so here it is – a tutorial on how to build a connection pool with the OTL (also included in the latest release as a demo 24_ConnectionPool). To run this code you’ll need OTL 1.03.

Let’s say we want to build a pool of some entities that take some time to initialize (database connections, for example). In a traditional sense, one would build a list of objects managing those entities and would then allocate them to the threads running the code. In practice, we can run in a big problem if such entities expect to always run from the thread in which they were created. (I had such problem once with TDBIB and Firebird Embedded.) To solve this, we would have to associate entities with threads and we’ll also have to monitor thread lifecycle (to deallocate entities when a thread is terminated).

With OTL, the logic is reversed. Threads will be managed by a thread pool and there will be no need for us to create/destroy them. We’ll just create a task and submit it into a thread pool. Thread pool will initialize the pool entity (database connection), associate it with a thread and pass it to all tasks that will run in this thread so that they can use it.

Furthermore, this solution allows you to use all the functionality of the OTL thread pool. You can set maximum number of concurrent tasks, idle thread timeout, maximum time the task will wait for execution and more and more.

So let’s see how we can code this in the OTL. All code was extracted from the demo 24_ConnectionPool.

Connection pool demo

In the OnCreate event the code creates a thread pool, assigns it a name and thread data factory. The latter is a function that will create and initialize new connection for each new thread. In the OnClose event the code terminates all waiting tasks (if any), allowing the application to shutdown gracefully. FConnectionPool is an interface and its lifetime is managed automatically so we don’t have to do anything explicit with it.

procedure TfrmConnectionPoolDemo.FormCreate(Sender: TObject);
begin
FConnectionPool := CreateThreadPool('Connection pool');
FConnectionPool.ThreadDataFactory := CreateThreadData;
end;

procedure TfrmConnectionPoolDemo.FormClose(Sender: TObject; var Action: TCloseAction);
begin
FConnectionPool.CancelAll;
end;

The magic CreateThreadData factory just creates a connection object (which would in a real program establish a database connection, for example).

function CreateThreadData: IInterface;
begin
Result := TConnectionPoolData.Create;
end;

There’s no black magic behind this connection object. It is an object which implements an interface. Any interface. This interface will be used only in your code. In this demo, TConnectionPoolData contains only one field – unique ID, which will help us follow the program execution.

type
IConnectionPoolData = interface ['{F604640D-6D4E-48B4-9A8C-483CA9635C71}']
function ConnectionID: integer;
end;

TConnectionPoolData = class(TInterfacedObject, IConnectionPoolData)
strict private
cpID: integer;
public
constructor Create;
destructor Destroy; override;
function ConnectionID: integer;
end; { TConnectionPoolData }

As this is not a code from a real world application, I didn’t bother connecting it to any specific database. TConnectionPoolData constructor will just notify the main form that it has begun its job, generate new ID and sleep for 5 seconds (to emulate establishing a slow connection). The destructor is even simpler, it just sends a notification to the main form.

constructor TConnectionPoolData.Create;
begin
PostToForm(WM_USER, MSG_CREATING_CONNECTION, integer(GetCurrentThreadID));
cpID := GConnPoolID.Increment;
Sleep(5000);
PostToForm(WM_USER, MSG_CREATED_CONNECTION, cpID);
end;

destructor TConnectionPoolData.Destroy;
begin
PostToForm(WM_USER, MSG_DESTROY_CONNECTION, cpID);
end;

Creating and running a task is really simple with the OTL:

procedure TfrmConnectionPoolDemo.btnScheduleClick(Sender: TObject);
begin
Log('Creating task');
CreateTask(TaskProc).MonitorWith(OTLMonitor).Schedule(FConnectionPool);
end;

We are monitoring the task with the TOmniEventMonitor component because a) we want to know when the task will terminate and b) otherwise we would have to keep reference to the IOmniTaskControl interface returned from the CreateTask.

The task worker procedure TaskProc is again really simple. First it pulls the connection data from the task interface (task.ThreadData as IConnectionPoolData), retrieves the connection ID and sends task and connection ID to the main form (for logging purposes) and then it sleeps for three seconds, indicating some heavy database activity.

procedure TaskProc(const task: IOmniTask);
begin
PostToForm(WM_USER + 1, task.UniqueID,
(task.ThreadData as IConnectionPoolData).ConnectionID);
Sleep(3000);
end;

Then … but wait! There’s no more! Believe it or not, that’s all. OK, there is some infrastructure code that is used only for logging but that you can look up by yourself.

There is also a code assigned to the second button (“Schedule and wait”) but it only demonstrates how you can schedule a task and wait on its execution. Useful if you’re running the task from a background thread (for example, Indy thread, as specified by the author of the original question).

Running the demo

Let’s run the demo and click on the Schedule key.

image

What happened here?

  • Task was created.
  • Immediately, it was scheduled for execution and thread pool called our thread data factory.
  • Thread data waited for five seconds and returned.
  • Thread pool immediately started executing the task.
  • Task waited for three seconds and exited.

OK, nothing special. Let’s click the Schedule button again.

image

Now a new task was created (with ID 4), was scheduled for execution in the same thread as the previous task and reused the connection that was created when the first task was scheduled. There is no 5 second wait, just the 3 second wait implemented in the task worker procedure.

If you now leave the program running for 10 seconds, a message Destroying connection 1 will appear. The reason for this is that the default thread idle timeout in the OTL thread pool is 10 seconds. In other words, if a thread does nothing for 10 seconds, it will be stopped. You are, of course, free to set this value to any number or even to 0, which would disable the idle thread termination mechanism.

If you now click the Schedule button again, new thread will be created in the thread pool and new connection will be created in our factory function (spending 5 seconds doing nothing).

image

Let’s try something else. I was running the demo on my laptop with a dual core CPU, which caused the OTL thread pool to limit maximum number of currently executing threads to two. By default, OTL thread pool uses as much threads as there are cores in the system, but again you can override the value. At the moment, you are limited by a maximum 60 concurrent threads, which should not cause any problems in the next few years, I hope. (The 60 thread limit is not an arbitrary number but is caused by the Windows limitation of allowing only up to 64 handles in the WaitForMultipleObjects function.) Yes, you are allowed to set this limitation to a value higher than the number of CPU cores in the system but still, running 60 active concurrent threads is really not recommended.

To recap – when running the demo, OTL thread pool was limited to two concurrent threads. When I clicked the Schedule button two times in a quick succession, first task was scheduled and first connection started being established (translation: entered the Sleep function). Then the second task was created (as the connection is being established from the worker thread, GUI is not blocked) and second connection started being established in the second thread. Five seconds later, connections are created and task start running (and wait three seconds, and exit).

image

Then I clicked the Schedule button two more times. Two tasks were scheduled and they immediately started execution in two worker threads.

image

For the third demo, I restarted the app and clicked the Shedule button three times. Only two worker threads were created and two connections established and two tasks started execution. The third task entered the thread pool queue and waited for the first task to terminate, after which it was immediately scheduled.

image

So here you have it – a very simple way to build a connection pool. Have fun!

3 comments:

  1. Anonymous11:50

    Could you post this sample code? Thx

    ReplyDelete
  2. Download full release at http://omnithreadlibrary.googlecode.com/files/OmniThreadLibrary-1.03.zip, unpack and open test 24.

    ReplyDelete
  3. Anonymous14:31

    Your written "By default, OTL thread pool uses as much threads as there are cores in the system, but again you can override the value".

    I tried it by setting of MaxExecuting-Property:

    FConnectionPool := CreateThreadPool('Connection pool');
    FConnectionPool.MaxExecuting := 5; // <-- no function!

    But it doesn't work. After debugging of your code (Ver. 1.03) I found the cause (=bug) for this issue: function TOTPWorker.Initialize() always overrides this value with default value:

    MaxExecuting.Value := Length(DSiGetThreadAffinity);

    I hope you can fix this issue in next version.

    ReplyDelete