Sunday, July 27, 2008

OmniThreadLibrary alpha

WiP OmniThreadLibrary is slowly progressing to the alpha stage. Most of the interfaces have been defined and most of the functionality that I want to see in the 1.0 release is present and tested. I have not yet decided what to do with the messaging (current Variant-based solution can be rough on the edges sometimes) but rest of the stuff is mostly completed.

Most important changes since the last release:

  • First of all, big thanks to fellow Slovenian programmer Lee_Nover (the guy who wrote the SpinLock unit, BTW) who did a major cleanup job of the OTL code:
    • Added const prefix to parameters on many places (mostly when interfaces were passed - I tend to miss that). OTL should be slightly faster because of that.
    • Removed CreateTask(TOmniWorker) as CreateTask(IOmniWorker) is good enough for all occasions.
    • Cleaned the tests\* tree and fixed tests to compile with new Otl* units.
    • Removed IOmniTaskControl.FreeOnTerminate as it's no longer needed.
  • Created project group containing all test projects. To make it work, all projects in the tests tree were renamed.
  • *** IMPORTANT *** OtlTaskEvents unit was renamed to OtlEventMonitor and TOtlTaskEventDispatcher component was renamed to TOmniEventMonitor. Recompile the package!
  • *** IMPORTANT *** If you're working with a snapshot, you should delete the OmniThreadLibrary folder before unpacking new version as many files were renamed since the last snapshot!
  • Task UniqueID is now int64 (was integer). Somehow I wasn't happy with the fact that unique ID would roll over after merely 4,3 billion tasks ...
  • Thread priority can be controlled with IOmniTaskControl.SetPriority.
  • Exceptions are trapped and mapped into EXIT_EXCEPTION predefined exit code (OtlCommon.pas). Test in 13_Exceptions.
  • IOmniTaskControl.WithLock provides a simple way to share one lock (critical section, spinlock) between task owner and task worker. Lock can be accessed via the Lock property, implemented in IOmniTaskControl and IOmniTask interfaces. See the 12_Lock test for the example.
  • Eye candy - instead of calling omniTaskEventDispatch.Monitor(task) you can now use task.MonitorWith(omniTaskEventDispatch). The old way is still available. An example from the (new) 8_RegisterComm test:
procedure TfrmTestOtlComm.FormCreate(Sender: TObject);
begin
FCommChannel := CreateTwoWayChannel(1024);
FClient1 := CreateTask(TCommTester.Create(FCommChannel.Endpoint1, 1024))
.MonitorWith(OmniTED)
.Run;
FClient2 := CreateTask(TCommTester.Create(FCommChannel.Endpoint2, 1024))
.MonitorWith(OmniTED)
.Run;
end;
  • IOmniTaskControl.TerminateWhen enables you to add additional termination event to the task. One event can be shared between multiple tasks, allowing you to stop them all at once. Example in the 14_TerminateWhen.
  • Basic support for task groups has been added. At the moment you can start/stop multiple tasks and not much more. To add a task to a group, use group.Add(task) or task.Join(group) syntax. Demo in 15_TaskGroup.
IOmniTaskGroup = interface ['{B36C08B4-0F71-422C-8613-63C4D04676B7}']
function Remove(taskControl: IOmniTaskControl): IOmniTaskGroup;
function Add(taskControl: IOmniTaskControl): IOmniTaskGroup;
function RunAll: IOmniTaskGroup;
function TerminateAll(maxWait_ms: cardinal = INFINITE): boolean;
function WaitForAll(maxWait_ms: cardinal = INFINITE): boolean;
end; { IOmniTaskGroup }
  • Added support for task chaining. When one task is completed, next task is started. An excerpt from the 16_ChainTo demo:
procedure TfrmTestTaskGroup.btnStartTasksClick(Sender: TObject);
var
task1: IOmniTaskControl;
task2: IOmniTaskControl;
task3: IOmniTaskControl;
taskA: IOmniTaskControl;
begin
lbLog.Clear;
task3 := CreateTask(BgTask, '3').MonitorWith(OmniTED);
task2 := CreateTask(BgTask, '2').MonitorWith(OmniTED).ChainTo(task3);
task1 := CreateTask(BgTask, '1').MonitorWith(OmniTED).ChainTo(task2);
taskA := CreateTask(BgTask, 'A').MonitorWith(OmniTED).ChainTo(
CreateTask(BgTask, 'B').MonitorWith(OmniTED).ChainTo(
CreateTask(BgTask, 'C').MonitorWith(OmniTED)));
task1.Run; taskA.Run;
end;
  • A thread pool has been implemented.  Demo in 11_ThreadPool. I'll write a longer article on using it, but for now the demo should do.

As usual, code is available in the repository and as a snapshot.

Labels: , , , , , ,

OmniThreadLibrary internals - OtlContainers

WiP

The time has come to bring the OtlContainers subthread to the end. In previous parts (1, 2, 3, 4) I obsessively described underlying lock-free structures but hadn't said a word about the higher-level classes that are actually used for message transfer inside the OmniThreadLibrary.

Let's take a quick look at all OtlContainers classes. TOmniBaseContainer is an abstract parent of the whole container hierarchy. It's basic function is to provide memory allocation and lock-free algorithms for all descendants. Most of the code I already described in previous installments and the rest is trivial.

  TOmniBaseContainer = class abstract(TInterfacedObject)
strict protected
obcBuffer : pointer;
obcElementSize : integer;
obcNumElements : integer;
obcPublicChain : TOmniHeadAndSpin;
obcRecycleChain: TOmniHeadAndSpin;
class function InvertOrder(chainHead: POmniLinkedData): POmniLinkedData; static;
class function PopLink(var chain: TOmniHeadAndSpin): POmniLinkedData; static;
class procedure PushLink(const link: POmniLinkedData; var chain: TOmniHeadAndSpin); static;
class function UnlinkAll(var chain: TOmniHeadAndSpin): POmniLinkedData; static;
public
destructor Destroy; override;
procedure Empty; virtual;
procedure Initialize(numElements, elementSize: integer); virtual;
function IsEmpty: boolean; virtual;
function IsFull: boolean; virtual;
property ElementSize: integer read obcElementSize;
property NumElements: integer read obcNumElements;
end; { TOmniBaseContainer }

Then there is the base stack implementation, which only adds Push and Pop to the base container. We've seen those two already.

  TOmniBaseStack = class(TOmniBaseContainer)
public
function Pop(var value): boolean; virtual;
function Push(const value): boolean; virtual;
end; { TOmniBaseStack }

The queue is only slightly more complex. It provides Enqueue and Dequeue, which we already know, and overrides Empty and IsEmpty to take the dequeued messages chain into account.

  TOmniBaseQueue = class(TOmniBaseContainer)
strict protected
obqDequeuedMessages: TOmniHeadAndSpin;
public
constructor Create;
function Dequeue(var value): boolean; virtual;
procedure Empty; override;
function Enqueue(const value): boolean; virtual;
function IsEmpty: boolean; override;
end; { TOmniBaseQueue }

Now we come to the interesting part. Basic stack and queue operations are also described with interfaces IOmniStack and IOmniQueue - just in case you'd like to use them in your programs. OTL uses class representation of the queue directly.

  IOmniStack = interface ['{F4C57327-18A0-44D6-B95D-2D51A0EF32B4}']
procedure Empty;
procedure Initialize(numElements, elementSize: integer);
function Pop(var value): boolean;
function Push(const value): boolean;
function IsEmpty: boolean;
function IsFull: boolean;
end; { IOmniStack }

IOmniQueue = interface ['{AE6454A2-CDB4-43EE-9F1B-5A7307593EE9}']
procedure Empty;
procedure Initialize(numElements, elementSize: integer);
function Enqueue(const value): boolean;
function Dequeue(var value): boolean;
function IsEmpty: boolean;
function IsFull: boolean;
end; { IOmniQueue }

Actual implementation of the higher-level structures is exposed with classes TOmniStack and TOmniQueue. Besides implementing IOmniStack and IOmniQueue (respectively), they both implement IOmniNotifySupport and IOmniMonitorSupport.

  TOmniStack = class(TOmniBaseStack, IOmniStack, IOmniNotifySupport, IOmniMonitorSupport)
TOmniQueue = class(TOmniBaseQueue, IOmniQueue, IOmniNotifySupport, IOmniMonitorSupport)

IOmniMonitorSupport is describing a container with monitoring support (notifies attached monitor whenever new message is sent). IOmniNotifySupport is describing a container that notifies interested parties (by setting an event) that new message has been sent. Both are used in the OTL - notification support is needed to implement IOmniCommunicationEndpoint.NewMessageEvent in the OtlComm and monitoring support is needed to support IOmniTaskControl.MonitorWith in the OtlTaskControl unit.

  IOmniMonitorSupport = interface ['{6D5F1191-9E4A-4DD5-99D8-694C95B0DE90}']
function GetMonitor: IOmniMonitorParams;
//
procedure Notify;
procedure RemoveMonitor;
procedure SetMonitor(monitor: IOmniMonitorParams);
property Monitor: IOmniMonitorParams read GetMonitor;
end; { IOmniMonitorSupport }

IOmniNotifySupport = interface ['{E5FFC739-669A-4931-B0DC-C5005A94A08B}']
function GetNewDataEvent: THandle;
//
procedure Signal;
property NewDataEvent: THandle read GetNewDataEvent;
end; { IOmniNotifySupport }

