Three steps to the blocking collection: [3] Blocking collection
About two months ago I started working on Delphi clone of .NET 4 BlockingCollection. Initial release was completed just before the end of 2009 and I started to write a series of articles on TOmniBlockingCollection in early January but then I got stuck in the dynamic lock-free queue implementation. Instead of writing articles I spent most of my free time working on that code. Now it is (finally) time to complete the journey. Everything that had to be said about the infrastructure was told and I only have to show you the internal workings of the blocking collection itself. [Step 1: Three steps to the blocking collection: [1] Inverse semaphore] [Step 2: Dynamic lock-free queue – doing it right] The blocking collecting is exposed as an interface that lives in the OtlCollections unit. IOmniBlockingCollection = interface(IGpTraceable) There’s also a class TOmniBlockingCollection which implements this interface. This class is public and can be used or reused in your code. The blocking collection works in the following way:
The trivial partsMost of the blocking collection code is fairly trivial. Add just calls TryAdd and raises an exception if TryAdd fails. procedure TOmniBlockingCollection.Add(const value: TOmniValue); CompleteAdding sets two “completed” flags – one boolean flag and one Windows event. Former is used for speed in non-blocking tests while the latter is used when TryTake has to block. procedure TOmniBlockingCollection.CompleteAdding; Take calls the TryTake with the INFINITE timeout. function TOmniBlockingCollection.Take(var value: TOmniValue): boolean; TryAdd checks if CompleteAdding has been called. If not, the value is stored in the dynamic queue. There’s a potential problem hiding in the TryAdd – between the time the completed flag is checked and the time the value is enqueued, another thread may call CompleteAdding. Strictly speaking, TryAdd should not succeed in that case. However, I cannot foresee a parallel algorithm where this could cause a problem. function TOmniBlockingCollection.TryAdd(const value: TOmniValue): boolean; Easy peasy. The not so trivial partAnd now for something completely different … TryTake is a whole different beast. It must:
Not so easy. In addition to the obcCompletedSignal (completed event) and obcCollection (dynamic data queue) it will also use obcObserver (a queue change mechanism used inside the OTL) and obcResourceCount, which is an instance of the TOmniResourceCount (inverse semaphore, introduced in Part 1). All these are created in the constructor: constructor TOmniBlockingCollection.Create(numProducersConsumers: integer); TryTake is pretty long so I’ve split it into two parts. Let’s take a look at the non-blocking part first. First, the code tries to retrieve data from the dynamic queue. If there’s data available, it is returned. End of story. Otherwise, the completed flag is checked. If CompleteAdding has been called, TryTake returns immediately. It also returns if timeout is 0. Otherwise, the code prepares for the blocking wait. Resource counter is allocated (reasons for this will be provided later), and observer is attached to the blocking collection. This observer will wake the blocking code when new value is stored in the collection. [In the code below you can see a small optimization – if the code is running on a single core then the observer is attached in the TOmniBlockingCollection constructor and detached in the destructor. Before this optimization was introduced, Attach and Detach spent much too much time in busy-wait code (on a single-core computer).] After all that is set, the code waits for the value (see the next code block), observer is detached from the queue and resource counter is released. function TOmniBlockingCollection.TryTake(var value: TOmniValue; Blocking part starts by storing the current time (millisecond-accurate TimeGetTime is used) and preparing wait handles. Then it enters the loop which repeats until the CompleteAdding has been called or timeout has elapsed (the Elapsed function which I’m not showing here for the sake of simplicty; see the source) or a value was dequeued. In the loop, the code tries again to dequeue a value from the dynamic queue and exits the loop if dequeue succeeds. Otherwise, a WaitForMultipleObjects is called. This wait waits for one of three conditions:
startTime := DSiTimeGetTime64; If new value was enqueued into the dynamic queue, TryDequeue is called again. It is entirely possible that another thread calls that function first and removes the value causing TryDequeue to fail and WaitForMultipleObjects to be called again. Such is life in the multithreaded world. Enumerating the blocking collectionTOmniBlockingCollection enumerator is slightly more powerful than the usual Delphi enumerator. In addition to the usual methods it contains function Take which is required by the Parallel architecture (see Parallel.For and Parallel.ForEach.Aggregate for more information). IOmniValueEnumerator = interface ['{F60EBBD8-2F87-4ACD-A014-452F296F4699}'] TOmniBlockingCollectionEnumerator = class(TInterfacedObject, The implementation is trivial. constructor TOmniBlockingCollectionEnumerator.Create(collection: TOmniBlockingCollection); ExampleA not-so-simple how to on using the blocking collection can be seen in the demo 34_TreeScan. It uses the blocking collection to scan a tree with multiple parallel threads. This demo works in Delphi 2007 and newer. A better example of using the blocking collection is in the demo 35_ParallelFor. Actually, it uses the same approach as demo 34 to scan the tree, except that the code is implemented as an anonymous method which causes it to be much simpler than the D2007 version. Of course, this demo works only in Delphi 2009 and above. This is the full parallel scanner from the 35_ParallelFor demo: function TfrmParallelForDemo.ParaScan(rootNode: TNode; value: integer): TNode; The code first creates a cancellation token which will be used to stop the Parallel.ForEach loop. Number of tasks is set to number of cores accessible from the process and a blocking collection is created. Resource count for this collection is initialized to the number of tasks (parameter to the TOmniBlockingCollection.Create). The root node of the tree is added to the blocking collection. Then the Parallel.ForEach is called. The IOmniValueEnumerable aspect of the blocking collection is passed to the ForEach. Currently, this is the only way to provide ForEach with data. This interface just tells the ForEach how to generate enumerator for each worker thread. [At the moment, each worker requires a separate enumerator. This may change in the future.] IOmniValueEnumerable = interface ['{50C1C176-C61F-41F5-AA0B-6FD215E5159F}'] The code also passes cancellation token to the ForEach loop and starts the parallel execution (call to Execute). In each parallel task, the following code is executed (this code is copied from the full ParaScan example above): procedure (const elem: TOmniValue) The code is provided with one element from the blocking collection (ForEach takes care of that). If the Value field is the value we’re searching for, nodeResult is set, blocking collection is put into CompleteAdding state (so that enumerators in other tasks will terminate blocking wait (if any)) and ForEach is cancelled. Otherwise (not the value we’re looking for), all the children of the current node are added to the blocking collection. TryAdd is used (and its return value ignored) because another thread may call CompleteAdding while the for childNode loop is being executed. That’s all! There is a blocking collection into which nodes are put (via the for childNode loop) and from which they are removed (via the ForEach infrastructure). If child nodes are not provided fast enough, blocking collection will block on Take and one or more tasks may sleep for some time until new values appear. Only when the value is found, the blocking collection and ForEach loop are completed/cancelled. This is very similar to the code that was my inspiration for writing the blocking collection:
However, this C# code exhibits a small problem. If the value is not to be found in the tree, the code never stops. Why? All tasks eventually block in the Take method (because complete tree has been scanned) and nobody calls CompleteAdding and loop.Stop. Does the Delphi code contains the very same problem? Definitely not! That’s exactly why the resource counter was added to the blocking collection! If the blocking collection is initialized with number of resources greater then zero, it will allocate a resource counter in the constructor. This resource counter is allocated just before the thread blocks in TryTake and released after that. Each blocking wait in TryTake waits for this resource counter to become signalled. If all threads try to execute blocking wait, this resource counter drops to zero, signals itself and unblocks all TryTake calls! This elegant solution has only one problem – resource counter must be initialized to the number of threads that will be reading from the blocking collection. That’s why in the code above (ParaScan) same number is passed to the blocking collection constructor (resource counter initialization) and to the ForEach.NumTasks method (number of parallel threads). DownloadTOmniBlockingCollection will be available in the OmniThreadLibrary 1.05, which will be released in few days. For the impatient there is OTL 1.05 Release Candidate. The only code that will change between 1.05 RC and release are possible bug fixes. Labels: Delphi, multithreading, OmniThreadLibrary, open source, programming, source code |
0 Comments:
Post a Comment
Links to this post:
Create a Link
<< Home