3. 綫程間同步

3.1. mutex

多個綫程同時訪問共享數據時可能會衝突,這跟前面講信號時所說的可重入性是同樣的問題。比如兩個綫程都要把某個全局變數增加1,這個操作在某平台需要三條指令完成:

  1. 從內存讀變數值到寄存器

  2. 寄存器的值加1

  3. 將寄存器的值寫回內存

假設兩個綫程在多處理器平台上同時執行這三條指令,則可能導致下圖所示的結果,最後變數只加了一次而非兩次。

圖 35.1. 並行訪問衝突

並行訪問衝突

思考一下,如果這兩個綫程在單處理器平台上執行,能夠避免這樣的問題嗎?

我們通過一個簡單的程序觀察這一現象。上圖所描述的現象從理論上是存在這種可能的,但實際運行程序時很難觀察到,為了使現象更容易觀察到,我們把上述三條指令做的事情用更多條指令來做:

		val = counter;
		printf("%x: %d\n", (unsigned int)pthread_self(), val + 1);
		counter = val + 1;

我們在“讀取變數的值”和“把變數的新值保存回去”這兩步操作之間插入一個printf調用,它會執行write系統調用進內核,為內核調度別的綫程執行提供了一個很好的時機。我們在一個循環中重複上述操作幾千次,就會觀察到訪問衝突的現象。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define NLOOP 5000

int counter;                /* incremented by threads */

void *doit(void *);

int main(int argc, char **argv)
{
	pthread_t tidA, tidB;

	pthread_create(&tidA, NULL, &doit, NULL);
	pthread_create(&tidB, NULL, &doit, NULL);

        /* wait for both threads to terminate */
	pthread_join(tidA, NULL);
	pthread_join(tidB, NULL);

	return 0;
}

void *doit(void *vptr)
{
	int    i, val;

	/*
	 * Each thread fetches, prints, and increments the counter NLOOP times.
	 * The value of the counter should increase monotonically.
	 */

	for (i = 0; i < NLOOP; i++) {
		val = counter;
		printf("%x: %d\n", (unsigned int)pthread_self(), val + 1);
		counter = val + 1;
	}

	return NULL;
}

我們創建兩個綫程,各自把counter增加5000次,正常情況下最後counter應該等於10000,但事實上每次運行該程序的結果都不一樣,有時候數到5000多,有時候數到6000多。

$ ./a.out
b76acb90: 1
b76acb90: 2
b76acb90: 3
b76acb90: 4
b76acb90: 5
b7eadb90: 1
b7eadb90: 2
b7eadb90: 3
b7eadb90: 4
b7eadb90: 5
b76acb90: 6
b76acb90: 7
b7eadb90: 6
b76acb90: 8
...

對於多綫程的程序,訪問衝突的問題是很普遍的,解決的辦法是引入互斥鎖(Mutex,Mutual Exclusive Lock),獲得鎖的綫程可以完成“讀-修改-寫”的操作,然後釋放鎖給其它綫程,沒有獲得鎖的綫程只能等待而不能訪問共享數據,這樣“讀-修改-寫”三步操作組成一個原子操作,要麼都執行,要麼都不執行,不會執行到中間被打斷,也不會在其它處理器上並行做這個操作。

Mutex用pthread_mutex_t類型的變數表示,可以這樣初始化和銷毀:

#include <pthread.h>

int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
       const pthread_mutexattr_t *restrict attr);
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

返回值:成功返回0,失敗返回錯誤號。

pthread_mutex_init函數對Mutex做初始化,參數attr設定Mutex的屬性,如果attrNULL則表示預設屬性,本章不詳細介紹Mutex屬性,感興趣的讀者可以參考[APUE2e]。用pthread_mutex_init函數初始化的Mutex可以用pthread_mutex_destroy銷毀。如果Mutex變數是靜態分配的(全局變數或static變數),也可以用宏定義PTHREAD_MUTEX_INITIALIZER來初始化,相當於用pthread_mutex_init初始化並且attr參數為NULL。Mutex的加鎖和解鎖操作可以用下列函數:

#include <pthread.h>

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

返回值:成功返回0,失敗返回錯誤號。

一個綫程可以調用pthread_mutex_lock獲得Mutex,如果這時另一個綫程已經調用pthread_mutex_lock獲得了該Mutex,則當前線程需要掛起等待,直到另一個綫程調用pthread_mutex_unlock釋放Mutex,當前線程被喚醒,才能獲得該Mutex並繼續執行。

