Sunday, July 27, 2008

OmniThreadLibrary alpha

WiP OmniThreadLibrary is slowly progressing to the alpha stage. Most of the interfaces have been defined and most of the functionality that I want to see in the 1.0 release is present and tested. I have not yet decided what to do with the messaging (current Variant-based solution can be rough on the edges sometimes) but rest of the stuff is mostly completed.

Most important changes since the last release:

  • First of all, big thanks to fellow Slovenian programmer Lee_Nover (the guy who wrote the SpinLock unit, BTW) who did a major cleanup job of the OTL code:
    • Added const prefix to parameters on many places (mostly when interfaces were passed - I tend to miss that). OTL should be slightly faster because of that.
    • Removed CreateTask(TOmniWorker) as CreateTask(IOmniWorker) is good enough for all occasions.
    • Cleaned the tests\* tree and fixed tests to compile with new Otl* units.
    • Removed IOmniTaskControl.FreeOnTerminate as it's no longer needed.
  • Created project group containing all test projects. To make it work, all projects in the tests tree were renamed.
  • *** IMPORTANT *** OtlTaskEvents unit was renamed to OtlEventMonitor and TOtlTaskEventDispatcher component was renamed to TOmniEventMonitor. Recompile the package!
  • *** IMPORTANT *** If you're working with a snapshot, you should delete the OmniThreadLibrary folder before unpacking new version as many files were renamed since the last snapshot!
  • Task UniqueID is now int64 (was integer). Somehow I wasn't happy with the fact that unique ID would roll over after merely 4,3 billion tasks ...
  • Thread priority can be controlled with IOmniTaskControl.SetPriority.
  • Exceptions are trapped and mapped into EXIT_EXCEPTION predefined exit code (OtlCommon.pas). Test in 13_Exceptions.
  • IOmniTaskControl.WithLock provides a simple way to share one lock (critical section, spinlock) between task owner and task worker. Lock can be accessed via the Lock property, implemented in IOmniTaskControl and IOmniTask interfaces. See the 12_Lock test for the example.
  • Eye candy - instead of calling omniTaskEventDispatch.Monitor(task) you can now use task.MonitorWith(omniTaskEventDispatch). The old way is still available. An example from the (new) 8_RegisterComm test:
procedure TfrmTestOtlComm.FormCreate(Sender: TObject);
begin
FCommChannel := CreateTwoWayChannel(1024);
FClient1 := CreateTask(TCommTester.Create(FCommChannel.Endpoint1, 1024))
.MonitorWith(OmniTED)
.Run;
FClient2 := CreateTask(TCommTester.Create(FCommChannel.Endpoint2, 1024))
.MonitorWith(OmniTED)
.Run;
end;
  • IOmniTaskControl.TerminateWhen enables you to add additional termination event to the task. One event can be shared between multiple tasks, allowing you to stop them all at once. Example in the 14_TerminateWhen.
  • Basic support for task groups has been added. At the moment you can start/stop multiple tasks and not much more. To add a task to a group, use group.Add(task) or task.Join(group) syntax. Demo in 15_TaskGroup.
IOmniTaskGroup = interface ['{B36C08B4-0F71-422C-8613-63C4D04676B7}']
function Remove(taskControl: IOmniTaskControl): IOmniTaskGroup;
function Add(taskControl: IOmniTaskControl): IOmniTaskGroup;
function RunAll: IOmniTaskGroup;
function TerminateAll(maxWait_ms: cardinal = INFINITE): boolean;
function WaitForAll(maxWait_ms: cardinal = INFINITE): boolean;
end; { IOmniTaskGroup }
  • Added support for task chaining. When one task is completed, next task is started. An excerpt from the 16_ChainTo demo:
procedure TfrmTestTaskGroup.btnStartTasksClick(Sender: TObject);
var
task1: IOmniTaskControl;
task2: IOmniTaskControl;
task3: IOmniTaskControl;
taskA: IOmniTaskControl;
begin
lbLog.Clear;
task3 := CreateTask(BgTask, '3').MonitorWith(OmniTED);
task2 := CreateTask(BgTask, '2').MonitorWith(OmniTED).ChainTo(task3);
task1 := CreateTask(BgTask, '1').MonitorWith(OmniTED).ChainTo(task2);
taskA := CreateTask(BgTask, 'A').MonitorWith(OmniTED).ChainTo(
CreateTask(BgTask, 'B').MonitorWith(OmniTED).ChainTo(
CreateTask(BgTask, 'C').MonitorWith(OmniTED)));
task1.Run; taskA.Run;
end;
  • A thread pool has been implemented.  Demo in 11_ThreadPool. I'll write a longer article on using it, but for now the demo should do.

As usual, code is available in the repository and as a snapshot.

OmniThreadLibrary internals - OtlContainers

WiP

The time has come to bring the OtlContainers subthread to the end. In previous parts (1, 2, 3, 4) I obsessively described underlying lock-free structures but hadn't said a word about the higher-level classes that are actually used for message transfer inside the OmniThreadLibrary.

Let's take a quick look at all OtlContainers classes. TOmniBaseContainer is an abstract parent of the whole container hierarchy. It's basic function is to provide memory allocation and lock-free algorithms for all descendants. Most of the code I already described in previous installments and the rest is trivial.

  TOmniBaseContainer = class abstract(TInterfacedObject)
strict protected
obcBuffer : pointer;
obcElementSize : integer;
obcNumElements : integer;
obcPublicChain : TOmniHeadAndSpin;
obcRecycleChain: TOmniHeadAndSpin;
class function InvertOrder(chainHead: POmniLinkedData): POmniLinkedData; static;
class function PopLink(var chain: TOmniHeadAndSpin): POmniLinkedData; static;
class procedure PushLink(const link: POmniLinkedData; var chain: TOmniHeadAndSpin); static;
class function UnlinkAll(var chain: TOmniHeadAndSpin): POmniLinkedData; static;
public
destructor Destroy; override;
procedure Empty; virtual;
procedure Initialize(numElements, elementSize: integer); virtual;
function IsEmpty: boolean; virtual;
function IsFull: boolean; virtual;
property ElementSize: integer read obcElementSize;
property NumElements: integer read obcNumElements;
end; { TOmniBaseContainer }

Then there is the base stack implementation, which only adds Push and Pop to the base container. We've seen those two already.

  TOmniBaseStack = class(TOmniBaseContainer)
public
function Pop(var value): boolean; virtual;
function Push(const value): boolean; virtual;
end; { TOmniBaseStack }

The queue is only slightly more complex. It provides Enqueue and Dequeue, which we already know, and overrides Empty and IsEmpty to take the dequeued messages chain into account.

  TOmniBaseQueue = class(TOmniBaseContainer)
