Tuesday, March 16, 2010

Speed comparison: Variant, TValue, and TOmniValue

When I read TValue is very slow! at TURBU Tech blog earlier today, I immediately wondered about how fast is TOmniValue (the basic data-exchange type in the OmniThreadLibrary) in regards to Variant and TValue. What else could I do but write a benchmark?!

I choose to test the performance in a way that is slightly different from the Mason’s approach. My test does not measure only store operation but also load and (in some instances) add. Also, the framework is slightly different and decouples time-management code from the benchmark.

const
CBenchResult = 100*1000*1000; //100 million
procedure TfrmBenchmark.Benchmark(const benchName: string;
benchProc: TBenchProc);
var
benchRes : integer;
stopwatch: TStopWatch;
begin
stopwatch := TStopWatch.StartNew;
benchProc(benchRes);
stopwatch.Stop;
Assert(benchRes = CBenchResult);
lbLog.Items.Add(Format('%s: %d ms',
[benchName, stopwatch.ElapsedMilliseconds]));
lbLog.Update;
end;
procedure TfrmBenchmark.btnBenchmarkClick(Sender: TObject);
begin
Benchmark('Variant', TestVariant);
Benchmark('TValue', TestTValue);
Benchmark('TOmniValue', TestTOmniValue);
end;
procedure TfrmBenchmark.TestTOmniValue(var benchRes: integer);
var
counter: TOmniValue;
i : integer;
begin
counter := 0;
for i := 1 to CBenchResult do
counter := counter.AsInteger + 1;
benchRes := counter;
end;
procedure TfrmBenchmark.TestTValue(var benchRes: integer);
var
counter: TValue;
i : integer;
begin
counter := 0;
for i := 1 to CBenchResult do
counter := counter.AsInteger + 1;
benchRes := counter.AsInteger;
end;
procedure TfrmBenchmark.TestVariant(var benchRes: integer);
var
counter: Variant;
i : integer;
begin
counter := 0;
for i := 1 to CBenchResult do
counter := counter + 1;
benchRes := counter;
end;

As you can see, all three tests are fairly similar. They count from 0 to 100.000.000 and the counter is stored in a Variant/TValue/TOmniValue. The Variant test follows the same semantics as if the counter variable would be declared integer, while the TValue and TOmniValue tests require some programmer’s help to determine how the counter should be interpreted (AsInteger).

The results were interesting. TValue is about 5x slower than the Variant, which is 7x slower than the TOmniValue.

bench

Of course, I was interested in where this speed difference comes from and I looked at the assembler code.

Digging into the assembler

Variant

Unit32.pas.87: counter := counter + 1;
004B1232 8D55F0           lea edx,[ebp-$10]
004B1235 8D45E0           lea eax,[ebp-$20]
004B1238 E817AAF6FF       call @VarCopy
004B123D 8D45D0           lea eax,[ebp-$30]
004B1240 BA01000000       mov edx,$00000001
004B1245 B101             mov cl,$01
004B1247 E8DCF8F6FF       call @VarFromInt
004B124C 8D55D0           lea edx,[ebp-$30]
004B124F 8D45E0           lea eax,[ebp-$20]
004B1252 E8F523F7FF       call @VarAdd
004B1257 8D55E0           lea edx,[ebp-$20]
004B125A 8D45F0           lea eax,[ebp-$10]
004B125D E8F2A9F6FF       call @VarCopy

Very straightforward code. Variant is copied into a temporary location, number 1 is converted into Variant, those two variants are added and result is stored back into the counter variable. As you can see, Variant calculations are really clumsy. It would be much faster to convert Variant to integer, add one and convert the result back. Like this.

procedure TfrmBenchmark.TestVariant2(var benchRes: integer);
var
counter: Variant;
i,j : integer;
begin
counter := 0;
for i := 1 to CBenchResult do begin
j := counter;
counter := j + 1;
end;
benchRes := counter;
end;

This modified version generates much faster code.

Unit32.pas.100: j := counter;
004B1355 8D45F0           lea eax,[ebp-$10]
004B1358 E863B2F6FF       call @VarToInteger
004B135D 8BF0             mov esi,eax
Unit32.pas.101: counter := j + 1;
004B135F 8D45F0           lea eax,[ebp-$10]
004B1362 8D5601           lea edx,[esi+$01]
004B1365 B1FC             mov cl,$fc
004B1367 E8BCF7F6FF       call @VarFromInt

Benchmarking proves my theory. Optimized version needed only 1220 ms to complete the test which made it almost 5x faster than the original Variant code.

TValue

Unit32.pas.76: counter := counter.AsInteger + 1;
004B11A1 8D45E8           lea eax,[ebp-$18]
004B11A4 E86B96FFFF       call TValue.AsInteger
004B11A9 40               inc eax
004B11AA 8D55D0           lea edx,[ebp-$30]
004B11AD E8A695FFFF       call TValue.&op_Implicit
004B11B2 8D55D0           lea edx,[ebp-$30]
004B11B5 8D45E8           lea eax,[ebp-$18]
004B11B8 8B0D4C9F4A00     mov ecx,[$004a9f4c]
004B11BE E8D567F5FF       call @CopyRecord

The TValue code is quite neat. Counter is converted to an integer, one is added, result is converted into a temporary TValue and this temporary TValue is copied back into counter. Why then is TValue version so much slower? We’ll have to look into implementation to find the answer. Let’s find out first why TOmniValue is so fast.

TOmniValue

Unit32.pas.65: counter := counter.AsInteger + 1;
004B10AA 8D45F3           lea eax,[ebp-$0d]
004B10AD E8FAF3FFFF       call TOmniValue.IsInteger
004B10B2 84C0             test al,al
004B10B4 740E             jz $004b10c4
004B10B6 8B45F3           mov eax,[ebp-$0d]
004B10B9 8945E8           mov [ebp-$18],eax
004B10BC 8B45F7           mov eax,[ebp-$09]
004B10BF 8945EC           mov [ebp-$14],eax
004B10C2 EB32             jmp $004b10f6
004B10C4 8D45F3           lea eax,[ebp-$0d]
004B10C7 E8D8F3FFFF       call TOmniValue.IsEmpty
004B10CC 84C0             test al,al
004B10CE 7410             jz $004b10e0
004B10D0 C745E800000000   mov [ebp-$18],$00000000
004B10D7 C745EC00000000   mov [ebp-$14],$00000000
004B10DE EB16             jmp $004b10f6
004B10E0 B94C114B00       mov ecx,$004b114c
004B10E5 B201             mov dl,$01
004B10E7 A16CD14000       mov eax,[$0040d16c]
004B10EC E82747F6FF       call Exception.Create
004B10F1 E8D247F5FF       call @RaiseExcept
004B10F6 8B45E8           mov eax,[ebp-$18]
004B10F9 8BF0             mov esi,eax
004B10FB 8D55F3           lea edx,[ebp-$0d]
004B10FE 8D4601           lea eax,[esi+$01]
004B1101 E8AEF3FFFF       call TOmniValue.&op_Implicit

