29 基于锁的并发数据结构
在介绍锁之外前,我们将首先介绍如何使用锁来实现常见的数据结构。通过向数据结构中添加锁,来保证线程安全。当然,添加锁究竟如何同时保证正确性和性能是我们面临的挑战:
当然,我们将很难向所有的数据结构或方法添加并发性,因为这是已经研究多年的主题,有关它的数以千计的研究论文被发表。因此,我们希望能够提供足够的介绍需要的思考类型,并提供一些好的资料供你作进一步的学习。我们发现Moir和Shavit的调查是一个非常好的信息[MS04]来源。
29.1 并发计数器
一个最简单的数据结构是一个计数器。它是一种具有简单的接口常用的结构。我们定义简单的非同步计数器如图29.1。
typedef struct __counter_t {int value;} counter_t;void init(counter_t *c) {c->value = 0;}void increment(counter_t *c) {c->value++;}void decrement(counter_t *c) {c->value--;}int get(counter_t *c) {return c->value;}
Figure 29.1: A Counter Without Locks
简单但不可伸缩
正如你所看到的,非同步计数器是一个微不足道的数据结构,只需要少量代码实现。现在,我们有我们的下一个挑战:我们如何才能使此代码线程安全的?图29.2所示我们是如何做到这一点。
typedef struct __counter_t {int value;pthread_mutex_t lock;} counter_t;void init(counter_t *c) {c->value = 0;Pthread_mutex_init(&c->lock, NULL);}void increment(counter_t *c) {Pthread_mutex_lock(&c->lock);c->value++;thread_mutex_unlock(&c->lock);}void decrement(counter_t *c) {Pthread_mutex_lock(&c->lock);c->value--;Pthread_mutex_unlock(&c->lock);}int get(counter_t *c) {Pthread_mutex_lock(&c->lock);int rc = c->value;Pthread_mutex_unlock(&c->lock);return rc;}
Figure 29.2: A Counter With Locks
这同步计数器工作简单、正常。事实上,它遵循最简单和最基本并发数据结构共同的设计模式:它只是增加了一个锁,这是调用该操作的数据结构的函数时获得的,并且在调用返回时被释放。通过这种方式,它是一个与监视器类似的数据结构[BH73],其中锁随着调用函数和函数返回被自动获取释放收购。
typedef struct __counter_t {int global; // global countpthread_mutex_t glock; // global lockint local[NUMCPUS]; // local count (per cpu)pthread_mutex_t llock[NUMCPUS]; // ... and locksint threshold; // update frequency} counter_t;// init: record threshold, init locks, init values// of all local counts and global countvoid init(counter_t *c, int threshold) {c->threshold = threshold;c->global = 0;pthread_mutex_init(&c->glock, NULL);int i;for (i = 0; i < NUMCPUS; i++) {c->local[i] = 0;pthread_mutex_init(&c->llock[i], NULL);}}// update: usually, just grab local lock and update local amount// once local count has risen by ’threshold’, grab global// lock and transfer local values to itvoid update(counter_t *c, int threadID, int amt) {pthread_mutex_lock(&c->llock[threadID]);c->local[threadID] += amt; // assumes amt > 0if (c->local[threadID] >= c->threshold) { // transfer to globalpthread_mutex_lock(&c->glock);c->global += c->local[threadID];pthread_mutex_unlock(&c->glock);c->local[threadID] = 0;}pthread_mutex_unlock(&c->llock[threadID]);}// get: just return global amount (which may not be perfect)int get(counter_t *c) {pthread_mutex_lock(&c->glock);int val = c->global;pthread_mutex_unlock(&c->glock);return val; // only approximate!}
Figure 29.5: Sloppy Counter Implementation
29.2 并发链表
// basic node structuretypedef struct __node_t {int key;struct __node_t *next;} node_t;// basic list structure (one used per list)typedef struct __list_t {node_t *head;pthread_mutex_t lock;} list_t;void List_Init(list_t *L) {L->head = NULL;pthread_mutex_init(&L->lock, NULL);}int List_Insert(list_t *L, int key) {pthread_mutex_lock(&L->lock);node_t *new = malloc(sizeof(node_t));if (new == NULL) {perror("malloc");pthread_mutex_unlock(&L->lock);return -1; // fail}new->key = key;new->next = L->head;L->head = new;pthread_mutex_unlock(&L->lock);return 0; // success}int List_Lookup(list_t *L, int key) {pthread_mutex_lock(&L->lock);node_t *curr = L->head;while (curr) {if (curr->key == key) {pthread_mutex_unlock(&L->lock);return 0; // success}curr = curr->next;}pthread_mutex_unlock(&L->lock);return -1; // failure}
Figure 29.7: Concurrent Linked List
void List_Init(list_t *L) {L->head = NULL;pthread_mutex_init(&L->lock, NULL);}void List_Insert(list_t *L, int key) {// synchronization not needednode_t *new = malloc(sizeof(node_t));if (new == NULL) {perror("malloc");return;}new->key = key;// just lock critical sectionpthread_mutex_lock(&L->lock);new->next = L->head;L->head = new;pthread_mutex_unlock(&L->lock);}int List_Lookup(list_t *L, int key) {int rv = -1;pthread_mutex_lock(&L->lock);node_t *curr = L->head;while (curr) {if (curr->key == key) {rv = 0;break;}curr = curr->next;}pthread_mutex_unlock(&L->lock);return rv; // now both success and failure}
Figure 29.8: Concurrent Linked List: Rewritten
29.3 并发队列
typedef struct __node_t {int value;struct __node_t *next;} node_t;typedef struct __queue_t {node_t *head;node_t *tail;pthread_mutex_t headLock;pthread_mutex_t tailLock;} queue_t;void Queue_Init(queue_t *q) {node_t *tmp = malloc(sizeof(node_t));tmp->next = NULL;q->head = q->tail = tmp;pthread_mutex_init(&q->headLock, NULL);pthread_mutex_init(&q->tailLock, NULL);}void Queue_Enqueue(queue_t *q, int value) {node_t *tmp = malloc(sizeof(node_t));assert(tmp != NULL);tmp->value = value;tmp->next = NULL;pthread_mutex_lock(&q->tailLock);q->tail->next = tmp;q->tail = tmp;pthread_mutex_unlock(&q->tailLock);}int Queue_Dequeue(queue_t *q, int *value) {pthread_mutex_lock(&q->headLock);node_t *tmp = q->head;node_t *newHead = tmp->next;if (newHead == NULL) {pthread_mutex_unlock(&q->headLock);return -1; // queue was empty}*value = newHead->value;q->head = newHead;pthread_mutex_unlock(&q->headLock);free(tmp);return 0;}
Figure 29.9: Michael and Scott Concurrent Queue
29.4 并发哈希表
#define BUCKETS (101)typedef struct __hash_t {list_t lists[BUCKETS];} hash_t;void Hash_Init(hash_t *H) {int i;for (i = 0; i < BUCKETS; i++) {List_Init(&H->lists[i]);}}int Hash_Insert(hash_t *H, int key) {int bucket = key % BUCKETS;return List_Insert(&H->lists[bucket], key);}int Hash_Lookup(hash_t *H, int key) {int bucket = key % BUCKETS;return List_Lookup(&H->lists[bucket], key);}
Figure 29.10: A Concurrent Hash Table
