Thursday, April 30, 2009

TDM Rerun #13: Shared Events, Part 2: Redesign

Now we can already guess where the general sluggishness of the shared event system comes from. The trouble lies in the constant XML loading and saving. Most of the shared event system tables are quite static, but that doesn’t hold for the Event Queue table, where new entries are inserted, modified and deleted all the time.

- Shared Events, Part 2: Redesign, The Delphi Magazine 102, February 2004

shared table memory snapshotMy second article on shared events architecture first addressed speed issues (original code was quite slow), then discussed internals of shared counters, shared linked lists and shared tables (all of which are used in the shared events system) and at the end returned to fine-tuning by fixing some remaining speed issues. As you can expect, the basis for the tuning was hard data from the profiler, not some wave-of-hand ideas about where the problem maybe lies.

Links: article (PDF, 99 KB), source code (ZIP, 1,9 MB), current version.

Labels: , ,

TDM Rerun #12: Shared Events

Shared event system, as I nicknamed this approach, is implemented as a shared set of in-memory tables, which are accessed and manipulated by producers and listeners. The important part is that there is no dedicated server: housekeeping is distributed between the producers and listeners.

- Shared Events, The Delphi Magazine 97, September 2003

imageShared events mechanism was definitely the most complicated architecture based on shared memory I ever put together. The system allowed multiple programs (running on the same computer) to cooperate using an event-based system. First program would publish an event (or more events) and others would subscribe to those events. First program would then broadcast the event, which would trigger notifications in all subscribed programs. The best trick was that there was no privileged part – no server, service or manager. Publishers and consumers shared all the work – tables were created as needed, housekeeping was done on both sides and so on.

Underlying architecture was largely redesigned after this article was published. Original source files are included only for historical reference.

Links: article (PDF, 496 KB), source code (ZIP, 1,9 MB), current version.

Labels: , ,

Thursday, April 02, 2009

Fluent XML [2]

Yesterday I described my approach to more fluent XML writing. Today I’ll describe the GpFluentXML unit where the ‘fluid’ implementation is stored. If you skipped yesterday’s post you’re strongly encourage to read it now.

Let’s start with the current version of the fluent XML builder interface, which is not completely identical to the yesterday’s version.

Interface

uses
OmniXML_Types,
OmniXML;

type
IGpFluentXmlBuilder = interface ['{91F596A3-F5E3-451C-A6B9-C5FF3F23ECCC}']
function GetXml: IXmlDocument;
//
function AddChild(const name: XmlString): IGpFluentXmlBuilder;
function AddComment(const comment: XmlString): IGpFluentXmlBuilder;
function AddProcessingInstruction(const target, data: XmlString): IGpFluentXmlBuilder;
function AddSibling(const name: XmlString): IGpFluentXmlBuilder;
function Anchor(var node: IXMLNode): IGpFluentXmlBuilder;
function Mark: IGpFluentXmlBuilder;
function Return: IGpFluentXmlBuilder;
function SetAttrib(const name, value: XmlString): IGpFluentXmlBuilder;
function Up: IGpFluentXmlBuilder;
property Attrib[const name, value: XmlString]: IGpFluentXmlBuilder
read SetAttrib; default;
property Xml: IXmlDocument read GetXml;
end; { IGpFluentXmlBuilder }

function CreateFluentXml: IGpFluentXmlBuilder;

The fluent XML builder is designed around the concept of the active node, which represents the point where changes are made. When you call the factory function CreateFluentXml, it creates a new IXMLDocument interface and sets active node to this interface (IXMLDocument is IXMLNode so that is not a problem). When you call other functions, active node may change or it may not, depending on a function.

AddProcessingInstruction and AddComment just create a processing instruction (<?xml … ?> line at the beginning of the XML document) and comment and don’t affect the active node.

AddChild creates a new XML node and makes it a child of the current active node.

Up sets active node to the parent of the active node. Unless, of course, if active node is already at the topmost level in which case it will raise an exception. In the yesterday post this method was called Parent.

AddSibling creates a new XML node and makes it a child of the current active node’s parent. In other words, AddSibling is a shorter version of Up followed by the AddChild.

SetAttrib or it’s shorthand, the default property Attrib, sets value of an attribute.

Mark and Return are always used in pairs. Mark pushes active node on the top of the internal (to the xml builder) stack. Return pops a node from the top of the stack and sets it as the active node. Yesterday this pair was named Here/Back.

Anchor copies the active node into its parameter. That allows you to generate the template code with the fluent xml and store few nodes for later use. Then you can use those nodes to insert programmatically generated XML at those points.

At the end, there’s the Xml property which returns the internal IXMLDocument interface, the one that was created in the CreateFluentXml.

And now let’s move to the implementation.

Implementation

Class TGpFluentXmlBuilder implements the IGpFluentXmlBuilder interface. In addition to the methods from this interface, it declares function ActiveNode and three fields – fxbActiveNode stores the active node, fxbMarkedNodes is a stack of nodes stored with the Mark method and fxbXmlDoc is the XML document.

type
TGpFluentXmlBuilder = class(TInterfacedObject, IGpFluentXmlBuilder)
strict private
fxbActiveNode : IXMLNode;
fxbMarkedNodes: IInterfaceList;
fxbXmlDoc : IXMLDocument;
strict protected
function ActiveNode: IXMLNode;
protected
function GetXml: IXmlDocument;
public
constructor Create;
destructor Destroy; override;
function AddChild(const name: XmlString): IGpFluentXmlBuilder;
function AddComment(const comment: XmlString): IGpFluentXmlBuilder;
function AddProcessingInstruction(const target, data: XmlString): IGpFluentXmlBuilder;
function AddSibling(const name: XmlString): IGpFluentXmlBuilder;
function Anchor(var node: IXMLNode): IGpFluentXmlBuilder;
function Mark: IGpFluentXmlBuilder;
function Return: IGpFluentXmlBuilder;
function SetAttrib(const name, value: XmlString): IGpFluentXmlBuilder;
function Up: IGpFluentXmlBuilder;
end; { TGpFluentXmlBuilder }

Some functions are pretty much trivial – one line to execute the action and another to return Self so another fluent XML action can be chained onto result of the function. Of course, some of those functions are simple because they use wrappers from the OmniXMLUtils unit, not from MS-compatible OmniXML.pas. [By the way, you can download OmniXML at www.omnixml.com.]

function TGpFluentXmlBuilder.AddChild(const name: XmlString): IGpFluentXmlBuilder;
begin
fxbActiveNode := AppendNode(ActiveNode, name);
Result := Self;
end; { TGpFluentXmlBuilder.AddChild }

function TGpFluentXmlBuilder.AddComment(const comment: XmlString): IGpFluentXmlBuilder;
begin
ActiveNode.AppendChild(fxbXmlDoc.CreateComment(comment));
Result := Self;
end; { TGpFluentXmlBuilder.AddComment }

function TGpFluentXmlBuilder.AddProcessingInstruction(const target, data: XmlString):
IGpFluentXmlBuilder;
begin
ActiveNode.AppendChild(fxbXmlDoc.CreateProcessingInstruction(target, data));
Result := Self;end; { TGpFluentXmlBuilder.AddProcessingInstruction }

function TGpFluentXmlBuilder.AddSibling(const name: XmlString): IGpFluentXmlBuilder;
begin
Result := Up;
fxbActiveNode := AppendNode(ActiveNode, name);
end; { TGpFluentXmlBuilder.AddSibling }

function TGpFluentXmlBuilder.GetXml: IXmlDocument;
begin
Result := fxbXmlDoc;
end; { TGpFluentXmlBuilder.GetXml }

function TGpFluentXmlBuilder.Mark: IGpFluentXmlBuilder;
begin
fxbMarkedNodes.Add(ActiveNode);
Result := Self;
end; { TGpFluentXmlBuilder.Mark }
function TGpFluentXmlBuilder.Return: IGpFluentXmlBuilder;
begin
fxbActiveNode := fxbMarkedNodes.Last as IXMLNode;
fxbMarkedNodes.Delete(fxbMarkedNodes.Count - 1);
Result := Self;
end; { TGpFluentXmlBuilder.Return }

function TGpFluentXmlBuilder.SetAttrib(const name, value: XmlString): IGpFluentXmlBuilder;
begin
SetNodeAttrStr(ActiveNode, name, value);
Result := Self;
end; { TGpFluentXmlBuilder.SetAttrib }

OK, Return has three lines, not two. That makes it medium complicated :)

In fact Up is also very simple, except that it checks validity of the active node before returning its parent.

function TGpFluentXmlBuilder.Up: IGpFluentXmlBuilder;
begin
if not assigned(fxbActiveNode) then
raise Exception.Create('Cannot access a parent at the root level')
else if fxbActiveNode = DocumentElement(fxbXmlDoc) then
raise Exception.Create('Cannot create a parent at the document element level')
else
fxbActiveNode := ActiveNode.ParentNode;
Result := Self;
end; { TGpFluentXmlBuilder.Up }

A little more trickstery is hidden inside the ActiveNode helper function. It returns active node when it is set; if not it returns XML document’s document element  or the XML doc itself if document element is not set. I don’t think the second option (document element) can ever occur. That part is just there to future-proof the code.

function TGpFluentXmlBuilder.ActiveNode: IXMLNode;
begin
if assigned(fxbActiveNode) then
Result := fxbActiveNode
else begin
Result := DocumentElement(fxbXmlDoc);
if not assigned(Result) then
Result := fxbXmlDoc;
end;
end; { TGpFluentXmlBuilder.ActiveNode }

Believe it or not, that’s all. The whole GpFluentXml unit with comments and everything is only 177 lines long.

Full GpFluentXML source is available at http://17slon.com/blogs/gabr/files/GpFluentXml.pas.

Labels: , , ,

Wednesday, April 01, 2009

Fluent XML [1]

Few days ago I was writing a very boring piece of code that should generate some XML document. It was full of function calls that created nodes in the XML document and set attributes. Boooooring stuff. But even worse than that – the structure of the XML document was totally lost in the code. It was hard to tell which node is child of which and how it’s all structured.

Then I did what every programmer does when he/she should write some boring code – I wrote a tool to simplify the process. [That process usually takes more time than the original approach but at least it is interesting ;) .]

I started by writing the endcode. In other words, I started thinking about how I want to create this XML document at all. Quickly I decided on the fluent interface approach. I perused it in the OmniThreadLibrary where it proved to be quite useful.

That’s how the first draft looked (Actually, it was much longer but that’s the important part.):

xmlWsdl := CreateFluentXml
.AddProcessingInstruction('xml', 'version="1.0" encoding="UTF-8"')
.AddChild('definitions')
.SetAttr('xmlns', 'http://schemas.xmlsoap.org/wsdl/')
.SetAttr('xmlns:xs', 'http://www.w3.org/2001/XMLSchema')
.SetAttr('xmlns:soap', 'http://schemas.xmlsoap.org/wsdl/soap/')
.SetAttr('xmlns:soapenc', 'http://schemas.xmlsoap.org/soap/encoding/')
.SetAttr('xmlns:mime', 'http://schemas.xmlsoap.org/wsdl/mime/');

This short fragment looks quite nice but in the full version (about 50 lines) all those SetAttr calls visually merged together with AddChild calls and the result was still unreadable (although shorter than the original code with explicit calls to XML interface).

