Previous: All-Pairs Shortest Paths Up: Examples
This example implements a single-reader, single-writer stream. Two operations are defined on such a stream: an append and a removal. A removal from an empty stream suspends until the stream is non-empty. Appends to the stream never suspend.
#include <iostream.h>class Stream;
class StreamNode { private: int data; StreamNode *sync next; StreamNode (int d) { data = d; } friend class Stream; };
class Stream { private: StreamNode* head; StreamNode* tail;
public: Stream (void) { head = new StreamNode(0); tail = head; }
void append (int a) { StreamNode* addition = new StreamNode(a); tail->next = addition; tail = addition; }
int remove (void) { StreamNode* old_head = head; head = head->next; delete old_head; return head->data; } };
Stream S;
void producer (int n) { for (int i=0; i<n; i++) { cout << "[appending " << i << "]"; S.append(i); } }
void consumer (int n) { for (int i=0; i<n; i++) cout << "Consumer removes : " << S.remove() << endl; }
int main() { par { producer(10); consumer(10); } return 0; }
The last StreamNode of a stream always has an undefined
next field. An empty stream is represented by a single
StreamNode with an undefined next pointer. See
Figure .
Appending an item means creating a new node with the appropriate data, defining the next field of the last StreamNode, and modifying the mutable tail member to point to this new node. Notice that because this action is not atomic, this modification of tail is unprotected. Hence concurrent append() operations are dangerous, and so this is a single-writer stream. A multiple-writer class can be created by simply making append() an atomic operation.
Removing an item requires reading the next field of the
StreamNode referenced by the mutable member head. If
this field is not defined, the removing thread of control suspends here.
Once this field is defined, the data contained in the StreamNode
referenced by this next field is returned, the first node
is deleted, and the mutable head member is modified to point
to this second node. Again, this modification is unprotected,
and so concurrent remove() operations are dangerous.
Unlike the append() operation, however, this member cannot
simply be made atomic to permit multiple readers. This is because
atomic functions must not suspend (recall Chapter ,
Section
).
This example illustrates some of the complexity involved in implementing such synchronization classes that contain mutable members and will be shared between concurrently executing threads of control. There are at least two common errors that are avoided in the above implementation. Both stem from the fact that if a thread of control is suspended on an undefined single-assignment variable, that thread may resume execution immediately once that single-assignment variable has been defined:
void append (int a) { StreamNode* addition = new StreamNode(a); tail->next = addition; tail = tail->next; //ERROR: tail may point to a deleted node }
If the stream was initially empty (so head and tail) point to the same node), then using the value tail->next may dereference deleted memory, since at this point a remove() operation may have deleted the node referenced by head.
Fortunately, these issues of synchronization and interaction based on shared objects can usually be encapsulated in a small collection of classes. These classes can be rigorously analyzed and verified and used whenever appropriate. For example, libraries that implement semaphores, monitors, and a variety of message-passing channels have been implemented and verified here at Caltech.