Weird stuff, huh?  Counter is converted to an integer, then a bunch of funny code is executed and the result is converted back to a a TOmniValue. The beginning and the end are easy to understand but what’s going on in-between?

The answer is – inlining. Much of the TOmniValue implementation is marked inline and what we are seeing here is the internal implementation of the AsInteger property.

I’ll return to this later but first let’s check what happens if all this inline modifiers are removed.

Unit32.pas.65: counter := counter.AsInteger + 1;
004B10EF 8D45F3           lea eax,[ebp-$0d]
004B10F2 E865F4FFFF       call TOmniValue.GetAsInteger
004B10F7 40               inc eax
004B10F8 8D55E0           lea edx,[ebp-$20]
004B10FB E8A4F4FFFF       call TOmniValue.&op_Implicit
004B1100 8D55E0           lea edx,[ebp-$20]
004B1103 8D45F3           lea eax,[ebp-$0d]
004B1106 8B0D5CF84A00     mov ecx,[$004af85c]
004B110C E88768F5FF       call @CopyRecord

The generated code is now almost the same as in the TValue case, only stack offsets are different. It is also much slower, instead of the 839 ms the code took 3119 ms to execute and was only twice as fast as the original Variant code (and much slower than the modified Variant code). Inlining the AsInteger couldn’t make such big change. It looks like the CopyRecord is the culprit for the slowdown. I didn’t verify this by measurement but if you look at the _CopyRecord implementation in the System.pas it is obvious that the record copying cannot be very fast.

The Delphi compiler team would do much good if in the future versions the compiler would generate custom code adapted to each record type to do the copying.

Use the source, Luke!

What’s left for me is to determine the reason for the big speed difference between TValue and TOmniValue. To find it, I had to dig into the implementation of both records. Of the biggest interest to me were the AsInteger getter and Implicit(from: integer) operator.

TOmniValue

TOmniValue lives in OtlCommon.pas. AsInteger getter GetAsInteger just remaps the call to the GetAsInt64 method. Similarly, Implicit maps to SetAsInt64.

type
  ovData: int64;
  ovType: (ovtNull, ovtBoolean, ovtInteger, ovtDouble, ovtExtended, 
           ovtString, ovtObject, ovtInterface, ovtVariant, 
           ovtWideString, ovtPointer);

function TOmniValue.GetAsInt64: int64;
begin
  if IsInteger then
    Result := ovData
  else if IsEmpty then
    Result := 0
  else
    raise Exception.Create('TOmniValue cannot be converted to int64');
end; { TOmniValue.GetAsInt64 }

procedure TOmniValue.SetAsInt64(const value: int64);
begin
  ovData := value;
  ovType := ovtInteger;
end; { TOmniValue.SetAsInt64 }

The code is quite straightforward. Some error checking is done in the getter and the value is just stored away in the setter. Now the assembler code from the first TOmniValue example makes some sense – we were simply looking at the implementation of those GetAsInt64. (Implicit operator was not inlined.)

TValue

The TValue record lives in RTTI.pas. AsInteger getter gets remapped to the generic version AsType<Integer> which calls TryAsType<T>. In a slightly less roundabout manner Implicit calls From<Integer>.

function TValue.TryAsType<T>(out AResult: T): Boolean;
var
val: TValue;
begin
Result := TryCast(System.TypeInfo(T), val);
if Result then
val.Get<T>(AResult);
end;
class function TValue.From<T>(const Value: T): TValue;
begin
Make(@Value, System.TypeInfo(T), Result);
end;

It’s quite obvious that the TValue internals are not optimized for speed. Everything is mapped to generics and the RTTI system which is fast, but not really that fast that it could be used for computationally-intensive code.

Conclusion

  1. Don’t use TValue for counting. Heck, don’t even use Variant or TOmniValue for counting – they were not designed for that purpose!
  2. TValue may look slow but in fact it is not. It is able to count from 1 to over three millions in one second. That’s not slow. It’s just not as fast as the register-based counter is. But that’s OK as you should always remember rule 1.
  3. TValue is incredibly powerful. Just look at its implementation. Therefore, it could afford to be a tad slower than other multi-purpose storage mechanisms.
  4. TOmniValue is very fast, but most of its speed (compared to the Variant) comes from the inlining and the compiler being smart enough not to call CopyRecord in this case.
  5. Delphi compiler should really be improved to generate custom CopyRecord for each record type.
  6. Assembler code tells a lot. Source code tells even more.

P.S.

Using OtlCommon won’t bring in any other parts of the OTL library. It will requires following units to compile: DSiWin32, GpStuff, and GpStringHash. Nothing from those units will be linked in as TOmniValue implementation doesn’t depend on them. The simplest way to get them all is to download the latest stable OmniThreadLibrary release.

Labels: , , , , ,

Friday, March 05, 2010

TDM Rerun #15: Many Faces Of An Application

That all sounds easy, but how can we combine the windows (forms-based) aspect of an application with something completely different, for example an SvCom-based service application? The problem here is that the GUI part of an application uses forms while the SvCom service is based on another Application object, based on the SvCom_NTService unit. How can we combine the GUI Application.Initialize (where Application is an object in the Forms unit) with a service Application.Initialize (where Application is an object in the SvCom_NTService unit)? By fully qualifying each object, of course.

- Many Faces Of An Application, The Delphi Magazine 107, July 2004

In the 2004 July issue I described an approach that allows the programmer to put multiple application front-ends inside one .exe file by manually tweaking the project’s .dpr file. This is the technique I’m still using in my programs. For example, most of the services I write can be configured by starting the exe with the /config switch.

Links: article (PDF, 126 KB), source code (ZIP, 1 MB).

Labels: , ,

Monday, February 22, 2010

Three steps to the blocking collection: [3] Blocking collection

About two months ago I started working on Delphi clone of .NET 4 BlockingCollection. Initial release was completed just before the end of 2009 and I started to write a series of articles on TOmniBlockingCollection in early January but then I got stuck in the dynamic lock-free queue implementation. Instead of writing articles I spent most of my free time working on that code.

Now it is (finally) time to complete the journey. Everything that had to be said about the infrastructure was told and I only have to show you the internal workings of the blocking collection itself.

[Step 1: Three steps to the blocking collection: [1] Inverse semaphore]

[Step 2: Dynamic lock-free queue – doing it right]

The blocking collecting is exposed as an interface that lives in the OtlCollections unit.

  IOmniBlockingCollection = interface(IGpTraceable) 
