29 基于锁的并发数据结构

在介绍锁之外前,我们将首先介绍如何使用锁来实现常见的数据结构。通过向数据结构中添加锁,来保证线程安全。当然,添加锁究竟如何同时保证正确性和性能是我们面临的挑战:

当然,我们将很难向所有的数据结构或方法添加并发性,因为这是已经研究多年的主题,有关它的数以千计的研究论文被发表。因此,我们希望能够提供足够的介绍需要的思考类型,并提供一些好的资料供你作进一步的学习。我们发现Moir和Shavit的调查是一个非常好的信息[MS04]来源。

29.1 并发计数器

一个最简单的数据结构是一个计数器。它是一种具有简单的接口常用的结构。我们定义简单的非同步计数器如图29.1。

  1. typedef struct __counter_t {
  2. int value;
  3. } counter_t;
  4. void init(counter_t *c) {
  5. c->value = 0;
  6. }
  7. void increment(counter_t *c) {
  8. c->value++;
  9. }
  10. void decrement(counter_t *c) {
  11. c->value--;
  12. }
  13. int get(counter_t *c) {
  14. return c->value;
  15. }

Figure 29.1: A Counter Without Locks

简单但不可伸缩

正如你所看到的,非同步计数器是一个微不足道的数据结构,只需要少量代码实现。现在,我们有我们的下一个挑战:我们如何才能使此代码线程安全的?图29.2所示我们是如何做到这一点。

  1. typedef struct __counter_t {
  2. int value;
  3. pthread_mutex_t lock;
  4. } counter_t;
  5. void init(counter_t *c) {
  6. c->value = 0;
  7. Pthread_mutex_init(&c->lock, NULL);
  8. }
  9. void increment(counter_t *c) {
  10. Pthread_mutex_lock(&c->lock);
  11. c->value++;
  12. thread_mutex_unlock(&c->lock);
  13. }
  14. void decrement(counter_t *c) {
  15. Pthread_mutex_lock(&c->lock);
  16. c->value--;
  17. Pthread_mutex_unlock(&c->lock);
  18. }
  19. int get(counter_t *c) {
  20. Pthread_mutex_lock(&c->lock);
  21. int rc = c->value;
  22. Pthread_mutex_unlock(&c->lock);
  23. return rc;
  24. }

Figure 29.2: A Counter With Locks

这同步计数器工作简单、正常。事实上,它遵循最简单和最基本并发数据结构共同的设计模式:它只是增加了一个锁,这是调用该操作的数据结构的函数时获得的,并且在调用返回时被释放。通过这种方式,它是一个与监视器类似的数据结构[BH73],其中锁随着调用函数和函数返回被自动获取释放收购。

  1. typedef struct __counter_t {
  2. int global; // global count
  3. pthread_mutex_t glock; // global lock
  4. int local[NUMCPUS]; // local count (per cpu)
  5. pthread_mutex_t llock[NUMCPUS]; // ... and locks
  6. int threshold; // update frequency
  7. } counter_t;
  8. // init: record threshold, init locks, init values
  9. // of all local counts and global count
  10. void init(counter_t *c, int threshold) {
  11. c->threshold = threshold;
  12. c->global = 0;
  13. pthread_mutex_init(&c->glock, NULL);
  14. int i;
  15. for (i = 0; i < NUMCPUS; i++) {
  16. c->local[i] = 0;
  17. pthread_mutex_init(&c->llock[i], NULL);
  18. }
  19. }
  20. // update: usually, just grab local lock and update local amount
  21. // once local count has risen by ’threshold’, grab global
  22. // lock and transfer local values to it
  23. void update(counter_t *c, int threadID, int amt) {
  24. pthread_mutex_lock(&c->llock[threadID]);
  25. c->local[threadID] += amt; // assumes amt > 0
  26. if (c->local[threadID] >= c->threshold) { // transfer to global
  27. pthread_mutex_lock(&c->glock);
  28. c->global += c->local[threadID];
  29. pthread_mutex_unlock(&c->glock);
  30. c->local[threadID] = 0;
  31. }
  32. pthread_mutex_unlock(&c->llock[threadID]);
  33. }
  34. // get: just return global amount (which may not be perfect)
  35. int get(counter_t *c) {
  36. pthread_mutex_lock(&c->glock);
  37. int val = c->global;
  38. pthread_mutex_unlock(&c->glock);
  39. return val; // only approximate!
  40. }

Figure 29.5: Sloppy Counter Implementation