My first idea was to merge at least some SetAttr calls into the AddChild by introducing two versions – one which takes only a node name and another which takes node name, attribute name and attribute value – but that didn’t help the code at all. Even worse – it was hard to see which AddChild calls were setting attributes and which not :(

That got me started in a new direction. If the main problem is visual clutter, I had to do something to make setting attributes stand out. Briefly I considered a complicated scheme which would use smart records and operator overloading but I couldn’t imagine a XML creating code which would use operators and be more readable than this so I rejected this approach. [It may still be a valid approach – it’s just that I cannot make it work in my head.]

Then I thought about arrays. In “classical” code I could easily add array-like support to attributes so that I could write xmlNode[attrName] := ‘some value’, but how can I make this conforming my fluent architecture?

To get or not to get

In order to be able to chain anything after the [], the indexed property hiding behind must return Self, i.e. the same interface it is living in. And because I want to use attribute name/value pairs, this property has to have two indices.

property Attrib[const name, value: XmlString]: IGpFluentXmlBuilder 
read GetAttrib; default;

That would allow me to write such code:

.AddSibling('service')['name', serviceName]
.AddChild('port')
['name', portName]
['binding', 'fs:' + bindingName]
.AddChild('soap:address')['location', serviceLocation];

As you can see, attributes can be chained and I can write attribute assignment in the same line as node creation and it is still obvious which is which and who is who.

But … assignment? In a getter? Why not! You can do anything in the property getter. To make this more obvious, my code calls this ‘getter’ SetAttrib. As a nice side effect, SetAttrib is completely the same as it was defined in the first draft and can even be used insted of the [] approach.

I’ll end today’s instalment with the complete 'fluent xml builder’ interface and with sample code that uses this interface to build an XML document. Tomorrow I’ll wrap things up by describing the interface and its implementation in all boring detail.

type
IGpFluentXmlBuilder = interface ['{91F596A3-F5E3-451C-A6B9-C5FF3F23ECCC}']
function GetXml: IXmlDocument;
//
function Anchor(var node: IXMLNode): IGpFluentXmlBuilder;
function AddChild(const name: XmlString): IGpFluentXmlBuilder;
function AddComment(const comment: XmlString): IGpFluentXmlBuilder;
function AddSibling(const name: XmlString): IGpFluentXmlBuilder;
function AddProcessingInstruction(const target, data: XmlString): IGpFluentXmlBuilder;
function Back: IGpFluentXmlBuilder;
function Here: IGpFluentXmlBuilder;
function Parent: IGpFluentXmlBuilder;
function SetAttrib(const name, value: XmlString): IGpFluentXmlBuilder;
property Attrib[const name, value: XmlString]: IGpFluentXmlBuilder
read SetAttrib; default;
property Xml: IXmlDocument read GetXml;
end; { IGpFluentXmlBuilder }
 
  xmlWsdl := CreateFluentXml
.AddProcessingInstruction('xml', 'version="1.0" encoding="UTF-8"')
.AddChild('definitions')
['xmlns', 'http://schemas.xmlsoap.org/wsdl/']
['xmlns:xs', 'http://www.w3.org/2001/XMLSchema']
['xmlns:soap', 'http://schemas.xmlsoap.org/wsdl/soap/']
['xmlns:soapenc', 'http://schemas.xmlsoap.org/soap/encoding/']
['xmlns:mime', 'http://schemas.xmlsoap.org/wsdl/mime/']
['name', serviceName]
['xmlns:ns1', 'urn:' + intfName]
['xmlns:fs', 'http://fab-online.com/soap/']
['targetNamespace', 'http://fab-online.com/soap/']
.AddChild('message')['name', 'fs:' + baseName + 'Request'].Anchor(nodeRequest)
.AddSibling('message')['name', 'fs:' + baseName + 'Response'].Anchor(nodeResponse)
.AddSibling('portType')['name', baseName]
.Here
.AddChild('operation')['name', baseName]
.AddChild('input')['message', 'fs:' + baseName + 'Request']
.AddSibling('output')['message', 'fs:' + baseName + 'Response']
.Back
.AddSibling('binding')
.Here
['name', bindingName]
['type', 'fs:' + intfName]
.AddChild('soap:binding')
['style', 'rpc']
['transport', 'http://schemas.xmlsoap.og/soap/http']
.AddChild('operation')['name', baseName]
.AddChild('soap:operation')
['soapAction', 'urn:' + baseName]
['style', 'rpc']
.AddSibling('input')
.AddChild('soap:body')
['use', 'encoded']
['encodingStyle', 'http://schemas.xmlsoap.org/soap/encoding/']
['namespace', 'urn:' + intfName + '-' + baseName]
.Parent
.AddSibling('output')
.AddChild('soap:body')
['use', 'encoded']
['encodingStyle', 'http://schemas.xmlsoap.org/soap/encoding/']
['namespace', 'urn:' + intfName + '-' + baseName]
.Back
.AddSibling('service')['name', serviceName]
.AddChild('port')
['name', portName]
['binding', 'fs:' + bindingName]
.AddChild('soap:address')['location', serviceLocation];

What do you think? Does my approach make any sense?

Labels: , , ,

Tuesday, February 10, 2009

OmniThreadLibrary 1.03

OmniThreadLibrary 1.03 was silently released two days ago. Even without the announcement, it was downloaded 133 times to this point. Awesome!

The main new feature is per-thread initialized data in thread pool. That allows you to create a connection pool with OTL. There’s a simple demo included in the distribution (24_ConnectionPool). I wrote few words about it yesterday.

No bugs were fixed so you don’t have to upgrade if you don’t need new thread pool functionality.

As usual, you can get it via SVN or as a ZIP archive.

Other important OTL links:

Labels: , , ,

Monday, February 09, 2009

Building a connection pool

Recently, an OTL user asked me in the forum how to build a connection pool with the OTL. The answer, at the time, was – not possible. There was a crucial component missing.

It turned out that implementing thread-global data was not really hard to do so here it is – a tutorial on how to build a connection pool with the OTL (also included in the latest release as a demo 24_ConnectionPool). To run this code you’ll need OTL 1.03.

Let’s say we want to build a pool of some entities that take some time to initialize (database connections, for example). In a traditional sense, one would build a list of objects managing those entities and would then allocate them to the threads running the code. In practice, we can run in a big problem if such entities expect to always run from the thread in which they were created. (I had such problem once with TDBIB and Firebird Embedded.) To solve this, we would have to associate entities with threads and we’ll also have to monitor thread lifecycle (to deallocate entities when a thread is terminated).

With OTL, the logic is reversed. Threads will be managed by a thread pool and there will be no need for us to create/destroy them. We’ll just create a task and submit it into a thread pool. Thread pool will initialize the pool entity (database connection), associate it with a thread and pass it to all tasks that will run in this thread so that they can use it.

Furthermore, this solution allows you to use all the functionality of the OTL thread pool. You can set maximum number of concurrent tasks, idle thread timeout, maximum time the task will wait for execution and more and more.

So let’s see how we can code this in the OTL. All code was extracted from the demo 24_ConnectionPool.

Connection pool demo

In the OnCreate event the code creates a thread pool, assigns it a name and thread data factory. The latter is a function that will create and initialize new connection for each new thread. In the OnClose event the code terminates all waiting tasks (if any), allowing the application to shutdown gracefully. FConnectionPool is an interface and its lifetime is managed automatically so we don’t have to do anything explicit with it.

procedure TfrmConnectionPoolDemo.FormCreate(Sender: TObject);
begin
FConnectionPool := CreateThreadPool('Connection pool');
FConnectionPool.ThreadDataFactory := CreateThreadData;
end;

procedure TfrmConnectionPoolDemo.FormClose(Sender: TObject; var Action: TCloseAction);
begin
FConnectionPool.CancelAll;
end;

The magic CreateThreadData factory just creates a connection object (which would in a real program establish a database connection, for example).

function CreateThreadData: IInterface;
begin
Result := TConnectionPoolData.Create;
end;

There’s no black magic behind this connection object. It is an object which implements an interface. Any interface. This interface will be used only in your code. In this demo, TConnectionPoolData contains only one field – unique ID, which will help us follow the program execution.

type
IConnectionPoolData = interface ['{F604640D-6D4E-48B4-9A8C-483CA9635C71}']
function ConnectionID: integer;
end;

TConnectionPoolData = class(TInterfacedObject, IConnectionPoolData)
strict private
cpID: integer;
public
constructor Create;
destructor Destroy; override;
function ConnectionID: integer;
end; { TConnectionPoolData }

As this is not a code from a real world application, I didn’t bother connecting it to any specific database. TConnectionPoolData constructor will just notify the main form that it has begun its job, generate new ID and sleep for 5 seconds (to emulate establishing a slow connection). The destructor is even simpler, it just sends a notification to the main form.

constructor TConnectionPoolData.Create;
begin
PostToForm(WM_USER, MSG_CREATING_CONNECTION, integer(GetCurrentThreadID));
cpID := GConnPoolID.Increment;
Sleep(5000);
PostToForm(WM_USER, MSG_CREATED_CONNECTION, cpID);
end;

destructor TConnectionPoolData.Destroy;
begin
PostToForm(WM_USER, MSG_DESTROY_CONNECTION, cpID);
end;

Creating and running a task is really simple with the OTL:

procedure TfrmConnectionPoolDemo.btnScheduleClick(Sender: TObject);
begin
Log('Creating task');
CreateTask(TaskProc).MonitorWith(OTLMonitor).Schedule(FConnectionPool);
end;

We are monitoring the task with the TOmniEventMonitor component because a) we want to know when the task will terminate and b) otherwise we would have to keep reference to the IOmniTaskControl interface returned from the CreateTask.

The task worker procedure TaskProc is again really simple. First it pulls the connection data from the task interface (task.ThreadData as IConnectionPoolData), retrieves the connection ID and sends task and connection ID to the main form (for logging purposes) and then it sleeps for three seconds, indicating some heavy database activity.

procedure TaskProc(const task: IOmniTask);
begin
PostToForm(WM_USER + 1, task.UniqueID,
(task.ThreadData as IConnectionPoolData).ConnectionID);
Sleep(3000);
end;

Then … but wait! There’s no more! Believe it or not, that’s all. OK, there is some infrastructure code that is used only for logging but that you can look up by yourself.

There is also a code assigned to the second button (“Schedule and wait”) but it only demonstrates how you can schedule a task and wait on its execution. Useful if you’re running the task from a background thread (for example, Indy thread, as specified by the author of the original question).

Running the demo

Let’s run the demo and click on the Schedule key.

image

What happened here?

  • Task was created.
  • Immediately, it was scheduled for execution and thread pool called our thread data factory.
  • Thread data waited for five seconds and returned.
  • Thread pool immediately started executing the task.
  • Task waited for three seconds and exited.

OK, nothing special. Let’s click the Schedule button again.

image

Now a new task was created (with ID 4), was scheduled for execution in the same thread as the previous task and reused the connection that was created when the first task was scheduled. There is no 5 second wait, just the 3 second wait implemented in the task worker procedure.

If you now leave the program running for 10 seconds, a message Destroying connection 1 will appear. The reason for this is that the default thread idle timeout in the OTL thread pool is 10 seconds. In other words, if a thread does nothing for 10 seconds, it will be stopped. You are, of course, free to set this value to any number or even to 0, which would disable the idle thread termination mechanism.

If you now click the Schedule button again, new thread will be created in the thread pool and new connection will be created in our factory function (spending 5 seconds doing nothing).

image

Let’s try something else. I was running the demo on my laptop with a dual core CPU, which caused the OTL thread pool to limit maximum number of currently executing threads to two. By default, OTL thread pool uses as much threads as there are cores in the system, but again you can override the value. At the moment, you are limited by a maximum 60 concurrent threads, which should not cause any problems in the next few years, I hope. (The 60 thread limit is not an arbitrary number but is caused by the Windows limitation of allowing only up to 64 handles in the WaitForMultipleObjects function.) Yes, you are allowed to set this limitation to a value higher than the number of CPU cores in the system but still, running 60 active concurrent threads is really not recommended.

To recap – when running the demo, OTL thread pool was limited to two concurrent threads. When I clicked the Schedule button two times in a quick succession, first task was scheduled and first connection started being established (translation: entered the Sleep function). Then the second task was created (as the connection is being established from the worker thread, GUI is not blocked) and second connection started being established in the second thread. Five seconds later, connections are created and task start running (and wait three seconds, and exit).

image

Then I clicked the Schedule button two more times. Two tasks were scheduled and they immediately started execution in two worker threads.

image

For the third demo, I restarted the app and clicked the Shedule button three times. Only two worker threads were created and two connections established and two tasks started execution. The third task entered the thread pool queue and waited for the first task to terminate, after which it was immediately scheduled.

image

So here you have it – a very simple way to build a connection pool. Have fun!

Labels: , , ,

Friday, February 06, 2009

Adding connection pool mechanism to OmniThreadLibrary

I have a problem.

I have this thread pool, which needs to be enhanced a little. And I don’t know how to do it.

