Prev: locks Next: condition-variables
The first data structure is a counter. A first attempt is simple but inefficient, it doesn’t use multi-threading:
typedef struct __counter_t {
int value;
} counter_t;
void init(counter_t *c) { c->value = 0; }
void increment(counter_t *c) { c->value++; }
void decrement(counter_t *c) { c->value--; }
int get(counter_t *c) { return c->value; }
Next, we add a single lock that we acquire for all operations:
typedef struct __counter_t {
int value;
;
pthread_mutex_t lock} counter_t;
void init(counter_t *c) {
->value = 0;
c(&c->lock, NULL);
Pthread_mutex_init}
void increment(counter_t *c) {
(&c->lock);
Pthread_mutex_lock->value++;
c(&c->lock);
Pthread_mutex_unlock}
void decrement(counter_t *c) {
(&c->lock);
Pthread_mutex_lock->value--;
c(&c->lock);
Pthread_mutex_unlock}
int get(counter_t *c) {
(&c->lock);
Pthread_mutex_lockint rc = c->value;
(&c->lock);
Pthread_mutex_unlockreturn rc;
}
This is now thread-safe, but performs worse than the single-threaded example due to lock overhead.
We can do better by allowing stale reads. Instead of synchronizing on every operation, we have a set of local counters, which are then synchronized to the global counter every once in a while.
typedef struct __counter_t {
int global; // global count
; // global lock
pthread_mutex_t glockint local[NUMCPUS]; // per-CPU count
[NUMCPUS]; // ... and locks
pthread_mutex_t llockint threshold; // update frequency
} counter_t;
// init: record threshold, init locks, init values
// of all local counts and global count
void init(counter_t *c, int threshold) {
->threshold = threshold;
c->global = 0;
c(&c->glock, NULL);
pthread_mutex_initint i;
for (i = 0; i < NUMCPUS; i++) {
->local[i] = 0;
c(&c->llock[i], NULL);
pthread_mutex_init}
}
// update: usually, just grab local lock and update
// local amount; once local count has risen ’threshold’,
// grab global lock and transfer local values to it
void update(counter_t *c, int threadID, int amt) {
int cpu = threadID % NUMCPUS;
(&c->llock[cpu]);
pthread_mutex_lock->local[cpu] += amt;
cif (c->local[cpu] >= c->threshold) {
// transfer to global (assumes amt>0)
(&c->glock);
pthread_mutex_lock->global += c->local[cpu];
c(&c->glock);
pthread_mutex_unlock->local[cpu] = 0;
c}
(&c->llock[cpu]);
pthread_mutex_unlock}
// get: just return global amount (approximate)
int get(counter_t *c) {
(&c->glock);
pthread_mutex_lockint val = c->global;
(&c->glock);
pthread_mutex_unlockreturn val; // only approximate!
}
This performs much better – having much higher performance (at the cost of accuracy). If we can say that reads are much less often, but require accuracy, they can synchronize between all locks and sacrifice performance for accuracy.
Concurrency can be added to linked lists by adding a lock for inserts and lookups:
void List_Init(list_t *L) {
->head = NULL;
L(&L->lock, NULL);
pthread_mutex_init}
void List_Insert(list_t *L, int key) {
// synchronization not needed
*new = malloc(sizeof(node_t));
node_t if (new == NULL) {
("malloc");
perrorreturn;
}
->key = key;
new
// just lock critical section
(&L->lock);
pthread_mutex_lock->next = L->head;
new->head = new;
L(&L->lock);
pthread_mutex_unlock}
int List_Lookup(list_t *L, int key) {
int rv = -1;
(&L->lock);
pthread_mutex_lock*curr = L->head;
node_t while (curr) {
if (curr->key == key) {
= 0;
rv break;
}
= curr->next;
curr }
(&L->lock);
pthread_mutex_unlockreturn rv; // now both success and failure
}
This list doesn’t scale. Instead, we can do
hand-over-hand-locking
, where every node has a lock. This
makes operations like inserts faster, but having to traverse the list
incurs the overhead of locking, which makes this approach slow.
For a concurrent queue, this implementation uses a dummy node and a lock on the head and tail for operations:
typedef struct __node_t {
int value;
struct __node_t *next;
} node_t;
typedef struct __queue_t {
*head;
node_t *tail;
node_t , tail_lock;
pthread_mutex_t head_lock}
void Queue_Init(queue_t *q) {
*tmp = malloc(sizeof(node_t));
node_t ->next = NULL;
tmp->head = q->tail = tmp;
q(&q->head_lock, NULL);
pthread_mutex_init(&q->tail_lock, NULL);
pthread_mutex_init}
void Queue_Enqueue(queue_t *q, int value) {
*tmp = malloc(sizeof(node_t));
node_t (tmp != NULL);
assert->value = value;
tmp->next = NULL;
tmp
(&q->tail_lock);
pthread_mutex_lock->tail->next = tmp;
q->tail = tmp;
q(&q->tail_lock);
pthread_mutex_unlock}
int Queue_Dequeue(queue_t *q, int *value) {
(&q->head_lock);
pthread_mutex_lock*tmp = q->head;
node_t *new_head = tmp->next;
node_t if (new_head == NULL) {
(&q->head_lock);
pthread_mutex_unlockreturn -1; // queue was empty
}
*value = new_head->value;
->head = new_head;
q(&q->head_lock);
pthread_mutex_unlock(tmp);
freereturn 0;
}
There are also faster concurrent queues using hazard pointers and Compare-and-Swap routines. To enqueue to a queue with Compare-and-swap, the data structure tries to enqueue a node with compare-and-swap. If it succeeds, great. If it fails, it retries after re-reading the current tail node. If it doesn’t succeed, it may try to yield or apply another strategy.
A concurrent hashtable uses separate chaining and the concurrent lists from before. This uses a lock per hash bucket, so the data structure only has to lock parts of itself based on an access or not.
#define BUCKETS (101)
typedef struct __hash_t {
[BUCKETS];
list_t lists} hash_t;
void Hash_Init(hash_t *H) {
int i;
for (i = 0; i < BUCKETS; i++)
(&H->lists[i]);
List_Init}
int Hash_Insert(hash_t *H, int key) {
return List_Insert(&H->lists[key % BUCKETS], key);
}
int Hash_Lookup(hash_t *H, int key) {
return List_Lookup(&H->lists[key % BUCKETS], key);
}
Prev: locks Next: condition-variables