Wednesday, April 01, 2009

Fluent XML [1]

Few days ago I was writing a very boring piece of code that should generate some XML document. It was full of function calls that created nodes in the XML document and set attributes. Boooooring stuff. But even worse than that – the structure of the XML document was totally lost in the code. It was hard to tell which node is child of which and how it’s all structured.

Then I did what every programmer does when he/she should write some boring code – I wrote a tool to simplify the process. [That process usually takes more time than the original approach but at least it is interesting ;) .]

I started by writing the endcode. In other words, I started thinking about how I want to create this XML document at all. Quickly I decided on the fluent interface approach. I perused it in the OmniThreadLibrary where it proved to be quite useful.

That’s how the first draft looked (Actually, it was much longer but that’s the important part.):

xmlWsdl := CreateFluentXml
.AddProcessingInstruction('xml', 'version="1.0" encoding="UTF-8"')
.AddChild('definitions')
.SetAttr('xmlns', 'http://schemas.xmlsoap.org/wsdl/')
.SetAttr('xmlns:xs', 'http://www.w3.org/2001/XMLSchema')
.SetAttr('xmlns:soap', 'http://schemas.xmlsoap.org/wsdl/soap/')
.SetAttr('xmlns:soapenc', 'http://schemas.xmlsoap.org/soap/encoding/')
.SetAttr('xmlns:mime', 'http://schemas.xmlsoap.org/wsdl/mime/');

This short fragment looks quite nice but in the full version (about 50 lines) all those SetAttr calls visually merged together with AddChild calls and the result was still unreadable (although shorter than the original code with explicit calls to XML interface).

My first idea was to merge at least some SetAttr calls into the AddChild by introducing two versions – one which takes only a node name and another which takes node name, attribute name and attribute value – but that didn’t help the code at all. Even worse – it was hard to see which AddChild calls were setting attributes and which not :(

That got me started in a new direction. If the main problem is visual clutter, I had to do something to make setting attributes stand out. Briefly I considered a complicated scheme which would use smart records and operator overloading but I couldn’t imagine a XML creating code which would use operators and be more readable than this so I rejected this approach. [It may still be a valid approach – it’s just that I cannot make it work in my head.]

Then I thought about arrays. In “classical” code I could easily add array-like support to attributes so that I could write xmlNode[attrName] := ‘some value’, but how can I make this conforming my fluent architecture?

To get or not to get

In order to be able to chain anything after the [], the indexed property hiding behind must return Self, i.e. the same interface it is living in. And because I want to use attribute name/value pairs, this property has to have two indices.

property Attrib[const name, value: XmlString]: IGpFluentXmlBuilder 
read GetAttrib; default;

That would allow me to write such code:

.AddSibling('service')['name', serviceName]
.AddChild('port')
['name', portName]
['binding', 'fs:' + bindingName]
.AddChild('soap:address')['location', serviceLocation];

As you can see, attributes can be chained and I can write attribute assignment in the same line as node creation and it is still obvious which is which and who is who.

But … assignment? In a getter? Why not! You can do anything in the property getter. To make this more obvious, my code calls this ‘getter’ SetAttrib. As a nice side effect, SetAttrib is completely the same as it was defined in the first draft and can even be used insted of the [] approach.

I’ll end today’s instalment with the complete 'fluent xml builder’ interface and with sample code that uses this interface to build an XML document. Tomorrow I’ll wrap things up by describing the interface and its implementation in all boring detail.

type
IGpFluentXmlBuilder = interface ['{91F596A3-F5E3-451C-A6B9-C5FF3F23ECCC}']
function GetXml: IXmlDocument;
//
function Anchor(var node: IXMLNode): IGpFluentXmlBuilder;
function AddChild(const name: XmlString): IGpFluentXmlBuilder;
function AddComment(const comment: XmlString): IGpFluentXmlBuilder;
function AddSibling(const name: XmlString): IGpFluentXmlBuilder;
function AddProcessingInstruction(const target, data: XmlString): IGpFluentXmlBuilder;
function Back: IGpFluentXmlBuilder;
function Here: IGpFluentXmlBuilder;
function Parent: IGpFluentXmlBuilder;
function SetAttrib(const name, value: XmlString): IGpFluentXmlBuilder;
property Attrib[const name, value: XmlString]: IGpFluentXmlBuilder
read SetAttrib; default;
property Xml: IXmlDocument read GetXml;
end; { IGpFluentXmlBuilder }
 
  xmlWsdl := CreateFluentXml
.AddProcessingInstruction('xml', 'version="1.0" encoding="UTF-8"')
.AddChild('definitions')
['xmlns', 'http://schemas.xmlsoap.org/wsdl/']
['xmlns:xs', 'http://www.w3.org/2001/XMLSchema']
['xmlns:soap', 'http://schemas.xmlsoap.org/wsdl/soap/']
['xmlns:soapenc', 'http://schemas.xmlsoap.org/soap/encoding/']
['xmlns:mime', 'http://schemas.xmlsoap.org/wsdl/mime/']
['name', serviceName]
['xmlns:ns1', 'urn:' + intfName]
['xmlns:fs', 'http://online.com/soap/']
['targetNamespace', 'http://online.com/soap/']
.AddChild('message')['name', 'fs:' + baseName + 'Request'].Anchor(nodeRequest)
.AddSibling('message')['name', 'fs:' + baseName + 'Response'].Anchor(nodeResponse)
.AddSibling('portType')['name', baseName]
.Here
.AddChild('operation')['name', baseName]
.AddChild('input')['message', 'fs:' + baseName + 'Request']
.AddSibling('output')['message', 'fs:' + baseName + 'Response']
.Back
.AddSibling('binding')
.Here
['name', bindingName]
['type', 'fs:' + intfName]
.AddChild('soap:binding')
['style', 'rpc']
['transport', 'http://schemas.xmlsoap.og/soap/http']
.AddChild('operation')['name', baseName]
.AddChild('soap:operation')
['soapAction', 'urn:' + baseName]
['style', 'rpc']
.AddSibling('input')
.AddChild('soap:body')
['use', 'encoded']
['encodingStyle', 'http://schemas.xmlsoap.org/soap/encoding/']
['namespace', 'urn:' + intfName + '-' + baseName]
.Parent
.AddSibling('output')
.AddChild('soap:body')
['use', 'encoded']
['encodingStyle', 'http://schemas.xmlsoap.org/soap/encoding/']
['namespace', 'urn:' + intfName + '-' + baseName]
.Back
.AddSibling('service')['name', serviceName]
.AddChild('port')
['name', portName]
['binding', 'fs:' + bindingName]
.AddChild('soap:address')['location', serviceLocation];

What do you think? Does my approach make any sense?

Tuesday, February 10, 2009

OmniThreadLibrary 1.03

OmniThreadLibrary 1.03 was silently released two days ago. Even without the announcement, it was downloaded 133 times to this point. Awesome!

The main new feature is per-thread initialized data in thread pool. That allows you to create a connection pool with OTL. There’s a simple demo included in the distribution (24_ConnectionPool). I wrote few words about it yesterday.

No bugs were fixed so you don’t have to upgrade if you don’t need new thread pool functionality.

As usual, you can get it via SVN or as a ZIP archive.

Other important OTL links:

Monday, February 09, 2009

Building a connection pool

Recently, an OTL user asked me in the forum how to build a connection pool with the OTL. The answer, at the time, was – not possible. There was a crucial component missing.

It turned out that implementing thread-global data was not really hard to do so here it is – a tutorial on how to build a connection pool with the OTL (also included in the latest release as a demo 24_ConnectionPool). To run this code you’ll need OTL 1.03.

Let’s say we want to build a pool of some entities that take some time to initialize (database connections, for example). In a traditional sense, one would build a list of objects managing those entities and would then allocate them to the threads running the code. In practice, we can run in a big problem if such entities expect to always run from the thread in which they were created. (I had such problem once with TDBIB and Firebird Embedded.) To solve this, we would have to associate entities with threads and we’ll also have to monitor thread lifecycle (to deallocate entities when a thread is terminated).

With OTL, the logic is reversed. Threads will be managed by a thread pool and there will be no need for us to create/destroy them. We’ll just create a task and submit it into a thread pool. Thread pool will initialize the pool entity (database connection), associate it with a thread and pass it to all tasks that will run in this thread so that they can use it.

Furthermore, this solution allows you to use all the functionality of the OTL thread pool. You can set maximum number of concurrent tasks, idle thread timeout, maximum time the task will wait for execution and more and more.

So let’s see how we can code this in the OTL. All code was extracted from the demo 24_ConnectionPool.

Connection pool demo

In the OnCreate event the code creates a thread pool, assigns it a name and thread data factory. The latter is a function that will create and initialize new connection for each new thread. In the OnClose event the code terminates all waiting tasks (if any), allowing the application to shutdown gracefully. FConnectionPool is an interface and its lifetime is managed automatically so we don’t have to do anything explicit with it.

procedure TfrmConnectionPoolDemo.FormCreate(Sender: TObject);
begin
FConnectionPool := CreateThreadPool('Connection pool');
FConnectionPool.ThreadDataFactory := CreateThreadData;
end;

procedure TfrmConnectionPoolDemo.FormClose(Sender: TObject; var Action: TCloseAction);
begin
FConnectionPool.CancelAll;
end;

The magic CreateThreadData factory just creates a connection object (which would in a real program establish a database connection, for example).

function CreateThreadData: IInterface;
begin
Result := TConnectionPoolData.Create;
end;

There’s no black magic behind this connection object. It is an object which implements an interface. Any interface. This interface will be used only in your code. In this demo, TConnectionPoolData contains only one field – unique ID, which will help us follow the program execution.

type
IConnectionPoolData = interface ['{F604640D-6D4E-48B4-9A8C-483CA9635C71}']
function ConnectionID: integer;
end;

TConnectionPoolData = class(TInterfacedObject, IConnectionPoolData)
strict private
cpID: integer;
public
constructor Create;
destructor Destroy; override;
function ConnectionID: integer;
end; { TConnectionPoolData }

As this is not a code from a real world application, I didn’t bother connecting it to any specific database. TConnectionPoolData constructor will just notify the main form that it has begun its job, generate new ID and sleep for 5 seconds (to emulate establishing a slow connection). The destructor is even simpler, it just sends a notification to the main form.

constructor TConnectionPoolData.Create;
begin
PostToForm(WM_USER, MSG_CREATING_CONNECTION, integer(GetCurrentThreadID));
cpID := GConnPoolID.Increment;
Sleep(5000);
PostToForm(WM_USER, MSG_CREATED_CONNECTION, cpID);
end;

destructor TConnectionPoolData.Destroy;
begin
PostToForm(WM_USER, MSG_DESTROY_CONNECTION, cpID);
end;

Creating and running a task is really simple with the OTL:

procedure TfrmConnectionPoolDemo.btnScheduleClick(Sender: TObject);
begin
Log('Creating task');
CreateTask(TaskProc).MonitorWith(OTLMonitor).Schedule(FConnectionPool);
end;

We are monitoring the task with the TOmniEventMonitor component because a) we want to know when the task will terminate and b) otherwise we would have to keep reference to the IOmniTaskControl interface returned from the CreateTask.