The following excerpt from the TOmniQueue code demonstrates the implementation of those interfaces. TOmniStack is implemented in a similar manner.

The constructor takes the options parameter when you can enable monitoring and/or notification support.

type
TOmniContainerOption = (coEnableMonitor, coEnableNotify);
TOmniContainerOptions = set of TOmniContainerOption;

constructor TOmniQueue.Create(numElements, elementSize: integer;
options: TOmniContainerOptions);
begin
inherited Create;
Initialize(numElements, elementSize);
orbOptions := options;
if coEnableMonitor in Options then
orbMonitorSupport := TOmniMonitorSupport.Create;
if coEnableNotify in Options then
orbNotifySupport := TOmniNotifySupport.Create;
end; { TOmniQueue.Create }

function TOmniQueue.Dequeue(var value): boolean;
begin
Result := inherited Dequeue(value);
if Result then
if coEnableNotify in Options then
orbNotifySupport.Signal;
end; { TOmniQueue.Dequeue }

function TOmniQueue.Enqueue(const value): boolean;
begin
Result := inherited Enqueue(value);
if Result then begin
if coEnableNotify in Options then
orbNotifySupport.Signal;
if coEnableMonitor in Options then
orbMonitorSupport.Notify;
end;
end; { TOmniQueue.Enqueue }

Enqueue calls original enqueueing code and then signals notification (if enabled) and notifies the monitor (if enabled). Dequeue is similar, except that it only triggers notification so that the reader rechecks the input queue.

Stop. Enough words have been said about the OtlContainer. Next time, I'll discuss something completely different.

Labels: , , , , , ,

Thursday, July 24, 2008

A lock-free queue, finally!

WiPIt was a long and windy road (1, 2, 3) but finally it brought us to the really interesting part - lock-free queue. Admit it, stack is fine but it is not really suitable for sending data from point A to point B. When forwarding data inside the application we typically want it to arrive in the same order as it was transmitted and that's what queues are for.

Still, there was a good reason for that lengthy introduction. The lock-free stack we all know and love by now ;) is basis for the lock-free queue. Even more, most of the stack code is reused and queue's Enqueue/Dequeue methods are incredibly similar to stack's Push/Pop.

Let's take a look at the method that inserts new element at the end of the queue. The Enqueue code is not just similar to Push. They are identical. [Compare: Push]

function TOmniBaseQueue.Enqueue(const value): boolean;
var
linkedData: POmniLinkedData;
begin
linkedData := PopLink(obcRecycleChain);
Result := assigned(linkedData);
if not Result then
Exit;
Move(value, linkedData.Data, ElementSize);
PushLink(linkedData, obcPublicChain);
end; { TOmniQueue.Enqueue }

OK, so enqueueing an item is identical to pushing it. How do we remove items from the head of the queue, then? Dequeue must return not the top element of the stack but the bottom one - that's the element that was enqueued (pushed) first.

We could traverse the stack and somehow remove the last element but removing elements from a lock-free structure is very problematic, as you've already seen. [Where was the problem in TOmniBaseStack? In PopLink. There you go!] Removing elements from the queue is even harder (if not impossible) so let's try another approach.

function TOmniBaseQueue.Dequeue(var value): boolean;
var
linkedData: POmniLinkedData;
begin
if obqDequeuedMessages.Head = nil then
obqDequeuedMessages.Head := InvertOrder(UnlinkAll(obcPublicChain));
linkedData := PopLink(obqDequeuedMessages);
Result := assigned(linkedData);
if not Result then
Exit;
Move(linkedData.Data, value, ElementSize);
PushLink(linkedData, obcRecycleChain);
end; { TOmniQueue.Dequeue }

If you compare it with the Pop method, you'll see that they are very similar. Dequeue does some magic in first two lines, uses a different chain header in the PopLink, but from that point onward they are identical.

function TOmniBaseStack.Pop(var value): boolean;
var
linkedData: POmniLinkedData;
begin
linkedData := PopLink(obcPublicChain);
Result := assigned(linkedData);
if not Result then
Exit;
Move(linkedData.Data, value, ElementSize);
PushLink(linkedData, obcRecycleChain);
end; { TOmniBaseStack.Pop }

The big question is, what happens in InvertOrder(UnlinkAll(obcPublicChain)) to make the old Pop code work as a queue? Well, let's draw some pretty pictures. I just luuuv pretty pictures.

Let's say we have a queue with two elements. Element 1 was enqueued first, followed by element 2. Our queue behaves like a stack when enqueueing so it's no wonder that I could just copy old picture from the stack introduction article to present this state.

queue with two elements

We have two empty nodes in the recycle chain and two allocated nodes in the public chain. Public chain header points to element 2, which points to element 1, which points to nil.

