Thursday, February 04, 2010

Three steps to the blocking collection: [2] Dynamically allocated queue

[Step one: Inverse semaphore.]

When I started thinking about the blocking collection internals (see step one) two facts become fairly obvious:

  • The underlying data storage should be some kind of a queue. Blocking collection only needs the data storage to support enqueue (Add) and dequeue (Take).
  • The underlying data storage should be allocated dynamically. Users will be using blocking collection on structures for which the size cannot be determined quickly (trees, for example) and therefore the code cannot preallocate »just big enough« data storage.

I was all set on using locking queue for the data storage but then I had an idea about how to implement dynamically allocated queue with microlocking. Instead of locking the whole queue, each thread would lock only one element, and that for such short time that other competing threads would just wait in busy-wait (spinning in a tight loop).

Then the usual thing happened. I run into the ABA problem. And I solved it – in a way. The queue works, behaves well and is extremely useful. It’s just not as perfect as I thought it would be.

So here it is - microlocking, (mostly) O(1) insert/remove, dynamically allocated queue with garbage collector. All yours for a measly 16 bytes per one unit of data. Hey, you have to pay the price at some point!

[I “discovered” this approach all by myself. That doesn’t mean that this is an original work; most probably this is just a variation of some well known method. If you know of any similar approach, practical or only theoretical, please post the link in comments.]

Tagged elements

The basic queue element is made of two parts, a tag and a value. As this implementation is to be used in the OmniThreadLibrary framework, the value is represented by a TOmniValue record. This record can handle almost anything from a byte to an int64, and can also store interfaces. The only downside is that it uses 13 bytes of memory.

As the tag uses only one byte and 1+13 = 14, which is not an elegant value, a queue element also contains two unused bytes. That rounds its size to a pretty 16 bytes. [I’m joking, of course. There is a very good reasons why the size must be divisible by 4. I’ll come back to that later.]

type
TOmniQueueTag = (tagFree, tagAllocating, tagAllocated, tagRemoving,
tagEndOfList, tagExtending, tagBlockPointer, tagDestroying);
TOmniTaggedValue = packed record
Tag : TOmniQueueTag;
Stuffing: word;
Value : TOmniValue;
end;

In the following expose, I’ll use shorthand [tag|value] to represent an instance of the TOmniTaggedValue record.

Queue data is managed in blocks. The size of a block is 64 KB. Divide this by 16 and you’ll find that a block contains 4096 elements. Upon allocation, each block is formatted as

[Free|0] [Free|0] … [Free|0] [EndOfList|0]

In other words, block is mostly initialized to zero (as Ord(tagFree) = 0). When the queue object is created, one block is allocated and both tail and head pointers point to the first element.

H:T:[Free|0] [Free|0] … [Free|0] [EndOfList|0]


Enqueue

The first thing Enqueue does is to lock the head element. To do this it first checks if head points to a tagFree or tagEndOfList. If that’s not the case, another thread has just locked this element and the current thread must wait a little and retry.

Then it (atomically!) swaps current tag value with either tagAllocating (if previous value was tagFree) or tagExtending (if it was tagEndOfList). If this atomic swap fails, another thread has overtaken this one and the thread has to retry from beginning.

But let’s assume that the tag was properly swapped. We now have a following situation:

H:T:[Allocating|0] [Free|0] … [Free|0] [EndOfList|0]

The thread then increments the head pointer to the next slot …

H:[Allocating|0] T:[Free|0] … [Free|0] [EndOfList|0]

… and stores  [Allocated|value] in the slot it has previously locked.

H:[Allocated|value] T:[Free|0] … [Free|0] [EndOfList|0]

That completes the Enqueue.

In pseudocode:

repeat
    fetch tag from current head
    if tag = tagFree and CAS(tag, tagAllocating) then
        break
    if tag = tagEndOfList and CAS(tag, tagExtending) then
        break
    yield 
forever 
if tag = tagFree then 
    increment head 
    store (tagAllocated, value) into locked slot 
else 
    // ignore this for a moment

Let’s think about possible problems.


  1. Two (or more) threads can simultaneously find out that head^.tag = tagFree and try to swap in tagAllocating. As this is implemented using atomic compare-and-swap, one thread will succeed and another will fail and retry. No problem here.

  2. The thread increments the head pointer. At that moment it is suspended and another thread calls the Enqueue. The second thread finds that the head points to a free element and continues with execution. It’s possible that the second thread will finish the operation before the first thread is resumed and that for some short time the queue storage would look like this:
        H:[Allocating|0] [Allocated|value] T:[Free|0] … [Free|0] [EndOfList|0]
    Again, no problem here. Dequeue will take care of this situation.

