Previous: All-Pairs Shortest Paths Up: Examples

Synchronizing Single-Reader Single-Writer Stream

This example implements a single-reader, single-writer stream. Two operations are defined on such a stream: an append and a removal. A removal from an empty stream suspends until the stream is non-empty. Appends to the stream never suspend.


#include <iostream.h>

class Stream;

class StreamNode { private: int data; StreamNode *sync next; StreamNode (int d) { data = d; } friend class Stream; };

class Stream { private: StreamNode* head; StreamNode* tail;

public: Stream (void) { head = new StreamNode(0); tail = head; }

void append (int a) { StreamNode* addition = new StreamNode(a); tail->next = addition; tail = addition; }

int remove (void) { StreamNode* old_head = head; head = head->next; delete old_head; return head->data; } };

Stream S;

void producer (int n) { for (int i=0; i<n; i++) { cout << "[appending " << i << "]"; S.append(i); } }

void consumer (int n) { for (int i=0; i<n; i++) cout << "Consumer removes : " << S.remove() << endl; }

int main() { par { producer(10); consumer(10); } return 0; }

The last StreamNode of a stream always has an undefined next field. An empty stream is represented by a single StreamNode with an undefined next pointer. See Figure .

Appending an item means creating a new node with the appropriate data, defining the next field of the last StreamNode, and modifying the mutable tail member to point to this new node. Notice that because this action is not atomic, this modification of tail is unprotected. Hence concurrent append() operations are dangerous, and so this is a single-writer stream. A multiple-writer class can be created by simply making append() an atomic operation.

Removing an item requires reading the next field of the StreamNode referenced by the mutable member head. If this field is not defined, the removing thread of control suspends here. Once this field is defined, the data contained in the StreamNode referenced by this next field is returned, the first node is deleted, and the mutable head member is modified to point to this second node. Again, this modification is unprotected, and so concurrent remove() operations are dangerous. Unlike the append() operation, however, this member cannot simply be made atomic to permit multiple readers. This is because atomic functions must not suspend (recall Chapter , Section ).

This example illustrates some of the complexity involved in implementing such synchronization classes that contain mutable members and will be shared between concurrently executing threads of control. There are at least two common errors that are avoided in the above implementation. Both stem from the fact that if a thread of control is suspended on an undefined single-assignment variable, that thread may resume execution immediately once that single-assignment variable has been defined:

  1. When adding a new StreamNode to the stream, the node must be created and the data field initialized before the new node is linked on to the stream. If the node is linked first, and then the data filled in, a suspended remove() operation could resume execution immediately when the next field of the previous node is defined and attempt to access a garbage data field.
  2. The first node can be deleted by a remove() operation as soon as the stream becomes non-empty, which occurs in the second line of the append() function. Thus, we must be careful not to access the contents of this node in the last line of append(). The following code, for example, is incorrect:
    
    void append (int a)  {
      StreamNode* addition = new StreamNode(a);
      tail->next = addition;
      tail = tail->next;             //ERROR: tail may point to a deleted node
    }
    

    If the stream was initially empty (so head and tail) point to the same node), then using the value tail->next may dereference deleted memory, since at this point a remove() operation may have deleted the node referenced by head.

Fortunately, these issues of synchronization and interaction based on shared objects can usually be encapsulated in a small collection of classes. These classes can be rigorously analyzed and verified and used whenever appropriate. For example, libraries that implement semaphores, monitors, and a variety of message-passing channels have been implemented and verified here at Caltech.

paolo@cs.caltech.edu