strict protected
obqDequeuedMessages: TOmniHeadAndSpin;
public
constructor Create;
function Dequeue(var value): boolean; virtual;
procedure Empty; override;
function Enqueue(const value): boolean; virtual;
function IsEmpty: boolean; override;
end; { TOmniBaseQueue }

Now we come to the interesting part. Basic stack and queue operations are also described with interfaces IOmniStack and IOmniQueue - just in case you'd like to use them in your programs. OTL uses class representation of the queue directly.

  IOmniStack = interface ['{F4C57327-18A0-44D6-B95D-2D51A0EF32B4}']
procedure Empty;
procedure Initialize(numElements, elementSize: integer);
function Pop(var value): boolean;
function Push(const value): boolean;
function IsEmpty: boolean;
function IsFull: boolean;
end; { IOmniStack }

IOmniQueue = interface ['{AE6454A2-CDB4-43EE-9F1B-5A7307593EE9}']
procedure Empty;
procedure Initialize(numElements, elementSize: integer);
function Enqueue(const value): boolean;
function Dequeue(var value): boolean;
function IsEmpty: boolean;
function IsFull: boolean;
end; { IOmniQueue }

Actual implementation of the higher-level structures is exposed with classes TOmniStack and TOmniQueue. Besides implementing IOmniStack and IOmniQueue (respectively), they both implement IOmniNotifySupport and IOmniMonitorSupport.

  TOmniStack = class(TOmniBaseStack, IOmniStack, IOmniNotifySupport, IOmniMonitorSupport)
TOmniQueue = class(TOmniBaseQueue, IOmniQueue, IOmniNotifySupport, IOmniMonitorSupport)

IOmniMonitorSupport is describing a container with monitoring support (notifies attached monitor whenever new message is sent). IOmniNotifySupport is describing a container that notifies interested parties (by setting an event) that new message has been sent. Both are used in the OTL - notification support is needed to implement IOmniCommunicationEndpoint.NewMessageEvent in the OtlComm and monitoring support is needed to support IOmniTaskControl.MonitorWith in the OtlTaskControl unit.

  IOmniMonitorSupport = interface ['{6D5F1191-9E4A-4DD5-99D8-694C95B0DE90}']
function GetMonitor: IOmniMonitorParams;
//
procedure Notify;
procedure RemoveMonitor;
procedure SetMonitor(monitor: IOmniMonitorParams);
property Monitor: IOmniMonitorParams read GetMonitor;
end; { IOmniMonitorSupport }

IOmniNotifySupport = interface ['{E5FFC739-669A-4931-B0DC-C5005A94A08B}']
function GetNewDataEvent: THandle;
//
procedure Signal;
property NewDataEvent: THandle read GetNewDataEvent;
end; { IOmniNotifySupport }

The following excerpt from the TOmniQueue code demonstrates the implementation of those interfaces. TOmniStack is implemented in a similar manner.

The constructor takes the options parameter when you can enable monitoring and/or notification support.

type
TOmniContainerOption = (coEnableMonitor, coEnableNotify);
TOmniContainerOptions = set of TOmniContainerOption;

constructor TOmniQueue.Create(numElements, elementSize: integer;
options: TOmniContainerOptions);
begin
inherited Create;
Initialize(numElements, elementSize);
orbOptions := options;
if coEnableMonitor in Options then
orbMonitorSupport := TOmniMonitorSupport.Create;
if coEnableNotify in Options then
orbNotifySupport := TOmniNotifySupport.Create;
end; { TOmniQueue.Create }

function TOmniQueue.Dequeue(var value): boolean;
begin
Result := inherited Dequeue(value);
if Result then
if coEnableNotify in Options then
orbNotifySupport.Signal;
end; { TOmniQueue.Dequeue }

function TOmniQueue.Enqueue(const value): boolean;
begin
Result := inherited Enqueue(value);
if Result then begin
if coEnableNotify in Options then
orbNotifySupport.Signal;
if coEnableMonitor in Options then
orbMonitorSupport.Notify;
end;
end; { TOmniQueue.Enqueue }

Enqueue calls original enqueueing code and then signals notification (if enabled) and notifies the monitor (if enabled). Dequeue is similar, except that it only triggers notification so that the reader rechecks the input queue.

Stop. Enough words have been said about the OtlContainer. Next time, I'll discuss something completely different.

Thursday, July 24, 2008

A lock-free queue, finally!

WiPIt was a long and windy road (1, 2, 3) but finally it brought us to the really interesting part - lock-free queue. Admit it, stack is fine but it is not really suitable for sending data from point A to point B. When forwarding data inside the application we typically want it to arrive in the same order as it was transmitted and that's what queues are for.

Still, there was a good reason for that lengthy introduction. The lock-free stack we all know and love by now ;) is basis for the lock-free queue. Even more, most of the stack code is reused and queue's Enqueue/Dequeue methods are incredibly similar to stack's Push/Pop.

Let's take a look at the method that inserts new element at the end of the queue. The Enqueue code is not just similar to Push. They are identical. [Compare: Push]

function TOmniBaseQueue.Enqueue(const value): boolean;
var
linkedData: POmniLinkedData;
begin
linkedData := PopLink(obcRecycleChain);
Result := assigned(linkedData);
if not Result then
Exit;
Move(value, linkedData.Data, ElementSize);
PushLink(linkedData, obcPublicChain);
end; { TOmniQueue.Enqueue }

OK, so enqueueing an item is identical to pushing it. How do we remove items from the head of the queue, then? Dequeue must return not the top element of the stack but the bottom one - that's the element that was enqueued (pushed) first.

We could traverse the stack and somehow remove the last element but removing elements from a lock-free structure is very problematic, as you've already seen. [Where was the problem in TOmniBaseStack? In PopLink. There you go!] Removing elements from the queue is even harder (if not impossible) so let's try another approach.

function TOmniBaseQueue.Dequeue(var value): boolean;
var
linkedData: POmniLinkedData;
begin
if obqDequeuedMessages.Head = nil then
obqDequeuedMessages.Head := InvertOrder(UnlinkAll(obcPublicChain));
linkedData := PopLink(obqDequeuedMessages);
Result := assigned(linkedData);
if not Result then
Exit;
Move(linkedData.Data, value, ElementSize);
PushLink(linkedData, obcRecycleChain);
end; { TOmniQueue.Dequeue }

If you compare it with the Pop method, you'll see that they are very similar. Dequeue does some magic in first two lines, uses a different chain header in the PopLink, but from that point onward they are identical.

function TOmniBaseStack.Pop(var value): boolean;
var
linkedData: POmniLinkedData;
begin
linkedData := PopLink(obcPublicChain);
Result := assigned(linkedData);
if not Result then
Exit;
Move(linkedData.Data, value, ElementSize);
PushLink(linkedData, obcRecycleChain);
end; { TOmniBaseStack.Pop }