Another interesting situation occurs when head is pointing to the last element in the block, the one with the EndOfList tag. In this case, new block is allocated and current element is changed so that it points to the new block.

if tag = tagFree then
    // we covered that already
else // tag = tagEndOfList
    allocate and initialize new block
    set head to new block's slot 1
    store (tagAllocated, value) into new block's slot 0
    store (tagBlockPointer, pointer to new block) into locked slot

After that, memory is laid out as follows:

H:[Allocated|value] [Allocated|value] … [Allocated|value] [BlockPointer|B2]
B2:[Allocated|value] T:[Free|0] … [Free|0] [EndOfList|0]

Compare and Swap

In the pseudo-code above I’ve used the CAS method as if it’s something that everybody on this world knows and loves, but probably it deserves some explanation.

CAS, or Compare-and-Swap, is an atomic function that compares some memory location with a value and puts in a new value if memory location was equal to that value. Otherwise, it does nothing.  And the best thing is that all this behaviour executes atomically. In other words – if two threads are attempting to CAS the same destination, only one of them will succeed.

In plain Delphi, CAS could be written as

function CAS32(const oldValue, newValue: cardinal; var PCardinal): boolean;
begin
EnterCriticalSection(cs);
Result := (destination^ = oldValue);
if Result then
destination^ := newValue;
LeaveCriticalSection(cs);
end;

However, that is quite slow so in reality we go down to the hardware and use lock cpmxchg operation that was designed exactly for this purpose.
function CAS32(const oldValue, newValue: cardinal; var destination): boolean;
asm
lock cmpxchg dword ptr [destination], newValue
setz al
end;

The Win32 function InterlockedCompareExchange implements the same behaviour, except that it is slower than the assembler version.

The Tag field of the TOmniTaggedValue  record uses only one byte of storage. The CAS32 function requires four bytes to be compared and swapped. [Even more, those 4 bytes must be 4-aligned. That’s why SizeOf(TOmniTaggedValue) is 16 – so that each Tag falls on a memory location whose address is evenly divisible by 4.]

Therefore, TOmniTaggedValue record implements the CASTag function which does some bit-fiddling to work around the problem.

function TOmniTaggedValue.CASTag(oldTag, newTag: TOmniQueueTag): boolean;
var
newValue: DWORD;
oldValue: DWORD;
begin
oldValue := PDWORD(@Tag)^ AND $FFFFFF00 OR DWORD(ORD(oldTag));
newValue := oldValue AND $FFFFFF00 OR DWORD(Ord(newTag));
Result := CAS32(oldValue, newValue, Tag);
end; { TOmniTaggedValue.CASTag }

First, we need an “old” 4-byte value. It is constructed by taking the oldTag parameter and OR-ing it with bytes 2, 3, and 4 of the record.

Then we need a “new” 4-byte value, which is constructed in a similar way.

Only then can we call the CAS32 function to compare-and-swap “old” value with the “new” one.

Dequeue

Dequeue is not much different from the Enqueue. First it locks the tail element by swapping it from tagAllocated to tagRemoving (a normal element) or from tagBlockPointer to tagDestroying (a pointer to the next block). If the tag is tagFree, then the queue is empty and Dequeue can return. In all other cases, it will loop and retry.

If tagAllocated was found, Dequeue can remove the current element in two easy steps. Firstly it increments the tail pointer and with that unlocks the queue tail. Only then it fetches the value from the queue. Tag is not modified at all.

In pseudocode:

  repeat
      fetch tag from current tail
      if tag = tagFree then
          return Empty
      if tag = tagAllocated and CAS(tag, tagRemoving) then
          break
      if tag = tagBlockPointer and CAS(tag, tagDestroying) then 
          break

    yield
  forever
  if tag = tagAllocated then 
      get value
      increment tail
  else
      // ignore this for a moment

Let’s look at a simple example. Assume that there are two elements in the queue originally and that Dequeue was just called.

H:[Allocated|value1] [Allocated|value2] T:[Free|0] … [EndOfList|0]

Dequeue first swaps the tail tag.

H:[Removing|value1] [Allocated|value2] T:[Free|0] … [EndOfList|0]

Then it increments the tail pointer.

[Removing|value1] H:[Allocated|value2] T:[Free|0] … [EndOfList|0]

At this moment, another thread may drop in and start dequeueing.

[Removing|value1] H:[Removing|value2] T:[Free|0] … [EndOfList|0]