['{208EFA15-1F8F-4885-A509-B00191145D38}']
procedure Add(const value: TOmniValue);
procedure CompleteAdding;
function GetEnumerator: IOmniValueEnumerator;
function IsCompleted: boolean;
function Take(var value: TOmniValue): boolean;
function TryAdd(const value: TOmniValue): boolean;
function TryTake(var value: TOmniValue; timeout_ms: cardinal = 0): boolean;
end; { IOmniBlockingCollection }

There’s also a class TOmniBlockingCollection which implements this interface. This class is public and can be used or reused in your code.

The blocking collection works in the following way:

  • Add will add new value to the collection (which is internally implemented as a queue (FIFO, first in, first out)).
  • CompleteAdding tells the collection that all data is in the queue. From now on, calling Add will raise an exception.
  • TryAdd is the same as Add except that it doesn’t raise an exception but returns False if the value can’t be added.
  • IsCompleted returns True after the CompleteAdding has been called.
  • Take reads next value from the collection. If there’s no data in the collection, Take will block until the next value is available. If, however, any other thread calls CompleteAdding while the Take is blocked, Take will unblock and return False.
  • TryTake is the same as Take except that it has a timeout parameter specifying maximum time the call is allowed to wait for the next value.
  • Enumerator calls Take in the MoveNext method and returns that value. Enumerator will therefore block when there is no data in the collection. The usual way to stop the enumerator is to call CompleteAdding which will unblock all pending MoveNext calls and stop enumeration. [For another approach see the example at the end of this article.]

The trivial parts

Most of the blocking collection code is fairly trivial.

Add just calls TryAdd and raises an exception if TryAdd fails.

procedure TOmniBlockingCollection.Add(const value: TOmniValue);
begin
if not TryAdd(value) then
raise ECollectionCompleted.Create('Adding to completed collection');
end; { TOmniBlockingCollection.Add }

CompleteAdding sets two “completed” flags – one boolean flag and one Windows event. Former is used for speed in non-blocking tests while the latter is used when TryTake has to block.

procedure TOmniBlockingCollection.CompleteAdding;
begin
if not obcCompleted then begin
obcCompleted := true;
Win32Check(SetEvent(obcCompletedSignal));
end;
end; { TOmniBlockingCollection.CompleteAdding }

Take calls the TryTake with the INFINITE timeout.

function TOmniBlockingCollection.Take(var value: TOmniValue): boolean;
begin
Result := TryTake(value, INFINITE);
end; { TOmniBlockingCollection.Take }

TryAdd checks if CompleteAdding has been called. If not, the value is stored in the dynamic queue.

There’s a potential problem hiding in the TryAdd – between the time the completed flag is checked and the time the value is enqueued, another thread may call CompleteAdding. Strictly speaking, TryAdd should not succeed in that case. However, I cannot foresee a parallel algorithm where this could cause a problem.

function TOmniBlockingCollection.TryAdd(const value: TOmniValue): boolean;
begin
// CompleteAdding and TryAdd are not synchronised
Result := not obcCompleted;
if Result then
obcCollection.Enqueue(value);
end; { TOmniBlockingCollection.TryAdd }

Easy peasy.

The not so trivial part

And now for something completely different …

TryTake is a whole different beast. It must:

  • retrieve the data
  • observe IsCompleted
  • block when there’s no data and observer is completed
  • observe the timeout limitations

Not so easy.

In addition to the obcCompletedSignal (completed event) and obcCollection (dynamic data queue) it will also use obcObserver (a queue change mechanism used inside the OTL) and obcResourceCount, which is an instance of the TOmniResourceCount (inverse semaphore, introduced in Part 1). All these are created in the constructor:

constructor TOmniBlockingCollection.Create(numProducersConsumers: integer);
begin
inherited Create;
if numProducersConsumers > 0 then
obcResourceCount := TOmniResourceCount.Create(numProducersConsumers);
obcCollection := TOmniQueue.Create;
obcCompletedSignal := CreateEvent(nil, true, false, nil);
obcObserver := CreateContainerWindowsEventObserver;
obcSingleThreaded := (Environment.Process.Affinity.Count = 1);
if obcSingleThreaded then
obcCollection.ContainerSubject.Attach(obcObserver, coiNotifyOnAllInserts);
end; { TOmniBlockingCollection.Create }

TryTake is pretty long so I’ve split it into two parts. Let’s take a look at the non-blocking part first.

First, the code tries to retrieve data from the dynamic queue. If there’s data available, it is returned. End of story.

Otherwise, the completed flag is checked. If CompleteAdding has been called, TryTake returns immediately. It also returns if timeout is 0.

Otherwise, the code prepares for the blocking wait. Resource counter is allocated (reasons for this will be provided later), and observer is attached to the blocking collection. This observer will wake the blocking code when new value is stored in the collection.

[In the code below you can see a small optimization – if the code is running on a single core then the observer is attached in the TOmniBlockingCollection constructor and detached in the destructor. Before this optimization was introduced, Attach and Detach spent much too much time in busy-wait code (on a single-core computer).]

After all that is set, the code waits for the value (see the next code block), observer is detached from the queue and resource counter is released.

function TOmniBlockingCollection.TryTake(var value: TOmniValue;
timeout_ms: cardinal): boolean;
var
awaited : DWORD;
startTime : int64;
waitHandles: array [0..2] of THandle;
begin
if obcCollection.TryDequeue(value) then
Result := true
else if IsCompleted or (timeout_ms = 0) then
Result := false
else begin
if assigned(obcResourceCount) then
obcResourceCount.Allocate;
try
if not obcSingleThreaded then
obcCollection.ContainerSubject.Attach(obcObserver, coiNotifyOnAllInserts);
try
//wait for the value, see the next code block below
finally
if not obcSingleThreaded then
obcCollection.ContainerSubject.Detach(obcObserver, coiNotifyOnAllInserts);
end;
finally
if assigned(obcResourceCount) then
obcResourceCount.Release;
end;
end;
end; { TOmniBlockingCollection.TryTake }

Blocking part starts by storing the current time (millisecond-accurate TimeGetTime is used) and preparing wait handles. Then it enters the loop which repeats until the CompleteAdding has been called or timeout has elapsed (the Elapsed function which I’m not showing here for the sake of simplicty; see the source) or a value was dequeued.

In the loop, the code tries again to dequeue a value from the dynamic queue and exits the loop if dequeue succeeds. Otherwise, a WaitForMultipleObjects is called. This wait waits for one of three conditions:

  • Completed event. If this event is signalled, CompleteAdding has been called and TryTake must exit.
  • Observer event. If this event is signalled, new value was enqueued into the dynamic queue and code must try to dequeue this value.
  • Resource count event. If this event is signalled, all resources are used and the code must exit (more on that later).
        startTime := DSiTimeGetTime64;