Well, actually I know. I have at least three approaches. I just can’t tell which one is the best :(

I’m talking about the OmniThreadLibrary – I know you guessed that already. The problem is the thread pool in OTL doesn’t allow for per-thread resource initialization and that’s something that you need when you’re implementing a connection pool (more background info here). I started adding this functionality but soon found out that I don’t have a good idea on how to implement it. Oh, I have few ideas, they are just not very good :(

Current

At the moment, OTL thread pool functionality is exposed through an interface. This interface is pretty high-level and doesn’t allow the programmer to mess with the underlying thread management. All thread information is hidden in the implementation section. There’s a notification event that’s triggered when pool thread is created or destroyed, but it is only a notification and is triggered asynchronously (and possibly with a delay).

In short:

type
TOTPWorkerThread = class(TThread)
end;

TOmniThreadPool = class(TInterfacedObject, IOmniThreadPool)
end;

Event handlers

The first idea was to add OnThreadInitialization/OnThreadCleanup to the IOmniThreadPool. [Actually something similar already exists - OnWorkerThreadCreated_Asy and OnWorkerThreadDestroyed_Asy – but those events are part of the previous implementation and will be removed very soon.] Those two events would receive a TThread parameter and do the proper initialization there.

There are some big problems though. Let’s say you’ll be implementing database connection pool. You’ll have to open a database connection in OnThreadInitialization. Where would you store that info then? In an external structure, indexed by the TThread? Ugly! Even worse – how would you access the database info from the task that will be executing in the thread pool? By accessing that same structure? Eugh!

Rejected.

Thread subclassing

A better idea is to implement a subclassed thread class in your own code and then tell the thread pool to use this thread class when creating new thread object. You’d then manage database connection in overridden Initialize/Cleanup methods.

type
TDBConnectionPoolThread = class(TOTPWorkerThread)
strict private
FDBConnection: TDBConnectionInfo;
protected
function Initialize: boolean; override;
procedure Cleanup; override;
end;

GlobalOmniThreadPool.ThreadClass := TDBConnectionPoolThread;

Looks much better but there’s again a problem – I’d have to expose TOTPWorkerThread object in the interface section and that’s just plain ugly. Worker thread mechanism should be hidden. Only few people would ever be interested in it.

Thread data subclassing

An even better idea is to add an empty

TOTPWorkerThreadData = class
end;

definition to the interface section of the thread pool unit. IOmniThreadPool would contain a property ThreadDataClass which would point to this definition. And each worker thread would create/destroy an instance of this class in its Execute method.

You’d add database management as

type
TDBConnectionPoolThreadData = class(TOTPWorkerThreadData)
strict private
FDBConnection: TDBConnectionInfo;
protected
constructor Create;
destructor Destroy; override;
end;

GlobalOmniThreadPool.ThreadDataClass := TDBConnectionPoolThreadData;

[Maybe the constructor has to be virtual here? I never know until I try.]

There’s still a question of accessing this information from the task (and it goes the same for the previous attempt – I just skipped the issue then). I’d have to extend the IOmniTask interface with a method to access per-thread data.

Thread data with interfaces

While writing this mind dump a new idea crossed my mind – what if thread data would be implemented as an interface, without the need for subclassing the thread or thread data? In a way it is a first idea just reimplemented to remove all its problems.

Task interface would be extended with thread data access definitions, approximately like this:

type
IOtlThreadData = interface
end;

IOtlTask = interface
property ThreadData: IOtlThreadData;
end;

Thread pool would get a property containing a factory method.

type
TCreateThreadDataProc = function: IOtlThreadData;

IOmniThreadPool = interface
property ThreadDataFactory: TCreateThreadDataProc;
end;

This factory method would be called when thread is created to initialize thread data. Each task would get assigned that same interface into its ThreadData property just before starting its execution in a selected thread. Task would then access ThreadData property to retrieve this information.

In the database connection pool scenario, you’d have to write a connection interface, object and factory.

type
IDBConnectionPoolThreadData = interface(IOtlThreadData)
property ConnectionInfo: TDBConnectionInfo read GetConnectionInfo;
end;

TDBConnectionPoolThreadData = class(TInterfacedObject, IDBConnectionPoolThreadData )
strict private
FDBConnection: TDBConnectionInfo;
protected
constructor Create;
destructor Destroy; override;
end;

function CreateConnectionPoolThreadData: IDBCOnnectionPoolThreadData;
begin
Result := TDBConnectionPoolThreadData.Create;
end;

GlobalThreadPool.ThreadDataFactory := CreateConnectionPoolThreadData;

This approach requires slightly more work from the programmer but I like it most as it somehow seems the cleanest of them all (plus it is implemented with interfaces which is pretty much the approach used in all OTL code).

So, dear reader, what do you think? If you have better idea, or see a big problem with any of those implementations that I didn’t think of, please do tell in the comments!

Labels: , , ,

Tuesday, February 03, 2009

Hassle-free critical section

While writing multithreaded code I sometimes need a fine-grained critical section that will synchronize access to some small (very small) piece of code. In the OmniThreadLibrary, for example, there’s a class TOmniTaskExecutor which has some of its internals (for example a set of Option flags) exposed to both the task controller and the task itself (and those two by definition live in two different threads). Access to those internal fields is serialized with a critical section.

Usually, I need two or more such critical sections. And because I’m lazy and I don’t want to write creation/destruction code every time I need a fine-grained lock, I usually create only one critical section and use if for all such accesses. In other words, when thread 1 is accessing field 1 (protected with that one critical section), thread 2 will be blocked from accessing field 2 (because it is protected with the same critical section). I can live with that, because the frequency of such accesses is very low (or I would not be reusing the same critical section).

Still, I was not happy with this status quo but I didn’t know what to do (except creating more critical sections, of course). Then, while developing the new OTL thread pool, I got a great idea – records! Records need no explicit .Create. Let’s make this new critical section a record!

Let’s start with the use scenario. I want to be able to declare the critical section object …

  TOmniTaskExecutor = class
strict private
oteInternalLock: TOmniCS;
//...
end;

… and then use it without any initialization.

  oteInternalLock.Acquire;
try
if not assigned(oteCommList) then
oteCommList := TInterfaceList.Create;
oteCommList.Add(comm);
SetEvent(oteCommRebuildHandles);
finally oteInternalLock.Release; end;

There are only two problems to be solved. I had to make sure that critical section is created when the record is first used and destroyed when the owning object is destroyed. It turned out that this is quite a big only

Destruction

Let’s start with the simpler problem – destruction. The solution to automatic record cleanup is well-documented (at least if you follow Delphi blogs where they talk about such things …). In general, Delphi compiler doesn’t guarantee what the initial state of record fields will be, but there are two exceptions to this rule – all strings are initialized to an empty string and all interfaces to nil (which in both cases means that the fields holding strings/interfaces are initialized to 0). In addition to that, the compiler will free memory allocated for string fields and destroy interfaces (well, decrease the reference count) when record goes out of scope. If the record is declared inside a method, this will happen when the method exits and if it is declared as a class field, the cleanup will occur when the class is destroyed. In any case, you can be sure that the compiler will take care for strings and interfaces.

So we already know something – TOmniCS record will contain an interface field and an instance of the object implementing this interface will do the actual critical section allocation and access.

  IOmniCriticalSection = interface ['{AA92906B-B92E-4C54-922C-7B87C23DABA9}']
procedure Acquire;
procedure Release;
function GetSyncObj: TSynchroObject;
end; { IOmniCriticalSection }

TOmniCS = record
private
ocsSync: IOmniCriticalSection;
function GetSyncObj: TSynchroObject;
public
procedure Initialize;
procedure Acquire; inline;
procedure Release; inline;
property SyncObj: TSynchroObject read GetSyncObj;
end; { TOmniCS }

The implementation of the IOmniCriticalSection interface is trivial.

  TOmniCriticalSection = class(TInterfacedObject, IOmniCriticalSection)
strict private
ocsCritSect: TSynchroObject;
public
constructor Create;
destructor Destroy; override;
procedure Acquire; inline;
function GetSyncObj: TSynchroObject;
procedure Release; inline;
end; { TOmniCriticalSection }

constructor TOmniCriticalSection.Create;
begin
ocsCritSect := TCriticalSection.Create;
end; { TOmniCriticalSection.Create }

destructor TOmniCriticalSection.Destroy;
begin
FreeAndNil(ocsCritSect);
end; { TOmniCriticalSection.Destroy }

procedure TOmniCriticalSection.Acquire;
begin
ocsCritSect.Acquire;
end; { TOmniCriticalSection.Acquire }

function TOmniCriticalSection.GetSyncObj: TSynchroObject;
begin
Result := ocsCritSect;
end; { TOmniCriticalSection.GetSyncObj }

procedure TOmniCriticalSection.Release;
begin
ocsCritSect.Release;
end; { TOmniCriticalSection.Release }
function CreateOmniCriticalSection: IOmniCriticalSection;
begin
Result := TOmniCriticalSection.Create;
end; { CreateOmniCriticalSection }

Creation

The destruction part was trivial (once you know the trick, of course), but the creation is not. Delphi only guarantees that the ocsSync interface will be initialized to nil (or 0, if you prefer), nothing more than that.

The TOmniCS record offloads all hard work to the Initialize method. It is called from Acquire and GetSyncObj (a method that returns underlying critical section), but not from Release and that’s for a reason. If you call Release before first calling Acquire, it is clearly a programming error and program should crash – and it will because the ocsSync will be nil.

procedure TOmniCS.Acquire;
begin
Initialize;
ocsSync.Acquire;
end; { TOmniCS.Acquire }

function TOmniCS.GetSyncObj: TSynchroObject;
begin
Initialize;
Result := ocsSync.GetSyncObj;
end; { TOmniCS.GetSyncObj }

procedure TOmniCS.Release;
begin
ocsSync.Release;
end; { TOmniCS.Release }

Let’s finally solve the hard work. Before the critical section can be used, ocsSync interface must be initialized. In a single-threaded world we would just create a TOmniCriticalSection object and store it in the ocsSync field. In the multi-threaded world this is not possible.

Let’s think about what can happen if two Acquire calls are made at the same time from two threads. Thread 1 checks if ocsSync is initialized, finds that it’s not and loses its CPU slice. Thread 2 checks if ocsSync is initialized, finds that it’s not, initializes it, calls Acquire and loses its CPU slice. Thread 1 creates another TOmniCriticalSection object, stores it in the ocsSync field (overwriting the previous value, which will get its reference count decremented, which will destroy the implementing object) and calls Acquire. Because this Acquire will be using a critical section different from the Acquire in thread 2, it will succeed and both threads will have access to the protected data. Bad!

The trick is to store TOmniCriticalSecion in the ocsSync field with an atomic operation that will succeed if and only if the ocsSync is empty (nil, zero). And that’s a job for the InterlockedCompareExchange (ICE in short).

ICE takes three parameters – first is an address of the memory area we are trying to modify. Second is the new value and third is the expected value stored in the memory area we are trying to modify. The function returns the current value of the affected memory area. If this memory is not equal to the third parameter than ICE will do nothing.

Quite a mouthful, I know. That’s how it is used in practice:

procedure TOmniCS.Initialize;
var
syncIntf: IOmniCriticalSection;
begin
Assert(cardinal(@ocsSync) mod 4 = 0, 'TOmniCS.Initialize: ocsSync is not 4-aligned!');
while not assigned(ocsSync) do begin
syncIntf := CreateOmniCriticalSection;
if InterlockedCompareExchange(PInteger(@ocsSync)^, integer(syncIntf), 0) = 0 then begin
pointer(syncIntf) := nil;
Exit;
end;
DSiYield;
end;
end; { TOmniCS.Initialize }

Initialize checks if ocsSync is allocated. If not, it will create a new instance of the IOmniCriticalSection interface and store it in the local variable. Then it tries to store it in the ocsSync field with a call to the ICE. The third parameter tells the ICE that we expect ocsSync to contain all zeroes. If this is so, interface will be stored in the ocsSync and ICE will return 0 (otherwise, it will return current value of the ocsSync field). If ICE succeeded, we have to clear the local variable without decrementing interface reference count and we can exit. If ICE failed, we’ll give the other thread a time slice (after all, the other thread just created the critical section, therefore we can assume it will Acquire it, therefore the current thread would not be able to Acquire it and it can sleep a little) and retry.

And that’s how you get a hassle-free critical section. Ugly, I know, but it works.

Just a word of warning – don’t try to pass a TOmniCS record around. Eventually you’ll do an assignment somewhere (newCS := oldCS) and that would screw things out. Just pass the critical section (TOmniCS.SyncObj) and all will be fine.


Labels: , ,

OmniThreadLibrary 1.02

If you haven’t noticed already – OmniThreadLibrary 1.02 has been released few days ago.

The main new feature is reimplemented thread pool. Previous implementation was never meant to be a permanent solution anyway. New thread pool uses OTL as an internal implementation mechanism. Talk about recursion! :) A longer article on the new implementation will follow.

There’s a new Enforced decorator which you can apply to the IOmniTask or IOmniTaskControl. In short – OTL always tries to execute your task. If you call taskControl.Terminate before the task has even started, OTL will set the termination signal and start executing task. This is not a good idea if the task was waiting in the thread pool queue and threadPool.CancelAll or threadPool.Terminate was executed. To bypass this auto-execute behaviour, you can call .Enforced(false).

I’ve implement a critical section which you only have do declare and start using. No need for .Create or .Initialize or similar. An article will follow …

There’s a new demo that shows how to use OTL for background file scanning.

Few small and not so small bugs were fixed and 3rd party units were sync’d to fresh releases.

As usual, you can get it via SVN or as a ZIP archive.

Other important OTL links:

Labels: , , ,

Saturday, December 20, 2008

Christmas mystery

We were tracking down quite an interesting problem in the past few days …

We have this service that accepts connections from multiple computers and reports some status back. And it does that via SOAP. And the SOAP implementation is our internal.

So far so good – we are using this approach in quite some programs deployed in many places. Everything is working well.

Everything, except one installation.

That customer was reporting weird things. Mysterious things. Sometimes the client on one computer would for a moment display a same data as another computer. And then the display would flip back to the correct data. Mysterious.

As expected, the guy who wrote the service pointed his finger to the guy that wrote the SOAP layer (me). And, as expected, I did the reverse. We were pretty sure that the problem was caused by the buggy code written by the other guy. We were both wrong.

Of course we first spent about a working day logging various parts of the service and SOAP server. You know the drill – add logging, compile, connect VPN, upload the source to the customer, start two clients at two machines on the remote site via RDP, wait for the problem, download logs, analyze. Booooring. At the end, we were none smarter. We only knew that the server always returns correct data to all clients.

Then we switched our attention to the client. Who’d say, the client always received the correct data. Except that it sometimes displayed wrong data. But that data was never received via the SOAP layer. As I said before – mystery.

Of course, once we got to that point we knew that the problem lies inside the client software so we only have to dig in that direction. And, of course, we got the answer. And boy was it a surprising one!

You see, our client is somewhat baroque. Layers of layers of code that were developed over the years. It’s a fate of all successful applications and this one is quite successful, at least in the vertical market we are working in. It is therefore not very surprising that the client is using temporary files in a somewhat strange arrangement. Instead of creating temporary files with unique names, it creates a temporary folder inside the %temp% and stores files there. This folder is guaranteed to be unique on the system as it uses a global counter as a part of the folder name. IOW, first client on the computer stores data in %temp%\data_1, second in %temp%\data_2. So the client on the first computer stored remote data (returned from the service) in %temp%\data_1\list.txt and the client on the second computer used %temp%\data_1\list.txt. They were both using ‘data_1’ subfolder as they were both the first (and only) client instance on that machine. A recipe for disaster? Not really as the %temp% folder is local to the computer.

Except that it was not.

Somebody at the customer’s site got a brilliant idea and configured temp folders for all domain clients to point to the domain controller. Don’t know who and surely don’t know why, but as the result %temp% pointed to the same folder (on the domain controller computer) on all client computers in the domain. So the first client downloaded its data to the %temp% (on the domain server), second client downloaded its data to the same location and then the first client displayed that data – data that was already modified and which belonged to the second client. A typical race condition if I ever saw one.

The solution is, of course, to always use GetTempFileName. But this time we surely (re)learnt it in an interesting way.

Labels:

Thursday, November 27, 2008

Advanced enumerators

The Blaise Pascal magazine #4 is out. Inside you can find a second part of my “enumerators” series (first page).

Labels: , ,

Monday, November 03, 2008

Background file scanning with OmniThreadLibrary

Yesterday, a reader asked if I can create a demo for the background file searcher, so here it is. To get the demo source, update your SVN copy or download this file.

The app has a simple interface.

demo 23 - initial

Enter the path to be searched and the file mask in the edit field and click Scan.

demo 23 - scanning

Program starts scanning and displays current folder and number of found files during the process. If you click the X button during the scan, it will be aborted and program will close.

When background scanning completes, list of found files is displayed in the listbox.

demo 23 - final

During the scanning, main thread is fully active. You can move the program around, resize it, minimize, maximize and so on.

Implementation

Let’s take a look at the application in design mode.

demo 23 - design

Besides the components that are visible at runtime, there is also a TOmniEventMonitor component (named OTLMonitor) and a TTimer (tmrDisplayStatus) on the form.

When the user clicks the Scan button, a background task is created.

procedure TfrmBackgroundFileSearchDemo.btnScanClick(Sender: TObject);
begin
FFileList := TStringList.Create;
btnScan.Enabled := false;
tmrDisplayStatus.Enabled := true;
FScanTask := CreateTask(ScanFolders, 'ScanFolders')
.MonitorWith(OTLMonitor)
.SetParameter('FolderMask', inpFolderMask.Text)
.Run;
end;

ScanFolders is the method that will do the scanning (in a background thread). We’ll return to it later. Task will be monitored with the OTLMonitor component so that we will receive task messages. OTLMonitor will also tell us when the task will terminate. Input folder and mask is send to the task as a parameter FolderMask and task is started.

The FFileList field is a TStringList that will contain a list of all found files.

Let’s ignore the scanner details for the moment and skip to the end of the scanning process. When task has completed its job, OTLMonitor.OnTaskTerminated is called.

procedure TfrmBackgroundFileSearchDemo.OTLMonitorTaskTerminated(
const task: IOmniTaskControl);
begin
tmrDisplayStatus.Enabled := false;
outScanning.Text := '';
outFiles.Text := IntToStr(FFileList.Count);
lbFiles.Clear;
lbFiles.Items.AddStrings(FFileList);
FreeAndNil(FFileList);
FScanTask := nil;
btnScan.Enabled := true;
end;

At that point, number of found files is copied to the outFiles edit field and complete list is assigned to the listbox. Task reference FScanTask is then cleared, which causes the task object to be destroyed and Scan button is reenabled (it was disabled during the scanning process).

But what if the user closes the program by clicking the X button while the background scanner is active? Simple, we just catch the OnFormCloseQuery event and tell the task to terminate.

procedure TfrmBackgroundFileSearchDemo.FormCloseQuery(Sender: TObject;
var CanClose: boolean);
begin
if assigned(FScanTask) then begin
FScanTask.Terminate;
FScanTask := nil;
CanClose := true;
end;
end;

The Terminate method will do two things – tell the task to terminate and then wait for its termination. After that, we simply have to clear the task reference and allow the program to terminate.

Scanner

Let’s move to the scanning part now. The ScanFolders method (which is the main task method, the one we passed to the CreateTask) splits the value of the FolderMask parameter into folder and mask parts and passes them to the main worker ScanFolder.

procedure ScanFolders(const task: IOmniTask);
var
folder: string;
mask : string;
begin
mask := task.ParamByName['FolderMask'];
folder := ExtractFilePath(mask);
Delete(mask, 1, Length(folder));
if folder <> '' then
folder := IncludeTrailingPathDelimiter(folder);
ScanFolder(task, folder, mask);
end;

ScanFolder first finds all subfolders of the selected folder and calls itself recursively for each subfolder. That means that we’ll first process deepest folders and then proceed to the top of the folder tree.

Then it sends a message MSG_SCAN_FOLDER to the main thread. As a parameter of this message it sends the name of the folder being processed. There’s nothing magical about this message – it is just an arbitrary numeric constant from range 0 .. 65534 (yes, number 65535 is reserved for internal use).

const
MSG_SCAN_FOLDER = 1;
MSG_FOLDER_FILES = 2;
procedure ScanFolder(const task: IOmniTask; const folder, mask: string);
var
err : integer;
folderFiles: TStringList;
S : TSearchRec;
begin
err := FindFirst(folder + '*.*', faDirectory, S);
if err = 0 then try
repeat
if ((S.Attr and faDirectory) <> 0) and (S.Name <> '.') and (S.Name <> '..') then
ScanFolder(task, folder + S.Name + '\', mask);
err := FindNext(S);
until task.Terminated or (err <> 0);
finally FindClose(S); end;
task.Comm.Send(MSG_SCAN_FOLDER, folder);
folderFiles := TStringList.Create;
try
err := FindFirst(folder + mask, 0, S);
if err = 0 then try
repeat
folderFiles.Add(folder + S.Name);
err := FindNext(S);
until task.Terminated or (err <> 0);
finally FindClose(S); end;
finally task.Comm.Send(MSG_FOLDER_FILES, folderFiles); end;
end;

ScanFolder then runs the FindFirst/FindNext/FindClose loop for the second time to search for files in the folder. [BTW, if you want to first scan folders nearer to the root, just change the two loops and scan for files first and for folders second.] Each file is added to an internal TStringList object which was created just a moment before. When folder scan is completed, this object is sent to the main thread as parameter of the MSG_FOLDER_FILES message.

This approach – sending data for one folder – is a compromise between returning the complete set (full scanned tree), which would not provide a good feedback, and returning each file as we detect it, which would unnecessarily put a high load on the system.

Both Find loops test the status of the task.Terminated function and exit immediately if it is True. That allows us to terminate the background task when the user closes the application and OnFormCloseQuery is called.

Receiving messages

That’s all that has to be done in the background task but we still have to process the messages in the main thread. For that, we write the OTLMonitor’s OnTaskMessage event.

procedure TfrmBackgroundFileSearchDemo.OTLMonitorTaskMessage(
const task: IOmniTaskControl);
var
folderFiles: TStringList;
msg : TOmniMessage;
begin
task.Comm.Receive(msg);
if msg.MsgID = MSG_SCAN_FOLDER then
FWaitingMessage := msg.MsgData
else if msg.MsgID = MSG_FOLDER_FILES then begin
folderFiles := TStringList(msg.MsgData.AsObject);
FFileList.AddStrings(folderFiles);
FreeAndNil(folderFiles);
FWaitingCount := IntToStr(FFileList.Count);
end;
end;

If the message is MSG_SCAN_FOLDER we just copy folder name to a local field. If the message is MSG_FOLDER_FILES, we copy file names from the parameter (which is a TStringList) to the global FFileList list and destroy the parameter. We also update a local field holding the number of currently found files.

So why don’t we directly update two edit fields on the form (one with current folder and another with number of found files)? Well, the background task can send many messages in one second (when processing folders will small number of files) and there’s no point in displaying them all – the user will never see what was displayed anyway. And it would slow down the GUI because Windows controls would be updated hundreds of times per second, which is never a good idea.

Instead of that we just store the strings to be displayed in two form fields and display them from a timer which is triggered three times per second. That will not show all scanned folders and all intermediate file count results, but will still provide the user with the sufficient feedback.

procedure TfrmBackgroundFileSearchDemo.tmrDisplayStatusTimer(Sender: TObject);
begin
if FWaitingMessage <> '' then begin
outScanning.Text := FWaitingMessage;
FWaitingMessage := '';
end;
if FWaitingCount <> '' then begin
outFiles.Text := FWaitingCount;
FWaitingCount := '';
end;
end;

And that’s all. Fully functional background file scanner that could easily be repackaged into a TComponent. And it’s all yours.

Labels: , , , ,

Sunday, November 02, 2008

OmniThreadLibrary 1.01

OmniThreadLibrary 1.01 has been released yesterday. It is available via SVN (http://omnithreadlibrary.googlecode.com/svn/tags/release-1.01) or as a ZIP archive.

Changes since version 1.0a:

  • *** Breaking interface change ***
    • IOmniTask.Terminated renamed to IOmniTask.Stopped.
    • New IOmniTask.Terminated that check whether the task has been requested to terminate. [demo 22]
  • [GJ] Redesigned stack cotainer with better lock contention.
  • [GJ] Totally redesigned queue container, which is no longer based on stack and allows multiple readers.
  • Full D2009 support; D2009 packages, project files and Tests project group.
  • Invoke-by-name and invoke-by-address messaging implemented [http://17slon.com/blogs/gabr/2008/10/erlangenizing-omnithreadlibrary.html, http://17slon.com/blogs/gabr/2008/10/omnithreadlibrary-using-rtti-to-call.html, demos 18 and 19]
  • Implemented CreateTask(reference to function (task: ITaskControl)). [D2009 only, demo 21]
  • Implemented blocking wait (ReceiveWait). [demo 19]
  • Added enumerator to the IOmniTaskGroup interface.
  • Implemented IOmniTaskGroup.RegisterAllWithTask and .UnregisterAllFromTask.
  • Added automatic comm unregistration for IOmniTaskGroup.RegisterAllCommWith.
  • Implemented IOmniTaskGroup.SendToAll.
  • IOmniTaskControl.Terminate now kills the task after the timeout.
  • New/updated tests/demos:
    • 10_Containers
      • 2 -> 2, 1 -> 4 and 4 -> 4 tests for stacks and queues.
      • [1, 2, 4] -> [1, 2, 4] full tests.
      • Writes CSV file with cumulative test results.
    • 17_MsgWait: demo for the .MsgWait decorator.
    • 18_StringMsgDispatch: Invoke demo.
    • 19_StringMsgBenchmark: Invoke benchmark, ReceiveWait demo.
    • 20_QuickSort: Parallel quicksort demo.
    • 21_Anonymous_methods: Anonymous methods demo (D2009 only).
    • 22_TerminationTest: Task termination demo.
  • Message ID $FFFF is now reserved for internal purposes.
  • Better default queue length calculation that takes into account OtlContainers  overhead and FastMM4 granulation.
  • Bug fixed: TOmniValue.Null was not really initialized to Null.
  • Bug fixed: Setting timer interval resets timer countdown.
  • Bug fixed: TOmniTaskControl.Schedule always scheduled task to the global thread pool.
  • Current versions of 3rd part units included.

Known problems:

  • Thread pool is not stable under high  load.

Labels: , , ,

Thursday, October 30, 2008

When one and one makes one

Some time ago I promised to write an article on TGpJoinedStream class. A time has come to fulfill the promise.

TGpJoinedStream is a stream wrapper class – a class that wraps one or more existing streams and provides a TStream interface on the outside. My GpStreams unit contains several examples of such wrappers: TGpStreamWindow provides a stream access to a part of another stream, TGpScatteredStream provides contiguous access to a scattered data (hm, I never wrote an article about that one, didn’t I?), and TGpBufferedStream adds data caching to any stream.

TGpJoinedStream’s role is simple. You pass it a bunch of streams and it provides stream access to concatenated data. IOW, if first stream contains numbers <1, 2, 3> and second stream contains <4, 5>, joined stream would function as if it contains <1, 2, 3, 4, 5>.

Implementation is pretty straightforward and won’t describe it here. I'll rather show how TGpJoinedStream is used in practice and why I wrote it at all.

We have a MPEG parser class that takes a TStream containing MPEG video data and extracts some information from it. It works fine when you need to extract information from a video file, but not so good if you want to use it inside a DirectX filter. From a viewpoint of a DirectX source or sink filter, video is not a stream but a bunch of buffers, which we must process one by one. The trouble is that buffers can almost never be fully processed because MPEG sequences don’t contain data length.

In MPEG, each sequence starts with byte signature 00 00 01, followed by a byte that tells you what kind of data you’re processing, followed by some data. Unless you want to parse each and every sequence data and you know exactly how all well-formed and malformed sequences in the existence are built, you only know that you reached the end of one sequence when you reach another 00 00 01 signature. That’s why the last sequence in the buffer can never be processed (unless this is the last buffer of them all) until we find 00 00 01 in the next buffer. Uff. I hope somebody understands this at all.

In short, we are processing data buffers. A variable-length part of each buffer will stay unprocessed and will have to be prepended to the next buffer before it is passed through the MPEG parser. And so on, until the end of data.

And that’s where TGpJoinedStream comes to help. The unprocessed part is stored in a memory stream FLeftovers, which is empty at the beginning. Buffer parser concatenates FLeftovers with the current data and passes the result through the stream parser. When that one returns, .Position in the combined stream indicates the first unprocessed byte. The unprocessed data is then copied into the FLeftovers stream so it can be used next time the buffer parser is called.

That’s how it looks in code (slightly simplified real code; I just removed some details that are not interesting for our story).

procedure TGpMPEGSequentialParser.ParseNext(buffer: pointer; 
bufferSize: integer);
var
combinedStream: TGpJoinedStream;
mpegParser : TGpMPEGParser;
newLeftovers : TMemoryStream;
strBuffer : TGpFixedMemoryStream;
begin
strBuffer := TGpFixedMemoryStream.Create(buffer^, bufferSize);
try
newLeftovers := TMemoryStream.Create;
try
combinedStream := TGpJoinedStream.Create([FLeftovers, strBuffer]);
try
mpegParser := TGpMPEGParser.Create;
try
mpegParser.MPEGStream := combinedStream;

mpegParser.Parse(HandleMpegSequence);

if combinedStream.Position < combinedStream.Size then
newLeftovers.CopyFrom(combinedStream,
combinedStream.Size - combinedStream.Position);
finally FreeAndNil(mpegParser); end;
finally FreeAndNil(combinedStream); end;
finally
FreeAndNil(FLeftovers);
FLeftovers := newLeftovers;
end;
finally FreeAndNil(strBuffer); end;
end;


Hope you like it!

Labels: , , ,

Tuesday, October 21, 2008

Internet, as predicted in 1946

“You know the logics setup. You got a logic in your house. It looks like a vision receiver used to, only it's got keys instead of dials and you punch the keys for what you wanna get. It's hooked in to the tank, which has the Carson Circuit all fixed up with relays. Say you punch "Station SNAFU" on your logic. Relays in the tank take over an' whatever vision-program SNAFU is telecastin' comes on your logic's screen. Or you punch "Sally Hancock's Phone" an' the screen blinks an' sputters an' you're hooked up with the logic in her house an' if somebody answers you got a vision-phone connection. But besides that, if you punch for the weather forecast or who won today's race at Hialeah or who was mistress of the White House durin' Garfield's administration or what is PDQ and R sellin' for today, that comes on the screen too. The relays in the tank do it. The tank is a big buildin' full of all the facts in creation an' all the recorded telecasts that ever was made—an' it's hooked in with all the other tanks all over the country—an' everything you wanna know or see or hear, you punch for it an' you get it. Very convenient. Also it does math for you, an' keeps books, an' acts as consultin' chemist, physicist, astronomer, an' tea-leaf reader, with a "Advice to the Lovelorn" thrown in. The only thing it won't do is tell you exactly what your wife meant when she said, "Oh, you think so, do you?" in that peculiar kinda voice. Logics don't work good on women. Only on things that make sense.”

- Murray Leinster, A Logic Named Joe

For Sci-Fi fans: Baen Free Library.

Labels:

Tuesday, October 14, 2008

TDM Rerun #11: Shared Pools

Sometimes we run into problems because of a Windows feature, and most of the time this is a very good feature, that a file mapping (an essential part of my shared memory implementation) cannot exist on its own. A file mapping, like mutexes, events, and other Windows primitives, must have an owner. If all processes associated with a given file mapping die, the file mapping will be destroyed. Because in my shared memory implementation this file mapping is backed with a page file, its contents won’t be preserved on an accessible part of the disk.

- Shared Pools, The Delphi Magazine 95, July 2003

Shared pool architectureThe shared pool was one of my more baroque creations. In fact, it was so complicated that it was never used in a deployed application. Basically, the article described an architecture to implement a pool of shared memory objects, which multiple Writers could use to send data to one Reader (typically sitting in another process). The system also handled cleanup when a Reader task died and other management details.

You really should not be playing with this code. There are better solutions.

Links: article (PDF, 193 KB), source code (ZIP, 1.9 MB)

Labels: , ,

Tuesday, October 07, 2008

OmniThreadLibrary: Using RTTI to call task methods

Yesterday I wrote an article on by-name and by-address invocations in the new OmniThreadLibrary (development version) but I didn’t finish the description of the OTL internal magic that makes those new calls work. Let’s fix that …

I ended the story right at the point where TOmniTaskExecutor.Asy_DispatchMessages calls DispatchOmniMessage.

Most of the magic happens inside this method, so it’s only fair to display it in its full glory.

procedure TOmniTaskExecutor.DispatchOmniMessage(msg: TOmniMessage);
var
methodAddr : pointer;
methodInfoObj : TObject;
methodInfo : TOmniInvokeInfo absolute methodInfoObj;
methodName : string;
methodSignature: TOmniInvokeType;
msgData : TOmniValue;
obj : TObject;
begin
if msg.MsgID = COtlReservedMsgID then begin
Assert(assigned(WorkerIntf));
GetMethodNameFromInternalMessage(msg, methodName, msgData);
if methodName = '' then
raise Exception.Create('TOmniTaskExecutor.DispatchOmniMessage: Method name not set');
if not assigned(oteMethodHash) then
oteMethodHash := TGpStringObjectHash.Create(17, true); //usually there won't be many methods
if not oteMethodHash.Find(methodName, methodInfoObj) then begin
GetMethodAddrAndSignature(methodName, methodAddr, methodSignature);
methodInfo := TOmniInvokeInfo.Create(methodAddr, methodSignature);
oteMethodHash.Add(methodName, methodInfo);
end;
case methodInfo.Signature of
itSelf:
TOmniInvokeSignature_Self(methodInfo.Address)(WorkerIntf.Implementor);
itSelfAndOmniValue:
TOmniInvokeSignature_Self_OmniValue(methodInfo.Address)(WorkerIntf.Implementor, msgData);
itSelfAndObject:
begin
obj := msgData.AsObject;
TOmniInvokeSignature_Self_Object(methodInfo.Address)(WorkerIntf.Implementor, obj);
end
else
RaiseInvalidSignature(methodName);
end; //case methodSignature
end
else
WorkerIntf.DispatchMessage(msg);
end; { TOmniTaskExecutor.DispatchMessage }

Now let’s dissect it. The code first checks if  the message ID contains the magic value. If not, the message is dispatched as before by using Delphi’s Dispatch.

  if msg.MsgID = COtlReservedMsgID then begin
//...
end
else
WorkerIntf.DispatchMessage(msg);
end; { TOmniTaskExecutor.DispatchMessage }

Not interesting. Let’s take a look at the other path – when msg.MsgID is an internal message ID.

In this case, DispatchMessages extracts the method name from the message. If the caller passed a method name to the Invoke, then the job is simple – it must only be extracted from the TOmniInternalAddressMsg object. If, on the other hand, method pointer was used, the code calls MethodName to convert it to a (who would guess?) method name.

procedure TOmniTaskExecutor.GetMethodNameFromInternalMessage(const msg: TOmniMessage; var
msgName: string; var msgData: TOmniValue);
var
internalType: TOmniInternalMessageType;
method : pointer;
begin
internalType := TOmniInternalMessage.InternalType(msg);
case internalType of
imtStringMsg:
TOmniInternalStringMsg.UnpackMessage(msg, msgName, msgData);
imtAddressMsg:
begin
TOmniInternalAddressMsg.UnpackMessage(msg, method, msgData);
msgName := WorkerIntf.Implementor.MethodName(method);
if msgName = '' then
raise Exception.CreateFmt('TOmniTaskExecutor.GetMethodNameFromInternalMessage: ' +
'Cannot find method name for method %p', [method]);
end
else
raise Exception.CreateFmt('TOmniTaskExecutor.GetMethodNameFromInternalMessage: ' +
'Internal message type %s is not supported',
[GetEnumName(TypeInfo(TOmniInternalMessageType), Ord(internalType))]);
end; //case internalType
end; { TOmniTaskExecutor.GetMethodNameFromInternalMessage }

Next, the code looks into an internal hash table and tries to fetch information on that method name.  If there’s no such information (when some method is called for the first time), GetMethodAddrAndSignature is called to convert the name to the method’s address and signature. Then this information is stored in the hash table so it will be immediately ready next time.

if not oteMethodHash.Find(methodName, methodInfoObj) then begin
GetMethodAddrAndSignature(methodName, methodAddr, methodSignature);
methodInfo := TOmniInvokeInfo.Create(methodAddr, methodSignature);
oteMethodHash.Add(methodName, methodInfo);
end;

Method signature is then used to select the type of call that will be performed. Only three signatures are supported: (Self: TObject), (Self: TObject; const msgData: TOmniValue) and (Self: TObject; var obj: TObject). Demo 18 demonstrates the use of all three. What I find especially convenient is that the third option allows you to put any TObject descendant in the parameter list. In other words, you can do:

type
TAsyncHello = class(TOmniWorker)
published
procedure TheAnswer(var sl: TStringList);
end;

procedure TfrmTestStringMsgDispatch.btnSendObjectClick(Sender: TObject);
var
sl: TStringList;
begin
sl := TStringList.Create;
sl.Text := '42';
FHelloTask.Invoke(@TAsyncHello.TheAnswer, sl);
end;

procedure TAsyncHello.TheAnswer(var sl: TStringList);
begin
FreeAndNil(sl);
end;

Convenient, huh?

RTTI

The above description of the DispatchOmniMessage looks very much like the famous Sydney Harris cartoon. In step one a method name is retrieved from the message. In step three the method is invoked using the right signature and method pointer. In step two, well …

Sydney Harris

Let’s be more explicit, then. We have a method name and we want to find the address for that method and some information about its parameters. Sounds like a job for RTTI, yes? Well, basic RTTI (the one that you enable with {$M+} or {$TYPEINFO ON} or simply by declaring a published property) only handles published properties, not methods. For methods, we need to use extended RTTI, which is enabled with {$METHODINFO ON}. I won’t describe it here as David Glassborow and Hallvard Vassbotn already did the job better than I could. If you’re interested in extended RTTI, I suggest that your research starts at Hallvard’s David Glassborow on extended RTTI article.

Great thanks for publishing all that info, guys, it helped a lot!

GetMethodAddrAndSignature first uses Delphi’s ObjAuto unit to extract method information header. Then it uses TObject’s MethodAddress to convert method name into address. Of course, it doesn’t use TObject directly, as it has no idea where the methods belonging to the task object are stored – it must call MethodAddress on your task object directly.

  methodInfoHeader := ObjAuto.GetMethodInfo(WorkerIntf.Implementor, methodName);
methodAddress := WorkerIntf.Implementor.MethodAddress(methodName);

After that, some sanity checks are run, which I won’t reprint here.

Then the method signature is checked. First we want to ensure that we’re dealing with a procedure, not a function.

  if assigned(methodInfoHeader.ReturnInfo.ReturnType) then
raise Exception.CreateFmt('TOmniTaskExecutor.DispatchMessage: ' +
'Method %s.%s must not return result',
[WorkerIntf.Implementor.ClassName, methodName]);

At the end, a messy fragment of code walks over the parameter description list for this method and checks that first parameter is a class reference (because this method is a part of a class, it has a hidden Self parameter at the beginning) and that the second parameter, if present at all, is either const data: TOmniValue or var obj: TObject (or descendant of TObject). Detected signature is then returned in the methodSignature parameter.

  // only limited subset of method signatures is allowed:
// (Self), (Self, const TOmniValue), (Self, var TObject)
headerEnd := cardinal(methodInfoHeader) + methodInfoHeader^.Len;
params := PParamInfo(cardinal(methodInfoHeader) + SizeOf(methodInfoHeader^)
- CShortLen + SizeOf(TReturnInfo) + Length(methodInfoHeader^.Name));
paramNum := 0;
methodSignature := itUnknown;
// Loop over the parameters
while cardinal(params) < headerEnd do begin
Inc(paramNum);
paramType := params.ParamType^;
if paramNum = 1 then
if (params^.Flags <> []) or (paramType^.Kind <> tkClass) then
RaiseInvalidSignature(methodName)
else
methodSignature := itSelf
else if paramNum = 2 then
//code says 'const' but GetMethodInfo says 'pfVar' :(
if (params^.Flags * [pfConst, pfVar] <> []) and (paramType^.Kind = tkRecord) and
(SameText(paramType^.Name, 'TOmniValue'))
then
methodSignature := itSelfAndOmniValue
else if (params^.Flags = [pfVar]) and (paramType^.Kind = tkClass) then
methodSignature := itSelfAndObject
else
RaiseInvalidSignature(methodName)
else
RaiseInvalidSignature(methodName);
params := params.NextParam;
end;

It looks messy and it is messy, but it does the job. If you have problems understanding this code, I’d recommend stepping over it with the debugger.

Benchmarks

In test 19 I implemented some benchmarking code to find out how much this approach is slower than the standard Comm.Send(msg, data). It turned out that not very much, thanks to the built-in caching.

image

Integer methods dispatching is about twice as fast as the string/pointer dispatching. That’s completely acceptable speed for something that is executed only few (thousand) times in the program’s lifetime. If your program model is based on sending millions and millions of do computation messages to the worker thread then it is doomed since the beginning anyway.

That would be enough for today, hope you liked it and stay well since the next time. Bye!

Labels: , , , ,

Monday, October 06, 2008

Erlangenizing the OmniThreadLibrary

Few days ago I “discovered” Erlang. [Such things happen when you read StackOverflow obsessively (sigh).] I started with Wikipedia and proceeded with the Pragmatic Programmer book – but that’s not really important. I wanted to talk about Erlang’s built-in concurrency support.

Some things are very similar to the OTL (if we take into account that Erlang is a functional language and Delphi is not) and some are different but one immediately jumped to my attention. In Erlang, the message recipient doesn’t use numeric code to discover what message it has received; instead of that, whole message is matched to a programmer provided pattern (or patterns). The interesting thing is that usually (by convention) the first element in the message is an atom (a name, a sequence of characters, if you want). That got me thinking … Why do we send integer messages in the OTL, anyway?

The Windows Way

From the very beginnings, OmniThreadLibrary tried to make programmer’s life simple. Well, at least simpler. One of the helping hands it offered was was simplified message processing. In the traditional Windows thread programming you have to wait for various objects to become signaled with WaitForMultipleObjects and then proceed accordingly. In practice that means that thread’s main logic is centralized in one very big method that handles all those events, mutexes and other stuff that can be waited upon.

OTL helps by implementing this logic internally (at least for workers that implement IOmniWorker interface). Instead of using kernel primitives, task owner sends messages to the task’s message queue. Messages are processed somewhere inside the OTL (specifically OtlTaskControl.pas/TOmniTaskExecutor.Asy_DispatchMessages) and are converted into method calls with Delphi’s Dispacth mechanism. That’s the same mechanism that makes sure that Windows messages are “converted” into Delphi methods and it requires that first two bytes of the dispatched message contain message ID. That’s why (until now) the recommended way to send a message to task was:

const
MSG_CHANGE_MESSAGE = 1;
MSG_SEND_MESSAGE = 2;

type
TAsyncHello = class(TOmniWorker)
strict private
aiMessage: string;
public
function Initialize: boolean; override;
procedure OMChangeMessage(var msg: TOmniMessage); message MSG_CHANGE_MESSAGE;
procedure OMSendMessage(var msg: TOmniMessage); message MSG_SEND_MESSAGE;
end;

FHelloTask: IOmniTaskControl;

FHelloTask.Comm.Send(MSG_CHANGE_MESSAGE, 'Random ' + IntToStr(Random(1234)));

[You can read more about this approach in OmniThreadLibrary Example #4: Bidirectional communication, the OTL way.]

The Erlang Way

This approach simplifies writing threaded code – at least the one that doesn’t depend heavily on shared data structures. But there’s still some room for improvement. For example, do we really have to use numeric messages, which have to be declared in advance. Why couldn’t the task controller just tell the task to execute the OMChangeMessage method?

To cut the long story short – this is now possible. Yesterday I committed a set of OTL modifications that allow you to do this:

type
TAsyncHello = class(TOmniWorker)
published
procedure Change(const data: TOmniValue);
end;

FHelloTask: IOmniTaskControl;

FHelloTask.Invoke('Change', 'Random ' + IntToStr(Random(1234)));

Yes, the Change method is published here. This is important.

Simple, huh? There’s a small problem, though – there are no compile-time checks. The code sends a string and compiler can do nothing to verify validity of this string. If the name was mistyped, you’d only notice it during the program execution.

To fix this problem, OTL allows another form of method invocation which uses a method address instead of  the name.

FHelloTask.Invoke(@TAsyncHello.Change, 'Random ' + IntToStr(Random(1234)));

In this case the compiler can check your typing, but still it won’t catch all problems – for example, the following code will compile and then raise exception during the execution.

procedure TfrmTestStringMsgDispatch.btnTestInvalidMsgClick(Sender: TObject);
begin
if cbStringMessages.Checked then
// will fail, FooBar method is not defined
FHelloTask.Invoke('FooBar')
else
// will fail, can only invoke methods from the task's class
FHelloTask.Invoke(@Self.btnTestInvalidMsg);
end;

[All new functionality is exposed in new demo 18_StringMsgDispatch.]

Implementation

To understand how the Invoke is implemented, it’s best to trace one such call. First we see that Invoke  gets converted into a normal message.

procedure TOmniTaskControl.Invoke(const msgMethod: pointer; msgData: TOmniValue);
begin
Comm.Send(TOmniInternalAddressMsg.CreateMessage(msgMethod, msgData));
end; { TOmniTaskControl.Invoke }

class function TOmniInternalAddressMsg.CreateMessage(const msgMethod: pointer; msgData:
TOmniValue): TOmniMessage;
begin
Result := TOmniMessage.Create(COtlReservedMsgID,
TOmniInternalAddressMsg.Create(msgMethod, msgData));
end; { TOmniInternalAddressMsg.CreateMessage }

This message has message ID COtlReservedMsgID (which is equal to $FFFF, so from now on please don’t use this message for you purposes). Message data field contains object which wraps method name and message data that was passed to the Invoke. Similar code is executed when Invoke is called with the method pointer parameter.

OK, so that’s how method name travels from the task controller to the task itself by using standard communication channel. But that is only half of the story … the simpler part!

On the receiving side, TOmniTaskExecutor.Asy_DispatchMessages detects new message and calls DispatchOmniMessage to process it.

if awaited = idxFirstMessage then
gotMsg := task.Comm.Receive(msg)
else begin
oteInternalLock.Acquire;
try
gotMsg := (oteCommList[awaited - idxFirstMessage - 1] as
IOmniCommunicationEndpoint).Receive(msg);
finally oteInternalLock.Release; end;
end;
if gotMsg and assigned(WorkerIntf) then
DispatchOmniMessage(msg);

Now that’s where the things start to get really interesting as we have to use RTTI and even extended RTTI to call the appropriate method. It is also the point where I’ll cut the story short. This article is already very long, maybe even too long, and I have much more to say on the subject. Expect part II to be published in few days. [Of course, if you’re curious you can just look into the code to see how DispatchOmniMessage is implemented!]

Test it!

The newest OmniThreadLibrary code is only available in the repository. No snapshots this time.

I’d still be immensely grateful to anybody that will test the new functionality and provide me with his thoughts on this approach.

Labels: , , , ,

Friday, October 03, 2008

Bulk update

Delphi 2009 was released so now it’s time to publish updates to my various units … In this article I’m just publishing the short changelog; you can expect more details on some enhancements (TGpJoinedStream, 4- and 8- aligned integer, Unicode support in GpStructuredStorage) in the near future.

DSiWin32 1.41

  • Compatible with Delphi 2009.
  • Created DSiInterlocked*64 family of functions by copying the code from http://qc.borland.com/wc/qcmain.aspx?d=6212. Functions were written by Will DeWitt Jr and are included with permission.
  • New functions DSiCopyFileAnimated, DSiConnectToNetworkResource, DSiIsHtmlFormatOnClipboard, DSiGetHtmlFormatFromClipboard, DSiCopyHtmlFormatToClipboard, DSiYield.
  • Added constants FILE_LIST_DIRECTORY, FILE_SHARE_FULL, FILE_ACTION_ADDED, FILE_ACTION_REMOVED, FILE_ACTION_MODIFIED, FILE_ACTION_RENAMED_OLD_NAME, FILE_ACTION_RENAMED_NEW_NAME.
  • Forced {$T-} as the code doesn't compile in {$T+} state.
  • Bug fixed: It was not possible to use DSiTimeGetTime64 in parallel from multiple threads

GpHugeFile 5.05a

  • Optimization: Under some circumstances, lots of unnecessary SetFilePointer calls were made.

GpLists 1.41

  • Works with Delphi 2009.

GpStreams 1.25

  • Added TGpJoinedStream class.
  • Span-storing class can now be modified via TGpScatteredStream.SpanClass.
  • TGpScatteredStream's AddSpan and AddSpanOS now return span offset in the span list.
  • Added bunch of BE_ overloads to the TGpStreamEnhancer class.
  • Small optimization in KeepStreamPositionWrapper destructor.

GpStructuredStorage 2.0

  • Works with Delphi 2009.
  • Added Unicode support to the underlying storage. File and folder names are stored in UTF-16, as are attribute names and values. API is still based on Delphi's 'string' type - meaning that values are converted to Unicode and back on the fly using the current locale.
  • Existing structured storage files will be upgraded automatically if they are not open for readonly access. Applications, compiled with GpStructuredStorage 1.x will not be able to read new/upgraded files. Version is incremented to 2.0.0.0 when 1.x file is opened unless it is opened in readonly mode. Newly created storages have version 2.0.0.0.

GpStuff 1.13

  • Implemented 4-aligned integer, TGp4AlignedInt.
  • Implemented 8-aligned integer, TGp8AlignedInt.
  • Added function OpenArrayToVarArray, written by Thomas Schubbauer.
  • ReverseWord/ReverseDWord rewritten in assembler (by GJ).
  • Declared MaxInt64 constant.

GpSync 1.21

  • Added optional external message counter to the message queue.

GpTextFile 4.01

  • Added TGpTextFile.Write(ws: WideString) and TGpTextFile.Writeln(s: string) overloads.
  • TGpTextFile.Write[ln] string parameters made 'const'.
  • Bug fixed [found by AKi]: TGpTextFile.Write was not working when using CP_UTF8 codepage.

GpTextStream 1.06

  • Works with Delphi 2009.
  • Exported StringToWideString, WideStringToString, and GetDefaultAnsiCodepage.

GpVersion 2.02

  • Added another CreateVersion overload.
  • Extended IVersion interface with IsHigherThan and IsLowerThan.

Labels: , ,

Saturday, September 20, 2008

Processing Windows messages in OmniThreadLibrary

In all my ravings about the OmniThreadLibrary I’ve never discussed the .MsgWait decorator – and partly because of that it was not even working in the 1.0 release :-(. Time to fix that!

Today I’ll show you how simple is to make a synchronous wrapper around the asynchronous component (using OTL, of course). The code itself is not an OTL test project but a unit I’m using to synchronously download web pages. Where’s the problem at all, you’ll say? Indy is synchronous and so is WinInet. True, but I’m using ICS, which is a completely asynchronous solution.

[For the readers who have no idea what I’m talking about: Synchronous means program calls the code, code executes for some time and then returns with web page. Asynchronous means: program calls the code and the code returns immediately. Somewhere in the background, web page is being retrieved and when it is fully transferred, your program gets notified in some way. Second way is more flexible, allows you to do more things in parallel, doesn’t block the UI but is more complicated to use.]

Sometimes (usually for quick and dirty tests) I need to retrieve a web page without having to think about messages and events and what will be called in which moment. For such times I put together a simple unit which wraps ICS stuff in a thread. It is called GpHttp and will be available on my web in some time. For now, you can download it here. You’ll also need ICS v6 (and it should work with ICS v5 if you remove Overbyte prefix from used units and classes).

Interface

I wanted to keep the interface really simple. The GpHttp unit exports only two functions, one to execute GET and another POST request. Both accept username and password and both return status code (200 if everything went OK), status text and page contents. In case of a socket problem, WinSock error code is returned in statusCode. (HTTP codes are always below 1,000 and WinSock codes are always above 10,000 so there is no possibility for collision.)

function GpHttpGet(const url, username, password: string; 
var statusCode: integer; var statusText, pageContents: string): boolean;

function GpHttpPost(const url, username, password, postData: string;
var statusCode: integer; var statusText, pageContents: string): boolean;

Both functions are simple wrappers around the third (local) function that executes any HTTP request.

function GpHttpGet(const url, username, password: string; 
var statusCode: integer; var statusText, pageContents: string): boolean;
begin
Result := GpHttpRequest(url, username, password, 'GET', '', statusCode,
statusText, pageContents);
end; { GpHttpGet }

function GpHttpPost(const url, username, password, postData: string;
var statusCode: integer; var statusText, pageContents: string): boolean;
begin
Result := GpHttpRequest(url, username, password, 'POST', postData,
statusCode, statusText, pageContents);
end; { GpHttpPost }

The real work starts in the GpHttpRequest method. First it creates a worker object and passes request data to it. The reference to the object (or more correct – to the interface implemented by this object) is stored in the worker variable. We’ll need it at the end to retrieve the status code and page contents.

Next, the task is created and run. Notice the call to .MsgWait which instructs the internal thread loop to process Windows messages (more on that later).

Next, we wait for the task to terminate. We’ll only be waiting up to 30 seconds (a constant in code which could be easily changed into a caller-provided parameter). If task is not finished in 30 seconds, we’ll terminate it and return False. Otherwise, we’ll retrieve status code, text and page contents from the worker object via the Implementor trick. (A better way would be to write a IGpHttpRequest interface supporting StatusCode, StatusText and PageContents properties, but sometimes I’m just too lazy …)

function GpHttpRequest(const url, username, password, request, 
postData: string; var statusCode: integer; var statusText,
pageContents: string): boolean;
var
task : IOmniTaskControl;
worker: IOmniWorker;
begin
worker := TGpHttpRequest.Create(url, username, password, request, postData);
task := CreateTask(worker, 'GpHttpRequest').MsgWait.Run;
Result := task.WaitFor(CGpHttpRequestTimeout_sec * 1000);
if not Result then
task.Terminate
else begin
statusCode := TGpHttpRequest(worker.Implementor).StatusCode;
statusText := TGpHttpRequest(worker.Implementor).StatusText;
pageContents := TGpHttpRequest(worker.Implementor).PageContents;
end;
end; { GpHttpRequest }

Background Task

The real work is done in the background task [as expected, doh, why are you even mentioning that??]. In the Initialize method, to be more specific.

function TGpHttpRequest.Initialize: boolean;
begin
hrHttpClient := THttpCli.Create(nil);
try
hrHttpClient.NoCache := true;
hrHttpClient.RequestVer := '1.1';
hrHttpClient.URL := hrURL;
hrHttpClient.Username := hrUsername;
hrHttpClient.Password := hrPassword;
hrHttpClient.FollowRelocation := true;
if hrUsername <> '' then
hrHttpClient.ServerAuth := httpAuthBasic;
hrHttpClient.SendStream := TStringStream.Create(hrPostData);
hrHttpClient.RcvdStream := TStringStream.Create('');
hrHttpClient.OnRequestDone := HandleRequestDone;
if SameText(hrRequest, 'GET') then
hrHttpClient.GetASync
else if SameText(hrRequest, 'POST') then
hrHttpClient.PostASync
else
raise Exception.CreateFmt(
'TGpHttpRequest.Initialize: Unknown request type %s',
[hrRequest]);
Result := true;
except
on E:ESocketException do begin
hrStatusCode := -1;
hrStatusText := E.Message;
Result := false;
end;
end;
end; { TGpHttpRequest.Initialize }

The code first instantiates a THttpCli object. THttpCli is the ICS’s HTTP client class used to execute HTTP requests asynchronously or synchronously. Here I’m using asynchronous mode because a) THttpCli.GetSync doesn’t have configurable timeout and b) it calls Application.ProcessMessages, which is not a good idea if you want to run your requests from a background thread (which I did).

Next the code initializes bunch of THttpCli paramers and calls either GetASync or PostASync. If get or post cause an exception, it is remapped to status code and text and Initialize returns False. That signals the IOmniWorker main loop that it should not proceed with the task execution.

Now we have reached the asynchronous phase. ICS is working in the background and all other threads are just waiting for it to finish. Because ICS’s architecture is message based, somebody must process Windows messages which flow through the ICS itself. That’s why we had to use .MsgWait decorator when preparing the task.

Some time later (possibly before the timeout occurs), THttpCli will fully download the web page and call the HandleRequestDone event handler. Here we’ll just copy relevant data into task’s internal fields and then the code will tell the task (i.e. itself) to terminate.

procedure TGpHttpRequest.HandleRequestDone(sender: TObject; 
rqType: THttpRequest; error: word);
begin
if error <> 0 then begin
hrStatusCode := error;
hrStatusText := 'Socket error';
end
else begin
hrPageContents := TStringStream(hrHttpClient.RcvdStream).DataString;
hrStatusCode := hrHttpClient.StatusCode;
hrStatusText := hrHttpClient.ReasonPhrase;
end;
Task.Terminate;
end; { TGpHttpRequest.HandleRequestDone }

In the cleanup code (which is, same as Initialize, called automatically from the IOmniWorker main loop) we just have to tear down internal objects.

procedure TGpHttpRequest.Cleanup;
begin
if assigned(hrHttpClient) then begin
hrHttpClient.RcvdStream.Free;
hrHttpClient.SendStream.Free;
FreeAndNil(hrHttpClient);
end;
end; { TGpHttpRequest.Cleanup }

And that’s all, folks. It really is that simple.

MsgWait

In case you want to learn more about OTL internals, read ahead …

Let’s take a short look at the .MsgWait implementation. The function itself just sets two internal fields and returns the object itself so that we can chain another method to it.

function TOmniTaskControl.MsgWait(wakeMask: DWORD): IOmniTaskControl;
begin
Options := Options + [tcoMessageWait];
otcExecutor.WakeMask := wakeMask;
Result := Self;
end; { TOmniTaskControl.MsgWait }

The hard work is done in TOmniTaskExecutor.Asy_DispatchMessages. If the tcoMessageWait option is set, the MsgWaitForMultipleObjectsEx will also wait for Windows messages (in addition to everything else it does) because it will receive non-null waitWakeMask. When a message is detected, the code will call ProcessThreadMessages method which simply peeks and dispatches all Windows messages (and Delphi’s internal message dispatch mechanism takes care of all the rest).

if tcoMessageWait in Options then
waitWakeMask := WakeMask
else
waitWakeMask := 0;
//...
awaited := MsgWaitForMultipleObjectsEx(numWaitHandles, waitHandles,
cardinal(timeout_ms), waitWakeMask, flags);
//...
else if awaited = (WAIT_OBJECT_0 + numWaitHandles) then //message
ProcessThreadMessages

procedure TOmniTaskExecutor.ProcessThreadMessages;
var
msg: TMsg;
begin
while PeekMessage(Msg, 0, 0, 0, PM_REMOVE) and (Msg.Message <> WM_QUIT) do begin
TranslateMessage(Msg);
DispatchMessage(Msg);
end;
end; { TOmniTaskControl.ProcessThreadMessages }

The Asy_DispatchMessages is probably the most complicated part of the OTL (once you understand the lock-free structures inside the OtlContainers ;) Once you understand how it works you’ll be fully prepared to write custom thread loops in highly specialized threaded code. But don’t worry, you can use OTL even if you don’t understand the magic hidden inside.

Labels: , , , ,

Saturday, September 06, 2008

Interfacing with external programs

Say you have a program. A popular program (at least in some circles) that other people want to write add-ons for. Your program does some job and in some processing steps you want to be able to execute some external code and then proceed according to the result returned by that code. Similar to the way CGI code is executed when a HTTP server processes a HTTP request.

The other day I was trying to enumerate all possible ways to do that. I found following solutions:

  • Running external program. Data can be passed in program’s standard input and read from its standard output – just like when CGI program is launched from the HTTP server. Simple, stable (it is simple to protect against malfunctioning add-ons), but quite slow.
  • Third-party DLL. Relatively simple, very fast, but can seriously destabilize the whole product. Complicated to upgrade the DLL (must shut down main application to upgrade add-on).
  • [D]COM[+]. Not my bag of Swedish … sorry, wrong movie. Definitely not the way I’d like to pursue. Unstable. Leads to problems that nobody seems to be able to troubleshoot.
  • Windows messages. Messy. Plus the main program runs as a service while add-on maybe wouldn’t.
  • TCP. Implement add-on as a text-processing TCP/IP service (another HTTP server, if we continue the CGI analogy). Interesting idea, but not very simple to implement. Fast when both are running on the same machine. Flexible – each can be shutdown and upgraded independently; processing can be distributed over several computers. Complicated to configure when multiple add-ons are installed (each must be configured to a different port). Firewall and antivirus software may cause problems.
  • Drop folder. Main app drops a file into some folder and waits for the result file. Clumsy, possibly faster than the external program solution (add-on can be always running), simple to implement and very stable.
  • Message queues (as in the MSMQ). Interesting but possibly too complicated for most customers to install and manage.

And now to the main point of my ramblings. What did I miss? Are there more possibilities? If you have any idea how to approach my problem from a different direction, leave me a comment. Don’t mention SOAP, BTW, it is implicitly included in the “add-on as a TCP server” solution.

And thanks!

Labels: ,

Wednesday, September 03, 2008

OmniThreadLibrary patterns - How to (not) create a task

From the first public release two months ago (or is it closer to three already?), OmniThreadLibrary has been constantly changing. Only with the 1.0 release it has reached a somewhat stable state. That flux was good as it was only through the development of the OTL that I discovered how to do some things properly (maybe I was not so wrong in Why is software third time lucky?, after all). But it was also bad as some introductionary topics that I wrote don't accurately reflect the current state anymore. Yes, I know, I should start writing good tutorials. Not fun :( I like writing blog articles more. Today I'll focus on task creation.

OTL offers three different ways to create a task (and a fourth one will be introduced when Delphi 2009 is available). Those three ways are mapped to three overloads of the CreateTask method (OtlTaskControl.pas).

  function CreateTask(worker: TOmniTaskProcedure; 
const taskName: string = ''): IOmniTaskControl; overload;
function CreateTask(worker: TOmniTaskMethod;
const taskName: string = ''): IOmniTaskControl; overload;
function CreateTask(const worker: IOmniWorker;
const taskName: string = ''): IOmniTaskControl; overload;

The simplest possible way (demoed in test application 2_TwoWayHello; see OmniThreadLibrary Example #3: Bidirectional communication) is to pass a name of a global procedure to the CreateTask. This global procedure must consume one parameter of type IOmniTask (it was described in the OmniThreadLibrary internals - OtlTask article, which is still mostly valid).

CreateTask(RunHelloWorld, 'HelloWorld').Run;

procedure RunHelloWorld(const task: IOmniTask);
begin
end;

A variation on the theme is passing a name of a method to the CreateTask. This approach is used in the test application 1_HelloWorld. The interesting point to make here is that you can declare this method in the same class from which the CreateTask is called. That way you can access the class fields and methods from the threaded code. Just keep in mind that you'll be doing this from another thread so make sure you know what you're doing!

procedure TfrmTestHelloWorld.RunHelloWorld(const task: IOmniTask);
begin
end;

For all except the simplest tasks, you'll use the third approach, because it will give you access to the true OTL power (internal wait loop and message dispatching). To use it, you have to create a worker object, which implements the IOmniWorker interface. An example from the 5_TwoWayHello_without_loop demo:

procedure TfrmTestTwoWayHello.actStartHelloExecute(Sender: TObject);
var
worker: IOmniWorker;
begin
worker := TAsyncHello.Create;
FHelloTask :=
OmniEventMonitor1.Monitor(CreateTask(worker, 'Hello')).
SetTimer(1000, MSG_SEND_MESSAGE).
SetParameter('Delay', 1000).
SetParameter('Message', 'Hello').
Run;
end;

As you're exposing this worker via interface, Delphi will manage its lifetime. When the task is terminated, worker instance will be destroyed too.

Because of the same reason, there's no need to create the worker class in advance. From the demo 7_InitTest:

task := OmniEventMonitor1.Monitor(CreateTask(TInitTest.Create(success), 'InitTest'))
.SetPriority(tpIdle).Run;

There's a catch, though. Because the CreateTask expects an interface, you must never pass to it a variable or field that is declared as a object. This will not work:

procedure TfrmTestTwoWayHello.actStartHelloExecute(Sender: TObject);
var
worker: TAsyncHello;
begin
worker := TAsyncHello.Create;
FHelloTask :=
OmniEventMonitor1.Monitor(CreateTask(worker, 'Hello')).
SetTimer(1000, MSG_SEND_MESSAGE).
SetParameter('Delay', 1000).
SetParameter('Message', 'Hello').
Run;
end;

If you need to store some information inside the worker during its execution and access it from the owner, you can use the Implementor property, which will "convert" IOmniTask interface back to the implementing object (which you must then cast to your worker class). From the demo 6_TwoWayHello_with_object_worker:

procedure TfrmTestTwoWayHello.actStopHelloExecute(Sender: TObject);
begin
FHelloTask.Terminate;
FHelloTask := nil;
lbLog.ItemIndex := lbLog.Items.Add(Format('%d Hello World''s sent',
[TAsyncHello(FWorker.Implementor).Count]));
end;

BTW, don't just set task reference (FHelloTask in the example above) to nil when you want to terminate the task. You have to call Terminate or use some other way to notify the task that it must terminate.

BTW2, if you use a parameterless constructor inside the CreateTask, you have to put empty parenthesis after the .Create or Delphi would not compile your code. I'm not really sure, why. The following segment is from test 14_TerminateWhen:

Log(Format('Task started: %d',
[CreateTask(TMyWorker.Create())
.TerminateWhen(FTerminate).WithCounter(FCounter)
.MonitorWith(OmniTED).Run.UniqueID]));

That completes today's tour.

In the introduction I mentioned the fourth CreateTask overload that will be introduced when Delphi 2009 is ready. OmniThreadLibrary will support anonymous methods so you'll be able to do something like that:

CreateTask(procedure begin MessageBeep(MB_ICONEXCLAMATION); end);

Labels: , , ,

Tuesday, September 02, 2008

OmniThreadLibrary 1.0a released

This is basically a bugfix release. There was a problem with the .MsgWait decorator which I fixed in the repository three days ago. As I want to write about this function soon, I thought it would be nice to make a release first ...

Other changes:

  • TGp4AlignedInt from GpStuff.pas is supposedly D2006 compatible (says a reader) so it has been enabled on that plaform. As a result, it is now possible to compiled and use OTL in D2006 (said the same source).
  • SpinLock.pas has been updated.
  • Test 6 has been changed to show why one would want to use IOmniWorker.Implementor function. I'll write more about that very soon.
  • I've included parts of the FastMM4 package into the repository to simplify debugging. FastMM4 was created by Pierre le Riche and is not covered by the OmniThreadLibrary license. It is released under a dual licensed and you can use it either under the MPL 1.1 or LGPL 2.1. More details in the included readme file and on the FastMM4 home page.

Labels: , , ,

Monday, September 01, 2008

Introduction to enumerators

The Blaise Pascal magazine #3 is out. Lots of content there, including my Introduction to enumerators article (first page).

Labels: , ,

Wednesday, August 27, 2008

OmniThreadLibrary 1.0, day 1

First 24 hours totally exceeded my expectations.

day 1 downloads

Now start using it! Big Grin

Labels:

Tuesday, August 26, 2008

OmniThreadLibrary 1.0 released

After 164 repository commits and two months of development, OmniThreadLibrary 1.0 was released today.

There were little changes since the 1.0 beta, mostly regarding the TOmniValue record, which can now seamlessly hold Double, Extended and Boolean values.

OmniThreadLibrary 1.0 is available via SVN (http://omnithreadlibrary.googlecode.com/svn/tags/release-1.0) or as a ZIP archive.

If you like OmniThreadLibrary, spread the word around. After all, not every Delphi developer is reading my blog (yet ;). Feel free to include any parts of the marketing blurb (below).

What is OmniThreadLibrary?

OmniThreadLibrary is simple to use threading library for Delphi. It's main "selling" points (besides the price, of course ;) are power, simplicity, and openess. With just few lines of code, you can set up multiple threads, send messages between them, process Windows messages and more. OmniThreadLibrary doesn't limit you in any way - if it is not powerfull enough for you, you can ignore any part of its "smartness" and replace it with your own code.

OmniThreadLibrary is an open source project. It lives in the Google Code and is licensed under the BSD license.

At the moment, OmniThreadLibrary supports Delphi 2007 on the Win32 platform. It will support Delphi 2009 as soon as it's available. Currently, there are no plans to support older Delphi compilers and .NET.

Where can I get more imformation?

Home page: http://otl.17slon.com/

Web discussion forum: http://otl.17slon.com/forum/

Downloads: http://code.google.com/p/omnithreadlibrary/downloads/list

Issue tracker: http://code.google.com/p/omnithreadlibrary/issues/list

SVN checkout instructions: http://code.google.com/p/omnithreadlibrary/source/checkout

Author's blog: http://thedelphigeek.com

Author's home page: http://gp.17slon.com

Labels: , , ,

Friday, August 22, 2008

OmniThreadLibrary 1.0 beta

Hello, my dear readers. Two weeks I spent there

Ugljan

but sadly I'm back now. Crying

My bad, your win - today I made last scheduled modification to the OTL sources and published version 1.0 beta. [snapshot, repository]

Underlying message format is no more based on Variants. TOmniValue is now a smart record with some implicit operators. In some cases that would cause more typing (using .AsInteger and similar) but in some other cases (when sending interfaces and objects) TOmniValue is now much simpler to use. Even more, it is slightly smaller and slightly faster.

There were also some changes in the TOmniEventMonitor component which now uses hash to access monitored tasks/pools. The old version was implemented with TInterfaceList and that could get quite slow if working with large number of tasks.

[BTW, I checked the code with Tiburon and it's working without a glitch.]

I've also started to work on the support infrastructure, namely the forum. Home page, FAQ and documentation will also appear - all in due time.

Labels: , , , , ,

Friday, August 01, 2008

The Delphi Geek goes to standby mode

and so does OmniThreadLibrary. It's bicycle and swimming time.

Comments will stay in limbo as I won't be moderating them. Feel free to post them, though - I'll process everything as soon as I come back.

Sunday, July 27, 2008

OmniThreadLibrary alpha

WiP OmniThreadLibrary is slowly progressing to the alpha stage. Most of the interfaces have been defined and most of the functionality that I want to see in the 1.0 release is present and tested. I have not yet decided what to do with the messaging (current Variant-based solution can be rough on the edges sometimes) but rest of the stuff is mostly completed.

Most important changes since the last release:

  • First of all, big thanks to fellow Slovenian programmer Lee_Nover (the guy who wrote the SpinLock unit, BTW) who did a major cleanup job of the OTL code:
    • Added const prefix to parameters on many places (mostly when interfaces were passed - I tend to miss that). OTL should be slightly faster because of that.
    • Removed CreateTask(TOmniWorker) as CreateTask(IOmniWorker) is good enough for all occasions.
    • Cleaned the tests\* tree and fixed tests to compile with new Otl* units.
    • Removed IOmniTaskControl.FreeOnTerminate as it's no longer needed.
  • Created project group containing all test projects. To make it work, all projects in the tests tree were renamed.
  • *** IMPORTANT *** OtlTaskEvents unit was renamed to OtlEventMonitor and TOtlTaskEventDispatcher component was renamed to TOmniEventMonitor. Recompile the package!
  • *** IMPORTANT *** If you're working with a snapshot, you should delete the OmniThreadLibrary folder before unpacking new version as many files were renamed since the last snapshot!
  • Task UniqueID is now int64 (was integer). Somehow I wasn't happy with the fact that unique ID would roll over after merely 4,3 billion tasks ...
  • Thread priority can be controlled with IOmniTaskControl.SetPriority.
  • Exceptions are trapped and mapped into EXIT_EXCEPTION predefined exit code (OtlCommon.pas). Test in 13_Exceptions.
  • IOmniTaskControl.WithLock provides a simple way to share one lock (critical section, spinlock) between task owner and task worker. Lock can be accessed via the Lock property, implemented in IOmniTaskControl and IOmniTask interfaces. See the 12_Lock test for the example.
  • Eye candy - instead of calling omniTaskEventDispatch.Monitor(task) you can now use task.MonitorWith(omniTaskEventDispatch). The old way is still available. An example from the (new) 8_RegisterComm test:
procedure TfrmTestOtlComm.FormCreate(Sender: TObject);
begin
FCommChannel := CreateTwoWayChannel(1024);
FClient1 := CreateTask(TCommTester.Create(FCommChannel.Endpoint1, 1024))
.MonitorWith(OmniTED)
.Run;
FClient2 := CreateTask(TCommTester.Create(FCommChannel.Endpoint2, 1024))
.MonitorWith(OmniTED)
.Run;
end;
  • IOmniTaskControl.TerminateWhen enables you to add additional termination event to the task. One event can be shared between multiple tasks, allowing you to stop them all at once. Example in the 14_TerminateWhen.
  • Basic support for task groups has been added. At the moment you can start/stop multiple tasks and not much more. To add a task to a group, use group.Add(task) or task.Join(group) syntax. Demo in 15_TaskGroup.
IOmniTaskGroup = interface ['{B36C08B4-0F71-422C-8613-63C4D04676B7}']
function Remove(taskControl: IOmniTaskControl): IOmniTaskGroup;
function Add(taskControl: IOmniTaskControl): IOmniTaskGroup;
function RunAll: IOmniTaskGroup;
function TerminateAll(maxWait_ms: cardinal = INFINITE): boolean;
function WaitForAll(maxWait_ms: cardinal = INFINITE): boolean;
end; { IOmniTaskGroup }
  • Added support for task chaining. When one task is completed, next task is started. An excerpt from the 16_ChainTo demo:
procedure TfrmTestTaskGroup.btnStartTasksClick(Sender: TObject);
var
task1: IOmniTaskControl;
task2: IOmniTaskControl;
task3: IOmniTaskControl;
taskA: IOmniTaskControl;
begin
lbLog.Clear;
task3 := CreateTask(BgTask, '3').MonitorWith(OmniTED);
task2 := CreateTask(BgTask, '2').MonitorWith(OmniTED).ChainTo(task3);
task1 := CreateTask(BgTask, '1').MonitorWith(OmniTED).ChainTo(task2);
taskA := CreateTask(BgTask, 'A').MonitorWith(OmniTED).ChainTo(
CreateTask(BgTask, 'B').MonitorWith(OmniTED).ChainTo(
CreateTask(BgTask, 'C').MonitorWith(OmniTED)));
task1.Run; taskA.Run;
end;
  • A thread pool has been implemented.  Demo in 11_ThreadPool. I'll write a longer article on using it, but for now the demo should do.

As usual, code is available in the repository and as a snapshot.

Labels: , , , , ,

OmniThreadLibrary internals - OtlContainers

WiP

The time has come to bring the OtlContainers subthread to the end. In previous parts (1, 2, 3, 4) I obsessively described underlying lock-free structures but hadn't said a word about the higher-level classes that are actually used for message transfer inside the OmniThreadLibrary.

Let's take a quick look at all OtlContainers classes. TOmniBaseContainer is an abstract parent of the whole container hierarchy. It's basic function is to provide memory allocation and lock-free algorithms for all descendants. Most of the code I already described in previous installments and the rest is trivial.

  TOmniBaseContainer = class abstract(TInterfacedObject)
strict protected
obcBuffer : pointer;
obcElementSize : integer;
obcNumElements : integer;
obcPublicChain : TOmniHeadAndSpin;
obcRecycleChain: TOmniHeadAndSpin;
class function InvertOrder(chainHead: POmniLinkedData): POmniLinkedData; static;
class function PopLink(var chain: TOmniHeadAndSpin): POmniLinkedData; static;
class procedure PushLink(const link: POmniLinkedData; var chain: TOmniHeadAndSpin); static;
class function UnlinkAll(var chain: TOmniHeadAndSpin): POmniLinkedData; static;
public
destructor Destroy; override;
procedure Empty; virtual;
procedure Initialize(numElements, elementSize: integer); virtual;
function IsEmpty: boolean; virtual;
function IsFull: boolean; virtual;
property ElementSize: integer read obcElementSize;
property NumElements: integer read obcNumElements;
end; { TOmniBaseContainer }

Then there is the base stack implementation, which only adds Push and Pop to the base container. We've seen those two already.

  TOmniBaseStack = class(TOmniBaseContainer)
public
function Pop(var value): boolean; virtual;
function Push(const value): boolean; virtual;
end; { TOmniBaseStack }

The queue is only slightly more complex. It provides Enqueue and Dequeue, which we already know, and overrides Empty and IsEmpty to take the dequeued messages chain into account.

  TOmniBaseQueue = class(TOmniBaseContainer)
strict protected
obqDequeuedMessages: TOmniHeadAndSpin;
public
constructor Create;
function Dequeue(var value): boolean; virtual;
procedure Empty; override;
function Enqueue(const value): boolean; virtual;
function IsEmpty: boolean; override;
end; { TOmniBaseQueue }

Now we come to the interesting part. Basic stack and queue operations are also described with interfaces IOmniStack and IOmniQueue - just in case you'd like to use them in your programs. OTL uses class representation of the queue directly.

  IOmniStack = interface ['{F4C57327-18A0-44D6-B95D-2D51A0EF32B4}']
procedure Empty;
procedure Initialize(numElements, elementSize: integer);
function Pop(var value): boolean;
function Push(const value): boolean;
function IsEmpty: boolean;
function IsFull: boolean;
end; { IOmniStack }

IOmniQueue = interface ['{AE6454A2-CDB4-43EE-9F1B-5A7307593EE9}']
procedure Empty;
procedure Initialize(numElements, elementSize: integer);
function Enqueue(const value): boolean;
function Dequeue(var value): boolean;
function IsEmpty: boolean;
function IsFull: boolean;
end; { IOmniQueue }

Actual implementation of the higher-level structures is exposed with classes TOmniStack and TOmniQueue. Besides implementing IOmniStack and IOmniQueue (respectively), they both implement IOmniNotifySupport and IOmniMonitorSupport.

  TOmniStack = class(TOmniBaseStack, IOmniStack, IOmniNotifySupport, IOmniMonitorSupport)
TOmniQueue = class(TOmniBaseQueue, IOmniQueue, IOmniNotifySupport, IOmniMonitorSupport)

IOmniMonitorSupport is describing a container with monitoring support (notifies attached monitor whenever new message is sent). IOmniNotifySupport is describing a container that notifies interested parties (by setting an event) that new message has been sent. Both are used in the OTL - notification support is needed to implement IOmniCommunicationEndpoint.NewMessageEvent in the OtlComm and monitoring support is needed to support IOmniTaskControl.MonitorWith in the OtlTaskControl unit.

  IOmniMonitorSupport = interface ['{6D5F1191-9E4A-4DD5-99D8-694C95B0DE90}']
function GetMonitor: IOmniMonitorParams;
//
procedure Notify;
procedure RemoveMonitor;
procedure SetMonitor(monitor: IOmniMonitorParams);
property Monitor: IOmniMonitorParams read GetMonitor;
end; { IOmniMonitorSupport }

IOmniNotifySupport = interface ['{E5FFC739-669A-4931-B0DC-C5005A94A08B}']
function GetNewDataEvent: THandle;
//
procedure Signal;
property NewDataEvent: THandle read GetNewDataEvent;
end; { IOmniNotifySupport }

The following excerpt from the TOmniQueue code demonstrates the implementation of those interfaces. TOmniStack is implemented in a similar manner.

The constructor takes the options parameter when you can enable monitoring and/or notification support.

type
TOmniContainerOption = (coEnableMonitor, coEnableNotify);
TOmniContainerOptions = set of TOmniContainerOption;

constructor TOmniQueue.Create(numElements, elementSize: integer;
options: TOmniContainerOptions);
begin
inherited Create;
Initialize(numElements, elementSize);
orbOptions := options;
if coEnableMonitor in Options then
orbMonitorSupport := TOmniMonitorSupport.Create;
if coEnableNotify in Options then
orbNotifySupport := TOmniNotifySupport.Create;
end; { TOmniQueue.Create }

function TOmniQueue.Dequeue(var value): boolean;
begin
Result := inherited Dequeue(value);
if Result then
if coEnableNotify in Options then
orbNotifySupport.Signal;
end; { TOmniQueue.Dequeue }

function TOmniQueue.Enqueue(const value): boolean;
begin
Result := inherited Enqueue(value);
if Result then begin
if coEnableNotify in Options then
orbNotifySupport.Signal;
if coEnableMonitor in Options then
orbMonitorSupport.Notify;
end;
end; { TOmniQueue.Enqueue }

Enqueue calls original enqueueing code and then signals notification (if enabled) and notifies the monitor (if enabled). Dequeue is similar, except that it only triggers notification so that the reader rechecks the input queue.

Stop. Enough words have been said about the OtlContainer. Next time, I'll discuss something completely different.

Labels: , , , , ,

Thursday, July 24, 2008

A lock-free queue, finally!

WiPIt was a long and windy road (1, 2, 3) but finally it brought us to the really interesting part - lock-free queue. Admit it, stack is fine but it is not really suitable for sending data from point A to point B. When forwarding data inside the application we typically want it to arrive in the same order as it was transmitted and that's what queues are for.

Still, there was a good reason for that lengthy introduction. The lock-free stack we all know and love by now ;) is basis for the lock-free queue. Even more, most of the stack code is reused and queue's Enqueue/Dequeue methods are incredibly similar to stack's Push/Pop.

Let's take a look at the method that inserts new element at the end of the queue. The Enqueue code is not just similar to Push. They are identical. [Compare: Push]

function TOmniBaseQueue.Enqueue(const value): boolean;
var
linkedData: POmniLinkedData;
begin
linkedData := PopLink(obcRecycleChain);
Result := assigned(linkedData);
if not Result then
Exit;
Move(value, linkedData.Data, ElementSize);
PushLink(linkedData, obcPublicChain);
end; { TOmniQueue.Enqueue }

OK, so enqueueing an item is identical to pushing it. How do we remove items from the head of the queue, then? Dequeue must return not the top element of the stack but the bottom one - that's the element that was enqueued (pushed) first.

We could traverse the stack and somehow remove the last element but removing elements from a lock-free structure is very problematic, as you've already seen. [Where was the problem in TOmniBaseStack? In PopLink. There you go!] Removing elements from the queue is even harder (if not impossible) so let's try another approach.

function TOmniBaseQueue.Dequeue(var value): boolean;
var
linkedData: POmniLinkedData;
begin
if obqDequeuedMessages.Head = nil then
obqDequeuedMessages.Head := InvertOrder(UnlinkAll(obcPublicChain));
linkedData := PopLink(obqDequeuedMessages);
Result := assigned(linkedData);
if not Result then
Exit;
Move(linkedData.Data, value, ElementSize);
PushLink(linkedData, obcRecycleChain);
end; { TOmniQueue.Dequeue }

If you compare it with the Pop method, you'll see that they are very similar. Dequeue does some magic in first two lines, uses a different chain header in the PopLink, but from that point onward they are identical.

function TOmniBaseStack.Pop(var value): boolean;
var
linkedData: POmniLinkedData;
begin
linkedData := PopLink(obcPublicChain);
Result := assigned(linkedData);
if not Result then
Exit;
Move(linkedData.Data, value, ElementSize);
PushLink(linkedData, obcRecycleChain);
end; { TOmniBaseStack.Pop }

The big question is, what happens in InvertOrder(UnlinkAll(obcPublicChain)) to make the old Pop code work as a queue? Well, let's draw some pretty pictures. I just luuuv pretty pictures.

Let's say we have a queue with two elements. Element 1 was enqueued first, followed by element 2. Our queue behaves like a stack when enqueueing so it's no wonder that I could just copy old picture from the stack introduction article to present this state.

queue with two elements

We have two empty nodes in the recycle chain and two allocated nodes in the public chain. Public chain header points to element 2, which points to element 1, which points to nil.

Dequeue is called next and it executes UnlinkAll. This method merely removes all nodes from the chain that was passed as an argument. In our case, all nodes are removed from the public chain. This is done atomically. [How? I'll show you later.]

after UnlinkAll

Now we have public chain header that points to an empty chain (nil, in other words) and unnamed pointer that points to the top element in the stack. This unnamed pointer is returned as the UnlinkAll result.

InvertOrder is called next. It walks over the chain that was passed as an argument and reverses each and every pointer. This is done in a simple, non-atomical way, as the chain was already removed from the shared storage so concurrency is not a problem.

after InvertOrder

InvertOrder returns a pointer to the (new) chain head. In our case, this pointer points to node 1, which points to node 2, which points to nil. Result of the InvertOrder call is stored into the class field obqDequeuedMessages (also known as dequeue chain header).

The rest of the Dequeue method simply treats obqDeqeuedMessages as a stack chain and pops top element from it. As the chain was reversed, the top element is the one we need - the one that was enqueued first. The atomic version of PopLink is used for simplicity, but a simpler code could be used instead as there could be no inter-thread conflicts.

after Dequeue

On the return from the Dequeue, dequeued chain points to node 2, recycle chain points to node 1 (its data was copied from the node to the Dequeue's value parameters and node was recycled) and public chain is nil.

Let's assume that a writer enqueues data 3 into the queue. As we've already seen, this is a normal Push operation and we already know how it works.

Enqueue after Dequeue

Dequeued chain still points to node 2, public chain points to former node 1 (which now holds the value 3) and recycle chain points to the first empty node.

Reader now executes Dequeue again. As the dequeued chain is not nil, UnlinkAll and InvertAll are not called. A node is simply popped from the dequeued chain.

after second Dequeue

Dequeued chain is now nil. Next call to Dequeue will call UnlinkAll to remove public chain, InvertOrder to invert it, result will be stored in the dequeued chain and dequeue operation will continue in an already known fashion.

There is one important consequence of this approach. The lock-free queue (at least our implementation) can only have one reader. The problem lies in this non-atomic fragment:

  if obqDequeuedMessages.Head = nil then
obqDequeuedMessages.Head := InvertOrder(UnlinkAll(obcPublicChain));

If there were two readers, following could happen:

  • writer pushes one element into the queue
  • reader 1 starts the Dequeue, sees that Head is nil and is stopped just before it calls UnlinkAll
  • reader 2 starts the Dequeue, sees that Head is nil and proceeds with InvertOrder(UnlinkAll())
  • reader 2 updates the Head field of the dequeued chain
  • reader 1 continues its execution and passes empty public chain into UnlinkAll, which passes nil to InvertOrder, which returns nil and stores it into the Head parameter
  • the element that was dequeued by the reader 2 is lost

Multiple writers are supported, though, just like in the lock-free stack.

Let's take a quick look at the assembler code. UnlinkAll  is quite simple. First it clears the ecx register (sets up the nil pointer), loads chain.Head into eax and atomically swaps chain.Head with ecx (i.e., with nil). Old chain.Head is returned in the Result (eax), which is exactly what we want.

class function TOmniBaseContainer.UnlinkAll(var chain: TOmniHeadAndSpin): POmniLinkedData;
asm
xor ecx, ecx
mov edx, eax
mov eax, [edx]
@Spin:
lock cmpxchg [edx], ecx //Cut Chain.Head
jnz @Spin
end; { TOmniQueue.UnlinkAll }

InvertOrder is slightly longer but much simpler as there are no lock operations involved. Code merely walks over the pointer chain and inverts it.

class function TOmniBaseContainer.InvertOrder(chainHead: POmniLinkedData): POmniLinkedData;
asm
test eax, eax
jz @Exit
xor ecx, ecx
@Walk:
xchg [eax], ecx //Turn links
and ecx, ecx
jz @Exit
xchg [ecx], eax
and eax, eax
jnz @Walk
mov eax, ecx
@Exit:
end; { TOmniBaseStack.InvertOrder }

That's all folks. You now know how both lock-free OTL structures work and what are their limitations. Now I only have one more article on OtlContainers to write, one that will explain higher-level TOmniStack and TOmniQueue.

Labels: , , , , ,

Wednesday, July 23, 2008

A working lock-free stack implementation

WiP

This is the third post in the mini-series on a lock-free stack, included in the OmniThreadLibrary. You've learned how the lock-free stack was implemented in the first place and which problem was hidden in that implementation. I've also mentioned that the problem was fixed in the meantime and that the working implementation is already available in the repository and as a snapshot, but I haven't said a word about the new implementation. This will now be rectified.

In the initial implementation lied an excellent example of the ABA problem.  You can read more about it in the Wikipedia, where you'll also learn that ABA problem is commonly solved by adding ‘tag’ bits. That's exactly what was done in the OTL, except that we didn't use just few measly bits, but a 4-byte counter.

Instead of containing only a pointer to the first node, each chain head is now composed of two parts — a pointer to the first node and a 4-byte spin count. This second part will make sure that the ABA problem cannot occur.

type
TOmniHeadAndSpin = packed record
Head: POmniLinkedData;
Spin: cardinal;
end;{ TOmniHeadAndSpin }

As the problem lied in the PopLink method, we'll start today's exploration just there. This is the new version in all its glory. (Of course, assembler is still provided by the ‘GJ’, I had nothing to do with it.)

class function TOmniBaseContainer.PopLink(var chain: TOmniHeadAndSpin): POmniLinkedData;
asm
push edi
push ebx
mov edi, eax //edi = @chain
@Spin:
mov ecx, 1 //Increment spin reference for 1
lock xadd [edi + 4], ecx //Get old spin reference to ecx
mov eax, [edi] //eax := chain.Head
mov edx, [edi +4] //edx := chain.Spin
test eax, eax
jz @Exit //Is Empty?
inc ecx //Now we are ready to cmpxchg8b
cmp edx, ecx //Is reference the some?
jnz @Spin
mov ebx, [eax] //ebx := Result.Next
lock cmpxchg8b [edi] //Now try to xchg
jnz @Spin //Do spin ???
@Exit:
pop ebx
pop edi
end;

You may have noticed that this is now a class function (a class static function, to be more precise). That made more sense as it needs no information from the class — it only works on a chain, which is passed as a parameter. Also, the code is much longer and not very easy to understand, so I'll try the trick that helped in the previous installment. I'll write an approximation of the PopLink in Delphi.

class function TOmniBaseContainer.PopLink(var chain: TOmniHeadAndSpin): POmniLinkedData;
label
Spin;
var
chainSpin: cardinal;
nextNode : POmniLinkedData;
spinRef : cardinal;
begin
// push edi
// push ebx
// mov edi, eax //edi = @chain
Spin:
// mov ecx, 1 //Increment spin reference for 1
// lock xadd [edi + 4], ecx //Get old spin reference to ecx
{.$ATOMIC}
spinRef := chain.Spin;
chain.Spin := chain.Spin + 1;
{.$END ATOMIC}
// mov eax, [edi] //eax := chain.Head
// mov edx, [edi +4] //edx := chain.Spin
{.$ATOMIC}
Result := chain.Head;
chainSpin := chain.Spin;
// test eax, eax
// jz @Exit //Is Empty?
if not assigned(Result) then
Exit;
// inc ecx //Now we are ready to cmpxchg8b
Inc(spinRef);
// cmp edx, ecx //Is reference the some?
// jnz @Spin
if spinRef <> chainSpin then
goto Spin;
// mov ebx, [eax] //ebx := Result.Next
// nextNode = ebx
nextNode := Result.Next;
// lock cmpxchg8b [edi] //Now try to xchg
{.$ATOMIC}
if (Result = chain.Head) and (chainSpin = chain.Spin) then begin
chain.Head := nextNode;
chain.Spin := spinRef;
end
else begin
chainSpin := chain.Spin;
Result := chain.Head;
{.$END ATOMIC}
// jnz @Spin //Do spin ???
goto Spin;
end;
//@Exit:
// pop ebx
// pop edi
end;

In the beginning, a little housekeeping is done (two registers stored away because Delphi compiler expects us not to modify them) and address of the chain record is loaded into the edi register.

With a little help of locked xadd, current version of the header spin value is loaded into the spinRef (ecx register) and is incremented in the header.

//  mov   ecx, 1                            //Increment spin reference for 1
// lock xadd [edi + 4], ecx //Get old spin reference to ecx
{.$ATOMIC}
spinRef := chain.Spin;
chain.Spin := chain.Spin + 1;
{.$END ATOMIC}

Xadd instructions stores first operand ([edi + 4], which equals to chain.Spin) into the second operand (ecx) and sum of both operands (original value of the second operand is used for summation) into the first operand.

Then we fetch current chain.Head and chain.Spin and store them into the Result (eax) and chainSpin (edx), respectively. This is a non-atomic fetch and doesn't guarantee that both values really belong to the same state of the chain record. Another thread may have been invoked between those two movs and it could call PopLink on the same chain and modify the header.

//  mov   eax, [edi]                        //eax := chain.Head
// mov edx, [edi +4] //edx := chain.Spin
Result := chain.Head;
chainSpin := chain.Spin;
// test eax, eax
// jz @Exit //Is Empty?
if not assigned(Result) then
Exit;

Because of the same reason there is no guarantee  that chainSpin is indeed equal to spinRef+1 so this is now checked for.

//  inc   ecx                               //Now we are ready to cmpxchg8b
Inc(spinRef);
// cmp edx, ecx //Is reference the some?
// jnz @Spin
if spinRef <> chainSpin then
goto Spin;

Now we're getting ready for the pointer swap. First we need an address of the second node in the ebx register.

//  mov   ebx, [eax]                        //ebx := Result.Next
nextNode := Result.Next;

Finally, we can use cmpxchg8b instruction to do the swap. Cmpxchg8b is a very special beast. It is similar to the cmpxchg, but it compares and swaps full 8 bytes in one go!

//  lock cmpxchg8b [edi]                    //Now try to xchg
{.$ATOMIC}
if (Result = chain.Head) and (chainSpin = chain.Spin) then begin
chain.Head := nextNode;
chain.Spin := spinRef;
end
else begin
chainSpin := chain.Spin;
Result := chain.Head;
{.$END ATOMIC}
// jnz @Spin //Do spin ???
goto Spin;
end;

Cmpxchg8b compares combination of edx and eax registers with its operand [edi]. In our case, edi is pointing to the chain (8 byte value in which first 4 bytes are a node pointer and second 4 a spin count), and eax and edx were loaded from the same structure just few instructions back. In other words, just like in the initial implementation we're checking if our internal values are still relevant. If that is not true, cmpxchg8b reloads edx and eax from the [edi] and the code then jumps back to the Spin label.

If values match, registers ecx and ebx are copied into the operand [edi]. Here, value in the ecx register is equal to the edx (we just verified that) and ebx contains the pointer to the next node. In other words, chain head is modified to point to the second node in chain.

There's not much difference between this implementation and the original one. The biggest and most important change is introduction of spin counter, which makes sure that the ABA problem, mentioned in the previous post, cannot occur.

[To be completely frank, ABA situation can still occur, at least in theory. To experience a problem with the new code, one thread would have to stop just before cmpxchg8b and wait there for a very long time. So long, in fact, that other threads would be able to wrap the spin counter from $FFFFFFFF to 0 and back to the original value. A little less than 4,3 billion PushLinks would have to be called. If the original thread is then awaken — and the spin counter contains the exact same value as it had before the thread was paused — the ABA problem would occur. That will never happen in practice, I'm totally sure.]

Besides the PopLink and introduction of spin counters, nothing much has changed. The PushLink method was slightly modified, as it was also changed from a normal method to a class static one, but it still works exactly as before.

class procedure TOmniBaseContainer.PushLink(const link: POmniLinkedData;
var chain: TOmniHeadAndSpin);
asm
mov ecx, eax
mov eax, [edx] //edx = chain.Head
@Spin:
mov [ecx], eax //link.Next := chain.Head
lock cmpxchg [edx], ecx //chain.Head := link
jnz @Spin
end; { TOmniBaseStack.PushLink }

That's it. This version of the lock-free stack has successfully passed 4-hour stress test, which makes me believe that it really works. Winking

Next time on the agenda: How the lock-free queue is made. This time I'll really blog about it. Nothing more will distract me. Promise!

Labels: , , , , ,

Monday, July 21, 2008

TDM Rerun #10: Synchronisation Toolkit Revisited

The plan for this article was to present an implementation of a shared memory pool: a mechanism that allows multiple data producers to send data to one data manipulator. In fact, I already had all the code and half the article completed when I found out that my solution doesn't always work. Everything was OK when both ends of the pool (producer and manipulator) were implemented in the same application or in two 'normal' applications. But if I tried to change one of those apps into an NT service, the shared pool stopped working.

- Synchronisation Toolkit Revisited, The Delphi Magazine 91, March 2003

My 10th TDM article returned to my pet theme - synchronisation and communication. First part described some aspects of the NT security model and second implemented shared memory-based queue, capable of transporting messages from a GUI application to a service and back.

It looks like I'll have to return to that topic again - I got some reports recently that my approach doesn't (always?) work on Vista :(

Links: article (PDF, 54KB), source code (ZIP, 2.2 MB), current GpSync unit

Labels: , ,

Lock-free stack problem

WiPIn my last treatise on OmniThreadLibrary internals you could read about the lock-free stack structure, included in the OmniThreadLibrary. The code was relatively simple to understand (after studying it for hours ;) but alas, it didn't work. The problem only appeared when multiple readers or multiple writers were operating on the same structure and that's why the initial tests which used only one reader and one writer failed to detect it.

I had no idea what could go wrong (at the moment I thought I understand the lock-free stuff in the OtlContainters) but luckily the author of the stack code was smarter and located the problem. He explained it to me so I can explain it to you ...

Remember the PopLink code from the previous article? Probably not, so I'll reprint it here in a slightly modified form. [For the original code and comments, see the Implementing lock-free stack article.]

function TOmniBaseContainer.PopLink(var chainHead: POmniLinkedData): POmniLinkedData;
label
Spin;
var
second: POmniLinkedData;
begin
// mov eax, [edx]
Result := chainHead^;
Spin:
// test eax, eax
// jz @Exit
if not assigned(Result) then
Exit;
// mov ecx, [eax]
second := Result.Next;
// lock cmpxchg [edx], ecx
{.$ATOMIC}
if Result = chainHead^ then
chainHead := second
{.$END ATOMIC}
else
// jnz @Spin
goto Spin
end; { TOmniBaseContainer.PopLink }

I must emphasize again that this is not a working code. It might even compile (yes, Pascal/Delphi does have labels and goto!) but it surely won't work as the part between {.$ATOMIC} and {.$END ATOMIC} won't be executed atomically. Still, this is a nice approximation that allows us to understand the problem that also occurs with the assembler version.

The code is (now that it's coded in Delphi) pretty simple: it accesses first element in the chain, follows its Next pointer to the second element, and then atomically checks whether the chain head still points to the first element and if that is true, changes the chain head to point to the second element.

What can go wrong there? After all, we're checking the chain head and making the switch atomically! When we have only one reader and one writer, nothing much. Problems occur when there are multiple readers or multiple writers working at the same time.

The single point of failure is the lock cmpxchg instruction (the atomic part in Delphi code). There is a problem with the value that is stored in the chain head - when lock cmpxchg is executed, we cannot be sure that the ecx register (or the second variable) still points to a node that is indeed second in the chain.

Let's examine one of many possible failure scenarios:

  • A thread executes the code above up to the atomic part. Immediately after the second := first.Next is executed, thread is stopped and another thread gets the CPU slice.

At that point we have following data structure: chainHead -> first -> second -> ...

  • Another thread executes the same PopLink code and pops node first from the chain.

The data structure now looks like this: chainHead -> second -> ...

  • Yet another thread pushes a completely different node (let's call it newnode) into the chain.

chainHead -> newnode -> second -> ...

  • The first node is pushed back into the chain.

chainHead -> first -> newnode -> second -> ...

  • Now the first thread continues execution with the lock cmpxchg. The condition still holds - chainHead indeed points to the first node. Therefore, chainHead is modified to point to the second node. But this is not the node chainHead should be pointing to! It should point to the newnode node!

And so everything falls apart. Sad

Can the code be fixed? Yes, and the fix is already in the repository and is included in the latest snapshot. Next time I'll explain the new approach (and maybe even the operation of the lock-free queue). Stay tuned!

Labels: , , , , ,

Thursday, July 17, 2008

OmniThreadLibrary progress report

WiPJust a short notice on what we're working on. I'm too tired to write a coherent article, longer than 100 words.

  • Lock-free structures are now really, really, really working.
  • Stress tests in tests\10_Containers have been significantly enhanced.
  • New Counter support.

The last item is really interesting and deserves a longer post. In short, it allows you to do:

counter := CreateCounter(2);

CreateTask(worker).WithCounter(counter).Run;

CreateTask(worker).WithCounter(counter).Run;

In worker code:

if Task.Counter.Decrement = 0 then

  Task.Comm.Send(MSG_ALL_DONE);

Decrement is interlocked, of course.

 

All that available in the repository and as a snapshot.

Labels: , , , , ,