It is entirely possible that the second thread will finish before the first one. The queue is empty now although the first thread has not yet completed its dequeue.

[Removing|value1] [Removing|value2] H:T:[Free|0] … [EndOfList|0]

First thread then continues execution, fetches the value from the slot and exits.

End-of-block pointer handling is only slightly more complicated.

  if tag = tagAllocated then 
      // we covered that already
  else
      // we know that the first slot in new block is allocated 
      set tail to new block's slot 1
      get value

Assume the following situation:

[Removing|value] [Removing|value] … [Removing|value] H:[BlockPointer|B2]
B2:[Allocated|value] T:[Free|0] … [Free|0] [EndOfList|0]

Dequeue first swaps tagBlockPointer with tagDestroying.

[Removing|value] [Removing|value] … [Removing|value] H:[Destroying|B2]
B2:[Allocated|value] T:[Free|0] … [Free|0] [EndOfList|0]

Then it follows the pointer to the next block. It knows that the first slot in this block will be allocated (because that’s how Enqueue is implemented) and moves the tail pointer directly to the second slot. By doing this, the tail pointer is released.. This is entirely safe to do as no other thread could have locked the tail slot in the meantime because it contains tag Destroying.

[Removing|value] [Removing|value] … [Removing|value] [Destroying|B2]
B2:[Allocated|value] H:T:[Free|0] … [Free|0] [EndOfList|0]

Last, the Dequeue fetches the value and exits.

That’s all – the tail pointer was safely moved to the new block and element was fetched (and marked as such).

But … is that really it? A careful reader may have noticed that something was not yet done. The first block is still allocated although no thread is referencing it anymore. Somebody has to release it – but who?

ABA Strikes

The first idea is just to release a block at this point. After all, no other dequeuers are doing anything with this block as all tags are known to be marked as tagRemoving or tagDestroying. So we can safely release the memory, no?

Actually, we can’t. And the reason for that is the ABA problem – and a tough one. It took me many days to find the reason behind the constant crashes I was experiencing when testing that initial approach.

Assume the following situation:

B1:[Removing|value] H:[Allocated|value] … [BlockPointer|B2]
B2:[Allocated|value] H:[Free|0] … [EndOfList|0]

Thread 1 starts executing the Dequeue method. It reads the tag from the tail pointer and is suspended before it can CAS the tagRemoving tag into the tail slot.

      fetch tag from current tail
      if tag = tagFree then // <- here we stop
          return Empty
      if tag = tagAllocated and CAS(tag, tagRemoving) then
          break

Now the fun begins. We have suspended thread with remembered location of the tail pointer pointing to the second slot of the B1 block. I’ll mark this pointer with S: (for Suspended).

B1:[Removing|value] S:H:[Allocated|value] … [BlockPointer|B2]
B2:[Allocated|value] H:[Free|0] … [EndOfList|0]

Another thread takes over and initiates the Dequeue. As the suspended thread was not yet able to change the tag, the second thread succeeds in dequeueing from the second slot and then from the third one and so on, up to the end of the block.

B1:[Removing|value] S:[Removing|value] … H:[BlockPointer|B2]
B2:
[Allocated|value] H:[Free|0] … [EndOfList|0]

During the next dequeue, second thread destroys block B1.

B1:[Removing|value] S:[Removing|value] … [Destroying|B2]
B2:
[Removing|value] T:H:[Free|0] … [EndOfList|0]

Then the third thread writes into all elements of block B2.

B1:[Removing|value] S:[Removing|value] … [Destroying|B2]
B2:
[Removing|value] T:[Allocated|value] … H:[EndOfList|0]

During the next write a memory block is allocated. It may happen (with a high probability, because FastMM memory manager tries to reuse recently released memory blocks) that this memory block will be located at address B1. The block is emptied during the allocation but the suspended thread’s copy of the tail pointer still points into it.

B1:[Allocated|value] H:S:[Free|0] … [EndOfList|0]
B2:[Removing|value] T:[Allocated|value] … [BlockPointer|B1]

Then another slot gets enqueued.

B1:[Allocated|value] S:[Allocated|value] H: … [EndOfList|0]
B2:[Removing|value] T:[Allocated|value] … [BlockPointer|B1]

At that point, the original thread is resumed. It continues the execution with

  if tag = tagAllocated and CAS(tag, tagRemoving) then 
      break 
  if tag = tagAllocated then
      increment tail
      get value