waitHandles[0] := obcCompletedSignal;
waitHandles[1] := obcObserver.GetEvent;
if assigned(obcResourceCount) then
waitHandles[2] := obcResourceCount.Handle;
Result := false;
while not (IsCompleted or Elapsed) do begin
if obcCollection.TryDequeue(value) then begin
Result := true;
break; //while
end;
awaited := WaitForMultipleObjects(IFF(assigned(obcResourceCount), 3, 2),
@waitHandles, false, TimeLeft_ms);
if awaited <> WAIT_OBJECT_1 then begin
if awaited = WAIT_OBJECT_2 then
CompleteAdding;
Result := false;
break; //while
end;
end;

If new value was enqueued into the dynamic queue, TryDequeue is called again. It is entirely possible that another thread calls that function first and removes the value causing TryDequeue to fail and WaitForMultipleObjects to be called again. Such is life in the multithreaded world.

Enumerating the blocking collection

TOmniBlockingCollection enumerator is slightly more powerful than the usual Delphi enumerator. In addition to the usual methods it contains function Take which is required by the Parallel architecture (see Parallel.For and Parallel.ForEach.Aggregate for more information).

  IOmniValueEnumerator = interface ['{F60EBBD8-2F87-4ACD-A014-452F296F4699}']
function GetCurrent: TOmniValue;
function MoveNext: boolean;
function Take(var value: TOmniValue): boolean;
property Current: TOmniValue read GetCurrent;
end; { IOmniValueEnumerator }
  TOmniBlockingCollectionEnumerator = class(TInterfacedObject,
IOmniValueEnumerator)
constructor Create(collection: TOmniBlockingCollection);
function GetCurrent: TOmniValue; inline;
function MoveNext: boolean; inline;
function Take(var value: TOmniValue): boolean;
property Current: TOmniValue read GetCurrent;
end; { TOmniBlockingCollectionEnumerator }

The implementation is trivial.

constructor TOmniBlockingCollectionEnumerator.Create(collection: TOmniBlockingCollection);
begin
obceCollection_ref := collection;
end; { TOmniBlockingCollectionEnumerator.Create }

function TOmniBlockingCollectionEnumerator.GetCurrent: TOmniValue;
begin
Result := obceValue;
end; { TOmniBlockingCollectionEnumerator.GetCurrent }

function TOmniBlockingCollectionEnumerator.MoveNext: boolean;
begin
Result := obceCollection_ref.Take(obceValue);
end; { TOmniBlockingCollectionEnumerator.MoveNext }

function TOmniBlockingCollectionEnumerator.Take(var value: TOmniValue): boolean;
begin
Result := MoveNext;
if Result then
value := obceValue;
end; { TOmniBlockingCollectionEnumerator.Take }

Example

A not-so-simple how to on using the blocking collection can be seen in the demo 34_TreeScan. It uses the blocking collection to scan a tree with multiple parallel threads. This demo works in Delphi 2007 and newer.

A better example of using the blocking collection is in the demo 35_ParallelFor. Actually, it uses the same approach as demo 34 to scan the tree, except that the code is implemented as an anonymous method which causes it to be much simpler than the D2007 version. Of course, this demo works only in Delphi 2009 and above.

This is the full parallel scanner from the 35_ParallelFor demo:

function TfrmParallelForDemo.ParaScan(rootNode: TNode; value: integer): TNode;
var
cancelToken: IOmniCancellationToken;
nodeQueue : IOmniBlockingCollection;
nodeResult : TNode;
numTasks : integer;
begin
nodeResult := nil;
cancelToken := CreateOmniCancellationToken;
numTasks := Environment.Process.Affinity.Count;
nodeQueue := TOmniBlockingCollection.Create(numTasks);
nodeQueue.Add(rootNode);
Parallel.ForEach(nodeQueue as IOmniValueEnumerable)
.NumTasks(numTasks) // must be same number of task as in
nodeQueue to ensure stopping

.CancelWith(cancelToken)
.Execute(
procedure (const elem: TOmniValue)
var
childNode: TNode;
node : TNode;
begin
node := TNode(elem.AsObject);
if node.Value = value then begin
nodeResult := node;
nodeQueue.CompleteAdding;
cancelToken.Signal;
end
else for childNode in node.Children do
nodeQueue.TryAdd(childNode);
end);
Result := nodeResult;
end; { TfrmParallelForDemo.ParaScan }

The code first creates a cancellation token which will be used to stop the Parallel.ForEach loop. Number of tasks is set to number of cores accessible from the process and a blocking collection is created. Resource count for this collection is initialized to the number of tasks (parameter to the TOmniBlockingCollection.Create). The root node of the tree is added to the blocking collection.

Then the Parallel.ForEach is called. The IOmniValueEnumerable aspect of the blocking collection is passed to the ForEach. Currently, this is the only way to provide ForEach with data. This interface just tells the ForEach how to generate enumerator for each worker thread. [At the moment, each worker requires a separate enumerator. This may change in the future.]

  IOmniValueEnumerable = interface ['{50C1C176-C61F-41F5-AA0B-6FD215E5159F}']
function GetEnumerator: IOmniValueEnumerator;
end; { IOmniValueEnumerable }

The code also passes cancellation token to the ForEach loop and starts the parallel execution (call to Execute). In each parallel task, the following code is executed (this code is copied from the full ParaScan example above):

      procedure (const elem: TOmniValue)
var
childNode: TNode;
node : TNode;
begin
node := TNode(elem.AsObject);
if node.Value = value then begin
nodeResult := node;
nodeQueue.CompleteAdding;
cancelToken.Signal;
end
else for childNode in node.Children do
nodeQueue.TryAdd(childNode);
end

The code is provided with one element from the blocking collection (ForEach takes care of that). If the Value field is the value we’re searching for, nodeResult is set, blocking collection is put into CompleteAdding state (so that enumerators in other tasks will terminate blocking wait (if any)) and ForEach is cancelled.

Otherwise (not the value we’re looking for), all the children of the current node are added to the blocking collection. TryAdd is used (and its return value ignored) because another thread may call CompleteAdding while the for childNode loop is being executed.

That’s all! There is a blocking collection into which nodes are put (via the for childNode loop) and from which they are removed (via the ForEach infrastructure). If child nodes are not provided fast enough, blocking collection will block on Take and one or more tasks may sleep for some time until new values appear. Only when the value is found, the blocking collection and ForEach loop are completed/cancelled.

This is very similar to the code that was my inspiration for writing the blocking collection:

var targetNode = …;
var bc = new BlockingCollection<Node>(startingNodes);
// since we expect GetConsumingEnumerable to block, limit parallelism to the number of
// procs, avoiding too much thread injection
var parOpts = new ParallelOptions() { MaxDegreeOfParallelism = Enivronment.ProcessorCount };
Parallel.ForEach(bc.GetConsumingEnumerable(), parOpts, (node,loop) =>
{
    if (node == targetNode)
    {
        Console.WriteLine(“hooray!”);
        bc.CompleteAdding();
        loop.Stop();
    }
    else
    {
        foreach(var neighbor in node.Neighbors) bc.Add(neighbor);
    }
});