Dequeue is called next and it executes UnlinkAll. This method merely removes all nodes from the chain that was passed as an argument. In our case, all nodes are removed from the public chain. This is done atomically. [How? I'll show you later.]

after UnlinkAll

Now we have public chain header that points to an empty chain (nil, in other words) and unnamed pointer that points to the top element in the stack. This unnamed pointer is returned as the UnlinkAll result.

InvertOrder is called next. It walks over the chain that was passed as an argument and reverses each and every pointer. This is done in a simple, non-atomical way, as the chain was already removed from the shared storage so concurrency is not a problem.

after InvertOrder

InvertOrder returns a pointer to the (new) chain head. In our case, this pointer points to node 1, which points to node 2, which points to nil. Result of the InvertOrder call is stored into the class field obqDequeuedMessages (also known as dequeue chain header).

The rest of the Dequeue method simply treats obqDeqeuedMessages as a stack chain and pops top element from it. As the chain was reversed, the top element is the one we need - the one that was enqueued first. The atomic version of PopLink is used for simplicity, but a simpler code could be used instead as there could be no inter-thread conflicts.

after Dequeue

On the return from the Dequeue, dequeued chain points to node 2, recycle chain points to node 1 (its data was copied from the node to the Dequeue's value parameters and node was recycled) and public chain is nil.

Let's assume that a writer enqueues data 3 into the queue. As we've already seen, this is a normal Push operation and we already know how it works.

Enqueue after Dequeue

Dequeued chain still points to node 2, public chain points to former node 1 (which now holds the value 3) and recycle chain points to the first empty node.

Reader now executes Dequeue again. As the dequeued chain is not nil, UnlinkAll and InvertAll are not called. A node is simply popped from the dequeued chain.

after second Dequeue

Dequeued chain is now nil. Next call to Dequeue will call UnlinkAll to remove public chain, InvertOrder to invert it, result will be stored in the dequeued chain and dequeue operation will continue in an already known fashion.

There is one important consequence of this approach. The lock-free queue (at least our implementation) can only have one reader. The problem lies in this non-atomic fragment:

  if obqDequeuedMessages.Head = nil then
obqDequeuedMessages.Head := InvertOrder(UnlinkAll(obcPublicChain));

If there were two readers, following could happen:

  • writer pushes one element into the queue
  • reader 1 starts the Dequeue, sees that Head is nil and is stopped just before it calls UnlinkAll
  • reader 2 starts the Dequeue, sees that Head is nil and proceeds with InvertOrder(UnlinkAll())
  • reader 2 updates the Head field of the dequeued chain
  • reader 1 continues its execution and passes empty public chain into UnlinkAll, which passes nil to InvertOrder, which returns nil and stores it into the Head parameter
  • the element that was dequeued by the reader 2 is lost

Multiple writers are supported, though, just like in the lock-free stack.

Let's take a quick look at the assembler code. UnlinkAll  is quite simple. First it clears the ecx register (sets up the nil pointer), loads chain.Head into eax and atomically swaps chain.Head with ecx (i.e., with nil). Old chain.Head is returned in the Result (eax), which is exactly what we want.

class function TOmniBaseContainer.UnlinkAll(var chain: TOmniHeadAndSpin): POmniLinkedData;
asm
xor ecx, ecx
mov edx, eax
mov eax, [edx]
@Spin:
lock cmpxchg [edx], ecx //Cut Chain.Head
jnz @Spin
end; { TOmniQueue.UnlinkAll }

InvertOrder is slightly longer but much simpler as there are no lock operations involved. Code merely walks over the pointer chain and inverts it.

class function TOmniBaseContainer.InvertOrder(chainHead: POmniLinkedData): POmniLinkedData;
asm
test eax, eax
jz @Exit
xor ecx, ecx
@Walk:
xchg [eax], ecx //Turn links
and ecx, ecx
jz @Exit
xchg [ecx], eax
and eax, eax
jnz @Walk
mov eax, ecx
@Exit:
end; { TOmniBaseStack.InvertOrder }

That's all folks. You now know how both lock-free OTL structures work and what are their limitations. Now I only have one more article on OtlContainers to write, one that will explain higher-level TOmniStack and TOmniQueue.

Labels: , , , , , ,

Wednesday, July 23, 2008

A working lock-free stack implementation

WiP

This is the third post in the mini-series on a lock-free stack, included in the OmniThreadLibrary. You've learned how the lock-free stack was implemented in the first place and which problem was hidden in that implementation. I've also mentioned that the problem was fixed in the meantime and that the working implementation is already available in the repository and as a snapshot, but I haven't said a word about the new implementation. This will now be rectified.

In the initial implementation lied an excellent example of the ABA problem.  You can read more about it in the Wikipedia, where you'll also learn that ABA problem is commonly solved by adding ‘tag’ bits. That's exactly what was done in the OTL, except that we didn't use just few measly bits, but a 4-byte counter.

Instead of containing only a pointer to the first node, each chain head is now composed of two parts — a pointer to the first node and a 4-byte spin count. This second part will make sure that the ABA problem cannot occur.

type
TOmniHeadAndSpin = packed record
Head: POmniLinkedData;
Spin: cardinal;
end;{ TOmniHeadAndSpin }

As the problem lied in the PopLink method, we'll start today's exploration just there. This is the new version in all its glory. (Of course, assembler is still provided by the ‘GJ’, I had nothing to do with it.)

class function TOmniBaseContainer.PopLink(var chain: TOmniHeadAndSpin): POmniLinkedData;
asm
push edi
push ebx
mov edi, eax //edi = @chain
@Spin:
mov ecx, 1 //Increment spin reference for 1
lock xadd [edi + 4], ecx //Get old spin reference to ecx
mov eax, [edi] //eax := chain.Head
mov edx, [edi +4] //edx := chain.Spin
test eax, eax
jz @Exit //Is Empty?
inc ecx //Now we are ready to cmpxchg8b
cmp edx, ecx //Is reference the some?
jnz @Spin
mov ebx, [eax] //ebx := Result.Next
lock cmpxchg8b [edi] //Now try to xchg
jnz @Spin //Do spin ???
@Exit:
pop ebx
pop edi
end;

You may have noticed that this is now a class function (a class static function, to be more precise). That made more sense as it needs no information from the class — it only works on a chain, which is passed as a parameter. Also, the code is much longer and not very easy to understand, so I'll try the trick that helped in the previous installment. I'll write an approximation of the PopLink in Delphi.

class function TOmniBaseContainer.PopLink(var chain: TOmniHeadAndSpin): POmniLinkedData;
label
Spin;
var
chainSpin: cardinal;
nextNode : POmniLinkedData;
spinRef : cardinal;
begin
// push edi
// push ebx
// mov edi, eax //edi = @chain
Spin:
// mov ecx, 1 //Increment spin reference for 1
// lock xadd [edi + 4], ecx //Get old spin reference to ecx
{.$ATOMIC}
spinRef := chain.Spin;
chain.Spin := chain.Spin + 1;
{.$END ATOMIC}
// mov eax, [edi] //eax := chain.Head
// mov edx, [edi +4] //edx := chain.Spin
{.$ATOMIC}
Result := chain.Head;
chainSpin := chain.Spin;
// test eax, eax
// jz @Exit //Is Empty?
if not assigned(Result) then
Exit;
// inc ecx //Now we are ready to cmpxchg8b
Inc(spinRef);
// cmp edx, ecx //Is reference the some?
// jnz @Spin
if spinRef <> chainSpin then
goto Spin;
// mov ebx, [eax] //ebx := Result.Next
// nextNode = ebx
nextNode := Result.Next;
// lock cmpxchg8b [edi] //Now try to xchg
{.$ATOMIC}
if (Result = chain.Head) and (chainSpin = chain.Spin) then begin
chain.Head := nextNode;
chain.Spin := spinRef;
end
else begin
chainSpin := chain.Spin;
Result := chain.Head;
{.$END ATOMIC}
// jnz @Spin //Do spin ???
goto Spin;
end;
//@Exit:
// pop ebx
// pop edi
end;

In the beginning, a little housekeeping is done (two registers stored away because Delphi compiler expects us not to modify them) and address of the chain record is loaded into the edi register.

With a little help of locked xadd, current version of the header spin value is loaded into the spinRef (ecx register) and is incremented in the header.

//  mov   ecx, 1                            //Increment spin reference for 1
// lock xadd [edi + 4], ecx //Get old spin reference to ecx
{.$ATOMIC}
spinRef := chain.Spin;
chain.Spin := chain.Spin + 1;
{.$END ATOMIC}

Xadd instructions stores first operand ([edi + 4], which equals to chain.Spin) into the second operand (ecx) and sum of both operands (original value of the second operand is used for summation) into the first operand.

Then we fetch current chain.Head and chain.Spin and store them into the Result (eax) and chainSpin (edx), respectively. This is a non-atomic fetch and doesn't guarantee that both values really belong to the same state of the chain record. Another thread may have been invoked between those two movs and it could call PopLink on the same chain and modify the header.

//  mov   eax, [edi]                        //eax := chain.Head
// mov edx, [edi +4] //edx := chain.Spin
Result := chain.Head;
chainSpin := chain.Spin;
// test eax, eax
// jz @Exit //Is Empty?
if not assigned(Result) then
Exit;

Because of the same reason there is no guarantee  that chainSpin is indeed equal to spinRef+1 so this is now checked for.

//  inc   ecx                               //Now we are ready to cmpxchg8b
Inc(spinRef);
// cmp edx, ecx //Is reference the some?
// jnz @Spin
if spinRef <> chainSpin then
goto Spin;

Now we're getting ready for the pointer swap. First we need an address of the second node in the ebx register.

//  mov   ebx, [eax]                        //ebx := Result.Next
nextNode := Result.Next;

Finally, we can use cmpxchg8b instruction to do the swap. Cmpxchg8b is a very special beast. It is similar to the cmpxchg, but it compares and swaps full 8 bytes in one go!

//  lock cmpxchg8b [edi]                    //Now try to xchg
{.$ATOMIC}
if (Result = chain.Head) and (chainSpin = chain.Spin) then begin
chain.Head := nextNode;
chain.Spin := spinRef;
end
else begin
chainSpin := chain.Spin;
Result := chain.Head;
{.$END ATOMIC}
// jnz @Spin //Do spin ???
goto Spin;
end;

Cmpxchg8b compares combination of edx and eax registers with its operand [edi]. In our case, edi is pointing to the chain (8 byte value in which first 4 bytes are a node pointer and second 4 a spin count), and eax and edx were loaded from the same structure just few instructions back. In other words, just like in the initial implementation we're checking if our internal values are still relevant. If that is not true, cmpxchg8b reloads edx and eax from the [edi] and the code then jumps back to the Spin label.

If values match, registers ecx and ebx are copied into the operand [edi]. Here, value in the ecx register is equal to the edx (we just verified that) and ebx contains the pointer to the next node. In other words, chain head is modified to point to the second node in chain.

There's not much difference between this implementation and the original one. The biggest and most important change is introduction of spin counter, which makes sure that the ABA problem, mentioned in the previous post, cannot occur.

[To be completely frank, ABA situation can still occur, at least in theory. To experience a problem with the new code, one thread would have to stop just before cmpxchg8b and wait there for a very long time. So long, in fact, that other threads would be able to wrap the spin counter from $FFFFFFFF to 0 and back to the original value. A little less than 4,3 billion PushLinks would have to be called. If the original thread is then awaken — and the spin counter contains the exact same value as it had before the thread was paused — the ABA problem would occur. That will never happen in practice, I'm totally sure.]

Besides the PopLink and introduction of spin counters, nothing much has changed. The PushLink method was slightly modified, as it was also changed from a normal method to a class static one, but it still works exactly as before.

class procedure TOmniBaseContainer.PushLink(const link: POmniLinkedData;
var chain: TOmniHeadAndSpin);
asm
mov ecx, eax
mov eax, [edx] //edx = chain.Head
@Spin:
mov [ecx], eax //link.Next := chain.Head
lock cmpxchg [edx], ecx //chain.Head := link
jnz @Spin
end; { TOmniBaseStack.PushLink }

That's it. This version of the lock-free stack has successfully passed 4-hour stress test, which makes me believe that it really works. Winking

Next time on the agenda: How the lock-free queue is made. This time I'll really blog about it. Nothing more will distract me. Promise!

Labels: , , , , , ,

Monday, July 21, 2008

TDM Rerun #10: Synchronisation Toolkit Revisited

The plan for this article was to present an implementation of a shared memory pool: a mechanism that allows multiple data producers to send data to one data manipulator. In fact, I already had all the code and half the article completed when I found out that my solution doesn't always work. Everything was OK when both ends of the pool (producer and manipulator) were implemented in the same application or in two 'normal' applications. But if I tried to change one of those apps into an NT service, the shared pool stopped working.

- Synchronisation Toolkit Revisited, The Delphi Magazine 91, March 2003

My 10th TDM article returned to my pet theme - synchronisation and communication. First part described some aspects of the NT security model and second implemented shared memory-based queue, capable of transporting messages from a GUI application to a service and back.

It looks like I'll have to return to that topic again - I got some reports recently that my approach doesn't (always?) work on Vista :(

Links: article (PDF, 54KB), source code (ZIP, 2.2 MB), current GpSync unit

Labels: , ,

Lock-free stack problem

WiPIn my last treatise on OmniThreadLibrary internals you could read about the lock-free stack structure, included in the OmniThreadLibrary. The code was relatively simple to understand (after studying it for hours ;) but alas, it didn't work. The problem only appeared when multiple readers or multiple writers were operating on the same structure and that's why the initial tests which used only one reader and one writer failed to detect it.

I had no idea what could go wrong (at the moment I thought I understand the lock-free stuff in the OtlContainters) but luckily the author of the stack code was smarter and located the problem. He explained it to me so I can explain it to you ...

Remember the PopLink code from the previous article? Probably not, so I'll reprint it here in a slightly modified form. [For the original code and comments, see the Implementing lock-free stack article.]

function TOmniBaseContainer.PopLink(var chainHead: POmniLinkedData): POmniLinkedData;
label
Spin;
var
second: POmniLinkedData;
begin
// mov eax, [edx]
Result := chainHead^;
Spin:
// test eax, eax
// jz @Exit
if not assigned(Result) then
Exit;
// mov ecx, [eax]
second := Result.Next;
// lock cmpxchg [edx], ecx
{.$ATOMIC}
if Result = chainHead^ then
chainHead := second
{.$END ATOMIC}
else
// jnz @Spin
goto Spin
end; { TOmniBaseContainer.PopLink }

I must emphasize again that this is not a working code. It might even compile (yes, Pascal/Delphi does have labels and goto!) but it surely won't work as the part between {.$ATOMIC} and {.$END ATOMIC} won't be executed atomically. Still, this is a nice approximation that allows us to understand the problem that also occurs with the assembler version.

The code is (now that it's coded in Delphi) pretty simple: it accesses first element in the chain, follows its Next pointer to the second element, and then atomically checks whether the chain head still points to the first element and if that is true, changes the chain head to point to the second element.

What can go wrong there? After all, we're checking the chain head and making the switch atomically! When we have only one reader and one writer, nothing much. Problems occur when there are multiple readers or multiple writers working at the same time.

The single point of failure is the lock cmpxchg instruction (the atomic part in Delphi code). There is a problem with the value that is stored in the chain head - when lock cmpxchg is executed, we cannot be sure that the ecx register (or the second variable) still points to a node that is indeed second in the chain.

Let's examine one of many possible failure scenarios:

  • A thread executes the code above up to the atomic part. Immediately after the second := first.Next is executed, thread is stopped and another thread gets the CPU slice.

At that point we have following data structure: chainHead -> first -> second -> ...

  • Another thread executes the same PopLink code and pops node first from the chain.

The data structure now looks like this: chainHead -> second -> ...

  • Yet another thread pushes a completely different node (let's call it newnode) into the chain.

chainHead -> newnode -> second -> ...

  • The first node is pushed back into the chain.

chainHead -> first -> newnode -> second -> ...

  • Now the first thread continues execution with the lock cmpxchg. The condition still holds - chainHead indeed points to the first node. Therefore, chainHead is modified to point to the second node. But this is not the node chainHead should be pointing to! It should point to the newnode node!

And so everything falls apart. Sad

Can the code be fixed? Yes, and the fix is already in the repository and is included in the latest snapshot. Next time I'll explain the new approach (and maybe even the operation of the lock-free queue). Stay tuned!

Labels: , , , , , ,

Thursday, July 17, 2008

OmniThreadLibrary progress report

WiPJust a short notice on what we're working on. I'm too tired to write a coherent article, longer than 100 words.

  • Lock-free structures are now really, really, really working.
  • Stress tests in tests\10_Containers have been significantly enhanced.
  • New Counter support.

The last item is really interesting and deserves a longer post. In short, it allows you to do:

counter := CreateCounter(2);

CreateTask(worker).WithCounter(counter).Run;

CreateTask(worker).WithCounter(counter).Run;

In worker code:

if Task.Counter.Decrement = 0 then

  Task.Comm.Send(MSG_ALL_DONE);

Decrement is interlocked, of course.

 

All that available in the repository and as a snapshot.

Labels: , , , , , ,

Tuesday, July 15, 2008

Implementing lock-free stack

[I'm cheating. The title should be “OmniThreadLibrary internals - OtlContainers”, but I wanted to attract more readers.Winking]

WiPToday I'll be describing the OtlContainers unit. Part of that unit, actually, as there is lot to tell and lot to show. I'll focus on the lock-free stack today and describe other stuff from the same unit in a day or two.

To give the credit where it is due — most of the code you'll see today was written by a Slovenian developer ‘GJ’. His is the implementation of the lock-free stack and all the assembler parts. I only wrapped them in higher-level structures. GJ, thanks for your contribution!

If you want to follow this article with the OtlContainers loaded in IDE, please make sure that you have the latest source [quick links: snapshot, repository, OtlContainers.pas].

The topic of today's presentation is TOmniBaseContainer. This is a base class that implements lock-free size-limited stack. Multiple simultaneous writers are supported, as are multiple readers. Although this is a fully functional class, you would usually want to use more feature-rich descendant TOmniStack.

A note regarding the limited size. It is much simpler to implement size-limited lock-free structure than to deal with dynamic allocation. In most cases, dynamic allocation would lead to possible locks inside the memory manager and some of the advantage gained by the lock-free algorithm would go away. Even more, you usually don't want the lock-free structure to grow without any control. Sure, OTL implementation won't be the right solution to all your needs but still we do believe that it is good enough for many usage scenarios.

Back to the story ... TOmniBaseContainer stores data into TOmniLinkedData nodes. First four bytes in the node point to the next node and other bytes (as many as the application requested) contain application data. Only first of those bytes is actually included in the TOmniLinkedData structure. It is only used as a placeholder for address calculation in Push and Pop operations.

type
POmniLinkedData = ^TOmniLinkedData;
TOmniLinkedData = packed record
Next: POmniLinkedData;
Data: byte; //user data, variable size
end; { TLinkedOmniData }

In the following diagrams I'll use this symbol to represent one TOmniLinkedData node.

data node

Nodes are connected into chains. Address of the first node is stored in a chain header. The Next field of the last node in chain contains value nil

data chain

As the TOmniBaseContainer implementation is size-limited, it can preallocate all nodes in one go. The Initialize method first allocates one big buffer to store all nodes and connects them into one chain, pointed to by the recycle chain header. This chain contains unused nodes. The other chain, public chain, contains nodes that are in use and is initialized to nil.

You'll see that the initialization code rounds up the size of node data to the nearest multiplier of 4. This is required because operations that modify node pointers (Next field) require them to be aligned on addresses that are divisible by 4.

  // calculate element size, round up to next 4-aligned value
bufferElementSize := ((SizeOf(POmniLinkedData) + elementSize) + 3) AND NOT 3;
GetMem(obcBuffer, bufferElementSize * cardinal(numElements));
Assert(cardinal(obcBuffer) AND 3 = 0);
//Format buffer to recycleChain, init orbRecycleChain and orbPublicChain.
//At the beginning, all elements are linked into the recycle chain.
obcRecycleChain := obcBuffer;
nextElement := nil; // to remove compiler warning in nextElement.Next := nil assignment below
currElement := obcRecycleChain;
for iElement := 0 to obcNumElements - 2 do begin
nextElement := POmniLinkedData(cardinal(currElement) + bufferElementSize);
currElement.Next := nextElement;
currElement := nextElement;
end;
nextElement.Next := nil; // terminate the chain
obcPublicChain := nil;

Structure, created by the initialization code is depicted below. Recycle chain points to the first node, which points to the next node and so on. The last node in the chain has nil stored in the Next pointer. Public chain is empty.

empty stack

Let's see what happens when code pushes new data onto the stack.

function TOmniBaseContainer.Push(const value): boolean;
var
linkedData: POmniLinkedData;
begin
linkedData := PopLink(obcRecycleChain);
Result := assigned(linkedData);
if not Result then
Exit;
Move(value, linkedData.Data, ElementSize);
PushLink(linkedData, obcPublicChain);
end; { TOmniBaseContainer.Push }

PopLink removes first node from the recycle chain. Chain header is updated to point to the next node in the chain. Pointer to the removed node is stored in the linkedData variable. If recycle chain is empty, linkedData contains nil and Push returns False.

The code next moves application data from the value parameter to the preallocated buffer. linkedData.Data conveniently addresses first data byte in the buffer.

At the end, the code inserts linkedData node at the beginning of the public chain and updates the public chain header to point to the linkedData node.

one pushed

The real workhorses here are PopLink and PushLink. Former pops first node from a chain and latter inserts the node into the head of the chain. The trick here is that they are written with multithreading in mind — they both expect that data structures may change at any time because another thread may be accessing the structure simultaneously.

Let's take a look at the PopLink first.

function TOmniBaseContainer.PopLink(var chainHead: POmniLinkedData): POmniLinkedData;
//nil << Link.Next << Link.Next << ... << Link.Next
//FILO buffer logic ^------ < chainHead
asm
mov eax, [edx] //Result := chainHead
@Spin:
test eax, eax
jz @Exit
mov ecx, [eax] //ecx := Result.Next
lock cmpxchg [edx], ecx //chainHead := Result.Next
jnz @Spin //Do spin ???
@Exit:
end; { TOmniBaseContainer.PopLink }

At beginning, edx register contains the address of the chainHead parameter. mov eax, [edx] moves chainHead, i.e. the node that chainHead is pointing to, into the eax. Chain may be empty, in which case chainHead is nil, or in assembler terms, eax is 0. test eax, eax checks for this condition.

If there's at least one node in the chain, we can continue. mov ecx, [eax] moves the address of the next node into the ecx register. Remember that eax points to the node and [eax] is the same as accessing the node's Next field.

Now we have an address of the first node in the eax register, address of the second node in the ecx register (there may be no second node, in which case the ecx is 0) and chainHead in the edx register. And now we can do the heavy magic.

lock cmpxchg [edx], ecx does few things, all at once. Well, not exactly in once, but processor (and level one cache and all weird hardware wrapping the CPUs) makes sure that this operation won't be interrupted by another core or CPU attempting to do the same thing at the same time. Firstly, cmpxchg compares eax with [edx]. Remember, we loaded eax from [edx] so those two values should be the same, yes? Well, no. Between mov eax, [edx] and lock cmpxchg, thread may be interrupted and stopped and another thread may have modified the chain header by executing PopLink. That's why we have to recheck.

If eax and [edx] are still the same, ecx is loaded into [edx]. In other words, address of the second node (which may be nil) is stored into the chainHead. Because of the lock prefix, all that (testing and assignment) happens atomically. Uninterruptible. In other words, another thread can not step on our (digital) toes.

If eax and [edx] are not the same, cmpxchg loads eax from [edx]. In other words, eax is refreshed from the chainHead. If that happens, jnz @Spin instruction will jump to the @Spin label and repeat the whole procedure.

At the end we have address of the second node stored in chainHead and address of the first node stored in eax, which is fine as pointer Results are returned in eax.

You can read more on cmpxchg here.

PushLink is much simpler.

procedure TOmniBaseContainer.PushLink(const link: POmniLinkedData; var chainHead:
POmniLinkedData);
asm
mov eax, [ecx] //ecx = chainHead
@Spin:
mov [edx], eax //link := chainHead.Next
lock cmpxchg [ecx], edx //chainHead := link
jnz @Spin
end; { TOmniBaseContainer.PushLink }

At the beginning, edx contains the value of the link parameter and ecx contains the address of the chainHead parameter.

The code first loads the address of the first node in the chain into the eax register. Nil pointers are fine in this case as we will not be following (dereferencing) them.

Then the code loads this same value into the [edx]. Remember, edx contains the value of the link, therefore [edx] represents link.Next. The mov [edx], eax line sets link.Next to point to the first node in the chain. If the chain is empty, link.Next will be set to nil, which is exactly the correct thing to do.

lock cmpxchg [ecx], edx then compares eax and [ecx] to ensure that underlying data haven't changed since PushLink started its execution. If values are not equal, eax is reloaded from [ecx] and code execution continues from the @Spin label. If values are equal, edx is loaded into [ecx]. At that moment, edx still contains the value stored in the link parameter and ecx contains the address of the chainHead. In other words, chainHead is set to link. As link.Next was set to old chainHead in the previous line, we have successfully linked link at the beginning of the chain.

That's all, the hard part is over. If you understand PopLink and PushLink, everything else is simple.

When we push the second value into the stack, it is inserted at the beginning of the public chain and recycle chain points to the next free node.

two pushed

The process continues until all nodes are linked into the public chain and recycle chain is nil.

four pushed

To pop a value from the stack, TOmniBaseContainer.Pop is used. It first gets a topmost allocated node (atomically, of course). Then it moves node data into the method parameter and pushes node into the recycle chain (again, atomically).

function TOmniBaseContainer.Pop(var value): boolean;
var
linkedData: POmniLinkedData;
begin
linkedData := PopLink(obcPublicChain);
Result := assigned(linkedData);
if not Result then
Exit;
Move(linkedData.Data, value, ElementSize);
PushLink(linkedData, obcRecycleChain);
end; { TOmniBaseContainer.Pop }

One of the important things to note is that nodes are not moved around. They are allocated at the beginning and for the whole lifetime of the TOmniBaseContainer they stay immovabe. Only Next fields are modified (and of course the Data is copied) and that's why this lock-free stack implementation is extremely fast. First results indicate that it can move about 800.000 integers per second between two threads on a 1,67 GHz Core2 Duo (T2300) machine.

base container test

Next post will discuss the lock-free ring buffer that is built on top of the lock-free stack. Stay tuned!

Labels: , , , , , ,

Monday, July 14, 2008

OmniThreadLibrary progress report

  • WiP new unit OtlContainers with lock-free stack and lock-free ring buffer (both with additional limitations; details coming soon)
  • OtlComm redesigned around OtlContainers
  • new test 10_Containers, which performs basic unit test for the OtlContainers unit
  • package split to designtime/runtime; OtlComm unit test split into separate unit OtlCommBufferTest; component registraion moved from OtlTaskEvents to new unit OtlRegister [all thanks to Lee_Nover]
  • fixed memory leaks in tests 8 and 9
  • added demonstration of how to send an object over the communication channel to the demo 8
  • new SpinLock unit which fixes reentrancy problems is included
  • new DSiWin32 unit which fixes problems when DSiTimeGetTime64 was called from more than one thread

All that available in the repository and as a snapshot.

Labels: , , , , , ,

Friday, July 11, 2008

OmniThreadLibrary goes lock-free

WiPWell, not complete OTL, just the messaging subsystem, but even that is quite some achievement. The ring buffer inside the OtlComm unit is now implemented with a help of a lock-free stack. Lock-free buffer is now a default. If you want to compile OtlComm with the locking buffer, define the OTL_LockingBuffer symbol.

Lock-free buffer did not improve communication subsystem speed much at the moment as the limiting factor is Delphi's Variant implementation but that will change. The collective mind behind the OTL is working on some interesting messaging ideas that will be much faster but will still keep most of the Variant flexibility and simplicity.

What I can promise even now is that there will be lock-free stack and lock-free ring buffer included in the OTL as standalone reusable data structures. Sounds useful?

Labels: , , , , , ,

Wednesday, July 09, 2008

OmniThreadLibrary Example #5: Registering additional communication channels

WIPI thought I would be documenting the TOmniTaskEventDispatch component today, but I was working on something interesting yesterday and today and I want to show it to the public. The functionality I'll be talking about is demoed in the tests\8_RegisterComm project, available in the repository and in the today's snapshot. [All new functionality was uploaded, not just this demo, of course.]

I was working on the OtlComm testing and benchmarking code. I wanted to find out if the communication code is working correctly (by stress-testing it) and if the lock-free buffer implementation is faster than the locking one. So I set up two threaded clients (using OTL, of course!) and started to write code that would establish a direct communication channel between them. I wanted to use the standard Comm channel for test control and reporting, but not for running tests.

At first, the task seemed quite simple. I set up a new communication channel of type IOmniTwoWayChannel and created two tasks, based on the TOmniWorker class. Each task received one endpoint of the new communication channel as a parameter.

  FCommChannel := CreateTwoWayChannel(1024);
FClient1 := OmniTaskEventDispatch1.Monitor(
CreateTask(TCommTester.Create(FCommChannel.Endpoint1, 1024))).Run;
FClient2 := OmniTaskEventDispatch1.Monitor(
CreateTask(TCommTester.Create(FCommChannel.Endpoint2, 1024))).Run;

An image is worth more than thousand words. additional communication channel


When I had this infrastructure ready, I started to think about message loop in my tasks. At that moment (yesterday), internal message loop only handled the default communication channel. Writing a whole message loop to support one measly communication channel didn't seem like a right decision. What to do? Extend OTL, of course!


Today's snapshot contains support for additional communication channels. You only have to call RegisterComm and internal message dispatcher will handle everything for you.


The 8_RegisterComm demo will help you understand. There are two buttons on the form, first sends a random number to task 1 and second sends a random number to task 2.


8_RegisterComm


A code for first button's OnClick event should suffice. Code for the second button is almost the same.

procedure TfrmTestOtlComm.btnSendTo1Click(Sender: TObject);
var
value: integer;
begin
value := Random(100);
Log(Format('Sending %d to task 1', [value]));
FClient1.Comm.Send(MSG_FORWARD, value);
end;

The TCommTester class implements message handler for the MSG_FORWARD message. The code in this method firstly notifies the owner that MSG_FORWARD message was received and secondly sends MSG_FORWARDING message to the task-to-task communication channel.

type
TCommTester = class(TOmniWorker)
strict private
ctComm : IOmniCommunicationEndpoint;
ctCommSize: integer;
public
constructor Create(commEndpoint: IOmniCommunicationEndpoint; commBufferSize: integer);
function Initialize: boolean; override;
procedure OMForward(var msg: TOmniMessage); message MSG_FORWARD;
procedure OMForwarding(var msg: TOmniMessage); message MSG_FORWARDING;
end; { TCommTester }

constructor TCommTester.Create(commEndpoint: IOmniCommunicationEndpoint; commBufferSize:
integer);
begin
inherited Create;
ctComm := commEndpoint;
ctCommSize := commBufferSize;
end;

function TCommTester.Initialize: boolean;
begin
Task.RegisterComm(ctComm);
Result := true;
end;

procedure TCommTester.OMForward(var msg: TOmniMessage);
begin
Task.Comm.Send(MSG_NOTIFY_FORWARD, msg.MsgData);
ctComm.Send(MSG_FORWARDING, msg.MsgData);
end;

procedure TCommTester.OMForwarding(var msg: TOmniMessage);
begin
Task.Comm.Send(MSG_NOTIFY_RECEPTION, msg.MsgData);
end;

The MSG_FORWARDING handler (OMForwarding) just notifies the owner that the message was received from another task.


In the screenshot above, you can see how value 0 traveled from task 1 to task 2 and how value 3 traveled from task 2 to task 1.


The really magical part happens in TCommTester.Initialize. The code fragment Task.RegisterComm(ctComm) registers communication endpoint as an additional message source. From that point onwards, messages that arrive on the ctComm channel are treated same way as messages arriving on the Comm channel.


The road to this deceptively simple solution was quite long. I had to do major refactoring in the OtlTask unit. Quite some code was moved from the TOmniTaskControl class to the TOmniTaskExecutor class (which was a record before). I think that the new code works fine (I've run all test cases), but only time will tell ...


If you feel the need to check how additional communication channel support is implemented, check TOmniTaskExecutor. Asy_DispatchMessages and TOmniTaskExecutor.Asy_RegisterComm (OtlTask unit).

Labels: , , , , , ,

Tuesday, July 08, 2008

OmniThreadLibrary internals - OtlComm

WIP

Today I'll describe the inner workings of the OmniThreadLibrary communication subsystem. It lives in the OtlComm unit and is used extensively by the task control/task worker interfaces (described in the previous installment). Its use is not limited to the OTL as it has no knowledge of tasks and threads. Feel free to use it in your own non-OTL-related code.

At the surface, messaging subsystem looks deceptively simple.

image

IOmniTaskControl interface exposes property Comm: IOmniCommunicationEndpoint.

  IOmniCommunicationEndpoint = interface ['{910D329C-D049-48B9-B0C0-9434D2E57870}']
function GetNewMessageEvent: THandle;
//
procedure RemoveMonitor;
procedure Send(msgID: word; msgData: TOmniValue); overload;
procedure Send(msgID: word; msgData: array of const); overload;
procedure Send(const msg: TOmniMessage); overload;
procedure SetMonitor(hWindow: THandle; messageWParam, messageLParam: integer);
function Receive(var msgID: word; var msgData: TOmniValue): boolean; overload;
function Receive(var msg: TOmniMessage): boolean; overload;
property NewMessageEvent: THandle read GetNewMessageEvent;
end; { IOmniTaskCommunication }

This simple interface allows the owner to send messages, receive messages and wait on a new message. A property with a same name lives on the worker side, too (in the IOmniTask interface). Both endpoints are connected so that IOmniTaskControl.Comm.Send sends message to IOmniTask.Comm and vice versa. Simple.


But when you look under the surface ...


Here be dragons!


This is the real picture of objects and interfaces living inside the OtlComm unit. Although the surface view may give you an impression that the IOmniTaskCommunication is the most important part of the communication system, in reality everything revolves around the IOmniTwoWayChannel.


image 


To understand this picture, it's best to start at the bottom left (IOW, in the rectangle just above this line). The IOmniTaskControl interface is the one that is returned from the CreateTask procedure.


IOmniTaskControl is implemented by the TOmniTaskControl object, which owns an IOmniTwoWayChannel interface, created during TOmniTaskControl initialization.

procedure TOmniTaskControl.Initialize;
begin
//...
otcCommChannel := CreateTwoWayChannel;
//...
end; { TOmniTaskControl.Initialize }

This interface is also passed to the TOmniTask object (which implements IOmniTask interface) when task is run.

function TOmniTaskControl.Run: IOmniTaskControl;
var
task: IOmniTask;
begin
//...
task := TOmniTask.Create(..., otcCommChannel, ...);
//...
end; { TOmniTaskControl.Run }

TOmniTwoWayChannel owns two ring buffers of type TOmniRingBuffer (either the locking version, which is the default at the moment, or lock-free version if you compile the unit with /dOTL_LockFreBuffer). Those buffers are used as unidirectional message queues that store TOmniMessage records.

  TOmniMessage = record
MsgID : word;
MsgData: TOmniValue;
end; { TOmniMessage }

TOmniTwoWayChannel also owns two IOmniCommunicationEndpoint interfaces (implement by the TOmniCommunicationEndpoint class). One of them is exposed via the Enpoint1 property and another via Endpoint2.

  IOmniTwoWayChannel = interface ['{3ED1AB88-4209-4E01-AA79-A577AD719520}']
function Endpoint1: IOmniCommunicationEndpoint;
function Endpoint2: IOmniCommunicationEndpoint;
end; { IOmniTwoWayChannel }

Both endpoints are connected to both ring buffers. If we name those ring buffers A and B, endpoint 1 writes to A and reads from B while endpoint 2 writes to B and reads from A.


TOmniTaskControl.Comm and TOmniTask.Comm are simple mappers that return different endpoints of the same IOmniTwoWayChannel interface.

function TOmniTaskControl.GetComm: IOmniCommunicationEndpoint;
begin
Result := otcCommChannel.Endpoint1;
end; { TOmniTaskControl.GetComm }

function TOmniTask.GetComm: IOmniCommunicationEndpoint;
begin
Result := otCommChannel.Endpoint2;
end; { TOmniTask.GetComm }

And that's all folks ...


Autovivification


Well, that's almost all. The last trick in the OtlComm is creation of communication infrastructure on as-needed basis. As you can see from the diagram, the whole system is quite "heavy". If you're running a simple fire-and-forget tasks, you may not need it at all. That's why the ring buffers and communication endpoints are not created until they are used.


The TOmniTwoWayChannel object is created whenever a task is created (see TOmniTaskControl.Initialize, above), but it does not create other parts of the infrastructure. This is only done in the endpoint accessor.

procedure TOmniTwoWayChannel.CreateBuffers;
begin
if twcUnidirQueue[1] = nil then
twcUnidirQueue[1] := TOmniRingBuffer.Create(twcMessageQueueSize);
if twcUnidirQueue[2] = nil then
twcUnidirQueue[2] := TOmniRingBuffer.Create(twcMessageQueueSize);
end; { TOmniTwoWayChannel.CreateBuffers }


function
TOmniTwoWayChannel.Endpoint1: IOmniCommunicationEndpoint;
begin
Assert((cardinal(@twcEndpoint[1]) AND 3) = 0);
if twcEndpoint[1] = nil then begin
twcLock.Acquire;
try
if twcEndpoint[1] = nil then begin
CreateBuffers;
twcEndpoint[1] := TOmniCommunicationEndpoint.Create(twcUnidirQueue[1], twcUnidirQueue[2]);
end;
finally twcLock.Release; end;
end;
Result := twcEndpoint[1];
end; { TOmniTwoWayChannel.Endpoint1 }

The code to create Endpoint2 is similar except that it uses twcEndpoint[2] and reverses parameters passed to the TOmniCommunicationEndpoint.Create.


The Endpoint1 method uses some tricks that may not be obvious.



  • Testing if interfaces have been already initialized is optimistic. The code first tests if endpoint is nil and if that is true (and that will be very rarely, only the first time), it locks access to internal structures and the retests the same condition before creating buffers and the endpoint.
  • The Assert checks if two least important bits of the endpoint address are 0. This makes the endpoint variable DWORD-aligned (its address is divisible by 4), which in turn causes reads/writes from/to that address to be atomic on the Intel architecture (the only one that concerns us). In other words, even if another thread is modifying the same variable on another CPU (or core), we know that we will read either the old value or the new value and not some mixture of both.

That concludes the OtlComm tour. The only remaining part (until the thread pool is implemented) is the TOmniTaskEventDispatch component, which I'll cover in a day or two.

Labels: , , , , , ,

Monday, July 07, 2008

OmniThreadLibrary lock-free communication

image image I've updated the OtlComm.pas unit in OTL home page with a lock-free circular buffer, donnated by a fellow Slovenian Delphi programmer who goes by a nick 'GJ'.

This code is doubly-work-in-progress! I haven't even test it. It is disabled by default and you have to define conditional symbol LockFreeBuffer to use it.

If you'll be making any comparisons to my original communication code and especially if you find any problem with new code, let me know.

UPDATE: I decided to use OTL_LockFreeBuffer instead of LockFreeBuffer. Repository has been updated, too.

Labels: , , , , , ,

Friday, July 04, 2008

OmniThreadLibrary internals - OtlTask

image [Please note - new snapshot is available on the OTL home page.]

In next few posts, I'll try to present a rough overview of the OTL internals.

The most important unit is OtlTask. It hosts interfaces describing threaded task. Currently, it also provides thread descendant to run those tasks, but that one will most probably be moved to the OtlThread unit.

The most important interface in OtlTask is IOmniTaskControl. This task control interface is returned from the CreateTask function. It is used to communicate with the task and to control its behavior. Four CreateTask functions map directly to four constructors, which store execution parameters into internal field and initialize some events used for task control.

    constructor Create(worker: IOmniWorker; const taskName: string); overload;
constructor Create(worker: TOmniWorker; const taskName: string); overload;
constructor Create(executor: TOmniTaskMethod; const taskName: string); overload;
constructor Create(executor: TOmniTaskProcedure; const taskName: string); overload;

SetParameter* methods are used to set parameters that are passed to the threaded worker.

    function  SetParameter(const paramName: string; paramValue: TOmniValue): IOmniTaskControl; overload;
function SetParameter(paramValue: TOmniValue): IOmniTaskControl; overload;
function SetParameters(parameters: array of TOmniValue): IOmniTaskControl;

Next four methods are used to control internal event/message loop behavior. They are only used when working with IOmniWorker or TOmniWorker tasks. Alertable causes message wait to be alertable and MsgWait causes message wait to process messages (see TOmniTaskControl.DispatchMessages for more information). SetTimer causes timerMessage message to be sent to the worker in specified intervals. If timerMessage is -1, worker's Timer method is called instead. FreeOnTerminate is only available when worker is a descendant of the TOmniWorker class and causes the worker to be destroyed after the task is completed.

    function  Alertable: IOmniTaskControl;
function FreeOnTerminate: IOmniTaskControl;
function MsgWait(wakeMask: DWORD = QS_ALLEVENTS): IOmniTaskControl;
function SetTimer(interval_ms: cardinal; timerMessage: integer = -1): IOmniTaskControl;

SetMonitor and RemoveMonitor are used to attach external message watching monitor. Typically that would be the TOmniTaskEventDispatch component and those TOmniTaskControl messages would be called component's Monitor and Detach functions.

    function  RemoveMonitor: IOmniTaskControl;
function SetMonitor(hWindow: THandle): IOmniTaskControl;

To start the task, you must execute either Run (creates a new thread and starts executing the task) or Schedule (schedules task to be executed in a thread pool; not implemented yet).

    function  Run: IOmniTaskControl;
function Schedule(threadPool: IOmniThreadPool = nil {default pool}): IOmniTaskControl;

There are four other methods related to the execution control. Terminate stops the task and waits the specified amount of time for the task to complete gracefully. At the moment it returns False if task does not terminate in the allotted time. In the future, it will maybe kill the thread. I'm not sure yet.


TerminateWhen is not implemented yet. It was supposed to assign another synchronization object to the thread. That object would cause the task to stop. The idea was to use one event to stop multiple tasks at once, but I'm not sure if that is useful at all.


WaitFor waits for the task to finish execution and returns True if task has stopped. WaitForInit waits for IOmniWorker/TOmniWorker worker to execute its Initialize method and returns initialization status (return value of the Initialize method).

    function  Terminate(maxWait_ms: cardinal = INFINITE): boolean;
function TerminateWhen(event: THandle): IOmniTaskControl;
function WaitFor(maxWait_ms: cardinal): boolean;
function WaitForInit: boolean;

The Comm property gives the owner access to the communication subsystem. This will be covered in a separate post.


ExitCode and ExitMessage will return task's exit code and message. Not implemented yet.


Name returns task's name, as set in the constructor and UniqueID returns task's unique ID. This identifier is unique inside the application.


Options gives you direct access to execution options, usually set with Alertable, MsgWait, and FreeOnTerminate.

    property Comm: IOmniCommunicationEndpoint read GetComm;
property ExitCode: integer read GetExitCode;
property ExitMessage: string read GetExitMessage;
property Name: string read GetName;
property Options: TOmniTaskControlOptions read otcOptions write otcOptions;
property UniqueID: cardinal read GetUniqueID;

Execution


Run first makes parameters immutable, then creates thread-side task interface (IOmniTask), creates new thread, assigns that task to the thread and starts the thread.

function TOmniTaskControl.Run: IOmniTaskControl;
var
task: IOmniTask;
begin
otcParameters.Lock;
task := TOmniTask.Create(otcExecutor, otcTaskName, otcParameters, otcCommChannel,
otcUniqueID, otcTerminateEvent, otcTerminatedEvent, otcMonitorWindow);
otcThread := TOmniThread.Create(task);
otcThread.Resume;
Result := Self;
end; { TOmniTaskControl.Run }

Thread's Execute method passes control to the TOmniTaskControl's Execute method.

procedure TOmniThread.Execute;
begin
{$IFNDEF OTL_DontSetThreadName}
SetThreadName(otTask.Name);
{$ENDIF OTL_DontSetThreadName}
(otTask as IOmniTaskExecutor).Execute;
end; { TOmniThread.Execute }

Execute calls either procedure or method worker. When working with IOmniWorker/TOmniWorker worker, Execute calls TOmniTaskControl.DispatchMessages. In all cases, IOmniTask is passed as a parameter.

procedure TOmniTaskExecutor.Execute(task: IOmniTask);
begin
case ExecutorType of
etMethod:
Method(task);
etProcedure:
Proc(task);
else
raise Exception.Create('TOmniTaskExecutor.Execute: Executor is not set');
end;
end; { TOmniTaskExecutor.Execute }

IOmniTask


IOmniTask interface allows the worker to communicate with the owner.

  IOmniTask = interface
procedure SetExitStatus(exitCode: integer; const exitMessage: string);
procedure Terminate;
property Comm: IOmniCommunicationEndpoint read GetComm;
property Name: string read GetName;
property Param[idxParam: integer]: TOmniValue read GetParam;
property ParamByName[const paramName: string]: TOmniValue read GetParamByName;
property TerminateEvent: THandle read GetTerminateEvent;
property UniqueID: cardinal read GetUniqueID;
end; { IOmniTask }

SetExitStatus sets exit code and message, which are then mapped to task control's ExitCode and ExitMessage parameters. Not implemented yet.


Terminate sets task's TerminateEvent. Thus the task can terminate itself. TerminateEvent is also set when code calls task control interface's Terminate method.


Comm returns task's communication endpoint. More details will be provided in the next post.


Name returns the task name, as set in the CreateTask call. UniqueID returns task's unique ID.


Param and ParamByName can be used to access parameters passed to the task.


More information on task writing is available in examples (and already published posts on the OTL topic).

Labels: , , , , , ,

Thursday, July 03, 2008

OmniThreadLibrary Example #4a: Bidirectional communication, with objects

imageThis is a variation of Example #4 that uses TOmniWorker object instead of IOmniWorker interface. Code is stored in folder tests\6_TwoWayHello_with_object_worker.

I strongly suggest that you look at Example #4 first, as this post only lists the differences between those two examples.

Firstly, we don't need TAsyncHello.Initialize. We'll set initial message text in the constructor. [Admittedly, this is something that can also be done when using IOmniWorker approach.] Message handlers are identical to the Example #4.

  TAsyncHello = class(TOmniWorker)
strict private
aiMessage: string;
public
constructor Create(const initialMessage: string);
procedure OMChangeMessage(var msg: TOmniMessage); message MSG_CHANGE_MESSAGE;
procedure OMSendMessage(var msg: TOmniMessage); message MSG_SEND_MESSAGE;
end;
constructor TAsyncHello.Create(const initialMessage: string);
begin
aiMessage := initialMessage;
end;

Secondly, we have to tweak the task creation.

procedure TfrmTestOTL.actStartHelloExecute(Sender: TObject);
begin
FHelloTask :=
OmniTaskEventDispatch1.Monitor(CreateTask(TAsyncHello.Create('Hello'), 'Hello')).
SetIdle(1000, MSG_SEND_MESSAGE).
SetParameter('Delay', 1000).
FreeOnTerminate.
Run;
end;

TAsyncHello object can now be passed directly to the CreateTask method. There is no need to set the Message parameter. Also new is the FreeOnTerminate call which tells the task manager to destroy the worker object when task is terminated. Alternatively, you can store away the worker object and destroy it "manually" after the FHelloTask is terminated.


The rest of the code does not differ from the Example #4.

Labels: , , , , , ,

OmniThreadLibrary Example #4: Bidirectional communication, the OTL way

[Please note - today's snapshot is available on the OTL home page.]

image Today's topic is writing message-driven tasks without having to write an event/message processing loop. Of course, the loop is still there, but it is hidden inside the OtlTask unit.

The demo is stored in folder tests\5_TwoWayHello_without_loop. OTL package is required to open the main form.

This time the worker is not a method or procedure, but a whole object. It must either implement IOmniWorker interface or be a descendant of the TOmniWorker class (which implements IOmniWorker).

The worker class will process two messages. When MSG_CHANGE_MESSAGE is received, worker will change internal message text. When MSG_SEND_MESSAGE is received, worker will send this message text to the owner (just like in yesterday's example). We'll also override the Initialize method to set initial message text. Initialize and it's counterpart Cleanup are executed in the context of the background thread executing the task so they can be used to allocate/deallocate thread-sensitive objects.

const
MSG_CHANGE_MESSAGE = 1;
MSG_SEND_MESSAGE = 2;

type
TAsyncHello = class(TOmniWorker)
strict private
aiMessage: string;
public
function Initialize: boolean; override;
procedure OMChangeMessage(var msg: TOmniMessage); message MSG_CHANGE_MESSAGE;
procedure OMSendMessage(var msg: TOmniMessage); message MSG_SEND_MESSAGE;
end;

We can write those two message handlers identical to the way we write Windows message handlers. Message IDs are arbitrary, I usually start with 1.

{ TAsyncHello }

function TAsyncHello.Initialize: boolean;
begin
aiMessage := Task.ParamByName['Message'];
Result := true;
end;

procedure TAsyncHello.OMChangeMessage(var msg: TOmniMessage);
begin
aiMessage := msg.MsgData;
end;

procedure TAsyncHello.OMSendMessage(var msg: TOmniMessage);
begin
Task.Comm.Send(0, aiMessage);
end;

Worker code (above) is trivial, task creation code only slightly less.

procedure TfrmTestOTL.actStartHelloExecute(Sender: TObject);
var
worker: IOmniWorker;
begin
worker := TAsyncHello.Create;
FHelloTask :=
OmniTaskEventDispatch1.Monitor(CreateTask(worker, 'Hello')).
SetIdle(1000, MSG_SEND_MESSAGE).
SetParameter('Delay', 1000).
SetParameter('Message', 'Hello').
Run;
end;

First we create  the worker object and store it in the interface variable. CreateTasks then takes this worker interface instead of worker method/procedure. SetParameters are same as in the previous example (except that the parameter order has been changed to name, value in today's snapshot). The only mystery here is the SetIdle call. It tells the internal event/message loop to send the MSG_SEND_MESSAGE every 1000 milliseconds. [The second parameter - message ID - is optional. When left out, worker's Idle method is called instead.]


A word of warning: SetIdle will be most probably renamed to SetTimer in the next release.


What about MSG_CHANGE_MESSAGE? It is generated in completely the same way as in the previous example.

procedure TfrmTestOTL.actChangeMessageExecute(Sender: TObject);
begin
FHelloTask.Comm.Send(MSG_CHANGE_MESSAGE, 'Random ' + IntToStr(Random(1234)));
end;

Same goes for the task termination - just call FHelloTask.Terminate and set FHelloTask to nil. Worker object will be destroyed automatically.


So, what you say? Simple enough?

Labels: , , , , , ,

Wednesday, July 02, 2008

OmniThreadLibrary Example #3: Bidirectional communication

image In the third installment of my introduction to the OmniThreadLibrary, I'll show how you can implement bidirectional communication with the threaded task. This time there will be more code to write so you should probably just open example in tests\4_TwoWayHello_with_package folder (or tests\2_TwoWayHello if you don't want to install the OTL package).

For your convenience, I've uploaded snapshot of the current repository state to the projects home address http://code.google.com/p/omnithreadlibrary/.

For the readers that are following my exposé in the browser, here is the screenshot of  today's example in action.

image

There are three buttons - first starts the threaded task, second instructs the task to change the message and thirds stops the task. There are also a TOmniTaskEventDispatch and a TActionList components on a form.

The actStart action is connected to the first button and starts the threaded task.

procedure TfrmTestOTL.actStartHelloExecute(Sender: TObject);
begin
FHelloTask :=
OmniTaskEventDispatch1.Monitor(CreateTask(RunHello, 'Hello')).
SetParameter(1000, 'Delay').
SetParameter('Hello', 'Message').
Run;
end;

The task is created in an already familiar manner (CreateTask output passed to the Monitor method). Then, two named parameters are set. First parameter to the SetParameter method contains a Variant value and second (optional) contains parameter's name.


At the end, Run is called to start the task and task control interface is stored in a field.

strict private
FHelloTask: IOmniTaskControl;

Stopping the task is trivial.

procedure TfrmTestOTL.actStopHelloExecute(Sender: TObject);
begin
FHelloTask.Terminate;
FHelloTask := nil;
end;

When you click the second button (Change message), standard communication channel (introduced in previous installment) is used to send a message containing new text.

procedure TfrmTestOTL.actChangeMessageExecute(Sender: TObject);
begin
FHelloTask.Comm.Send(MSG_CHANGE_MESSAGE, 'Random ' + IntToStr(Random(1234)));
end;

The OmniTaskEventDispatch1's event handlers are identical to the Hello, world! example.


The part that is new today is the worker code which must receive and process MSG_CHANGE_MESSAGE messages. Today, this code is a normal procedure and not a method (just to show that this option is available, too).

procedure RunHello(task: IOmniTask);
var
msg : string;
msgData: TOmniValue;
msgID : word;
begin
msg := task.ParamByName['Message'];
repeat
case DSiWaitForTwoObjects(task.TerminateEvent, task.Comm.NewMessageEvent,
false, task.ParamByName['Delay'])
of
WAIT_OBJECT_1:
begin
task.Comm.Receive(msgID, msgData);
if msgID = MSG_CHANGE_MESSAGE then
msg := msgData;
end;
WAIT_TIMEOUT:
task.Comm.Send(0, msg);
else
break; //repeat
end;
until false;
end;

RunHello first retrieves the value of the Message parameter. Then it enters an infinite loop (repeat .. until false) in which it waits for either task.TerminateEvent or task.Comm.NewMessageEvent. DSiWin32 is used for brevity only. Normal WaitForMultipleObjects API call could be used insted.


Task.TerminateEvent is signaled in the Terminate method (when user clicks the Stop "Hello" button). In this case, code simply breaks out of the repeat..until loop.


Task.Comm.NewMessageEvent is signaled when a message is waiting in the communication queue. In this case (WAIT_OBJECT_1), message is received and processed.


If nothing was signaled for <value of the 'Delay' parameter> milliseconds, the WAIT_TIMEOUT path is taken and message msg (whatever the current value may be) is sent to the owner where it is displayed.


If you did any threaded pr0gramming then you certainly recognized this loop - this is fairly standard approach when writing multithreaded code. Nothing special here except the messaging subsystem. Still, OTL doesn't stop here. As you'll see in the next example, it is possible to rewrite this code in much simpler way.

Labels: , , , , , ,

OmniThreadLibrary Example #2: Hello, world!

imageLast time I've shown how to write a trivial threaded Beep in two lines.  Today I'll do something more complicated - a threaded Hello, world program that communicates with its owner. Communication will be unidirectional; I'll leave bidirectional communication for the next example.

First, compile and install the OmniThreadLibrary_D2007 package (from the packages\ subfolder). This will add one component to your palette - TOmniTaskEventDispatch. This component simplifies communication between a threaded task and primary thread.

As before, start with an empty form and drop a button on it. You'll also need a listbox to show "Hello, world!" messages. You'll also need to drom one TOmniTaskEventDispatch component on the form.

Write an OnClick handler for the button.

procedure TfrmTestOTL.btnHelloClick(Sender: TObject);
begin
btnHello.Enabled := false;
OmniTaskEventDispatch1.Monitor(CreateTask(RunHelloWorld, 'HelloWorld')).Run;
end;

The code is quite similar to the one you had to write in the first example, with one important modification - it passes the task interface, created in CreateTask, to the TOmniTaskEventDispatch component. That will tell  the component to start monitoring messages from the threaded task.


As you can see, TOmniTaskEventDispatch.Monitor takes as a parameter the interface returned from CreateTask (I'll give more attention to this interface in latter posts) and it also returns that same interface as its result so that you can immediately call the Run method.


The code above also disables the button. It will be re-enabled when the threaded task exits.


Let's write the code that does some work now. Method RunHelloWorld will be executed in the background thread.

procedure TfrmTestOTL.RunHelloWorld(task: IOmniTask);
begin
task.Comm.Send(0, 'Hello, world!');
end;

The OTL automatically sets up two-way channel between task control interface (the one that is returned from the CreateTask) and the threaded task (the IOmniTask interface passed to the worker method). Both expose a property named Comm that provides this communication. At this time, the communication channel is very simplistic - you can only send (word, Variant) pairs (message ID and data). In this case, I'm setting message ID to 0 and message data to 'Hello, world!'.


That is everything that has to be done on the threaded side. On the GUI side, the OmniTaskEventDispatch1 component is already set to monitor messages from the threaded task. We only have to write an OnTaskMessage handler.

procedure TfrmTestOTL.OmniTaskEventDispatch1TaskMessage(task: IOmniTaskControl);
var
msgID : word;
msgData: TOmniValue;
begin
task.Comm.Receive(msgID, msgData);
lbLog.ItemIndex := lbLog.Items.Add(Format('[%d/%s] %d|%s', [task.UniqueID, task.Name, msgID, msgData]));
end;

The OnTaskMessage handler is called each time the message is available. The code reads this message and shows it in the listbox. As you can also see, each task is assigned an unique ID.


To enable the button when task is done, you have to write an OnTaskTerminated event handler.

procedure TfrmTestOTL.OmniTaskEventDispatch1TaskTerminated(task: IOmniTaskControl);
begin
lbLog.ItemIndex := lbLog.Items.Add(Format('[%d/%s] Terminated', [task.UniqueID, task.Name]));
btnHello.Enabled := true;
end;

The code doesn't check which task has completed as we can only have one task running.


Run it and press the button few times. You should see that the threaded task gets allocated new unique ID every time it is run.


image


This example is available in the OTL repository in folder tests\3_HelloWorld_with_package.


If you don't want to install the package, you can create TOmniTaskEventDispatch component on the fly. An example using this approach is available in the repository in folder tests\1_HelloWorld.

Labels: , , , , , ,

Why is software third time lucky?

In the beginning, there is a Problem.

Customer knows that the Problem exists, but can't tell exactly what the Problem is.

So the Customer goes to to a Programmer and asks him [or her, of course, but let me keep this simple]: "Can you build me a program that does That?"

[Here lies the original sin. Customer never asks: "Can you help me solve the Problem?" Because of that, Programmer is writing a program to do That instead of program that does This.]

Programmer says: "I can," because we programmers, by our nature, want to believe that we can code anything. So the Programmer goes to work and he works and works and although he has no idea what he is working on and how it should be built, he makes the Version One. It is merely a research project, a tool that allows the Programmer to understand the problem (not the Problem, just the problem of coding a program that does That), but the pressure of the modern world forces him to deliver it to the Customer.

Of course, Version One is unusable. After all, it was merely a sandbox where the Programmer tried to understand the problem. So the Customer pretends that he is using it and from time to time orders a small change, just to make the Programmer think that his software is being used.

After some time, Customer starts to think that he should really get something useful for his money, not just the unusable Version One and thus commissions the Programmer to write a new, improved version. Mind you, the Customer still hasn't a faintest clues what the Problem is, and so doesn't the Programmer. Even more, Programmer still has no idea that he has to solve the Problem. His task is to write a program that does That.

Still, something has improved since the day 1 - Programmer's understanding on how to write a program that does That. So he goes and codes much better program, which is built on his previous experiences. And thus the Version Two is born.

Version Two is much better version. It actually works. It doesn't look like a five year old has put it together in one afternoon and it doesn't crash every five minutes. And so, the Customer starts using it. And after some time, he sadly recognizes that the Programmer has indeed written a wonderful software that does That, but that this, alas, doesn't help him solve the Problem at all.

But now the Customer can finally tell the Programmer what the Version Two should do differently. And the Programmer listens and finally says in awe: "You mean that it should do This instead of That?!? Why didn't you say that in first place?"

And thus, the Version Three is born. It is written well. It solves the Problem. The Customer is happy. And the Programmer lives on to fight yet another battle.

This I believe.

Labels: ,