Tag is still tagAllocated (well, it is again set to tagAllocated, but our poor thread doesn’t know that) so it swaps it with tagRemoving. That is weird as we have now a tagRemoving slot in the middle of tagAllocated ones, but that’s maybe something we could live with. The biggest problem lies in the next line which sets the tail pointer to the next slot.And by that I don’t mean the slot relative to the current tail but to the stored tail pointer! In other words, the third slot of block B1.

B1:[Allocated|value] [Removing|value] T:H: … [EndOfList|0]
B2:[Removing|value] [Allocated|value] … [BlockPointer|B1]

And now we have a total mess of a memory layout. From this point onwards, nothing works Surprise

Even worse, that is not the only problem. For instance, the B1 block may have been reallocated by another thread, for another purposes and CAS may have still succeeded if correct bytes are found at that location. Fat chance, I know, but as the Pratchett likes to say, million-to-one chances crop up nine times out of ten.

The same scenario can happen during the Enqueue.

    fetch tag from current head
    //thread pauses here
    if tag = tagFree and CAS(tag, tagAllocating) then

The problem is more likely to occur at this point because CAS will expect the source to be  $00000000. It is entirely possible that another thread allocates this block, clears it for further use, and just the moment after that our suspended thread kicks in and a) destroys this block by CAS-ing tagAllocated in and b) points the tail pointer into that block. Utter disaster.

Garbage Collector

There is just one thing that can be done – never to release a memory block while any thread is using it. In a way, we need a garbage collector.

It is hard to answer the question: “Is any thread using this memory block?” [Not impossible, I must add. Just very hard. And any solution would be totally impractical.] We have to be satisfied with less. Another way to look at the problem is: “When is it safe to release a memory block?” That, at least we can answer: “When Enqueue and Dequeue are not executing. At all. In any thread.” That is also the solution which I’ve implemented.

We must allow many Enqueue/Dequeue paths to execute at the same time, but we only want one thread to be releasing memory and during this time no Enqueue/Dequeue must execute. Does this remind you of anything? Of course, a Multi-Readers-Exclusive-Writer lock!

Enqueue/Dequeue acquire read lock during the execution. Garbage collector acquires write lock, releases the memory and releases the lock. Simple.

The garbage collector is very simple and is implemented in place (as opposed to the implementation in a separate thread). The thread that found the tagBlockPointer is responsible for freeing the memory block.

Enqueue is simply wrapped in the read lock.

acquire read access to GC
// do the Enqueue
release read access to GC

Dequeue is slightly more complicated. If the tagBlockPointer is found then the code releases read lock, acquires write lock and releases the memory block. In other words, Dequeue switches from the dequeueing mode (by releasing the read lock) into garbage collecting mode (by acquiering the write lock).

acquire read access to GC
// do the Dequeue
release read access to GC
if block has to be released
    acquire write access to GC
    release the block
    release write access to GC

MREW implementation is very simple and could theoretically lead to starvation. However, the practical tests confirmed that this does not happen.

One number is used for locking. If it is greater than zero, there are readers active. Each reader increments the number on enter and decrements it on exit.

Writer waits until this number is 0 (no readers) and decrements it to –1. When exiting, it just sets the number back to 0.

Of course, all those increments and decrements are done atomically.

procedure TOmniBaseQueue.EnterReader;
var
value: integer;
begin
repeat
value := obcRemoveCount.Value;
if value >= 0 then
if obcRemoveCount.CAS(value, value + 1) then
break
else
DSiYield; // let the GC do its work
until false;
end; { TOmniBaseQueue.EnterReader }

procedure TOmniBaseQueue.EnterWriter;
begin
while not ((obcRemoveCount.Value = 0) and (obcRemoveCount.CAS(0, -1))) do
asm pause; end; // retry after slight pause
end; { TOmniBaseQueue.EnterWriter }

procedure TOmniBaseQueue.LeaveReader;
begin
obcRemoveCount.Decrement;
end; { TOmniBaseQueue.LeaveReader }

procedure TOmniBaseQueue.LeaveWriter;
begin
obcRemoveCount.Value := 0;
end; { TOmniBaseQueue.LeaveWriter }

This implementation of the queue has passed 24 hour stress test where millions of messages were enqueued and dequeued every second and where from 1 to 8 threads were functioning as a writer and another 1 to 8 as a reader. Every four million messages threads were stopped and content of queues was checked for validity. No problems were found.

What About Performance?

To test the workings of the queue and to measure its performance I wrote a simple test, located in folder 32_Queue in the Tests branch of the OTL tree.

The test framework sets up the following data path:

source queue –> N threads –> channel queue –> M threads –> destination queue