However, this C# code exhibits a small problem. If the value is not to be found in the tree, the code never stops. Why? All tasks eventually block in the Take method (because complete tree has been scanned) and nobody calls CompleteAdding and loop.Stop. Does the Delphi code contains the very same problem?

Definitely not! That’s exactly why the resource counter was added to the blocking collection!

If the blocking collection is initialized with number of resources greater then zero, it will allocate a resource counter in the constructor. This resource counter is allocated just before the thread blocks in TryTake and released after that. Each blocking wait in TryTake waits for this resource counter to become signalled. If all threads try to execute blocking wait, this resource counter drops to zero, signals itself and unblocks all TryTake calls!

This elegant solution has only one problem – resource counter must be initialized to the number of threads that will be reading from the blocking collection. That’s why in the code above (ParaScan) same number is passed to the blocking collection constructor (resource counter initialization) and to the ForEach.NumTasks method (number of parallel threads).

Download

TOmniBlockingCollection will be available in the OmniThreadLibrary 1.05, which will be released in few days.

For the impatient there is OTL 1.05 Release Candidate. The only code that will change between 1.05 RC and release are possible bug fixes.

Labels: , , , , ,

Thursday, February 18, 2010

Dynamic lock-free queue – doing it right

Some history required …

First there was a good idea with somewhat patchy implementation: Three steps to the blocking collection: [2] Dynamically allocated queue.

Then there was a partial solution, depending on me being able to solve another problem. Still, it was a good solution: Releasing queue memory without the MREW lock.

At the end, the final (actually, the original) problem was also solved: Bypassing the ABA problem.

And now to the results …

This article describes a lock-free, (nearly) O(1) insert/remove, dynamically allocated queue that doesn’t require garbage collector. It can be implemented on any hardware that supports 8-byte compare-and-swap operation (in Intel world, that means at least a Pentium). The code uses 8-byte atomic move in some parts but they can be easily changed into 8-byte CAS in case the platform doesn’t support such operation. In the current implementation, Move64 (8-byte move) function uses SSE2 instructions and therefore requires Pentium 4. The code, however, can be conditionally compiled with CAS64 instead of Move64 thus enabling it to run on Pentium 1 to 3. (See the notes in the code for more information). The code requires memory manager that allows the memory to be released in a thread different from the thread where allocation occurred. [Obviously, Windows on Intel platform satisfies all conditions.]

Although the dynamic queue has been designed with the OmniThreadLibrary (OTL for short) in mind, there’s also a small sample implementation that doesn’t depend on the OTL: GpLockFreeQueue.pas. This implementation can store int64 elements only (or everything you can cast into 8 bytes) while the OTL implementation from OtlContainers stores TOmniValue data. [The latter being a kind of variant record used inside the OTL to store “anything” from a byte to a string/wide string/object/interface.] Because of that, GpLockFreeQueue implementation is smaller, faster, but slightly more limited. Both are released under the BSD license.

Memory layout

Data is stored in slots. Each slot uses 16 bytes and contains byte-size tag, word-size offset and up to 13 bytes of data. The implementation in OtlContainers uses all of those 13 bytes to store TOmniValue while the implementation in GpLockFreeQueue uses only 8 bytes and keeps the rest unused.

The following notation is used to represent a slot: [tag|offset|value].

In reality, value field is first in the record because it must be 4-aligned. The reason for that will be revealed in a moment. In GpLockFreeQueue, a slot is defined as:

  TGpLFQueueTaggedValue = packed record
Value : int64;
Tag : TGpLFQueueTag;
Offset : word;
Stuffing: array [1..5] of byte;
end; { TGpLFQueueTaggedValue }

Slots do not stand by themselves; they are allocated in blocks. Default block size if 64 KB (4096 slots) but can be varied from 64 bytes (four slots) to 1 MB (65536 slots). In this article, I’ll be using 5-slot blocks, as they are big enough to demonstrate all the nooks and crannies of the algorithm and small enough to fit in one line of text.

During the allocation, each block is formatted as follows:

[Header|0|4] [Sentinel|1|0] [Free|2|0] [Free|3|0] [EndOfList|4|0]

The first slot is marked as a Header and has the value field initialized to “number of slots in the block minus one”. [The highest value that can be stored in the header’s value field is 65535; therefore the maximum number of slots in a block is 65536.] This value is atomically decremented each time a slot is dequeued. When the number drops to zero, block can be released. (More on that in: Releasing queue memory without the MREW lock.) InterlockedDecrement, which is used to decrement this value, requires its argument to be 4-aligned and that’s the reason for the value field to be stored first in the slot.

The second slot is a Sentinel. Slots from the third onwards are tagged Free and are used to store data. The last slot is tagged EndOfList and is used to link two blocks. All slots have the offset field initialized to the sequence number of the slot – in the Header this value is 0, in the Sentinel 1, and so on up to the EnndOfList with the value set to 4 (number of slots in the block minus 1). This value is used in the Dequeue to calculate the address of the header slot just before the header’s value is decremented.

In addition to dynamically allocated (and released) memory blocks, the queue uses head and tail tagged pointers. Both are 8-byte values, consisting of two 4-byte fields – slot and tag. The following notation is used to represent a tagged pointer: [slot|tag].

The slot field contains the address of the current head/tail slot while the tag field contains the tag of the current slot. The motivation behind this scheme is explained in the Bypassing the ABA problem post.

Tail and head pointers are modified using 8-byte CAS and Move commands and must therefore be 8-aligned.

By putting all that together, we get a snapshot of the queue state. This is the initial state of a queue with five-slot blocks:

Head:[B1:2|Free]
Tail:[B1:1|Sentinel]
B1:[Header|0|4] T:[Sentinel|1|0] H:[Free|2|0] [Free|3|0] [EndOfList|4|0]

The memory block begins at address B1 and contains five slots, initialized as described before. The tail pointer points to the second slot of block B1 (B1:1; I’m using the form address:offset), which is tagged Sentinel and the head pointer points to the third block (B1:2), the first Free slot. Here we see the sole reason for the Sentinel – it stands between the tail and the head when the queue is empty.

Enqueue

In theory, the enqueue operation is simple. The element is stored in the next available slot and queue head is advanced. In practice, however, multithreading makes things much more complicated.

To prevent thread conflicts, each enqueueing thread must first take ownership of the head. It does this by swapping queue head tag from Free to Allocating or from EndOfList to Extending. To prevent ABA problems, both head pointer and head tag are swapped with the same head pointer and new tag in one atomic 8-byte compare-and-swap.

Enqueue then does its work and at the end swaps (head pointer, tag) to (next head pointer, Free|EndOfList) which allows other threads to proceed with their enqueue operation.