如果一個綫程既想獲得鎖,又不想掛起等待,可以調用pthread_mutex_trylock,如果Mutex已經被另一個綫程獲得,這個函數會失敗返回EBUSY,而不會使綫程掛起等待。

現在我們用Mutex解決先前的問題:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>


#define NLOOP 5000

int counter;                /* incremented by threads */
pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;

void *doit(void *);

int main(int argc, char **argv)
{
	pthread_t tidA, tidB;

	pthread_create(&tidA, NULL, doit, NULL);
	pthread_create(&tidB, NULL, doit, NULL);

        /* wait for both threads to terminate */
	pthread_join(tidA, NULL);
	pthread_join(tidB, NULL);

	return 0;
}

void *doit(void *vptr)
{
	int     i, val;

	/*
	 * Each thread fetches, prints, and increments the counter NLOOP times.
	 * The value of the counter should increase monotonically.
	 */

	for (i = 0; i < NLOOP; i++) {
		pthread_mutex_lock(&counter_mutex);

		val = counter;
		printf("%x: %d\n", (unsigned int)pthread_self(), val + 1);
		counter = val + 1;

		pthread_mutex_unlock(&counter_mutex);
	}

	return NULL;
}

這樣運行結果就正常了,每次運行都能數到10000。

看到這裡,讀者一定會好奇:Mutex的兩個基本操作lock和unlock是如何實現的呢?假設Mutex變數的值為1表示互斥鎖空閒,這時某個進程調用lock可以獲得鎖,而Mutex的值為0表示互斥鎖已經被某個綫程獲得,其它綫程再調用lock只能掛起等待。那麼lock和unlock的偽代碼如下:

lock:
	if(mutex > 0){
		mutex = 0;
		return 0;
	} else
		掛起等待;
	goto lock;

unlock:
	mutex = 1;
	喚醒等待Mutex的綫程;
	return 0;

unlock操作中喚醒等待綫程的步驟可以有不同的實現,可以只喚醒一個等待綫程,也可以喚醒所有等待該Mutex的綫程,然後讓被喚醒的這些綫程去競爭獲得這個Mutex,競爭失敗的綫程繼續掛起等待。

細心的讀者應該已經看出問題了:對Mutex變數的讀取、判斷和修改不是原子操作。如果兩個綫程同時調用lock,這時Mutex是1,兩個綫程都判斷mutex>0成立,然後其中一個綫程置mutex=0,而另一個綫程並不知道這一情況,也置mutex=0,於是兩個綫程都以為自己獲得了鎖。

為了實現互斥鎖操作,大多數體繫結構都提供了swap或exchange指令,該指令的作用是把寄存器和內存單元的數據相交換,由於只有一條指令,保證了原子性,即使是多處理器平台,訪問內存的匯流排周期也有先後,一個處理器上的交換指令執行時另一個處理器的交換指令只能等待匯流排周期。現在我們把lock和unlock的偽代碼改一下(以x86的xchg指令為例):

lock:
	movb $0, %al
	xchgb %al, mutex
	if(al寄存器的內容 > 0){
		return 0;
	} else
		掛起等待;
	goto lock;

unlock:
	movb $1, mutex
	喚醒等待Mutex的綫程;
	return 0;

unlock中的釋放鎖操作同樣只用一條指令實現,以保證它的原子性。

也許還有讀者好奇,“掛起等待”和“喚醒等待綫程”的操作如何實現?每個Mutex有一個等待隊列,一個綫程要在Mutex上掛起等待,首先在把自己加入等待隊列中,然後置綫程狀態為睡眠,然後調用調度器函數切換到別的綫程。一個綫程要喚醒等待隊列中的其它綫程,只需從等待隊列中取出一項,把它的狀態從睡眠改為就緒,加入就緒隊列,那麼下次調度器函數執行時就有可能切換到被喚醒的綫程。

一般情況下,如果同一個綫程先後兩次調用lock,在第二次調用時,由於鎖已經被占用,該綫程會掛起等待別的綫程釋放鎖,然而鎖正是被自己占用着的,該綫程又被掛起而沒有機會釋放鎖,因此就永遠處于掛起等待狀態了,這叫做死鎖(Deadlock)。另一種典型的死鎖情形是這樣:綫程A獲得了鎖1,綫程B獲得了鎖2,這時綫程A調用lock試圖獲得鎖2,結果是需要掛起等待綫程B釋放鎖2,而這時綫程B也調用lock試圖獲得鎖1,結果是需要掛起等待綫程A釋放鎖1,於是綫程A和B都永遠處于掛起狀態了。不難想象,如果涉及到更多的綫程和更多的鎖,有沒有可能死鎖的問題將會變得複雜和難以判斷。