Source queue is filled with numbers from 1 to 1.000.000. Then 1 to 8 threads are set up to read from the source queue and write into the channel queue and another 1 to 8 threads are set up to read from the channel queue and write to the destination queue. Application then starts the clock and starts all threads. When all numbers are moved to the destination queue, clock is stopped and contents of the destination queue are verified. Thread creation time is not included in the measured time.

All in all this results in 2 million reads and 2 million writes distributed over three queues. Tests are very brutal as all threads are just hammering on the queues, doing nothing else. The table below contains average, min and max time of 5 runs on a 2.67 GHz computer with two 4-core CPUs.

  average [min-max] all data in milliseconds millions of queue operations per second
N = 1, M = 1 707 [566-834] 5,66
N = 2, M = 2 996 [950-1031] 4,02
N = 3, M = 3 1065 [1055-1074] 3,76
N = 4, M = 4 1313 [1247-1358] 3,04
N = 8, M = 8 1520 [1482-1574] 2,63
N = 1 , M = 7 3880 [3559-4152] 1,03
N = 7, M = 1 1314 [1299-1358] 3,04

The queue is performing well even when there are twice more threads than cores in the computer. The only anomalous data is in the N=1, M=7 row where there were only eight threads but the performance was quite low. It looks like the single writer was not able to put enough data into the channel queue for seven readers to read and that caused excessive looping in the MREW. But I have no proof for that.

What is Good For?

Absolutely nothingeverything! Of course, this queue was designed as a backing storage for the blocking collection, but it is also useful for any multi-threaded and single-threaded use. Just don’t use it in situation where you can’t control the growth of the data.

For example, all internal messaging queues in the OTL will still use bounded (fixed-size) queues. That way, a message recipient that blocks at some point only causes the queue to fill up which then triggers the exception. If dynamic queue would be used for the messaging, it could fill up all the virtual memory on the computer and only then crash the program (or some other thread would run out of memory and you’d have no idea where the cause of the problem lies).

Use it sparingly, use it wisely.

8 comments:

  1. Anonymous14:56

    Very, very interesting article!!

    ReplyDelete
  2. Thank you. I know it's a tad long but I didn't want to split it into 3-5 smaller articles.

    ReplyDelete
  3. Javier Santo Domingo16:57

    Great article!
    This made me reread the "LOCKED ATOMIC OPERATIONS" chapter of the Intel's manual. Just in case someone else wants to do that too, its 8.1 of http://www.intel.com/assets/PDF/manual/252046.pdf, but is 7.1 in the 2007 printed version i have.

    ReplyDelete
  4. Super... :)

    What about CAS8 instead function TOmniTaggedValue.CASTag?

    function CAS8(const oldValue, newValue: byte; var destination): boolean;
    asm
    lock cmpxchg byte ptr [destination], newValue
    setz al
    end; { CAS8 }

    LP GJ

    ReplyDelete
  5. 'Cause I didn't knew that this is also possible to do :(( My assembler skills are not that great.

    Thanks for the hint, I'll retest the code with CAS8 and report back.

    ReplyDelete
  6. Xepol19:58

    I'm still digging through this article, but my first instinct is to say it looks over complicated.

    By that I mean that the tag for the values is perhaps doing too much. The lock for each unique item could be "modifying/not modifying", and then another list based lock for actions that actually affect the lists' structure on the whole.

    If you need to go as far as to protect out of band read access, then an interlocked inc/dec for a read pointer that counts how many people are reading the record.


    Record destruction could then look like:

    spin until not modifying, set to modifying
    spin until read count=0
    spin until list locked
    destroy object
    update list
    unlock list

    Then a few rules such as read locks fail while the record is being modified, modifying fails if the read counter isn't 0. Head, Tail, any next/previous pointers would all be protected by the list wide lock.

    I could be over-simplifying, but as I said, I am still digging throught the article - these are just first impressions.

    ReplyDelete
  7. Interestingly, there is very little difference between code using CAS8 and CAS32. But as the code is simpler with CAS8, I'll do the change.

    ReplyDelete
  8. @Xepol: A number of states could definitely be reduced, but as I had one whole byte of space to use, I was not trying to minimize them. They are defined so that the concept is easy to understand. If you go down to the 'modifying' bit, then you're working on a different solution (or so I believe).

    A block read counter is not possible to implement (I believe), at least not without locking.

    The destruction could possibly be implemented your way but I don't see any significant advantage.

    The best you can do: Take my code and modify it your way. Test it. Run the bench. Then report back. In the worst case you'll learn something and that will be good for you. In the best case you'll wildly improve my approach and that will be good for the community.

    ReplyDelete