Let’s start with the empty list.

Head:[B1:2|Free]
Tail:[B1:1|Sentinel]
B1:[Header|0|4] T:[Sentinel|1|0] H:[Free|2|0] [Free|3|0] [EndOfList|4|0]

Enqueue first swaps [B1:2|Free] with [B1:2|Allocating].

Head:[B1:2|Allocating]
Tail:[B1:1|Sentinel]
B1:[Header|0|4] T:[Sentinel|1|0] H:[Free|2|0] [Free|3|0] [EndOfList|4|0]

The green colour indicates an atomic change.

Only the head tag has changed, the data in the B1 memory block is not modified. Head still points to a slot tagged Free (slot B1:2). This is fine as enqueueing threads don’t take interest in this tag at all.

Data is then stored in the slot and its tag is changed to Allocated. This again makes no change to enqueuers as the head slot in the header was not updated yet. It also doesn’t allow the dequeue operation on this slot to proceed because the head is adjacent to the tail, which points to a Sentinel and in this case Dequeue treats the queue as empty (as we’ll see later).

Head:[B1:2|Allocating]
Tail:[B1:1|Sentinel]
B1:[Header|0|4] T:[Sentinel|1|0] H:[Allocated|2|42] [Free|3|0] [EndOfList|4|0]

Red colour marks “unsafe” modification.

At the end, the head is unlocked by storing address of the next slot (first free slot, B1:3) and next slot’s tag (Free).

Head:[B1:3|Free]
Tail:[B1:1|Sentinel]
B1:[Header|0|4] T:[Sentinel|1|0] [Allocated|2|42] H:[Free|3|0] [EndOfList|4|0]

Teal colour marks an atomic 8-byte move used to move new data into the head pointer. If the target platform doesn’t support such move, an 8-byte CAS could be used instead.

After those changes, head is pointing to the next free slot and data is stored in the queue.

Let’s assume that another Enqueue is called and stores number 17 in the queue. Nothing new happens here.

Head:[B1:4|EndOfList]
Tail:[B1:1|Sentinel]
B1:[Header|0|4] T:[Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] H:[EndOfList|4|0]

The next Enqueue must do something new as there are no free slots in the current block. To extend the queue, thread first swaps the EndOfList tag with the Extending tag. By doing this, the thread takes ownership of the queue head.

Head:[B1:4|Extending]
Tail:[B1:1|Sentinel]
B1:[Header|0|4] T:[Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] H:[EndOfList|4|0]

A new block gets allocated and initialized (see chapter on memory management, below).

Head:[B1:4|Extending]
Tail:[B1:1|Sentinel]
B1:[Header|0|4] T:[Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] H:[EndOfList|4|0]
B2:[Header|0|4] [Sentinel|1|0] [Free|2|0] [Free|3|0] [EndOfList|4|0]

Data is stored in the first free slot of the block B2.

Head:[B1:4|Extending]
Tail:[B1:1|Sentinel]
B1:[Header|0|4] T:[Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] H:[EndOfList|4|0]
B2:[Header|0|4] [Sentinel|1|0] [Allocated|2|57] [Free|3|0] [EndOfList|4|0]

Last slot of block B1 is modified to point to the first element in the second slot of the next block (Sentinel). Also, a tag BlockPointer is stored into that slot.

Head:[B1:4|Extending]
Tail:[B1:1|Sentinel]
B1:[Header|0|4] T:[Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] H:[BlockPointer|4|B2:1]
B2:[Header|0|4] [Sentinel|1|0] [Allocated|2|57] [Free|3|0] [EndOfList|4|0]

At the end, the head is updated to point to the first free slot (B2:3).

Head:[B2:3|Free]
Tail:[B1:1|Sentinel]
B1:[Header|0|4] T:[Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] [BlockPointer|4|B2:1]
B2:[Header|0|4] [Sentinel|1|0] [Allocated|2|57] H:[Free|3|0] [EndOfList|4|0]

That completes the Enqueue. List head is now unlocked.

The actual code is not more complicated than this description (code taken from GpLockFreeQueue).

procedure TGpLockFreeQueue.Enqueue(const value: int64);
var
extension: PGpLFQueueTaggedValue;
next : PGpLFQueueTaggedValue;
head : PGpLFQueueTaggedValue;
begin
repeat
head := obcHeadPointer.Slot;
if (obcHeadPointer.Tag = tagFree)
and CAS64(head, Ord(tagFree), head, Ord(tagAllocating), obcHeadPointer^)
then
break //repeat
else if (obcHeadPointer.Tag = tagEndOfList)
and CAS64(head, Ord(tagEndOfList), head, Ord(tagExtending), obcHeadPointer^)
then
break //repeat
else // very temporary condition, retry quickly
asm pause; end;
until false;
if obcHeadPointer.Tag = tagAllocating then begin // enqueueing
next := NextSlot(head);
head.Value := value;
head.Tag := tagAllocated;
Move64(next, Ord(next.Tag), obcHeadPointer^); // release the lock
end
else begin // allocating memory
extension := AllocateBlock; // returns pointer to the header
Inc(extension, 2); // move over header and sentinel to the first data slot
extension.Tag := tagAllocated;
extension.Value := value;
Dec(extension); // forward reference points to the sentinel
head.Value := int64(extension);
head.Tag := tagBlockPointer;
Inc(extension, 2); // get to the first free slot
Move64(extension, Ord(extension.Tag), obcHeadPointer^); // release the lock
PreallocateMemory; // preallocate memory block
end;
end; { TGpLockFreeQueue.Enqueue }

Dequeue

Enqueue is simple but Dequeue is a whole new bag of problems. It has to handle the Sentinel slot and because of that there are five possible scenarios:

  1. Skip the sentinel.
  2. Read the data (tail doesn’t catch the head).
  3. Read the data (tail does catch the head).
  4. The queue is empty.
  5. Follow the BlockPointer tag.

To prevent thread conflicts, dequeueing thread takes ownership of the tail. It does this by swapping the tail tag from Allocated or Sentinel to Removing or from BlockPointer to Destroying. Again, those changes are done atomically by swapping both tail pointer and tail tag in one go.

Let’s walk through all five scenarios now.

1 – Skip the sentinel

Let’s start with a queue state where two slots are allocated and head points to the EndOfList slot.

Head:[B1:4|EndOfList]
Tail:
[B1:1|Sentinel]
B1:[Header|0|4] T:[Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] H:[EndOfList|4|0]

The code first locks the tail.

Head:[B1:4|EndOfList]
Tail:[B1:1|Removing]
B1:[Header|0|4] T:[Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] H:[EndOfList|4|0]

As there is no data in the Sentinel slot, the tail is immediately updated to point to the next slot.

Head:[B1:4|EndOfList]
Tail:[B1:2|Allocated]
B1:[Header|0|4] [Sentinel|1|0] T:[Allocated|2|42] [Allocated|3|17] H:[EndOfList|4|0]