The task worker procedure TaskProc is again really simple. First it pulls the connection data from the task interface (task.ThreadData as IConnectionPoolData), retrieves the connection ID and sends task and connection ID to the main form (for logging purposes) and then it sleeps for three seconds, indicating some heavy database activity.

procedure TaskProc(const task: IOmniTask);
begin
PostToForm(WM_USER + 1, task.UniqueID,
(task.ThreadData as IConnectionPoolData).ConnectionID);
Sleep(3000);
end;

Then … but wait! There’s no more! Believe it or not, that’s all. OK, there is some infrastructure code that is used only for logging but that you can look up by yourself.

There is also a code assigned to the second button (“Schedule and wait”) but it only demonstrates how you can schedule a task and wait on its execution. Useful if you’re running the task from a background thread (for example, Indy thread, as specified by the author of the original question).

Running the demo

Let’s run the demo and click on the Schedule key.

image

What happened here?

  • Task was created.
  • Immediately, it was scheduled for execution and thread pool called our thread data factory.
  • Thread data waited for five seconds and returned.
  • Thread pool immediately started executing the task.
  • Task waited for three seconds and exited.

OK, nothing special. Let’s click the Schedule button again.

image

Now a new task was created (with ID 4), was scheduled for execution in the same thread as the previous task and reused the connection that was created when the first task was scheduled. There is no 5 second wait, just the 3 second wait implemented in the task worker procedure.

If you now leave the program running for 10 seconds, a message Destroying connection 1 will appear. The reason for this is that the default thread idle timeout in the OTL thread pool is 10 seconds. In other words, if a thread does nothing for 10 seconds, it will be stopped. You are, of course, free to set this value to any number or even to 0, which would disable the idle thread termination mechanism.

If you now click the Schedule button again, new thread will be created in the thread pool and new connection will be created in our factory function (spending 5 seconds doing nothing).

image

Let’s try something else. I was running the demo on my laptop with a dual core CPU, which caused the OTL thread pool to limit maximum number of currently executing threads to two. By default, OTL thread pool uses as much threads as there are cores in the system, but again you can override the value. At the moment, you are limited by a maximum 60 concurrent threads, which should not cause any problems in the next few years, I hope. (The 60 thread limit is not an arbitrary number but is caused by the Windows limitation of allowing only up to 64 handles in the WaitForMultipleObjects function.) Yes, you are allowed to set this limitation to a value higher than the number of CPU cores in the system but still, running 60 active concurrent threads is really not recommended.

To recap – when running the demo, OTL thread pool was limited to two concurrent threads. When I clicked the Schedule button two times in a quick succession, first task was scheduled and first connection started being established (translation: entered the Sleep function). Then the second task was created (as the connection is being established from the worker thread, GUI is not blocked) and second connection started being established in the second thread. Five seconds later, connections are created and task start running (and wait three seconds, and exit).

image

Then I clicked the Schedule button two more times. Two tasks were scheduled and they immediately started execution in two worker threads.

image

For the third demo, I restarted the app and clicked the Shedule button three times. Only two worker threads were created and two connections established and two tasks started execution. The third task entered the thread pool queue and waited for the first task to terminate, after which it was immediately scheduled.

image

So here you have it – a very simple way to build a connection pool. Have fun!

Friday, February 06, 2009

Adding connection pool mechanism to OmniThreadLibrary

I have a problem.

I have this thread pool, which needs to be enhanced a little. And I don’t know how to do it.