The big question is, what happens in InvertOrder(UnlinkAll(obcPublicChain)) to make the old Pop code work as a queue? Well, let's draw some pretty pictures. I just luuuv pretty pictures.

Let's say we have a queue with two elements. Element 1 was enqueued first, followed by element 2. Our queue behaves like a stack when enqueueing so it's no wonder that I could just copy old picture from the stack introduction article to present this state.

queue with two elements

We have two empty nodes in the recycle chain and two allocated nodes in the public chain. Public chain header points to element 2, which points to element 1, which points to nil.

Dequeue is called next and it executes UnlinkAll. This method merely removes all nodes from the chain that was passed as an argument. In our case, all nodes are removed from the public chain. This is done atomically. [How? I'll show you later.]

after UnlinkAll

Now we have public chain header that points to an empty chain (nil, in other words) and unnamed pointer that points to the top element in the stack. This unnamed pointer is returned as the UnlinkAll result.

InvertOrder is called next. It walks over the chain that was passed as an argument and reverses each and every pointer. This is done in a simple, non-atomical way, as the chain was already removed from the shared storage so concurrency is not a problem.

after InvertOrder

InvertOrder returns a pointer to the (new) chain head. In our case, this pointer points to node 1, which points to node 2, which points to nil. Result of the InvertOrder call is stored into the class field obqDequeuedMessages (also known as dequeue chain header).

The rest of the Dequeue method simply treats obqDeqeuedMessages as a stack chain and pops top element from it. As the chain was reversed, the top element is the one we need - the one that was enqueued first. The atomic version of PopLink is used for simplicity, but a simpler code could be used instead as there could be no inter-thread conflicts.

after Dequeue

On the return from the Dequeue, dequeued chain points to node 2, recycle chain points to node 1 (its data was copied from the node to the Dequeue's value parameters and node was recycled) and public chain is nil.

Let's assume that a writer enqueues data 3 into the queue. As we've already seen, this is a normal Push operation and we already know how it works.

Enqueue after Dequeue

Dequeued chain still points to node 2, public chain points to former node 1 (which now holds the value 3) and recycle chain points to the first empty node.

Reader now executes Dequeue again. As the dequeued chain is not nil, UnlinkAll and InvertAll are not called. A node is simply popped from the dequeued chain.

after second Dequeue

Dequeued chain is now nil. Next call to Dequeue will call UnlinkAll to remove public chain, InvertOrder to invert it, result will be stored in the dequeued chain and dequeue operation will continue in an already known fashion.

There is one important consequence of this approach. The lock-free queue (at least our implementation) can only have one reader. The problem lies in this non-atomic fragment:

  if obqDequeuedMessages.Head = nil then
obqDequeuedMessages.Head := InvertOrder(UnlinkAll(obcPublicChain));

If there were two readers, following could happen:

  • writer pushes one element into the queue
  • reader 1 starts the Dequeue, sees that Head is nil and is stopped just before it calls UnlinkAll
  • reader 2 starts the Dequeue, sees that Head is nil and proceeds with InvertOrder(UnlinkAll())
  • reader 2 updates the Head field of the dequeued chain
  • reader 1 continues its execution and passes empty public chain into UnlinkAll, which passes nil to InvertOrder, which returns nil and stores it into the Head parameter
  • the element that was dequeued by the reader 2 is lost

Multiple writers are supported, though, just like in the lock-free stack.

Let's take a quick look at the assembler code. UnlinkAll  is quite simple. First it clears the ecx register (sets up the nil pointer), loads chain.Head into eax and atomically swaps chain.Head with ecx (i.e., with nil). Old chain.Head is returned in the Result (eax), which is exactly what we want.

class function TOmniBaseContainer.UnlinkAll(var chain: TOmniHeadAndSpin): POmniLinkedData;
asm
xor ecx, ecx
mov edx, eax
mov eax, [edx]
@Spin:
lock cmpxchg [edx], ecx //Cut Chain.Head
jnz @Spin
end; { TOmniQueue.UnlinkAll }

InvertOrder is slightly longer but much simpler as there are no lock operations involved. Code merely walks over the pointer chain and inverts it.

class function TOmniBaseContainer.InvertOrder(chainHead: POmniLinkedData): POmniLinkedData;
asm
test eax, eax
jz @Exit
xor ecx, ecx
@Walk:
xchg [eax], ecx //Turn links
and ecx, ecx
jz @Exit
xchg [ecx], eax
and eax, eax
jnz @Walk
mov eax, ecx
@Exit:
end; { TOmniBaseStack.InvertOrder }

That's all folks. You now know how both lock-free OTL structures work and what are their limitations. Now I only have one more article on OtlContainers to write, one that will explain higher-level TOmniStack and TOmniQueue.

Wednesday, July 23, 2008

A working lock-free stack implementation

WiP

This is the third post in the mini-series on a lock-free stack, included in the OmniThreadLibrary. You've learned how the lock-free stack was implemented in the first place and which problem was hidden in that implementation. I've also mentioned that the problem was fixed in the meantime and that the working implementation is already available in the repository and as a snapshot, but I haven't said a word about the new implementation. This will now be rectified.

In the initial implementation lied an excellent example of the ABA problem.  You can read more about it in the Wikipedia, where you'll also learn that ABA problem is commonly solved by adding ‘tag’ bits. That's exactly what was done in the OTL, except that we didn't use just few measly bits, but a 4-byte counter.

Instead of containing only a pointer to the first node, each chain head is now composed of two parts — a pointer to the first node and a 4-byte spin count. This second part will make sure that the ABA problem cannot occur.

type
TOmniHeadAndSpin = packed record
Head: POmniLinkedData;
Spin: cardinal;
end;{ TOmniHeadAndSpin }

As the problem lied in the PopLink method, we'll start today's exploration just there. This is the new version in all its glory. (Of course, assembler is still provided by the ‘GJ’, I had nothing to do with it.)

class function TOmniBaseContainer.PopLink(var chain: TOmniHeadAndSpin): POmniLinkedData;
asm
push edi
push ebx
mov edi, eax //edi = @chain
@Spin:
mov ecx, 1 //Increment spin reference for 1
lock xadd [edi + 4], ecx //Get old spin reference to ecx
mov eax, [edi] //eax := chain.Head
mov edx, [edi +4] //edx := chain.Spin
test eax, eax
jz @Exit //Is Empty?
inc ecx //Now we are ready to cmpxchg8b
cmp edx, ecx //Is reference the some?
jnz @Spin
mov ebx, [eax] //ebx := Result.Next
lock cmpxchg8b [edi] //Now try to xchg
jnz @Spin //Do spin ???
@Exit:
pop ebx
pop edi
end;

You may have noticed that this is now a class function (a class static function, to be more precise). That made more sense as it needs no information from the class — it only works on a chain, which is passed as a parameter. Also, the code is much longer and not very easy to understand, so I'll try the trick that helped in the previous installment. I'll write an approximation of the PopLink in Delphi.

class function TOmniBaseContainer.PopLink(var chain: TOmniHeadAndSpin): POmniLinkedData;
label
Spin;
var
chainSpin: cardinal;
nextNode : POmniLinkedData;
spinRef : cardinal;
begin
// push edi
// push ebx
// mov edi, eax //edi = @chain
Spin:
// mov ecx, 1 //Increment spin reference for 1
// lock xadd [edi + 4], ecx //Get old spin reference to ecx
{.$ATOMIC}
spinRef := chain.Spin;
chain.Spin := chain.Spin + 1;
{.$END ATOMIC}
// mov eax, [edi] //eax := chain.Head
// mov edx, [edi +4] //edx := chain.Spin
{.$ATOMIC}
Result := chain.Head;
chainSpin := chain.Spin;
// test eax, eax
// jz @Exit //Is Empty?
if not assigned(Result) then
Exit;
// inc ecx //Now we are ready to cmpxchg8b
Inc(spinRef);
// cmp edx, ecx //Is reference the some?
// jnz @Spin
if spinRef <> chainSpin then
goto Spin;
// mov ebx, [eax] //ebx := Result.Next
// nextNode = ebx
nextNode := Result.Next;
// lock cmpxchg8b [edi] //Now try to xchg
{.$ATOMIC}
if (Result = chain.Head) and (chainSpin = chain.Spin) then begin
chain.Head := nextNode;
chain.Spin := spinRef;
end
else begin
chainSpin := chain.Spin;
Result := chain.Head;
{.$END ATOMIC}
// jnz @Spin //Do spin ???
goto Spin;
end;
//@Exit:
// pop ebx
// pop edi
end;

In the beginning, a little housekeeping is done (two registers stored away because Delphi compiler expects us not to modify them) and address of the chain record is loaded into the edi register.

With a little help of locked xadd, current version of the header spin value is loaded into the spinRef (ecx register) and is incremented in the header.

//  mov   ecx, 1                            //Increment spin reference for 1
// lock xadd [edi + 4], ecx //Get old spin reference to ecx
{.$ATOMIC}
spinRef := chain.Spin;
chain.Spin := chain.Spin + 1;
{.$END ATOMIC}

Xadd instructions stores first operand ([edi + 4], which equals to chain.Spin) into the second operand (ecx) and sum of both operands (original value of the second operand is used for summation) into the first operand.

Then we fetch current chain.Head and chain.Spin and store them into the Result (eax) and chainSpin (edx), respectively. This is a non-atomic fetch and doesn't guarantee that both values really belong to the same state of the chain record. Another thread may have been invoked between those two movs and it could call PopLink on the same chain and modify the header.

//  mov   eax, [edi]                        //eax := chain.Head
// mov edx, [edi +4] //edx := chain.Spin
Result := chain.Head;
chainSpin := chain.Spin;
// test eax, eax
// jz @Exit //Is Empty?
if not assigned(Result) then
Exit;

Because of the same reason there is no guarantee  that chainSpin is indeed equal to spinRef+1 so this is now checked for.

//  inc   ecx                               //Now we are ready to cmpxchg8b
Inc(spinRef);
// cmp edx, ecx //Is reference the some?
// jnz @Spin
if spinRef <> chainSpin then
goto Spin;

Now we're getting ready for the pointer swap. First we need an address of the second node in the ebx register.

//  mov   ebx, [eax]                        //ebx := Result.Next
nextNode := Result.Next;

Finally, we can use cmpxchg8b instruction to do the swap. Cmpxchg8b is a very special beast. It is similar to the cmpxchg, but it compares and swaps full 8 bytes in one go!

//  lock cmpxchg8b [edi]                    //Now try to xchg
{.$ATOMIC}
if (Result = chain.Head) and (chainSpin = chain.Spin) then begin
chain.Head := nextNode;
chain.Spin := spinRef;
end
else begin
chainSpin := chain.Spin;
Result := chain.Head;
{.$END ATOMIC}
// jnz @Spin //Do spin ???
goto Spin;
end;

Cmpxchg8b compares combination of edx and eax registers with its operand [edi]. In our case, edi is pointing to the chain (8 byte value in which first 4 bytes are a node pointer and second 4 a spin count), and eax and edx were loaded from the same structure just few instructions back. In other words, just like in the initial implementation we're checking if our internal values are still relevant. If that is not true, cmpxchg8b reloads edx and eax from the [edi] and the code then jumps back to the Spin label.

If values match, registers ecx and ebx are copied into the operand [edi]. Here, value in the ecx register is equal to the edx (we just verified that) and ebx contains the pointer to the next node. In other words, chain head is modified to point to the second node in chain.

There's not much difference between this implementation and the original one. The biggest and most important change is introduction of spin counter, which makes sure that the ABA problem, mentioned in the previous post, cannot occur.

[To be completely frank, ABA situation can still occur, at least in theory. To experience a problem with the new code, one thread would have to stop just before cmpxchg8b and wait there for a very long time. So long, in fact, that other threads would be able to wrap the spin counter from $FFFFFFFF to 0 and back to the original value. A little less than 4,3 billion PushLinks would have to be called. If the original thread is then awaken — and the spin counter contains the exact same value as it had before the thread was paused — the ABA problem would occur. That will never happen in practice, I'm totally sure.]

Besides the PopLink and introduction of spin counters, nothing much has changed. The PushLink method was slightly modified, as it was also changed from a normal method to a class static one, but it still works exactly as before.

class procedure TOmniBaseContainer.PushLink(const link: POmniLinkedData;
var chain: TOmniHeadAndSpin);
asm
mov ecx, eax
mov eax, [edx] //edx = chain.Head
@Spin:
mov [ecx], eax //link.Next := chain.Head
lock cmpxchg [edx], ecx //chain.Head := link
jnz @Spin
end; { TOmniBaseStack.PushLink }

That's it. This version of the lock-free stack has successfully passed 4-hour stress test, which makes me believe that it really works. Winking

Next time on the agenda: How the lock-free queue is made. This time I'll really blog about it. Nothing more will distract me. Promise!

Monday, July 21, 2008

TDM Rerun #10: Synchronisation Toolkit Revisited

The plan for this article was to present an implementation of a shared memory pool: a mechanism that allows multiple data producers to send data to one data manipulator. In fact, I already had all the code and half the article completed when I found out that my solution doesn't always work. Everything was OK when both ends of the pool (producer and manipulator) were implemented in the same application or in two 'normal' applications. But if I tried to change one of those apps into an NT service, the shared pool stopped working.

- Synchronisation Toolkit Revisited, The Delphi Magazine 91, March 2003

My 10th TDM article returned to my pet theme - synchronisation and communication. First part described some aspects of the NT security model and second implemented shared memory-based queue, capable of transporting messages from a GUI application to a service and back.

It looks like I'll have to return to that topic again - I got some reports recently that my approach doesn't (always?) work on Vista :(

Links: article (PDF, 54KB), source code (ZIP, 2.2 MB), current GpSync unit

Lock-free stack problem

WiPIn my last treatise on OmniThreadLibrary internals you could read about the lock-free stack structure, included in the OmniThreadLibrary. The code was relatively simple to understand (after studying it for hours ;) but alas, it didn't work. The problem only appeared when multiple readers or multiple writers were operating on the same structure and that's why the initial tests which used only one reader and one writer failed to detect it.

I had no idea what could go wrong (at the moment I thought I understand the lock-free stuff in the OtlContainters) but luckily the author of the stack code was smarter and located the problem. He explained it to me so I can explain it to you ...

Remember the PopLink code from the previous article? Probably not, so I'll reprint it here in a slightly modified form. [For the original code and comments, see the Implementing lock-free stack article.]

function TOmniBaseContainer.PopLink(var chainHead: POmniLinkedData): POmniLinkedData;
label
Spin;
var
second: POmniLinkedData;
begin
// mov eax, [edx]
Result := chainHead^;
Spin:
// test eax, eax
// jz @Exit
if not assigned(Result) then
Exit;
// mov ecx, [eax]
second := Result.Next;
// lock cmpxchg [edx], ecx
{.$ATOMIC}
if Result = chainHead^ then
chainHead := second
{.$END ATOMIC}
else
// jnz @Spin
goto Spin
end; { TOmniBaseContainer.PopLink }

I must emphasize again that this is not a working code. It might even compile (yes, Pascal/Delphi does have labels and goto!) but it surely won't work as the part between {.$ATOMIC} and {.$END ATOMIC} won't be executed atomically. Still, this is a nice approximation that allows us to understand the problem that also occurs with the assembler version.

The code is (now that it's coded in Delphi) pretty simple: it accesses first element in the chain, follows its Next pointer to the second element, and then atomically checks whether the chain head still points to the first element and if that is true, changes the chain head to point to the second element.

What can go wrong there? After all, we're checking the chain head and making the switch atomically! When we have only one reader and one writer, nothing much. Problems occur when there are multiple readers or multiple writers working at the same time.

The single point of failure is the lock cmpxchg instruction (the atomic part in Delphi code). There is a problem with the value that is stored in the chain head - when lock cmpxchg is executed, we cannot be sure that the ecx register (or the second variable) still points to a node that is indeed second in the chain.

Let's examine one of many possible failure scenarios:

  • A thread executes the code above up to the atomic part. Immediately after the second := first.Next is executed, thread is stopped and another thread gets the CPU slice.

At that point we have following data structure: chainHead -> first -> second -> ...

  • Another thread executes the same PopLink code and pops node first from the chain.

The data structure now looks like this: chainHead -> second -> ...

  • Yet another thread pushes a completely different node (let's call it newnode) into the chain.

chainHead -> newnode -> second -> ...

  • The first node is pushed back into the chain.

chainHead -> first -> newnode -> second -> ...

  • Now the first thread continues execution with the lock cmpxchg. The condition still holds - chainHead indeed points to the first node. Therefore, chainHead is modified to point to the second node. But this is not the node chainHead should be pointing to! It should point to the newnode node!

And so everything falls apart. Sad

Can the code be fixed? Yes, and the fix is already in the repository and is included in the latest snapshot. Next time I'll explain the new approach (and maybe even the operation of the lock-free queue). Stay tuned!

Thursday, July 17, 2008

OmniThreadLibrary progress report

WiPJust a short notice on what we're working on. I'm too tired to write a coherent article, longer than 100 words.

  • Lock-free structures are now really, really, really working.
  • Stress tests in tests\10_Containers have been significantly enhanced.
  • New Counter support.

The last item is really interesting and deserves a longer post. In short, it allows you to do:

counter := CreateCounter(2);

CreateTask(worker).WithCounter(counter).Run;

CreateTask(worker).WithCounter(counter).Run;

In worker code:

if Task.Counter.Decrement = 0 then

  Task.Comm.Send(MSG_ALL_DONE);

Decrement is interlocked, of course.

 

All that available in the repository and as a snapshot.

Tuesday, July 15, 2008

Implementing lock-free stack

[I'm cheating. The title should be “OmniThreadLibrary internals - OtlContainers”, but I wanted to attract more readers.Winking]

WiPToday I'll be describing the OtlContainers unit. Part of that unit, actually, as there is lot to tell and lot to show. I'll focus on the lock-free stack today and describe other stuff from the same unit in a day or two.

To give the credit where it is due — most of the code you'll see today was written by a Slovenian developer ‘GJ’. His is the implementation of the lock-free stack and all the assembler parts. I only wrapped them in higher-level structures. GJ, thanks for your contribution!

If you want to follow this article with the OtlContainers loaded in IDE, please make sure that you have the latest source [quick links: snapshot, repository, OtlContainers.pas].

The topic of today's presentation is TOmniBaseContainer. This is a base class that implements lock-free size-limited stack. Multiple simultaneous writers are supported, as are multiple readers. Although this is a fully functional class, you would usually want to use more feature-rich descendant TOmniStack.

A note regarding the limited size. It is much simpler to implement size-limited lock-free structure than to deal with dynamic allocation. In most cases, dynamic allocation would lead to possible locks inside the memory manager and some of the advantage gained by the lock-free algorithm would go away. Even more, you usually don't want the lock-free structure to grow without any control. Sure, OTL implementation won't be the right solution to all your needs but still we do believe that it is good enough for many usage scenarios.

Back to the story ... TOmniBaseContainer stores data into TOmniLinkedData nodes. First four bytes in the node point to the next node and other bytes (as many as the application requested) contain application data. Only first of those bytes is actually included in the TOmniLinkedData structure. It is only used as a placeholder for address calculation in Push and Pop operations.

type
POmniLinkedData = ^TOmniLinkedData;
TOmniLinkedData = packed record
Next: POmniLinkedData;
Data: byte; //user data, variable size
end; { TLinkedOmniData }

In the following diagrams I'll use this symbol to represent one TOmniLinkedData node.

data node

Nodes are connected into chains. Address of the first node is stored in a chain header. The Next field of the last node in chain contains value nil

data chain

As the TOmniBaseContainer implementation is size-limited, it can preallocate all nodes in one go. The Initialize method first allocates one big buffer to store all nodes and connects them into one chain, pointed to by the recycle chain header. This chain contains unused nodes. The other chain, public chain, contains nodes that are in use and is initialized to nil.

You'll see that the initialization code rounds up the size of node data to the nearest multiplier of 4. This is required because operations that modify node pointers (Next field) require them to be aligned on addresses that are divisible by 4.

  // calculate element size, round up to next 4-aligned value
bufferElementSize := ((SizeOf(POmniLinkedData) + elementSize) + 3) AND NOT 3;
GetMem(obcBuffer, bufferElementSize * cardinal(numElements));
Assert(cardinal(obcBuffer) AND 3 = 0);
//Format buffer to recycleChain, init orbRecycleChain and orbPublicChain.
//At the beginning, all elements are linked into the recycle chain.
obcRecycleChain := obcBuffer;
nextElement := nil; // to remove compiler warning in nextElement.Next := nil assignment below
currElement := obcRecycleChain;
for iElement := 0 to obcNumElements - 2 do begin
nextElement := POmniLinkedData(cardinal(currElement) + bufferElementSize);
currElement.Next := nextElement;
currElement := nextElement;
end;
nextElement.Next := nil; // terminate the chain
obcPublicChain := nil;

Structure, created by the initialization code is depicted below. Recycle chain points to the first node, which points to the next node and so on. The last node in the chain has nil stored in the Next pointer. Public chain is empty.

empty stack

Let's see what happens when code pushes new data onto the stack.

function TOmniBaseContainer.Push(const value): boolean;
var
linkedData: POmniLinkedData;
begin
linkedData := PopLink(obcRecycleChain);
Result := assigned(linkedData);
if not Result then
Exit;
Move(value, linkedData.Data, ElementSize);
PushLink(linkedData, obcPublicChain);
end; { TOmniBaseContainer.Push }

PopLink removes first node from the recycle chain. Chain header is updated to point to the next node in the chain. Pointer to the removed node is stored in the linkedData variable. If recycle chain is empty, linkedData contains nil and Push returns False.

The code next moves application data from the value parameter to the preallocated buffer. linkedData.Data conveniently addresses first data byte in the buffer.

At the end, the code inserts linkedData node at the beginning of the public chain and updates the public chain header to point to the linkedData node.

one pushed

The real workhorses here are PopLink and PushLink. Former pops first node from a chain and latter inserts the node into the head of the chain. The trick here is that they are written with multithreading in mind — they both expect that data structures may change at any time because another thread may be accessing the structure simultaneously.

Let's take a look at the PopLink first.

function TOmniBaseContainer.PopLink(var chainHead: POmniLinkedData): POmniLinkedData;
//nil << Link.Next << Link.Next << ... << Link.Next
//FILO buffer logic ^------ < chainHead
asm
mov eax, [edx] //Result := chainHead
@Spin:
test eax, eax
jz @Exit
mov ecx, [eax] //ecx := Result.Next
lock cmpxchg [edx], ecx //chainHead := Result.Next
jnz @Spin //Do spin ???
@Exit:
end; { TOmniBaseContainer.PopLink }

At beginning, edx register contains the address of the chainHead parameter. mov eax, [edx] moves chainHead, i.e. the node that chainHead is pointing to, into the eax. Chain may be empty, in which case chainHead is nil, or in assembler terms, eax is 0. test eax, eax checks for this condition.

If there's at least one node in the chain, we can continue. mov ecx, [eax] moves the address of the next node into the ecx register. Remember that eax points to the node and [eax] is the same as accessing the node's Next field.

Now we have an address of the first node in the eax register, address of the second node in the ecx register (there may be no second node, in which case the ecx is 0) and chainHead in the edx register. And now we can do the heavy magic.

lock cmpxchg [edx], ecx does few things, all at once. Well, not exactly in once, but processor (and level one cache and all weird hardware wrapping the CPUs) makes sure that this operation won't be interrupted by another core or CPU attempting to do the same thing at the same time. Firstly, cmpxchg compares eax with [edx]. Remember, we loaded eax from [edx] so those two values should be the same, yes? Well, no. Between mov eax, [edx] and lock cmpxchg, thread may be interrupted and stopped and another thread may have modified the chain header by executing PopLink. That's why we have to recheck.

If eax and [edx] are still the same, ecx is loaded into [edx]. In other words, address of the second node (which may be nil) is stored into the chainHead. Because of the lock prefix, all that (testing and assignment) happens atomically. Uninterruptible. In other words, another thread can not step on our (digital) toes.

If eax and [edx] are not the same, cmpxchg loads eax from [edx]. In other words, eax is refreshed from the chainHead. If that happens, jnz @Spin instruction will jump to the @Spin label and repeat the whole procedure.

At the end we have address of the second node stored in chainHead and address of the first node stored in eax, which is fine as pointer Results are returned in eax.

You can read more on cmpxchg here.

PushLink is much simpler.

procedure TOmniBaseContainer.PushLink(const link: POmniLinkedData; var chainHead:
POmniLinkedData);
asm
mov eax, [ecx] //ecx = chainHead
@Spin:
mov [edx], eax //link := chainHead.Next
lock cmpxchg [ecx], edx //chainHead := link
jnz @Spin
end; { TOmniBaseContainer.PushLink }

At the beginning, edx contains the value of the link parameter and ecx contains the address of the chainHead parameter.

The code first loads the address of the first node in the chain into the eax register. Nil pointers are fine in this case as we will not be following (dereferencing) them.

Then the code loads this same value into the [edx]. Remember, edx contains the value of the link, therefore [edx] represents link.Next. The mov [edx], eax line sets link.Next to point to the first node in the chain. If the chain is empty, link.Next will be set to nil, which is exactly the correct thing to do.

lock cmpxchg [ecx], edx then compares eax and [ecx] to ensure that underlying data haven't changed since PushLink started its execution. If values are not equal, eax is reloaded from [ecx] and code execution continues from the @Spin label. If values are equal, edx is loaded into [ecx]. At that moment, edx still contains the value stored in the link parameter and ecx contains the address of the chainHead. In other words, chainHead is set to link. As link.Next was set to old chainHead in the previous line, we have successfully linked link at the beginning of the chain.

That's all, the hard part is over. If you understand PopLink and PushLink, everything else is simple.

When we push the second value into the stack, it is inserted at the beginning of the public chain and recycle chain points to the next free node.

two pushed

The process continues until all nodes are linked into the public chain and recycle chain is nil.

four pushed

To pop a value from the stack, TOmniBaseContainer.Pop is used. It first gets a topmost allocated node (atomically, of course). Then it moves node data into the method parameter and pushes node into the recycle chain (again, atomically).

function TOmniBaseContainer.Pop(var value): boolean;
var
linkedData: POmniLinkedData;
begin
linkedData := PopLink(obcPublicChain);
Result := assigned(linkedData);
if not Result then
Exit;
Move(linkedData.Data, value, ElementSize);
PushLink(linkedData, obcRecycleChain);
end; { TOmniBaseContainer.Pop }

One of the important things to note is that nodes are not moved around. They are allocated at the beginning and for the whole lifetime of the TOmniBaseContainer they stay immovabe. Only Next fields are modified (and of course the Data is copied) and that's why this lock-free stack implementation is extremely fast. First results indicate that it can move about 800.000 integers per second between two threads on a 1,67 GHz Core2 Duo (T2300) machine.

base container test

Next post will discuss the lock-free ring buffer that is built on top of the lock-free stack. Stay tuned!

Monday, July 14, 2008

OmniThreadLibrary progress report

  • WiP new unit OtlContainers with lock-free stack and lock-free ring buffer (both with additional limitations; details coming soon)
  • OtlComm redesigned around OtlContainers
  • new test 10_Containers, which performs basic unit test for the OtlContainers unit
  • package split to designtime/runtime; OtlComm unit test split into separate unit OtlCommBufferTest; component registraion moved from OtlTaskEvents to new unit OtlRegister [all thanks to Lee_Nover]
  • fixed memory leaks in tests 8 and 9
  • added demonstration of how to send an object over the communication channel to the demo 8
  • new SpinLock unit which fixes reentrancy problems is included
  • new DSiWin32 unit which fixes problems when DSiTimeGetTime64 was called from more than one thread

All that available in the repository and as a snapshot.

Friday, July 11, 2008

OmniThreadLibrary goes lock-free

WiPWell, not complete OTL, just the messaging subsystem, but even that is quite some achievement. The ring buffer inside the OtlComm unit is now implemented with a help of a lock-free stack. Lock-free buffer is now a default. If you want to compile OtlComm with the locking buffer, define the OTL_LockingBuffer symbol.

Lock-free buffer did not improve communication subsystem speed much at the moment as the limiting factor is Delphi's Variant implementation but that will change. The collective mind behind the OTL is working on some interesting messaging ideas that will be much faster but will still keep most of the Variant flexibility and simplicity.

What I can promise even now is that there will be lock-free stack and lock-free ring buffer included in the OTL as standalone reusable data structures. Sounds useful?

Wednesday, July 09, 2008

OmniThreadLibrary Example #5: Registering additional communication channels

WIPI thought I would be documenting the TOmniTaskEventDispatch component today, but I was working on something interesting yesterday and today and I want to show it to the public. The functionality I'll be talking about is demoed in the tests\8_RegisterComm project, available in the repository and in the today's snapshot. [All new functionality was uploaded, not just this demo, of course.]

I was working on the OtlComm testing and benchmarking code. I wanted to find out if the communication code is working correctly (by stress-testing it) and if the lock-free buffer implementation is faster than the locking one. So I set up two threaded clients (using OTL, of course!) and started to write code that would establish a direct communication channel between them. I wanted to use the standard Comm channel for test control and reporting, but not for running tests.

At first, the task seemed quite simple. I set up a new communication channel of type IOmniTwoWayChannel and created two tasks, based on the TOmniWorker class. Each task received one endpoint of the new communication channel as a parameter.

  FCommChannel := CreateTwoWayChannel(1024);
FClient1 := OmniTaskEventDispatch1.Monitor(
CreateTask(TCommTester.Create(FCommChannel.Endpoint1, 1024))).Run;
FClient2 := OmniTaskEventDispatch1.Monitor(
CreateTask(TCommTester.Create(FCommChannel.Endpoint2, 1024))).Run;

An image is worth more than thousand words. additional communication channel


When I had this infrastructure ready, I started to think about message loop in my tasks. At that moment (yesterday), internal message loop only handled the default communication channel. Writing a whole message loop to support one measly communication channel didn't seem like a right decision. What to do? Extend OTL, of course!


Today's snapshot contains support for additional communication channels. You only have to call RegisterComm and internal message dispatcher will handle everything for you.


The 8_RegisterComm demo will help you understand. There are two buttons on the form, first sends a random number to task 1 and second sends a random number to task 2.


8_RegisterComm


A code for first button's OnClick event should suffice. Code for the second button is almost the same.

procedure TfrmTestOtlComm.btnSendTo1Click(Sender: TObject);
var
value: integer;
begin
value := Random(100);
Log(Format('Sending %d to task 1', [value]));
FClient1.Comm.Send(MSG_FORWARD, value);
end;

The TCommTester class implements message handler for the MSG_FORWARD message. The code in this method firstly notifies the owner that MSG_FORWARD message was received and secondly sends MSG_FORWARDING message to the task-to-task communication channel.

type
TCommTester = class(TOmniWorker)
strict private
ctComm : IOmniCommunicationEndpoint;
ctCommSize: integer;
public
constructor Create(commEndpoint: IOmniCommunicationEndpoint; commBufferSize: integer);
function Initialize: boolean; override;
procedure OMForward(var msg: TOmniMessage); message MSG_FORWARD;
procedure OMForwarding(var msg: TOmniMessage); message MSG_FORWARDING;
end; { TCommTester }

constructor TCommTester.Create(commEndpoint: IOmniCommunicationEndpoint; commBufferSize:
integer);
begin
inherited Create;
ctComm := commEndpoint;
ctCommSize := commBufferSize;
end;

function TCommTester.Initialize: boolean;
begin
Task.RegisterComm(ctComm);
Result := true;
end;

procedure TCommTester.OMForward(var msg: TOmniMessage);
begin
Task.Comm.Send(MSG_NOTIFY_FORWARD, msg.MsgData);
ctComm.Send(MSG_FORWARDING, msg.MsgData);
end;

procedure TCommTester.OMForwarding(var msg: TOmniMessage);
begin
Task.Comm.Send(MSG_NOTIFY_RECEPTION, msg.MsgData);
end;

The MSG_FORWARDING handler (OMForwarding) just notifies the owner that the message was received from another task.


In the screenshot above, you can see how value 0 traveled from task 1 to task 2 and how value 3 traveled from task 2 to task 1.


The really magical part happens in TCommTester.Initialize. The code fragment Task.RegisterComm(ctComm) registers communication endpoint as an additional message source. From that point onwards, messages that arrive on the ctComm channel are treated same way as messages arriving on the Comm channel.


The road to this deceptively simple solution was quite long. I had to do major refactoring in the OtlTask unit. Quite some code was moved from the TOmniTaskControl class to the TOmniTaskExecutor class (which was a record before). I think that the new code works fine (I've run all test cases), but only time will tell ...


If you feel the need to check how additional communication channel support is implemented, check TOmniTaskExecutor. Asy_DispatchMessages and TOmniTaskExecutor.Asy_RegisterComm (OtlTask unit).

Tuesday, July 08, 2008

OmniThreadLibrary internals - OtlComm

WIP

Today I'll describe the inner workings of the OmniThreadLibrary communication subsystem. It lives in the OtlComm unit and is used extensively by the task control/task worker interfaces (described in the previous installment). Its use is not limited to the OTL as it has no knowledge of tasks and threads. Feel free to use it in your own non-OTL-related code.

At the surface, messaging subsystem looks deceptively simple.

image

IOmniTaskControl interface exposes property Comm: IOmniCommunicationEndpoint.

  IOmniCommunicationEndpoint = interface ['{910D329C-D049-48B9-B0C0-9434D2E57870}']
function GetNewMessageEvent: THandle;
//
procedure RemoveMonitor;
procedure Send(msgID: word; msgData: TOmniValue); overload;
procedure Send(msgID: word; msgData: array of const); overload;
procedure Send(const msg: TOmniMessage); overload;
procedure SetMonitor(hWindow: THandle; messageWParam, messageLParam: integer);
function Receive(var msgID: word; var msgData: TOmniValue): boolean; overload;
function Receive(var msg: TOmniMessage): boolean; overload;
property NewMessageEvent: THandle read GetNewMessageEvent;
end; { IOmniTaskCommunication }

This simple interface allows the owner to send messages, receive messages and wait on a new message. A property with a same name lives on the worker side, too (in the IOmniTask interface). Both endpoints are connected so that IOmniTaskControl.Comm.Send sends message to IOmniTask.Comm and vice versa. Simple.


But when you look under the surface ...


Here be dragons!


This is the real picture of objects and interfaces living inside the OtlComm unit. Although the surface view may give you an impression that the IOmniTaskCommunication is the most important part of the communication system, in reality everything revolves around the IOmniTwoWayChannel.


image 


To understand this picture, it's best to start at the bottom left (IOW, in the rectangle just above this line). The IOmniTaskControl interface is the one that is returned from the CreateTask procedure.


IOmniTaskControl is implemented by the TOmniTaskControl object, which owns an IOmniTwoWayChannel interface, created during TOmniTaskControl initialization.

procedure TOmniTaskControl.Initialize;
begin
//...
otcCommChannel := CreateTwoWayChannel;
//...
end; { TOmniTaskControl.Initialize }

This interface is also passed to the TOmniTask object (which implements IOmniTask interface) when task is run.

function TOmniTaskControl.Run: IOmniTaskControl;
var
task: IOmniTask;
begin
//...
task := TOmniTask.Create(..., otcCommChannel, ...);
//...
end; { TOmniTaskControl.Run }

TOmniTwoWayChannel owns two ring buffers of type TOmniRingBuffer (either the locking version, which is the default at the moment, or lock-free version if you compile the unit with /dOTL_LockFreBuffer). Those buffers are used as unidirectional message queues that store TOmniMessage records.

  TOmniMessage = record
MsgID : word;
MsgData: TOmniValue;
end; { TOmniMessage }

TOmniTwoWayChannel also owns two IOmniCommunicationEndpoint interfaces (implement by the TOmniCommunicationEndpoint class). One of them is exposed via the Enpoint1 property and another via Endpoint2.

  IOmniTwoWayChannel = interface ['{3ED1AB88-4209-4E01-AA79-A577AD719520}']
function Endpoint1: IOmniCommunicationEndpoint;
function Endpoint2: IOmniCommunicationEndpoint;
end; { IOmniTwoWayChannel }

Both endpoints are connected to both ring buffers. If we name those ring buffers A and B, endpoint 1 writes to A and reads from B while endpoint 2 writes to B and reads from A.


TOmniTaskControl.Comm and TOmniTask.Comm are simple mappers that return different endpoints of the same IOmniTwoWayChannel interface.

function TOmniTaskControl.GetComm: IOmniCommunicationEndpoint;
begin
Result := otcCommChannel.Endpoint1;
end; { TOmniTaskControl.GetComm }

function TOmniTask.GetComm: IOmniCommunicationEndpoint;
begin
Result := otCommChannel.Endpoint2;
end; { TOmniTask.GetComm }

And that's all folks ...


Autovivification


Well, that's almost all. The last trick in the OtlComm is creation of communication infrastructure on as-needed basis. As you can see from the diagram, the whole system is quite "heavy". If you're running a simple fire-and-forget tasks, you may not need it at all. That's why the ring buffers and communication endpoints are not created until they are used.


The TOmniTwoWayChannel object is created whenever a task is created (see TOmniTaskControl.Initialize, above), but it does not create other parts of the infrastructure. This is only done in the endpoint accessor.

procedure TOmniTwoWayChannel.CreateBuffers;
begin
if twcUnidirQueue[1] = nil then
twcUnidirQueue[1] := TOmniRingBuffer.Create(twcMessageQueueSize);
if twcUnidirQueue[2] = nil then
twcUnidirQueue[2] := TOmniRingBuffer.Create(twcMessageQueueSize);
end; { TOmniTwoWayChannel.CreateBuffers }


function
TOmniTwoWayChannel.Endpoint1: IOmniCommunicationEndpoint;
begin
Assert((cardinal(@twcEndpoint[1]) AND 3) = 0);
if twcEndpoint[1] = nil then begin
twcLock.Acquire;
try
if twcEndpoint[1] = nil then begin
CreateBuffers;
twcEndpoint[1] := TOmniCommunicationEndpoint.Create(twcUnidirQueue[1], twcUnidirQueue[2]);
end;
finally twcLock.Release; end;
end;
Result := twcEndpoint[1];
end; { TOmniTwoWayChannel.Endpoint1 }

The code to create Endpoint2 is similar except that it uses twcEndpoint[2] and reverses parameters passed to the TOmniCommunicationEndpoint.Create.


The Endpoint1 method uses some tricks that may not be obvious.



  • Testing if interfaces have been already initialized is optimistic. The code first tests if endpoint is nil and if that is true (and that will be very rarely, only the first time), it locks access to internal structures and the retests the same condition before creating buffers and the endpoint.
  • The Assert checks if two least important bits of the endpoint address are 0. This makes the endpoint variable DWORD-aligned (its address is divisible by 4), which in turn causes reads/writes from/to that address to be atomic on the Intel architecture (the only one that concerns us). In other words, even if another thread is modifying the same variable on another CPU (or core), we know that we will read either the old value or the new value and not some mixture of both.

That concludes the OtlComm tour. The only remaining part (until the thread pool is implemented) is the TOmniTaskEventDispatch component, which I'll cover in a day or two.