寫程序時應該儘量避免同時獲得多個鎖,如果一定有必要這麼做,則有一個原則:如果所有線程在需要多個鎖時都按相同的先後順序(常見的是按Mutex變數的地址順序)獲得鎖,則不會出現死鎖。比如一個程序中用到鎖1、鎖2、鎖3,它們所對應的Mutex變數的地址是鎖1<鎖2<鎖3,那麼所有線程在需要同時獲得2個或3個鎖時都應該按鎖1、鎖2、鎖3的順序獲得。如果要為所有的鎖確定一個先後順序比較困難,則應該儘量使用pthread_mutex_trylock調用代替pthread_mutex_lock調用,以免死鎖。

3.2. Condition Variable

綫程間的同步還有這樣一種情況:綫程A需要等某個條件成立才能繼續往下執行,現在這個條件不成立,綫程A就阻塞等待,而綫程B在執行過程中使這個條件成立了,就喚醒綫程A繼續執行。在pthread庫中通過條件變數(Condition Variable)來阻塞等待一個條件,或者喚醒等待這個條件的綫程。Condition Variable用pthread_cond_t類型的變數表示,可以這樣初始化和銷毀:

#include <pthread.h>

int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *restrict cond,
       const pthread_condattr_t *restrict attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

返回值:成功返回0,失敗返回錯誤號。

和Mutex的初始化和銷毀類似,pthread_cond_init函數初始化一個Condition Variable,attr參數為NULL則表示預設屬性,pthread_cond_destroy函數銷毀一個Condition Variable。如果Condition Variable是靜態分配的,也可以用宏定義PTHEAD_COND_INITIALIZER初始化,相當於用pthread_cond_init函數初始化並且attr參數為NULL。Condition Variable的操作可以用下列函數:

#include <pthread.h>

int pthread_cond_timedwait(pthread_cond_t *restrict cond,
       pthread_mutex_t *restrict mutex,
       const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond,
       pthread_mutex_t *restrict mutex);
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);

返回值:成功返回0,失敗返回錯誤號。

可見,一個Condition Variable總是和一個Mutex搭配使用的。一個綫程可以調用pthread_cond_wait在一個Condition Variable上阻塞等待,這個函數做以下三步操作:

  1. 釋放Mutex

  2. 阻塞等待

  3. 當被喚醒時,重新獲得Mutex並返回

pthread_cond_timedwait函數還有一個額外的參數可以設定等待超時,如果到達了abstime所指定的時刻仍然沒有別的綫程來喚醒當前線程,就返回ETIMEDOUT。一個綫程可以調用pthread_cond_signal喚醒在某個Condition Variable上等待的另一個綫程,也可以調用pthread_cond_broadcast喚醒在這個Condition Variable上等待的所有線程。

下面的程序演示了一個生產者-消費者的例子,生產者生產一個結構體串在鏈表的表頭上,消費者從表頭取走結構體。

#include <stdlib.h>
#include <pthread.h>
#include <stdio.h>

struct msg {
	struct msg *next;
	int num;
};

struct msg *head;
pthread_cond_t has_product = PTHREAD_COND_INITIALIZER;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

void *consumer(void *p)
{
	struct msg *mp;

	for (;;) {
		pthread_mutex_lock(&lock);
		while (head == NULL)
			pthread_cond_wait(&has_product, &lock);
		mp = head;
		head = mp->next;
		pthread_mutex_unlock(&lock);
		printf("Consume %d\n", mp->num);
		free(mp);
		sleep(rand() % 5);
	}
}

void *producer(void *p)
{
	struct msg *mp;
	for (;;) {
		mp = malloc(sizeof(struct msg));
		mp->num = rand() % 1000 + 1;
		printf("Produce %d\n", mp->num);
		pthread_mutex_lock(&lock);
		mp->next = head;
		head = mp;
		pthread_mutex_unlock(&lock);
		pthread_cond_signal(&has_product);
		sleep(rand() % 5);
	}
}