There’s no need to update the tag in slot 1 as no other thread can reach it again. Because the slot is now unreachable, the code now decrements the count in the B1’s Header slot (from 4 to 3).

Head:[B1:4|EndOfList]
Tail:
[B1:2|Allocated]
B1:[Header|0|3] [Sentinel|1|0] T:[Allocated|2|42] [Allocated|3|17] H:[EndOfList|4|0]

Because the original tag was Sentinel, the code retries from beginning immediately. The queue is now in scenario 2 (data, the tail is not immediately before the head).

2 - Read the data (tail doesn’t catch the head)

Again, the tail is locked.

Head:[B1:4|EndOfList]
Tail:[B1:2|Removing]
B1:[Header|0|3] [Sentinel|1|0] T:[Allocated|2|42] [Allocated|3|17] H:[EndOfList|4|0]

The code then reads the value from the slot (42) and advances the tail to the slot B1:3.

Head:[B1:4|EndOfList]
Tail:[B1:3|Allocated]
B1:[Header|0|3] [Sentinel|1|0] [Allocated|2|42] T:[Allocated|3|17] H:[EndOfList|4|0]

Again, there is no need to change the slot tag. The slot 2 is now unreachable and the Header count is decremented.

Head:[B1:4|EndOfList]
Tail:
[B1:3|Allocated]
B1:[Header|0|2] [Sentinel|1|0] [Allocated|2|42] T:[Allocated|3|17] H:[EndOfList|4|0]

The code has retrieved the data and can now return from the Dequeue method.

3 - Read the data (tail does catch the head)

If the Dequeue is now called for the second time, we have the scenario 3 – there is data in the queue, but the head pointer is next to the tail pointer. Because of the, the tail cannot be incremented. Instead of that, the code replaces the tail slot tag with the Sentinel.

It is entirely possible that the head will change the very next moment which means that the Sentinel would not be really needed. Luckily, that doesn’t hurt much – the next Dequeue would skip the Sentinel, retry and fetch the next element from the queue.

The code starts in a well-known manner, by taking ownership of the tail.

Head:[B1:4|EndOfList]
Tail:[B1:3|Removing]
B1:[Header|0|2] [Sentinel|1|0] [Allocated|2|42] T:[Allocated|3|17] H:[EndOfList|4|0]
 

The code then reads the value from the slot, but because the head was next to tail when Dequeue was called, the code doesn’t increment the tail and doesn’t decrement the Header counter. Instead of that, the Sentinel tag is put into the head tag.

Head:[B1:4|EndOfList]
Tail:
[B1:3|Sentinel]
B1:[Header|0|2] [Sentinel|1|0] [Allocated|2|42] T:[Allocated|3|17] H:[EndOfList|4|0]
 

It doesn’t matter that the slot tag is still Allocated as no-one will read it again.

4 - The queue is empty

If the Dequeue would be called now, it would return immediately with status empty because the tail tag is Sentinel and because the tail has caught the head.

5 - Follow the BlockPointer tag

In the last scenario, the tail is pointing to a BlockPointer.

Head:[B2:3|Free]
Tail:[B1:4|EndOfList]
B1:[Header|0|1] [Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] T:[BlockPointer|4|B2:1]
B2:[Header|0|4] [Sentinel|1|0] [Allocated|2|57] H:[Free|3|0] [EndOfList|4|0]

As expected, the code first takes the ownership of the tail.

Head:[B2:3|Free]
Tail:[B1:4|Destroying]
B1:[Header|0|1] [Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] T:[BlockPointer|4|B2:1]
B2:[Header|0|4] [Sentinel|1|0] [Allocated|2|57] H:[Free|3|0] [EndOfList|4|0]

We know that the first slot in the next block is Sentinel. We also know that the head is not pointing to this slot because that’s how Enqueue works (when new block is allocated, head points to the first slot after the Sentinel.). Therefore, it is safe to update the tail to point to the Sentinel slot of the B2 block.

Head:[B2:3|Free]
Tail:[B2:1|Sentinel]
B1:[Header|0|1] [Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] [BlockPointer|4|B2:1]
B2:[Header|0|4] T:[Sentinel|1|0] [Allocated|2|57] H:[Free|3|0] [EndOfList|4|0]

By doing the swap, the ownership of the tail is released.

The Header count is then decremented.

Head:[B2:3|Free]
Tail:[B2:1|Sentinel]
B1:[Header|0|0] [Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] [BlockPointer|4|B2:1]
B2:[Header|0|4] T:[Sentinel|1|0] [Allocated|2|57] H:[Free|3|0] [EndOfList|4|0]

Because the count is now zero, the code destroys the B1 block. (Note that the Header count decrement is atomic and only one thread can actually reach the zero.) While the block is being destroyed, other threads may be calling Dequeue.

Head:[B2:3|Free]
Tail:[B2:1|Sentinel]
B1:[Header|0|0] [Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] [BlockPointer|4|B2:1]
B2:[Header|0|4] T:[Sentinel|1|0] [Allocated|2|57] H:[Free|3|0] [EndOfList|4|0]

Because the tail tag was originally BlockPointer, the code retries immediately and continues with the scenario 1.

The actual code is tricky because some of the code path is shared between scenarios (code taken from GpLockFreeQueue).

function TGpLockFreeQueue.Dequeue(var value: int64): boolean;
var
caughtTheHead: boolean;
tail : PGpLFQueueTaggedValue;
header : PGpLFQueueTaggedValue;
next : PGpLFQueueTaggedValue;
tag : TGpLFQueueTag;
begin
tag := tagSentinel;
Result := true;
while Result and (tag = tagSentinel) do begin
repeat
tail := obcTailPointer.Slot;
caughtTheHead := NextSlot(obcTailPointer.Slot) = obcHeadPointer.Slot;
if (obcTailPointer.Tag = tagAllocated)
and CAS64(tail, Ord(tagAllocated), tail, Ord(tagRemoving), obcTailPointer^) then
begin
tag := tagAllocated;
break; //repeat
end
else if (obcTailPointer.Tag = tagSentinel) then begin
if caughtTheHead then begin
Result := false;
break; //repeat
end
else if CAS64(tail, Ord(tagSentinel), tail, Ord(tagRemoving), obcTailPointer^) then begin
tag := tagSentinel;
break; //repeat
end
end
else if (obcTailPointer.Tag = tagBlockPointer)
and CAS64(tail, Ord(tagBlockPointer), tail, Ord(tagDestroying), obcTailPointer^) then
begin
tag := tagBlockPointer;
break; //repeat
end
else
asm pause; end;
until false;
if Result then begin // dequeueing
header := tail;
Dec(header, header.Offset);
if tag in [tagSentinel, tagAllocated] then begin
next := NextSlot(tail);
if tag = tagAllocated then // sentinel doesn't contain any useful value
value := tail.Value;
if caughtTheHead then begin // release the lock; as this is the last element, don't move forward
Move64(tail, Ord(tagSentinel), obcTailPointer^);
header := nil; // do NOT decrement the counter; this slot will be retagged again
end
else
Move64(next, Ord(next.Tag), obcTailPointer^); // release the lock
end
else begin // releasing memory
next := PGpLFQueueTaggedValue(tail.Value); // next points to the sentinel
Move64(next, Ord(tagSentinel), obcTailPointer^); // release the lock
tag := tagSentinel; // retry
end;
if assigned(header) and (InterlockedDecrement(PInteger(header)^) = 0) then
ReleaseBlock(header);
end;
end; //while Result and (tag = tagSentinel)
end; { TGpLockFreeQueue.Dequeue }