29.2 并发链表

  1. // basic node structure
  2. typedef struct __node_t {
  3. int key;
  4. struct __node_t *next;
  5. } node_t;
  6. // basic list structure (one used per list)
  7. typedef struct __list_t {
  8. node_t *head;
  9. pthread_mutex_t lock;
  10. } list_t;
  11. void List_Init(list_t *L) {
  12. L->head = NULL;
  13. pthread_mutex_init(&L->lock, NULL);
  14. }
  15. int List_Insert(list_t *L, int key) {
  16. pthread_mutex_lock(&L->lock);
  17. node_t *new = malloc(sizeof(node_t));
  18. if (new == NULL) {
  19. perror("malloc");
  20. pthread_mutex_unlock(&L->lock);
  21. return -1; // fail
  22. }
  23. new->key = key;
  24. new->next = L->head;
  25. L->head = new;
  26. pthread_mutex_unlock(&L->lock);
  27. return 0; // success
  28. }
  29. int List_Lookup(list_t *L, int key) {
  30. pthread_mutex_lock(&L->lock);
  31. node_t *curr = L->head;
  32. while (curr) {
  33. if (curr->key == key) {
  34. pthread_mutex_unlock(&L->lock);
  35. return 0; // success
  36. }
  37. curr = curr->next;
  38. }
  39. pthread_mutex_unlock(&L->lock);
  40. return -1; // failure
  41. }

Figure 29.7: Concurrent Linked List

  1. void List_Init(list_t *L) {
  2. L->head = NULL;
  3. pthread_mutex_init(&L->lock, NULL);
  4. }
  5. void List_Insert(list_t *L, int key) {
  6. // synchronization not needed
  7. node_t *new = malloc(sizeof(node_t));
  8. if (new == NULL) {
  9. perror("malloc");
  10. return;
  11. }
  12. new->key = key;
  13. // just lock critical section
  14. pthread_mutex_lock(&L->lock);
  15. new->next = L->head;
  16. L->head = new;
  17. pthread_mutex_unlock(&L->lock);
  18. }
  19. int List_Lookup(list_t *L, int key) {
  20. int rv = -1;
  21. pthread_mutex_lock(&L->lock);
  22. node_t *curr = L->head;
  23. while (curr) {
  24. if (curr->key == key) {
  25. rv = 0;
  26. break;
  27. }
  28. curr = curr->next;
  29. }
  30. pthread_mutex_unlock(&L->lock);
  31. return rv; // now both success and failure
  32. }

Figure 29.8: Concurrent Linked List: Rewritten

29.3 并发队列

  1. typedef struct __node_t {
  2. int value;
  3. struct __node_t *next;
  4. } node_t;
  5. typedef struct __queue_t {
  6. node_t *head;
  7. node_t *tail;
  8. pthread_mutex_t headLock;
  9. pthread_mutex_t tailLock;
  10. } queue_t;
  11. void Queue_Init(queue_t *q) {
  12. node_t *tmp = malloc(sizeof(node_t));
  13. tmp->next = NULL;
  14. q->head = q->tail = tmp;
  15. pthread_mutex_init(&q->headLock, NULL);
  16. pthread_mutex_init(&q->tailLock, NULL);
  17. }
  18. void Queue_Enqueue(queue_t *q, int value) {
  19. node_t *tmp = malloc(sizeof(node_t));
  20. assert(tmp != NULL);
  21. tmp->value = value;
  22. tmp->next = NULL;
  23. pthread_mutex_lock(&q->tailLock);
  24. q->tail->next = tmp;
  25. q->tail = tmp;
  26. pthread_mutex_unlock(&q->tailLock);
  27. }
  28. int Queue_Dequeue(queue_t *q, int *value) {
  29. pthread_mutex_lock(&q->headLock);
  30. node_t *tmp = q->head;
  31. node_t *newHead = tmp->next;
  32. if (newHead == NULL) {
  33. pthread_mutex_unlock(&q->headLock);
  34. return -1; // queue was empty
  35. }
  36. *value = newHead->value;
  37. q->head = newHead;
  38. pthread_mutex_unlock(&q->headLock);
  39. free(tmp);
  40. return 0;
  41. }

Figure 29.9: Michael and Scott Concurrent Queue

29.4 并发哈希表

  1. #define BUCKETS (101)
  2. typedef struct __hash_t {
  3. list_t lists[BUCKETS];
  4. } hash_t;
  5. void Hash_Init(hash_t *H) {
  6. int i;
  7. for (i = 0; i < BUCKETS; i++) {
  8. List_Init(&H->lists[i]);
  9. }
  10. }
  11. int Hash_Insert(hash_t *H, int key) {
  12. int bucket = key % BUCKETS;
  13. return List_Insert(&H->lists[bucket], key);
  14. }
  15. int Hash_Lookup(hash_t *H, int key) {
  16. int bucket = key % BUCKETS;
  17. return List_Lookup(&H->lists[bucket], key);
  18. }

Figure 29.10: A Concurrent Hash Table

29.5 总结