int main(int argc, char *argv[]) 
{
	pthread_t pid, cid;  

	srand(time(NULL));
	pthread_create(&pid, NULL, producer, NULL);
	pthread_create(&cid, NULL, consumer, NULL);
	pthread_join(pid, NULL);
	pthread_join(cid, NULL);
	return 0;
}

執行結果如下:

$ ./a.out 
Produce 744
Consume 744
Produce 567
Produce 881
Consume 881
Produce 911
Consume 911
Consume 567
Produce 698
Consume 698

習題

1、在本節的例子中,生產者和消費者訪問鏈表的順序是LIFO的,請修改程序,把訪問順序改成FIFO。

3.3. Semaphore

Mutex變數是非0即1的,可看作一種資源的可用數量,初始化時Mutex是1,表示有一個可用資源,加鎖時獲得該資源,將Mutex減到0,表示不再有可用資源,解鎖時釋放該資源,將Mutex重新加到1,表示又有了一個可用資源。

信號量(Semaphore)和Mutex類似,表示可用資源的數量,和Mutex不同的是這個數量可以大於1。

本節介紹的是POSIX semaphore庫函數,詳見sem_overview(7),這種信號量不僅可用於同一進程的綫程間同步,也可用於不同進程間的同步。

#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_post(sem_t * sem);
int sem_destroy(sem_t * sem);

semaphore變數的類型為sem_t,sem_init()初始化一個semaphore變數,value參數表示可用資源的數量,pshared參數為0表示信號量用於同一進程的綫程間同步,本節只介紹這種情況。在用完semaphore變數之後應該調用sem_destroy()釋放與semaphore相關的資源。

調用sem_wait()可以獲得資源,使semaphore的值減1,如果調用sem_wait()時semaphore的值已經是0,則掛起等待。如果不希望掛起等待,可以調用sem_trywait()。調用sem_post()可以釋放資源,使semaphore的值加1,同時喚醒掛起等待的綫程。

上一節生產者-消費者的例子是基于鏈表的,其空間可以動態分配,現在基于固定大小的環形隊列重寫這個程序:

#include <stdlib.h>
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>

#define NUM 5
int queue[NUM];
sem_t blank_number, product_number;

void *producer(void *arg) 
{
	int p = 0;
	while (1) {
		sem_wait(&blank_number);
		queue[p] = rand() % 1000 + 1;
		printf("Produce %d\n", queue[p]);
		sem_post(&product_number);
		p = (p+1)%NUM;
		sleep(rand()%5);
	}
}

void *consumer(void *arg) 
{
	int c = 0;
	while (1) {
		sem_wait(&product_number);
		printf("Consume %d\n", queue[c]);
		queue[c] = 0;
		sem_post(&blank_number);
		c = (c+1)%NUM;
		sleep(rand()%5);
	}
}

int main(int argc, char *argv[]) 
{
	pthread_t pid, cid;  

	sem_init(&blank_number, 0, NUM);
	sem_init(&product_number, 0, 0);
	pthread_create(&pid, NULL, producer, NULL);
	pthread_create(&cid, NULL, consumer, NULL);
	pthread_join(pid, NULL);
	pthread_join(cid, NULL);
	sem_destroy(&blank_number);
	sem_destroy(&product_number);
	return 0;
}

習題

1、本節和上一節的例子給出一個重要的提示:用Condition Variable可以實現Semaphore。請用Condition Variable實現Semaphore,然後用自己實現的Semaphore重寫本節的程序。

3.4. 其它綫程間同步機制

如果共享數據是隻讀的,那麼各綫程讀到的數據應該總是一致的,不會出現訪問衝突。只要有一個綫程可以改寫數據,就必須考慮綫程間同步的問題。由此引出了讀者寫者鎖(Reader-Writer Lock)的概念,Reader之間並不互斥,可以同時讀共享數據,而Writer是獨占的(exclusive),在Writer修改數據時其它Reader或Writer不能訪問數據,可見Reader-Writer Lock比Mutex具有更好的並發性。

用掛起等待的方式解決訪問衝突不見得是最好的辦法,因為這樣畢竟會影響系統的並發性,在某些情況下解決訪問衝突的問題可以儘量避免掛起某個綫程,例如Linux內核的Seqlock、RCU(read-copy-update)等機制。

關於這些同步機制的細節,有興趣的讀者可以參考[APUE2e][ULK]