Memory management

In the dynamic queue described above, special consideration goes to memory allocation and deallocation because most of the time that will be the slowest part of the enqueue/dequeue.

Memory is always released after the queue tail is unlocked. That way, other threads may dequeue from the same queue while the thread is releasing the memory.

The allocation is trickier, because the Enqueue only knows that it will need the memory after the head is locked. The trick here is to use one preallocated memory block which is reused inside the Enqueue. This is much faster than calling the allocator. After the head is unlocked, Enqueue preallocates next block of memory. This will slow down the current thread, but will not block other threads from enqueueing into the same queue.

Dequeue also tries to help with that. If the preallocated block is not present when a block must be released, Dequeue will store the released block away for the next Enqueue to use.

Also, there's one such block preallocated when the queue is initially created.

If this explanation is unclear, look at the program flow below. It describes the code flow through the Enqueue that has to allocate a memory block and through the Dequeue that has to release a memory block. Identifiers in parenthesis represent methods listed below.

Enqueue:

  • lock the head
  • detect EndOfList
  • use the cached block if available, otherwise allocate a new block (AllocateBlock)
  • unlock the head
  • if there is no cached block, allocate new block and store it away (PreallocateMemory)

Dequeue:

  • lock the tail
  • process last slot in the block
  • unlock the tail
  • decrement the header count
  • as the header count has dropped to zero:
    • if the cached block is empty, store this one away (ReleaseBlock)
    • otherwise release the block

All manipulations with the cached block are done atomically. All allocations are optimistic – if the preallocated block is empty, new memory block is allocated, partitioned and only then the code tries to swap it into the preallocated block variable. If compare-and-swap fails at this point, other thread went through the same routine, just slightly faster, and the allocated (and partitioned) block is thrown away. Looks like there may be quite some work done in vain but in reality the preallocated block is rarely thrown away.

It tested other, more complicated schemes (for example small 4-slot stack) but they invariably behaved worse than this simple approach.

function TGpLockFreeQueue.AllocateBlock: PGpLFQueueTaggedValue;
var
cached: PGpLFQueueTaggedValue;
begin
cached := obcCachedBlock;
if assigned(cached) and CAS32(cached, nil, obcCachedBlock) then
Result := cached
else begin
Result := AllocMem(obcBlockSize);
PartitionMemory(Result);
end;
end; { TGpLockFreeQueue.AllocateBlock }


procedure
TGpLockFreeQueue.PreallocateMemory;
var
memory: PGpLFQueueTaggedValue;
begin
if not assigned(obcCachedBlock) then begin
memory := AllocMem(obcBlockSize);
PartitionMemory(memory);
if not CAS32(nil, memory, obcCachedBlock) then
FreeMem(memory);
end;
end; { TGpLockFreeQueue.PreallocateMemory }

procedure TGpLockFreeQueue.ReleaseBlock(firstSlot: PGpLFQueueTaggedValue; forceFree: boolean);
begin
if forceFree or assigned(obcCachedBlock) then
FreeMem(firstSlot)
else begin
ZeroMemory(firstSlot, obcBlockSize);
PartitionMemory(firstSlot);
if not CAS32(nil, firstSlot, obcCachedBlock) then
FreeMem(firstSlot);
end;
end; { TGpLockFreeQueue.ReleaseBlock }

As you can see in the code fragments above, memory is also initialized (formatted into slots) when memory is allocated. This also helps with the general performance.

Performance

Tests were again performed using the 32_Queue project in the Tests branch of the OTL tree.

The test framework sets up the following data path:

source queue –> N threads –> channel queue –> M threads –> destination queue

Source queue is filled with numbers from 1 to 1.000.000. Then 1 to 8 threads are set up to read from the source queue and write into the channel queue and another 1 to 8 threads are set up to read from the channel queue and write to the destination queue. Application then starts the clock and starts all threads. When all numbers are moved to the destination queue, clock is stopped and contents of the destination queue are verified. Thread creation time is not included in the measured time.

All in all this results in 2 million reads and 2 million writes distributed over three queues. Tests are very brutal as all threads are just hammering on the queues, doing nothing else. The table below contains average, min and max time of 5 runs on a 2.67 GHz computer with two 4-core CPUs. Data from the current implementation ("new code") is compared to the original implementation ("old code"). Best times are marked green.

  New code Old code
  average [min-max]
all data in milliseconds
millions of queue operations per second average [min-max]
all data in milliseconds
millions of queue operations per second
N = 1, M = 1 590 [559 – 682] 6.78 707 [566-834] 5.66
N = 2, M = 2 838 [758 – 910] 4.77 996 [950-1031] 4.02
N = 3, M = 3 1095 [1054 – 1173] 3.65 1065 [1055-1074] 3.76
N = 4, M = 4 1439 [1294 – 1535] 2.78 1313 [1247-1358] 3.04
N = 8, M = 8 1674 [1303 – 2217] 2.39 1520 [1482-1574] 2.63
N = 1, M = 7 1619 [1528 – 1822] 2.47 3880 [3559-4152] 1.03
N = 7, M = 1 1525 [1262 – 1724] 2.62 1314 [1299-1358] 3.04

The new implementation is faster when less threads are used and slightly slower when number of threads increases. The best thing is that there is no weird speed drop in N = 1, M = 7 case. The small slowdown with higher number of threads doesn't bother me much as this test case really stresses the queue. In all practical applications, there should be much more code that does real work and queue load would rapidly drop down.

If your code depends on accessing a shared queue from many multiple threads that enqueue/dequeue most of the time, there's a simple solution - change the code! I believe that multithreaded code should not fight for each data, but cooperate. A possible solution is to split the data in packets and schedule packets to the shared queue. Each thread would then dequeue one packet and process all data stored within.

Wrapup

The code will be released in OmniThreadLibrary 1.5 (but you can use it already if you fetch the HEAD from the SVN). It passed very rigorous stress test and I believe it is working. If you find any problems, please let me know. I’m also interested in any ports to different languages (a C version would be nice).

Labels: , , , , ,