Well, actually I know. I have at least three approaches. I just can’t tell which one is the best :(

I’m talking about the OmniThreadLibrary – I know you guessed that already. The problem is the thread pool in OTL doesn’t allow for per-thread resource initialization and that’s something that you need when you’re implementing a connection pool (more background info here). I started adding this functionality but soon found out that I don’t have a good idea on how to implement it. Oh, I have few ideas, they are just not very good :(

Current

At the moment, OTL thread pool functionality is exposed through an interface. This interface is pretty high-level and doesn’t allow the programmer to mess with the underlying thread management. All thread information is hidden in the implementation section. There’s a notification event that’s triggered when pool thread is created or destroyed, but it is only a notification and is triggered asynchronously (and possibly with a delay).

In short:

type
TOTPWorkerThread = class(TThread)
end;

TOmniThreadPool = class(TInterfacedObject, IOmniThreadPool)
end;

Event handlers

The first idea was to add OnThreadInitialization/OnThreadCleanup to the IOmniThreadPool. [Actually something similar already exists - OnWorkerThreadCreated_Asy and OnWorkerThreadDestroyed_Asy – but those events are part of the previous implementation and will be removed very soon.] Those two events would receive a TThread parameter and do the proper initialization there.

There are some big problems though. Let’s say you’ll be implementing database connection pool. You’ll have to open a database connection in OnThreadInitialization. Where would you store that info then? In an external structure, indexed by the TThread? Ugly! Even worse – how would you access the database info from the task that will be executing in the thread pool? By accessing that same structure? Eugh!

Rejected.

Thread subclassing

A better idea is to implement a subclassed thread class in your own code and then tell the thread pool to use this thread class when creating new thread object. You’d then manage database connection in overridden Initialize/Cleanup methods.

type
TDBConnectionPoolThread = class(TOTPWorkerThread)
strict private
FDBConnection: TDBConnectionInfo;
protected
function Initialize: boolean; override;
procedure Cleanup; override;
end;

GlobalOmniThreadPool.ThreadClass := TDBConnectionPoolThread;

Looks much better but there’s again a problem – I’d have to expose TOTPWorkerThread object in the interface section and that’s just plain ugly. Worker thread mechanism should be hidden. Only few people would ever be interested in it.

Thread data subclassing

An even better idea is to add an empty

TOTPWorkerThreadData = class
end;

definition to the interface section of the thread pool unit. IOmniThreadPool would contain a property ThreadDataClass which would point to this definition. And each worker thread would create/destroy an instance of this class in its Execute method.

You’d add database management as

type
TDBConnectionPoolThreadData = class(TOTPWorkerThreadData)
strict private
FDBConnection: TDBConnectionInfo;
protected
constructor Create;
destructor Destroy; override;
end;

GlobalOmniThreadPool.ThreadDataClass := TDBConnectionPoolThreadData;

[Maybe the constructor has to be virtual here? I never know until I try.]

There’s still a question of accessing this information from the task (and it goes the same for the previous attempt – I just skipped the issue then). I’d have to extend the IOmniTask interface with a method to access per-thread data.

Thread data with interfaces

While writing this mind dump a new idea crossed my mind – what if thread data would be implemented as an interface, without the need for subclassing the thread or thread data? In a way it is a first idea just reimplemented to remove all its problems.

Task interface would be extended with thread data access definitions, approximately like this:

type
IOtlThreadData = interface
end;

IOtlTask = interface
property ThreadData: IOtlThreadData;
end;

Thread pool would get a property containing a factory method.

type
TCreateThreadDataProc = function: IOtlThreadData;

IOmniThreadPool = interface
property ThreadDataFactory: TCreateThreadDataProc;
end;

This factory method would be called when thread is created to initialize thread data. Each task would get assigned that same interface into its ThreadData property just before starting its execution in a selected thread. Task would then access ThreadData property to retrieve this information.

In the database connection pool scenario, you’d have to write a connection interface, object and factory.

type
IDBConnectionPoolThreadData = interface(IOtlThreadData)
property ConnectionInfo: TDBConnectionInfo read GetConnectionInfo;
end;

TDBConnectionPoolThreadData = class(TInterfacedObject, IDBConnectionPoolThreadData )
strict private
FDBConnection: TDBConnectionInfo;
protected
constructor Create;
destructor Destroy; override;
end;

function CreateConnectionPoolThreadData: IDBCOnnectionPoolThreadData;
begin
Result := TDBConnectionPoolThreadData.Create;
end;

GlobalThreadPool.ThreadDataFactory := CreateConnectionPoolThreadData;

This approach requires slightly more work from the programmer but I like it most as it somehow seems the cleanest of them all (plus it is implemented with interfaces which is pretty much the approach used in all OTL code).

So, dear reader, what do you think? If you have better idea, or see a big problem with any of those implementations that I didn’t think of, please do tell in the comments!

Tuesday, February 03, 2009

Hassle-free critical section

While writing multithreaded code I sometimes need a fine-grained critical section that will synchronize access to some small (very small) piece of code. In the OmniThreadLibrary, for example, there’s a class TOmniTaskExecutor which has some of its internals (for example a set of Option flags) exposed to both the task controller and the task itself (and those two by definition live in two different threads). Access to those internal fields is serialized with a critical section.

Usually, I need two or more such critical sections. And because I’m lazy and I don’t want to write creation/destruction code every time I need a fine-grained lock, I usually create only one critical section and use if for all such accesses. In other words, when thread 1 is accessing field 1 (protected with that one critical section), thread 2 will be blocked from accessing field 2 (because it is protected with the same critical section). I can live with that, because the frequency of such accesses is very low (or I would not be reusing the same critical section).

Still, I was not happy with this status quo but I didn’t know what to do (except creating more critical sections, of course). Then, while developing the new OTL thread pool, I got a great idea – records! Records need no explicit .Create. Let’s make this new critical section a record!

Let’s start with the use scenario. I want to be able to declare the critical section object …

  TOmniTaskExecutor = class
strict private
oteInternalLock: TOmniCS;
//...
end;

… and then use it without any initialization.

  oteInternalLock.Acquire;
try
if not assigned(oteCommList) then
oteCommList := TInterfaceList.Create;
oteCommList.Add(comm);
SetEvent(oteCommRebuildHandles);
finally oteInternalLock.Release; end;

There are only two problems to be solved. I had to make sure that critical section is created when the record is first used and destroyed when the owning object is destroyed. It turned out that this is quite a big only

Destruction

Let’s start with the simpler problem – destruction. The solution to automatic record cleanup is well-documented (at least if you follow Delphi blogs where they talk about such things …). In general, Delphi compiler doesn’t guarantee what the initial state of record fields will be, but there are two exceptions to this rule – all strings are initialized to an empty string and all interfaces to nil (which in both cases means that the fields holding strings/interfaces are initialized to 0). In addition to that, the compiler will free memory allocated for string fields and destroy interfaces (well, decrease the reference count) when record goes out of scope. If the record is declared inside a method, this will happen when the method exits and if it is declared as a class field, the cleanup will occur when the class is destroyed. In any case, you can be sure that the compiler will take care for strings and interfaces.

So we already know something – TOmniCS record will contain an interface field and an instance of the object implementing this interface will do the actual critical section allocation and access.

  IOmniCriticalSection = interface ['{AA92906B-B92E-4C54-922C-7B87C23DABA9}']
procedure Acquire;
procedure Release;
function GetSyncObj: TSynchroObject;
end; { IOmniCriticalSection }

TOmniCS = record
private
ocsSync: IOmniCriticalSection;
function GetSyncObj: TSynchroObject;
public
procedure Initialize;
procedure Acquire; inline;
procedure Release; inline;
property SyncObj: TSynchroObject read GetSyncObj;
end; { TOmniCS }

The implementation of the IOmniCriticalSection interface is trivial.

  TOmniCriticalSection = class(TInterfacedObject, IOmniCriticalSection)
strict private
ocsCritSect: TSynchroObject;
public
constructor Create;
destructor Destroy; override;
procedure Acquire; inline;
function GetSyncObj: TSynchroObject;
procedure Release; inline;
end; { TOmniCriticalSection }

constructor TOmniCriticalSection.Create;
begin
ocsCritSect := TCriticalSection.Create;
end; { TOmniCriticalSection.Create }

destructor TOmniCriticalSection.Destroy;
begin
FreeAndNil(ocsCritSect);
end; { TOmniCriticalSection.Destroy }

procedure TOmniCriticalSection.Acquire;
begin
ocsCritSect.Acquire;
end; { TOmniCriticalSection.Acquire }

function TOmniCriticalSection.GetSyncObj: TSynchroObject;
begin
Result := ocsCritSect;
end; { TOmniCriticalSection.GetSyncObj }

procedure TOmniCriticalSection.Release;
begin
ocsCritSect.Release;
end; { TOmniCriticalSection.Release }
function CreateOmniCriticalSection: IOmniCriticalSection;
begin
Result := TOmniCriticalSection.Create;
end; { CreateOmniCriticalSection }

Creation

The destruction part was trivial (once you know the trick, of course), but the creation is not. Delphi only guarantees that the ocsSync interface will be initialized to nil (or 0, if you prefer), nothing more than that.

The TOmniCS record offloads all hard work to the Initialize method. It is called from Acquire and GetSyncObj (a method that returns underlying critical section), but not from Release and that’s for a reason. If you call Release before first calling Acquire, it is clearly a programming error and program should crash – and it will because the ocsSync will be nil.

procedure TOmniCS.Acquire;
begin
Initialize;
ocsSync.Acquire;
end; { TOmniCS.Acquire }

function TOmniCS.GetSyncObj: TSynchroObject;
begin
Initialize;
Result := ocsSync.GetSyncObj;
end; { TOmniCS.GetSyncObj }

procedure TOmniCS.Release;
begin
ocsSync.Release;
end; { TOmniCS.Release }

Let’s finally solve the hard work. Before the critical section can be used, ocsSync interface must be initialized. In a single-threaded world we would just create a TOmniCriticalSection object and store it in the ocsSync field. In the multi-threaded world this is not possible.

Let’s think about what can happen if two Acquire calls are made at the same time from two threads. Thread 1 checks if ocsSync is initialized, finds that it’s not and loses its CPU slice. Thread 2 checks if ocsSync is initialized, finds that it’s not, initializes it, calls Acquire and loses its CPU slice. Thread 1 creates another TOmniCriticalSection object, stores it in the ocsSync field (overwriting the previous value, which will get its reference count decremented, which will destroy the implementing object) and calls Acquire. Because this Acquire will be using a critical section different from the Acquire in thread 2, it will succeed and both threads will have access to the protected data. Bad!

The trick is to store TOmniCriticalSecion in the ocsSync field with an atomic operation that will succeed if and only if the ocsSync is empty (nil, zero). And that’s a job for the InterlockedCompareExchange (ICE in short).

ICE takes three parameters – first is an address of the memory area we are trying to modify. Second is the new value and third is the expected value stored in the memory area we are trying to modify. The function returns the current value of the affected memory area. If this memory is not equal to the third parameter than ICE will do nothing.

Quite a mouthful, I know. That’s how it is used in practice:

procedure TOmniCS.Initialize;
var
syncIntf: IOmniCriticalSection;
begin
Assert(cardinal(@ocsSync) mod 4 = 0, 'TOmniCS.Initialize: ocsSync is not 4-aligned!');
while not assigned(ocsSync) do begin
syncIntf := CreateOmniCriticalSection;
if InterlockedCompareExchange(PInteger(@ocsSync)^, integer(syncIntf), 0) = 0 then begin
pointer(syncIntf) := nil;
Exit;
end;
DSiYield;
end;
end; { TOmniCS.Initialize }

Initialize checks if ocsSync is allocated. If not, it will create a new instance of the IOmniCriticalSection interface and store it in the local variable. Then it tries to store it in the ocsSync field with a call to the ICE. The third parameter tells the ICE that we expect ocsSync to contain all zeroes. If this is so, interface will be stored in the ocsSync and ICE will return 0 (otherwise, it will return current value of the ocsSync field). If ICE succeeded, we have to clear the local variable without decrementing interface reference count and we can exit. If ICE failed, we’ll give the other thread a time slice (after all, the other thread just created the critical section, therefore we can assume it will Acquire it, therefore the current thread would not be able to Acquire it and it can sleep a little) and retry.

And that’s how you get a hassle-free critical section. Ugly, I know, but it works.

Just a word of warning – don’t try to pass a TOmniCS record around. Eventually you’ll do an assignment somewhere (newCS := oldCS) and that would screw things out. Just pass the critical section (TOmniCS.SyncObj) and all will be fine.


OmniThreadLibrary 1.02

If you haven’t noticed already – OmniThreadLibrary 1.02 has been released few days ago.

The main new feature is reimplemented thread pool. Previous implementation was never meant to be a permanent solution anyway. New thread pool uses OTL as an internal implementation mechanism. Talk about recursion! :) A longer article on the new implementation will follow.

There’s a new Enforced decorator which you can apply to the IOmniTask or IOmniTaskControl. In short – OTL always tries to execute your task. If you call taskControl.Terminate before the task has even started, OTL will set the termination signal and start executing task. This is not a good idea if the task was waiting in the thread pool queue and threadPool.CancelAll or threadPool.Terminate was executed. To bypass this auto-execute behaviour, you can call .Enforced(false).

I’ve implement a critical section which you only have do declare and start using. No need for .Create or .Initialize or similar. An article will follow …

There’s a new demo that shows how to use OTL for background file scanning.

Few small and not so small bugs were fixed and 3rd party units were sync’d to fresh releases.

As usual, you can get it via SVN or as a ZIP archive.

Other important OTL links:

Saturday, December 20, 2008

Christmas mystery

We were tracking down quite an interesting problem in the past few days …

We have this service that accepts connections from multiple computers and reports some status back. And it does that via SOAP. And the SOAP implementation is our internal.

So far so good – we are using this approach in quite some programs deployed in many places. Everything is working well.

Everything, except one installation.

That customer was reporting weird things. Mysterious things. Sometimes the client on one computer would for a moment display a same data as another computer. And then the display would flip back to the correct data. Mysterious.

As expected, the guy who wrote the service pointed his finger to the guy that wrote the SOAP layer (me). And, as expected, I did the reverse. We were pretty sure that the problem was caused by the buggy code written by the other guy. We were both wrong.

Of course we first spent about a working day logging various parts of the service and SOAP server. You know the drill – add logging, compile, connect VPN, upload the source to the customer, start two clients at two machines on the remote site via RDP, wait for the problem, download logs, analyze. Booooring. At the end, we were none smarter. We only knew that the server always returns correct data to all clients.

Then we switched our attention to the client. Who’d say, the client always received the correct data. Except that it sometimes displayed wrong data. But that data was never received via the SOAP layer. As I said before – mystery.

Of course, once we got to that point we knew that the problem lies inside the client software so we only have to dig in that direction. And, of course, we got the answer. And boy was it a surprising one!

You see, our client is somewhat baroque. Layers of layers of code that were developed over the years. It’s a fate of all successful applications and this one is quite successful, at least in the vertical market we are working in. It is therefore not very surprising that the client is using temporary files in a somewhat strange arrangement. Instead of creating temporary files with unique names, it creates a temporary folder inside the %temp% and stores files there. This folder is guaranteed to be unique on the system as it uses a global counter as a part of the folder name. IOW, first client on the computer stores data in %temp%\data_1, second in %temp%\data_2. So the client on the first computer stored remote data (returned from the service) in %temp%\data_1\list.txt and the client on the second computer used %temp%\data_1\list.txt. They were both using ‘data_1’ subfolder as they were both the first (and only) client instance on that machine. A recipe for disaster? Not really as the %temp% folder is local to the computer.

Except that it was not.

Somebody at the customer’s site got a brilliant idea and configured temp folders for all domain clients to point to the domain controller. Don’t know who and surely don’t know why, but as the result %temp% pointed to the same folder (on the domain controller computer) on all client computers in the domain. So the first client downloaded its data to the %temp% (on the domain server), second client downloaded its data to the same location and then the first client displayed that data – data that was already modified and which belonged to the second client. A typical race condition if I ever saw one.

The solution is, of course, to always use GetTempFileName. But this time we surely (re)learnt it in an interesting way.

Thursday, November 27, 2008

Advanced enumerators

The Blaise Pascal magazine #4 is out. Inside you can find a second part of my “enumerators” series (first page).

Monday, November 03, 2008

Background file scanning with OmniThreadLibrary

Yesterday, a reader asked if I can create a demo for the background file searcher, so here it is. To get the demo source, update your SVN copy or download this file.

The app has a simple interface.

demo 23 - initial

Enter the path to be searched and the file mask in the edit field and click Scan.

demo 23 - scanning

Program starts scanning and displays current folder and number of found files during the process. If you click the X button during the scan, it will be aborted and program will close.

When background scanning completes, list of found files is displayed in the listbox.

demo 23 - final

During the scanning, main thread is fully active. You can move the program around, resize it, minimize, maximize and so on.

Implementation

Let’s take a look at the application in design mode.

demo 23 - design

Besides the components that are visible at runtime, there is also a TOmniEventMonitor component (named OTLMonitor) and a TTimer (tmrDisplayStatus) on the form.

When the user clicks the Scan button, a background task is created.

procedure TfrmBackgroundFileSearchDemo.btnScanClick(Sender: TObject);
begin
FFileList := TStringList.Create;
btnScan.Enabled := false;
tmrDisplayStatus.Enabled := true;
FScanTask := CreateTask(ScanFolders, 'ScanFolders')
.MonitorWith(OTLMonitor)
.SetParameter('FolderMask', inpFolderMask.Text)
.Run;
end;

ScanFolders is the method that will do the scanning (in a background thread). We’ll return to it later. Task will be monitored with the OTLMonitor component so that we will receive task messages. OTLMonitor will also tell us when the task will terminate. Input folder and mask is send to the task as a parameter FolderMask and task is started.

The FFileList field is a TStringList that will contain a list of all found files.

Let’s ignore the scanner details for the moment and skip to the end of the scanning process. When task has completed its job, OTLMonitor.OnTaskTerminated is called.

procedure TfrmBackgroundFileSearchDemo.OTLMonitorTaskTerminated(
const task: IOmniTaskControl);
begin
tmrDisplayStatus.Enabled := false;
outScanning.Text := '';
outFiles.Text := IntToStr(FFileList.Count);
lbFiles.Clear;
lbFiles.Items.AddStrings(FFileList);
FreeAndNil(FFileList);
FScanTask := nil;
btnScan.Enabled := true;
end;

At that point, number of found files is copied to the outFiles edit field and complete list is assigned to the listbox. Task reference FScanTask is then cleared, which causes the task object to be destroyed and Scan button is reenabled (it was disabled during the scanning process).

But what if the user closes the program by clicking the X button while the background scanner is active? Simple, we just catch the OnFormCloseQuery event and tell the task to terminate.

procedure TfrmBackgroundFileSearchDemo.FormCloseQuery(Sender: TObject;
var CanClose: boolean);
begin
if assigned(FScanTask) then begin
FScanTask.Terminate;
FScanTask := nil;
CanClose := true;
end;
end;

The Terminate method will do two things – tell the task to terminate and then wait for its termination. After that, we simply have to clear the task reference and allow the program to terminate.

Scanner

Let’s move to the scanning part now. The ScanFolders method (which is the main task method, the one we passed to the CreateTask) splits the value of the FolderMask parameter into folder and mask parts and passes them to the main worker ScanFolder.

procedure ScanFolders(const task: IOmniTask);
var
folder: string;
mask : string;
begin
mask := task.ParamByName['FolderMask'];
folder := ExtractFilePath(mask);
Delete(mask, 1, Length(folder));
if folder <> '' then
folder := IncludeTrailingPathDelimiter(folder);
ScanFolder(task, folder, mask);
end;

ScanFolder first finds all subfolders of the selected folder and calls itself recursively for each subfolder. That means that we’ll first process deepest folders and then proceed to the top of the folder tree.

Then it sends a message MSG_SCAN_FOLDER to the main thread. As a parameter of this message it sends the name of the folder being processed. There’s nothing magical about this message – it is just an arbitrary numeric constant from range 0 .. 65534 (yes, number 65535 is reserved for internal use).

const
MSG_SCAN_FOLDER = 1;
MSG_FOLDER_FILES = 2;
procedure ScanFolder(const task: IOmniTask; const folder, mask: string);
var
err : integer;
folderFiles: TStringList;
S : TSearchRec;
begin
err := FindFirst(folder + '*.*', faDirectory, S);
if err = 0 then try
repeat
if ((S.Attr and faDirectory) <> 0) and (S.Name <> '.') and (S.Name <> '..') then
ScanFolder(task, folder + S.Name + '\', mask);
err := FindNext(S);
until task.Terminated or (err <> 0);
finally FindClose(S); end;
task.Comm.Send(MSG_SCAN_FOLDER, folder);
folderFiles := TStringList.Create;
try
err := FindFirst(folder + mask, 0, S);
if err = 0 then try
repeat
folderFiles.Add(folder + S.Name);
err := FindNext(S);
until task.Terminated or (err <> 0);
finally FindClose(S); end;
finally task.Comm.Send(MSG_FOLDER_FILES, folderFiles); end;
end;

ScanFolder then runs the FindFirst/FindNext/FindClose loop for the second time to search for files in the folder. [BTW, if you want to first scan folders nearer to the root, just change the two loops and scan for files first and for folders second.] Each file is added to an internal TStringList object which was created just a moment before. When folder scan is completed, this object is sent to the main thread as parameter of the MSG_FOLDER_FILES message.

This approach – sending data for one folder – is a compromise between returning the complete set (full scanned tree), which would not provide a good feedback, and returning each file as we detect it, which would unnecessarily put a high load on the system.

Both Find loops test the status of the task.Terminated function and exit immediately if it is True. That allows us to terminate the background task when the user closes the application and OnFormCloseQuery is called.

Receiving messages

That’s all that has to be done in the background task but we still have to process the messages in the main thread. For that, we write the OTLMonitor’s OnTaskMessage event.

procedure TfrmBackgroundFileSearchDemo.OTLMonitorTaskMessage(
const task: IOmniTaskControl);
var
folderFiles: TStringList;
msg : TOmniMessage;
begin
task.Comm.Receive(msg);
if msg.MsgID = MSG_SCAN_FOLDER then
FWaitingMessage := msg.MsgData
else if msg.MsgID = MSG_FOLDER_FILES then begin
folderFiles := TStringList(msg.MsgData.AsObject);
FFileList.AddStrings(folderFiles);
FreeAndNil(folderFiles);
FWaitingCount := IntToStr(FFileList.Count);
end;
end;

If the message is MSG_SCAN_FOLDER we just copy folder name to a local field. If the message is MSG_FOLDER_FILES, we copy file names from the parameter (which is a TStringList) to the global FFileList list and destroy the parameter. We also update a local field holding the number of currently found files.

So why don’t we directly update two edit fields on the form (one with current folder and another with number of found files)? Well, the background task can send many messages in one second (when processing folders will small number of files) and there’s no point in displaying them all – the user will never see what was displayed anyway. And it would slow down the GUI because Windows controls would be updated hundreds of times per second, which is never a good idea.

Instead of that we just store the strings to be displayed in two form fields and display them from a timer which is triggered three times per second. That will not show all scanned folders and all intermediate file count results, but will still provide the user with the sufficient feedback.

procedure TfrmBackgroundFileSearchDemo.tmrDisplayStatusTimer(Sender: TObject);
begin
if FWaitingMessage <> '' then begin
outScanning.Text := FWaitingMessage;
FWaitingMessage := '';
end;
if FWaitingCount <> '' then begin
outFiles.Text := FWaitingCount;
FWaitingCount := '';
end;
end;

And that’s all. Fully functional background file scanner that could easily be repackaged into a TComponent. And it’s all yours.

Sunday, November 02, 2008

OmniThreadLibrary 1.01

OmniThreadLibrary 1.01 has been released yesterday. It is available via SVN (http://omnithreadlibrary.googlecode.com/svn/tags/release-1.01) or as a ZIP archive.

Changes since version 1.0a:

  • *** Breaking interface change ***
    • IOmniTask.Terminated renamed to IOmniTask.Stopped.
    • New IOmniTask.Terminated that check whether the task has been requested to terminate. [demo 22]
  • [GJ] Redesigned stack cotainer with better lock contention.
  • [GJ] Totally redesigned queue container, which is no longer based on stack and allows multiple readers.
  • Full D2009 support; D2009 packages, project files and Tests project group.
  • Invoke-by-name and invoke-by-address messaging implemented [http://17slon.com/blogs/gabr/2008/10/erlangenizing-omnithreadlibrary.html, http://17slon.com/blogs/gabr/2008/10/omnithreadlibrary-using-rtti-to-call.html, demos 18 and 19]
  • Implemented CreateTask(reference to function (task: ITaskControl)). [D2009 only, demo 21]
  • Implemented blocking wait (ReceiveWait). [demo 19]
  • Added enumerator to the IOmniTaskGroup interface.
  • Implemented IOmniTaskGroup.RegisterAllWithTask and .UnregisterAllFromTask.
  • Added automatic comm unregistration for IOmniTaskGroup.RegisterAllCommWith.
  • Implemented IOmniTaskGroup.SendToAll.
  • IOmniTaskControl.Terminate now kills the task after the timeout.
  • New/updated tests/demos:
    • 10_Containers
      • 2 -> 2, 1 -> 4 and 4 -> 4 tests for stacks and queues.
      • [1, 2, 4] -> [1, 2, 4] full tests.
      • Writes CSV file with cumulative test results.
    • 17_MsgWait: demo for the .MsgWait decorator.
    • 18_StringMsgDispatch: Invoke demo.
    • 19_StringMsgBenchmark: Invoke benchmark, ReceiveWait demo.
    • 20_QuickSort: Parallel quicksort demo.
    • 21_Anonymous_methods: Anonymous methods demo (D2009 only).
    • 22_TerminationTest: Task termination demo.
  • Message ID $FFFF is now reserved for internal purposes.
  • Better default queue length calculation that takes into account OtlContainers  overhead and FastMM4 granulation.
  • Bug fixed: TOmniValue.Null was not really initialized to Null.
  • Bug fixed: Setting timer interval resets timer countdown.
  • Bug fixed: TOmniTaskControl.Schedule always scheduled task to the global thread pool.
  • Current versions of 3rd part units included.

Known problems:

  • Thread pool is not stable under high  load.

Thursday, October 30, 2008

When one and one makes one

Some time ago I promised to write an article on TGpJoinedStream class. A time has come to fulfill the promise.

TGpJoinedStream is a stream wrapper class – a class that wraps one or more existing streams and provides a TStream interface on the outside. My GpStreams unit contains several examples of such wrappers: TGpStreamWindow provides a stream access to a part of another stream, TGpScatteredStream provides contiguous access to a scattered data (hm, I never wrote an article about that one, didn’t I?), and TGpBufferedStream adds data caching to any stream.

TGpJoinedStream’s role is simple. You pass it a bunch of streams and it provides stream access to concatenated data. IOW, if first stream contains numbers <1, 2, 3> and second stream contains <4, 5>, joined stream would function as if it contains <1, 2, 3, 4, 5>.

Implementation is pretty straightforward and won’t describe it here. I'll rather show how TGpJoinedStream is used in practice and why I wrote it at all.

We have a MPEG parser class that takes a TStream containing MPEG video data and extracts some information from it. It works fine when you need to extract information from a video file, but not so good if you want to use it inside a DirectX filter. From a viewpoint of a DirectX source or sink filter, video is not a stream but a bunch of buffers, which we must process one by one. The trouble is that buffers can almost never be fully processed because MPEG sequences don’t contain data length.

In MPEG, each sequence starts with byte signature 00 00 01, followed by a byte that tells you what kind of data you’re processing, followed by some data. Unless you want to parse each and every sequence data and you know exactly how all well-formed and malformed sequences in the existence are built, you only know that you reached the end of one sequence when you reach another 00 00 01 signature. That’s why the last sequence in the buffer can never be processed (unless this is the last buffer of them all) until we find 00 00 01 in the next buffer. Uff. I hope somebody understands this at all.

In short, we are processing data buffers. A variable-length part of each buffer will stay unprocessed and will have to be prepended to the next buffer before it is passed through the MPEG parser. And so on, until the end of data.

And that’s where TGpJoinedStream comes to help. The unprocessed part is stored in a memory stream FLeftovers, which is empty at the beginning. Buffer parser concatenates FLeftovers with the current data and passes the result through the stream parser. When that one returns, .Position in the combined stream indicates the first unprocessed byte. The unprocessed data is then copied into the FLeftovers stream so it can be used next time the buffer parser is called.

That’s how it looks in code (slightly simplified real code; I just removed some details that are not interesting for our story).

procedure TGpMPEGSequentialParser.ParseNext(buffer: pointer; 
bufferSize: integer);
var
combinedStream: TGpJoinedStream;
mpegParser : TGpMPEGParser;
newLeftovers : TMemoryStream;
strBuffer : TGpFixedMemoryStream;
begin
strBuffer := TGpFixedMemoryStream.Create(buffer^, bufferSize);
try
newLeftovers := TMemoryStream.Create;
try
combinedStream := TGpJoinedStream.Create([FLeftovers, strBuffer]);
try
mpegParser := TGpMPEGParser.Create;
try
mpegParser.MPEGStream := combinedStream;

mpegParser.Parse(HandleMpegSequence);

if combinedStream.Position < combinedStream.Size then
newLeftovers.CopyFrom(combinedStream,
combinedStream.Size - combinedStream.Position);
finally FreeAndNil(mpegParser); end;
finally FreeAndNil(combinedStream); end;
finally
FreeAndNil(FLeftovers);
FLeftovers := newLeftovers;
end;
finally FreeAndNil(strBuffer); end;
end;


Hope you like it!

Tuesday, October 21, 2008

Internet, as predicted in 1946

“You know the logics setup. You got a logic in your house. It looks like a vision receiver used to, only it's got keys instead of dials and you punch the keys for what you wanna get. It's hooked in to the tank, which has the Carson Circuit all fixed up with relays. Say you punch "Station SNAFU" on your logic. Relays in the tank take over an' whatever vision-program SNAFU is telecastin' comes on your logic's screen. Or you punch "Sally Hancock's Phone" an' the screen blinks an' sputters an' you're hooked up with the logic in her house an' if somebody answers you got a vision-phone connection. But besides that, if you punch for the weather forecast or who won today's race at Hialeah or who was mistress of the White House durin' Garfield's administration or what is PDQ and R sellin' for today, that comes on the screen too. The relays in the tank do it. The tank is a big buildin' full of all the facts in creation an' all the recorded telecasts that ever was made—an' it's hooked in with all the other tanks all over the country—an' everything you wanna know or see or hear, you punch for it an' you get it. Very convenient. Also it does math for you, an' keeps books, an' acts as consultin' chemist, physicist, astronomer, an' tea-leaf reader, with a "Advice to the Lovelorn" thrown in. The only thing it won't do is tell you exactly what your wife meant when she said, "Oh, you think so, do you?" in that peculiar kinda voice. Logics don't work good on women. Only on things that make sense.”

- Murray Leinster, A Logic Named Joe

For Sci-Fi fans: Baen Free Library.

Tuesday, October 14, 2008

TDM Rerun #11: Shared Pools

Sometimes we run into problems because of a Windows feature, and most of the time this is a very good feature, that a file mapping (an essential part of my shared memory implementation) cannot exist on its own. A file mapping, like mutexes, events, and other Windows primitives, must have an owner. If all processes associated with a given file mapping die, the file mapping will be destroyed. Because in my shared memory implementation this file mapping is backed with a page file, its contents won’t be preserved on an accessible part of the disk.

- Shared Pools, The Delphi Magazine 95, July 2003

Shared pool architectureThe shared pool was one of my more baroque creations. In fact, it was so complicated that it was never used in a deployed application. Basically, the article described an architecture to implement a pool of shared memory objects, which multiple Writers could use to send data to one Reader (typically sitting in another process). The system also handled cleanup when a Reader task died and other management details.

You really should not be playing with this code. There are better solutions.

Links: article (PDF, 193 KB), source code (ZIP, 1.9 MB)

Tuesday, October 07, 2008

OmniThreadLibrary: Using RTTI to call task methods

Yesterday I wrote an article on by-name and by-address invocations in the new OmniThreadLibrary (development version) but I didn’t finish the description of the OTL internal magic that makes those new calls work. Let’s fix that …

I ended the story right at the point where TOmniTaskExecutor.Asy_DispatchMessages calls DispatchOmniMessage.

Most of the magic happens inside this method, so it’s only fair to display it in its full glory.

procedure TOmniTaskExecutor.DispatchOmniMessage(msg: TOmniMessage);
var
methodAddr : pointer;
methodInfoObj : TObject;
methodInfo : TOmniInvokeInfo absolute methodInfoObj;
methodName : string;
methodSignature: TOmniInvokeType;
msgData : TOmniValue;
obj : TObject;
begin
if msg.MsgID = COtlReservedMsgID then begin
Assert(assigned(WorkerIntf));
GetMethodNameFromInternalMessage(msg, methodName, msgData);
if methodName = '' then
raise Exception.Create('TOmniTaskExecutor.DispatchOmniMessage: Method name not set');
if not assigned(oteMethodHash) then
oteMethodHash := TGpStringObjectHash.Create(17, true); //usually there won't be many methods
if not oteMethodHash.Find(methodName, methodInfoObj) then begin
GetMethodAddrAndSignature(methodName, methodAddr, methodSignature);
methodInfo := TOmniInvokeInfo.Create(methodAddr, methodSignature);
oteMethodHash.Add(methodName, methodInfo);
end;
case methodInfo.Signature of
itSelf:
TOmniInvokeSignature_Self(methodInfo.Address)(WorkerIntf.Implementor);
itSelfAndOmniValue:
TOmniInvokeSignature_Self_OmniValue(methodInfo.Address)(WorkerIntf.Implementor, msgData);
itSelfAndObject:
begin
obj := msgData.AsObject;
TOmniInvokeSignature_Self_Object(methodInfo.Address)(WorkerIntf.Implementor, obj);
end
else
RaiseInvalidSignature(methodName);
end; //case methodSignature
end
else
WorkerIntf.DispatchMessage(msg);
end; { TOmniTaskExecutor.DispatchMessage }

Now let’s dissect it. The code first checks if  the message ID contains the magic value. If not, the message is dispatched as before by using Delphi’s Dispatch.

  if msg.MsgID = COtlReservedMsgID then begin
//...
end
else
WorkerIntf.DispatchMessage(msg);
end; { TOmniTaskExecutor.DispatchMessage }

Not interesting. Let’s take a look at the other path – when msg.MsgID is an internal message ID.

In this case, DispatchMessages extracts the method name from the message. If the caller passed a method name to the Invoke, then the job is simple – it must only be extracted from the TOmniInternalAddressMsg object. If, on the other hand, method pointer was used, the code calls MethodName to convert it to a (who would guess?) method name.

procedure TOmniTaskExecutor.GetMethodNameFromInternalMessage(const msg: TOmniMessage; var
msgName: string; var msgData: TOmniValue);
var
internalType: TOmniInternalMessageType;
method : pointer;
begin
internalType := TOmniInternalMessage.InternalType(msg);
case internalType of
imtStringMsg:
TOmniInternalStringMsg.UnpackMessage(msg, msgName, msgData);
imtAddressMsg:
begin
TOmniInternalAddressMsg.UnpackMessage(msg, method, msgData);
msgName := WorkerIntf.Implementor.MethodName(method);
if msgName = '' then
raise Exception.CreateFmt('TOmniTaskExecutor.GetMethodNameFromInternalMessage: ' +
'Cannot find method name for method %p', [method]);
end
else
raise Exception.CreateFmt('TOmniTaskExecutor.GetMethodNameFromInternalMessage: ' +
'Internal message type %s is not supported',
[GetEnumName(TypeInfo(TOmniInternalMessageType), Ord(internalType))]);
end; //case internalType
end; { TOmniTaskExecutor.GetMethodNameFromInternalMessage }

Next, the code looks into an internal hash table and tries to fetch information on that method name.  If there’s no such information (when some method is called for the first time), GetMethodAddrAndSignature is called to convert the name to the method’s address and signature. Then this information is stored in the hash table so it will be immediately ready next time.

if not oteMethodHash.Find(methodName, methodInfoObj) then begin
GetMethodAddrAndSignature(methodName, methodAddr, methodSignature);
methodInfo := TOmniInvokeInfo.Create(methodAddr, methodSignature);
oteMethodHash.Add(methodName, methodInfo);
end;

Method signature is then used to select the type of call that will be performed. Only three signatures are supported: (Self: TObject), (Self: TObject; const msgData: TOmniValue) and (Self: TObject; var obj: TObject). Demo 18 demonstrates the use of all three. What I find especially convenient is that the third option allows you to put any TObject descendant in the parameter list. In other words, you can do:

type
TAsyncHello = class(TOmniWorker)
published
procedure TheAnswer(var sl: TStringList);
end;

procedure TfrmTestStringMsgDispatch.btnSendObjectClick(Sender: TObject);
var
sl: TStringList;
begin
sl := TStringList.Create;
sl.Text := '42';
FHelloTask.Invoke(@TAsyncHello.TheAnswer, sl);
end;

procedure TAsyncHello.TheAnswer(var sl: TStringList);
begin
FreeAndNil(sl);
end;

Convenient, huh?

RTTI

The above description of the DispatchOmniMessage looks very much like the famous Sydney Harris cartoon. In step one a method name is retrieved from the message. In step three the method is invoked using the right signature and method pointer. In step two, well …

Sydney Harris

Let’s be more explicit, then. We have a method name and we want to find the address for that method and some information about its parameters. Sounds like a job for RTTI, yes? Well, basic RTTI (the one that you enable with {$M+} or {$TYPEINFO ON} or simply by declaring a published property) only handles published properties, not methods. For methods, we need to use extended RTTI, which is enabled with {$METHODINFO ON}. I won’t describe it here as David Glassborow and Hallvard Vassbotn already did the job better than I could. If you’re interested in extended RTTI, I suggest that your research starts at Hallvard’s David Glassborow on extended RTTI article.

Great thanks for publishing all that info, guys, it helped a lot!

GetMethodAddrAndSignature first uses Delphi’s ObjAuto unit to extract method information header. Then it uses TObject’s MethodAddress to convert method name into address. Of course, it doesn’t use TObject directly, as it has no idea where the methods belonging to the task object are stored – it must call MethodAddress on your task object directly.

  methodInfoHeader := ObjAuto.GetMethodInfo(WorkerIntf.Implementor, methodName);
methodAddress := WorkerIntf.Implementor.MethodAddress(methodName);

After that, some sanity checks are run, which I won’t reprint here.

Then the method signature is checked. First we want to ensure that we’re dealing with a procedure, not a function.

  if assigned(methodInfoHeader.ReturnInfo.ReturnType) then
raise Exception.CreateFmt('TOmniTaskExecutor.DispatchMessage: ' +
'Method %s.%s must not return result',
[WorkerIntf.Implementor.ClassName, methodName]);

At the end, a messy fragment of code walks over the parameter description list for this method and checks that first parameter is a class reference (because this method is a part of a class, it has a hidden Self parameter at the beginning) and that the second parameter, if present at all, is either const data: TOmniValue or var obj: TObject (or descendant of TObject). Detected signature is then returned in the methodSignature parameter.

  // only limited subset of method signatures is allowed:
// (Self), (Self, const TOmniValue), (Self, var TObject)
headerEnd := cardinal(methodInfoHeader) + methodInfoHeader^.Len;
params := PParamInfo(cardinal(methodInfoHeader) + SizeOf(methodInfoHeader^)
- CShortLen + SizeOf(TReturnInfo) + Length(methodInfoHeader^.Name));
paramNum := 0;
methodSignature := itUnknown;
// Loop over the parameters
while cardinal(params) < headerEnd do begin
Inc(paramNum);
paramType := params.ParamType^;
if paramNum = 1 then
if (params^.Flags <> []) or (paramType^.Kind <> tkClass) then
RaiseInvalidSignature(methodName)
else
methodSignature := itSelf
else if paramNum = 2 then
//code says 'const' but GetMethodInfo says 'pfVar' :(
if (params^.Flags * [pfConst, pfVar] <> []) and (paramType^.Kind = tkRecord) and
(SameText(paramType^.Name, 'TOmniValue'))
then
methodSignature := itSelfAndOmniValue
else if (params^.Flags = [pfVar]) and (paramType^.Kind = tkClass) then
methodSignature := itSelfAndObject
else
RaiseInvalidSignature(methodName)
else
RaiseInvalidSignature(methodName);
params := params.NextParam;
end;

It looks messy and it is messy, but it does the job. If you have problems understanding this code, I’d recommend stepping over it with the debugger.

Benchmarks

In test 19 I implemented some benchmarking code to find out how much this approach is slower than the standard Comm.Send(msg, data). It turned out that not very much, thanks to the built-in caching.

image

Integer methods dispatching is about twice as fast as the string/pointer dispatching. That’s completely acceptable speed for something that is executed only few (thousand) times in the program’s lifetime. If your program model is based on sending millions and millions of do computation messages to the worker thread then it is doomed since the beginning anyway.

That would be enough for today, hope you liked it and stay well since the next time. Bye!

Monday, October 06, 2008

Erlangenizing the OmniThreadLibrary

Few days ago I “discovered” Erlang. [Such things happen when you read StackOverflow obsessively (sigh).] I started with Wikipedia and proceeded with the Pragmatic Programmer book – but that’s not really important. I wanted to talk about Erlang’s built-in concurrency support.

Some things are very similar to the OTL (if we take into account that Erlang is a functional language and Delphi is not) and some are different but one immediately jumped to my attention. In Erlang, the message recipient doesn’t use numeric code to discover what message it has received; instead of that, whole message is matched to a programmer provided pattern (or patterns). The interesting thing is that usually (by convention) the first element in the message is an atom (a name, a sequence of characters, if you want). That got me thinking … Why do we send integer messages in the OTL, anyway?

The Windows Way

From the very beginnings, OmniThreadLibrary tried to make programmer’s life simple. Well, at least simpler. One of the helping hands it offered was was simplified message processing. In the traditional Windows thread programming you have to wait for various objects to become signaled with WaitForMultipleObjects and then proceed accordingly. In practice that means that thread’s main logic is centralized in one very big method that handles all those events, mutexes and other stuff that can be waited upon.

OTL helps by implementing this logic internally (at least for workers that implement IOmniWorker interface). Instead of using kernel primitives, task owner sends messages to the task’s message queue. Messages are processed somewhere inside the OTL (specifically OtlTaskControl.pas/TOmniTaskExecutor.Asy_DispatchMessages) and are converted into method calls with Delphi’s Dispacth mechanism. That’s the same mechanism that makes sure that Windows messages are “converted” into Delphi methods and it requires that first two bytes of the dispatched message contain message ID. That’s why (until now) the recommended way to send a message to task was:

const
MSG_CHANGE_MESSAGE = 1;
MSG_SEND_MESSAGE = 2;

type
TAsyncHello = class(TOmniWorker)
strict private
aiMessage: string;
public
function Initialize: boolean; override;
procedure OMChangeMessage(var msg: TOmniMessage); message MSG_CHANGE_MESSAGE;
procedure OMSendMessage(var msg: TOmniMessage); message MSG_SEND_MESSAGE;
end;

FHelloTask: IOmniTaskControl;

FHelloTask.Comm.Send(MSG_CHANGE_MESSAGE, 'Random ' + IntToStr(Random(1234)));

[You can read more about this approach in OmniThreadLibrary Example #4: Bidirectional communication, the OTL way.]

The Erlang Way

This approach simplifies writing threaded code – at least the one that doesn’t depend heavily on shared data structures. But there’s still some room for improvement. For example, do we really have to use numeric messages, which have to be declared in advance. Why couldn’t the task controller just tell the task to execute the OMChangeMessage method?

To cut the long story short – this is now possible. Yesterday I committed a set of OTL modifications that allow you to do this:

type
TAsyncHello = class(TOmniWorker)
published
procedure Change(const data: TOmniValue);
end;

FHelloTask: IOmniTaskControl;

FHelloTask.Invoke('Change', 'Random ' + IntToStr(Random(1234)));

Yes, the Change method is published here. This is important.

Simple, huh? There’s a small problem, though – there are no compile-time checks. The code sends a string and compiler can do nothing to verify validity of this string. If the name was mistyped, you’d only notice it during the program execution.

To fix this problem, OTL allows another form of method invocation which uses a method address instead of  the name.

FHelloTask.Invoke(@TAsyncHello.Change, 'Random ' + IntToStr(Random(1234)));

In this case the compiler can check your typing, but still it won’t catch all problems – for example, the following code will compile and then raise exception during the execution.

procedure TfrmTestStringMsgDispatch.btnTestInvalidMsgClick(Sender: TObject);
begin
if cbStringMessages.Checked then
// will fail, FooBar method is not defined
FHelloTask.Invoke('FooBar')
else
// will fail, can only invoke methods from the task's class
FHelloTask.Invoke(@Self.btnTestInvalidMsg);
end;

[All new functionality is exposed in new demo 18_StringMsgDispatch.]

Implementation

To understand how the Invoke is implemented, it’s best to trace one such call. First we see that Invoke  gets converted into a normal message.

procedure TOmniTaskControl.Invoke(const msgMethod: pointer; msgData: TOmniValue);
begin
Comm.Send(TOmniInternalAddressMsg.CreateMessage(msgMethod, msgData));
end; { TOmniTaskControl.Invoke }

class function TOmniInternalAddressMsg.CreateMessage(const msgMethod: pointer; msgData:
TOmniValue): TOmniMessage;
begin
Result := TOmniMessage.Create(COtlReservedMsgID,
TOmniInternalAddressMsg.Create(msgMethod, msgData));
end; { TOmniInternalAddressMsg.CreateMessage }

This message has message ID COtlReservedMsgID (which is equal to $FFFF, so from now on please don’t use this message for you purposes). Message data field contains object which wraps method name and message data that was passed to the Invoke. Similar code is executed when Invoke is called with the method pointer parameter.

OK, so that’s how method name travels from the task controller to the task itself by using standard communication channel. But that is only half of the story … the simpler part!

On the receiving side, TOmniTaskExecutor.Asy_DispatchMessages detects new message and calls DispatchOmniMessage to process it.

if awaited = idxFirstMessage then
gotMsg := task.Comm.Receive(msg)
else begin
oteInternalLock.Acquire;
try
gotMsg := (oteCommList[awaited - idxFirstMessage - 1] as
IOmniCommunicationEndpoint).Receive(msg);
finally oteInternalLock.Release; end;
end;
if gotMsg and assigned(WorkerIntf) then
DispatchOmniMessage(msg);

Now that’s where the things start to get really interesting as we have to use RTTI and even extended RTTI to call the appropriate method. It is also the point where I’ll cut the story short. This article is already very long, maybe even too long, and I have much more to say on the subject. Expect part II to be published in few days. [Of course, if you’re curious you can just look into the code to see how DispatchOmniMessage is implemented!]

Test it!

The newest OmniThreadLibrary code is only available in the repository. No snapshots this time.

I’d still be immensely grateful to anybody that will test the new functionality and provide me with his thoughts on this approach.

Friday, October 03, 2008

Bulk update

Delphi 2009 was released so now it’s time to publish updates to my various units … In this article I’m just publishing the short changelog; you can expect more details on some enhancements (TGpJoinedStream, 4- and 8- aligned integer, Unicode support in GpStructuredStorage) in the near future.

DSiWin32 1.41

  • Compatible with Delphi 2009.
  • Created DSiInterlocked*64 family of functions by copying the code from http://qc.borland.com/wc/qcmain.aspx?d=6212. Functions were written by Will DeWitt Jr and are included with permission.
  • New functions DSiCopyFileAnimated, DSiConnectToNetworkResource, DSiIsHtmlFormatOnClipboard, DSiGetHtmlFormatFromClipboard, DSiCopyHtmlFormatToClipboard, DSiYield.
  • Added constants FILE_LIST_DIRECTORY, FILE_SHARE_FULL, FILE_ACTION_ADDED, FILE_ACTION_REMOVED, FILE_ACTION_MODIFIED, FILE_ACTION_RENAMED_OLD_NAME, FILE_ACTION_RENAMED_NEW_NAME.
  • Forced {$T-} as the code doesn't compile in {$T+} state.
  • Bug fixed: It was not possible to use DSiTimeGetTime64 in parallel from multiple threads

GpHugeFile 5.05a

  • Optimization: Under some circumstances, lots of unnecessary SetFilePointer calls were made.

GpLists 1.41

  • Works with Delphi 2009.

GpStreams 1.25

  • Added TGpJoinedStream class.
  • Span-storing class can now be modified via TGpScatteredStream.SpanClass.
  • TGpScatteredStream's AddSpan and AddSpanOS now return span offset in the span list.
  • Added bunch of BE_ overloads to the TGpStreamEnhancer class.
  • Small optimization in KeepStreamPositionWrapper destructor.

GpStructuredStorage 2.0

  • Works with Delphi 2009.
  • Added Unicode support to the underlying storage. File and folder names are stored in UTF-16, as are attribute names and values. API is still based on Delphi's 'string' type - meaning that values are converted to Unicode and back on the fly using the current locale.
  • Existing structured storage files will be upgraded automatically if they are not open for readonly access. Applications, compiled with GpStructuredStorage 1.x will not be able to read new/upgraded files. Version is incremented to 2.0.0.0 when 1.x file is opened unless it is opened in readonly mode. Newly created storages have version 2.0.0.0.

GpStuff 1.13

  • Implemented 4-aligned integer, TGp4AlignedInt.
  • Implemented 8-aligned integer, TGp8AlignedInt.
  • Added function OpenArrayToVarArray, written by Thomas Schubbauer.
  • ReverseWord/ReverseDWord rewritten in assembler (by GJ).
  • Declared MaxInt64 constant.

GpSync 1.21

  • Added optional external message counter to the message queue.

GpTextFile 4.01

  • Added TGpTextFile.Write(ws: WideString) and TGpTextFile.Writeln(s: string) overloads.
  • TGpTextFile.Write[ln] string parameters made 'const'.
  • Bug fixed [found by AKi]: TGpTextFile.Write was not working when using CP_UTF8 codepage.

GpTextStream 1.06

  • Works with Delphi 2009.
  • Exported StringToWideString, WideStringToString, and GetDefaultAnsiCodepage.

GpVersion 2.02

  • Added another CreateVersion overload.
  • Extended IVersion interface with IsHigherThan and IsLowerThan.