Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

🔐 多執行緒鎖機制完全指南

用白話和示意圖理解所有常見的鎖機制


📚 目錄

  1. 無鎖機制 (Lock-Free)
  2. 互斥鎖 (Mutex)
  3. 自旋鎖 (Spinlock)
  4. 讀寫鎖 (Read-Write Lock)
  5. 遞迴鎖 (Recursive Lock)
  6. 條件變數 (Condition Variable)
  7. 信號量 (Semaphore)
  8. 柵欄 (Barrier)
  9. 順序鎖 (Seqlock)
  10. 效能比較總表

1. 無鎖機制 (Lock-Free)

🎯 核心概念

想像一個「旋轉壽司台」:

  • 生產者 = 廚師放壽司
  • 消費者 = 客人拿壽司
  • 不需要服務生協調(不需要鎖)

📊 示意圖

環狀緩衝區(SPSC Queue)
┌───┬───┬───┬───┬───┬───┬───┬───┐
│ A │ B │ C │   │   │   │   │   │
└───┴───┴───┴───┴───┴───┴───┴───┘
  ↑       ↑
  讀      寫

生產者(廚師)         消費者(客人)
只改寫入位置   ←→     只改讀取位置
互不干擾!

💻 程式碼範例

template<typename T>
class LockFreeQueue {
    std::vector<T> buffer_;
    std::atomic<size_t> next_write_index_{0};  // 原子變數
    std::atomic<size_t> next_read_index_{0};
    
public:
    void push(T data) {
        size_t write_pos = next_write_index_.load(std::memory_order_relaxed);
        buffer_[write_pos] = data;
        next_write_index_.store(write_pos + 1, std::memory_order_release);
    }
    
    bool pop(T& data) {
        size_t read_pos = next_read_index_.load(std::memory_order_relaxed);
        size_t write_pos = next_write_index_.load(std::memory_order_acquire);
        
        if (read_pos == write_pos) return false;  // 佇列空
        
        data = buffer_[read_pos];
        next_read_index_.store(read_pos + 1, std::memory_order_release);
        return true;
    }
};

✅ 優點

  • 超快速:沒有 Context Switch
  • 無死鎖:沒有鎖就不會卡住
  • 低延遲:適合即時系統

❌ 缺點

  • 限制多:通常只支援 SPSC(單生產者單消費者)
  • 複雜度高:需要理解記憶體順序(Memory Order)
  • 除錯困難:錯誤不易重現

🎯 使用場景

  • 音訊處理管線
  • 高頻交易系統
  • 網路封包處理

2. 互斥鎖 (Mutex)

🎯 核心概念

想像一個「公共廁所」:

  • 只有一把鑰匙(鎖)
  • 誰拿到鑰匙誰進去
  • 其他人在外面排隊等

📊 示意圖

時間軸:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━→

執行緒 A:  [取得鎖]───執行中───[釋放鎖]
執行緒 B:            [等待......]  [取得鎖]───執行中───
執行緒 C:                          [等待..................]

狀態:
┌─────────────┐
│ 共享資源    │
│ (counter)   │  ← 只有拿到鎖的執行緒能存取
└─────────────┘
      ↑
      🔒 Mutex

💻 程式碼範例

#include <mutex>

class BankAccount {
    int balance_ = 0;
    std::mutex mutex_;  // 保護 balance_
    
public:
    void deposit(int amount) {
        std::lock_guard<std::mutex> lock(mutex_);  // 自動加鎖
        balance_ += amount;
        // 離開作用域自動解鎖
    }
    
    void withdraw(int amount) {
        std::lock_guard<std::mutex> lock(mutex_);
        if (balance_ >= amount) {
            balance_ -= amount;
        }
    }
    
    int get_balance() {
        std::lock_guard<std::mutex> lock(mutex_);
        return balance_;
    }
};

🔄 運作流程

執行緒 A 想要修改 balance_:

1. 執行到 lock_guard
   ↓
2. 嘗試取得 mutex_
   ├─ 成功 → 繼續執行
   └─ 失敗 → 進入睡眠,等作業系統喚醒
   
3. 執行 balance_ += amount
   
4. 離開 {} 作用域
   ↓
5. 自動釋放 mutex_
   ↓
6. 作業系統喚醒等待中的執行緒

✅ 優點

  • 簡單易用:最基本的同步機制
  • 保證互斥:絕對不會有兩個執行緒同時執行
  • 自動管理:搭配 RAII(lock_guard)不會忘記解鎖

❌ 缺點

  • Context Switch 開銷:等待時會睡眠
  • 可能死鎖:互相等待對方的鎖
  • 不公平:無法保證先等待的先取得

🎯 使用場景

  • 銀行轉帳系統
  • 資料庫連線池
  • 任何需要保護共享資源的情境

3. 自旋鎖 (Spinlock)

🎯 核心概念

想像「排隊買演唱會票」:

  • 不離開隊伍(不睡眠)
  • 一直盯著售票口(一直檢查)
  • 票一開賣馬上衝(立即取得)

📊 示意圖

Mutex(睡眠等待):
執行緒 B: [嘗試鎖]→ 😴 睡覺... → 被喚醒 → [取得鎖]
                    (Context Switch)

Spinlock(忙碌等待):
執行緒 B: [嘗試鎖]→ 🔄🔄🔄 一直檢查... → [取得鎖]
                    (消耗 CPU 但無 Context Switch)

CPU 使用率:
Mutex:     ▓░░░░░░░░░░░▓▓▓▓ (睡眠時不佔用)
Spinlock:  ▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓ (一直佔用 CPU)

💻 程式碼範例

#include <atomic>

class Spinlock {
    std::atomic_flag flag_ = ATOMIC_FLAG_INIT;
    
public:
    void lock() {
        // 一直嘗試直到成功
        while (flag_.test_and_set(std::memory_order_acquire)) {
            // 空轉等待(可加入 CPU pause 指令優化)
            #ifdef __x86_64__
                __builtin_ia32_pause();  // x86 的 PAUSE 指令
            #endif
        }
    }
    
    void unlock() {
        flag_.clear(std::memory_order_release);
    }
};

// 使用範例
class Counter {
    int value_ = 0;
    Spinlock lock_;
    
public:
    void increment() {
        lock_.lock();
        ++value_;
        lock_.unlock();
    }
};

🔄 運作對比

關鍵時刻圖:

Mutex(等待 100 微秒):
[執行] → [睡眠 100μs] → [喚醒] → [執行]
         ↑              ↑
    Context Switch   Context Switch
    (~1-10 微秒)     (~1-10 微秒)

總成本:100 + 1 + 1 = 102 微秒

Spinlock(等待 5 微秒):
[執行] → [空轉 5μs] → [執行]
         ↑
    沒有 Context Switch!

總成本:5 微秒(但 CPU 一直忙碌)

✅ 優點

  • 低延遲:無 Context Switch
  • 適合短臨界區:鎖定時間 < 1 微秒時很快
  • 簡單實作:只需要原子操作

❌ 缺點

  • 浪費 CPU:等待時一直消耗 CPU
  • 可能活鎖:優先權反轉問題
  • 不適合長時間鎖定:會讓其他執行緒餓死

🎯 使用場景

  • 作業系統核心(Kernel)
  • 中斷處理程序
  • 非常短的臨界區(< 100 奈秒)

4. 讀寫鎖 (Read-Write Lock)

🎯 核心概念

想像一個「圖書館」:

  • 讀者:很多人可以同時看書(共享)
  • 作者:只能一個人寫書,寫的時候不能讀(獨佔)

📊 示意圖

時間軸:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━→

讀者 A:  [讀取────────]
讀者 B:    [讀取──────]     ← 可以同時讀
讀者 C:      [讀取────]

寫者 X:               [等待] [寫入] ← 等所有讀者結束
讀者 D:                      [等待] [讀取] ← 等寫者結束

狀態表:
┌─────────────┬─────────────┬─────────────┐
│   讀模式    │   讀模式    │   寫模式    │
├─────────────┼─────────────┼─────────────┤
│ 多人可讀    │ ← 切換 →    │ 只有一人    │
│ 無人可寫    │             │ 無人可讀寫  │
└─────────────┴─────────────┴─────────────┘

💻 程式碼範例

#include <shared_mutex>

class ThreadSafeCache {
    std::map<std::string, std::string> cache_;
    mutable std::shared_mutex mutex_;  // C++17
    
public:
    // 讀取(共享鎖)
    std::string get(const std::string& key) const {
        std::shared_lock<std::shared_mutex> lock(mutex_);  // 共享鎖
        auto it = cache_.find(key);
        return it != cache_.end() ? it->second : "";
    }
    
    // 寫入(獨佔鎖)
    void set(const std::string& key, const std::string& value) {
        std::unique_lock<std::shared_mutex> lock(mutex_);  // 獨佔鎖
        cache_[key] = value;
    }
    
    // 檢查存在(共享鎖)
    bool contains(const std::string& key) const {
        std::shared_lock<std::shared_mutex> lock(mutex_);
        return cache_.find(key) != cache_.end();
    }
};

🔄 執行流程

場景:5 個讀者,1 個寫者

步驟 1: 讀者 A, B, C 同時讀取
┌─────────────┐
│ 共享資源    │
│ (Data)      │
└─────────────┘
  ↑   ↑   ↑
  A   B   C    ← 共享鎖,可以同時讀

步驟 2: 寫者 X 嘗試寫入
┌─────────────┐
│ 共享資源    │
│ (Data)      │
└─────────────┘
  ↑   ↑   ↑
  A   B   C    ← 還在讀
      ⏰ X     ← 等待中(無法取得獨佔鎖)

步驟 3: 所有讀者結束,寫者開始
┌─────────────┐
│ 共享資源    │
│ (Data)      │
└─────────────┘
      ↑
      X        ← 獨佔鎖,只有它能存取

步驟 4: 寫者結束,新讀者進入
┌─────────────┐
│ 共享資源    │
│ (Data)      │
└─────────────┘
  ↑   ↑
  D   E        ← 新的讀者可以同時讀

✅ 優點

  • 讀取效能高:多個讀者不互斥
  • 適合讀多寫少:如快取系統
  • 公平性可調:可優先讀者或寫者

❌ 缺點

  • 寫者可能餓死:讀者不斷進入
  • 實作複雜:需要兩種鎖狀態
  • 額外開銷:比普通 Mutex 慢

🎯 使用場景

  • 快取系統(Redis, Memcached)
  • 組態檔讀取
  • 資料庫索引

5. 遞迴鎖 (Recursive Lock)

🎯 核心概念

想像「俄羅斯娃娃」:

  • 同一個人可以多次開門(同一執行緒可多次加鎖)
  • 但要記得開幾次就要關幾次

📊 示意圖

普通 Mutex(會死鎖):
void funcA() {
    mutex.lock();         // ✅ 第一次加鎖成功
    // ...
    funcB();              // 呼叫 funcB
}

void funcB() {
    mutex.lock();         // ❌ 死鎖!同一執行緒嘗試再次加鎖
    // ...
    mutex.unlock();
}

遞迴鎖(可以重入):
void funcA() {
    recursive_mutex.lock();  // ✅ 計數 = 1
    // ...
    funcB();
}

void funcB() {
    recursive_mutex.lock();  // ✅ 計數 = 2(同一執行緒)
    // ...
    recursive_mutex.unlock(); // 計數 = 1
}
// funcA 結束時 unlock()     // 計數 = 0(真正釋放)

鎖計數器:
執行位置         | lock() 次數 | unlock() 次數 | 計數器
----------------|------------|--------------|-------
進入 funcA       |     1      |      0       |   1
進入 funcB       |     2      |      0       |   2
離開 funcB       |     2      |      1       |   1
離開 funcA       |     2      |      2       |   0  ← 釋放

💻 程式碼範例

#include <mutex>

class FileSystem {
    std::recursive_mutex mutex_;
    std::string root_path_;
    
public:
    void createDirectory(const std::string& path) {
        std::lock_guard<std::recursive_mutex> lock(mutex_);
        
        // 建立父目錄(遞迴呼叫)
        auto parent = getParentPath(path);
        if (!parent.empty()) {
            ensureDirectoryExists(parent);  // 會再次加鎖!
        }
        
        // 建立目錄
        mkdir(path.c_str(), 0755);
    }
    
    void ensureDirectoryExists(const std::string& path) {
        std::lock_guard<std::recursive_mutex> lock(mutex_);  // 重入鎖
        
        if (!exists(path)) {
            createDirectory(path);  // 又呼叫回去了!
        }
    }
};

🔄 呼叫堆疊範例

呼叫過程:
createDirectory("/a/b/c/d")
  ├─ lock()  [計數=1]
  ├─ ensureDirectoryExists("/a/b/c")
  │    ├─ lock()  [計數=2]  ← 同一執行緒,允許!
  │    ├─ createDirectory("/a/b/c")
  │    │    ├─ lock()  [計數=3]
  │    │    ├─ ensureDirectoryExists("/a/b")
  │    │    │    └─ lock()  [計數=4]
  │    │    │    └─ unlock() [計數=3]
  │    │    └─ unlock() [計數=2]
  │    └─ unlock() [計數=1]
  └─ unlock() [計數=0]  ← 真正釋放

✅ 優點

  • 允許重入:遞迴函式不會死鎖
  • 簡化設計:不需追蹤是否已加鎖
  • 相容 OOP:物件方法互相呼叫方便

❌ 缺點

  • 效能較差:需要記錄擁有者和計數
  • 容易誤用:隱藏設計問題
  • 記憶體開銷:需要額外儲存資訊

🎯 使用場景

  • 樹狀結構遍歷
  • 遞迴演算法
  • 複雜物件方法呼叫

6. 條件變數 (Condition Variable)

🎯 核心概念

想像「餐廳叫號系統」:

  • 客人拿號碼牌等待(wait)
  • 廚師做好餐點通知(notify)
  • 被叫到號碼的去取餐(被喚醒)

📊 示意圖

生產者-消費者模型:

生產者執行緒:                消費者執行緒:
┌─────────────┐              ┌─────────────┐
│ 生產資料     │              │ 等待資料     │
│   ↓         │              │   ↓         │
│ lock()      │              │ lock()      │
│   ↓         │              │   ↓         │
│ queue.push()│              │ while(empty)│
│   ↓         │              │   cv.wait() │ ← 釋放鎖並睡眠
│ cv.notify() │ ────通知───→ │ (被喚醒)    │ ← 重新取得鎖
│   ↓         │              │   ↓         │
│ unlock()    │              │ queue.pop() │
└─────────────┘              │   ↓         │
                             │ unlock()    │
                             └─────────────┘

時間序列:
消費者: [lock]→[檢查空]→[wait 睡眠......]→[被喚醒]→[pop]→[unlock]
生產者:                  [lock]→[push]→[notify]→[unlock]
                                        ↑
                                    喚醒訊號

💻 程式碼範例

#include <condition_variable>
#include <mutex>
#include <queue>

template<typename T>
class BlockingQueue {
    std::queue<T> queue_;
    std::mutex mutex_;
    std::condition_variable cv_;
    
public:
    // 生產者:放入資料
    void push(T value) {
        std::unique_lock<std::mutex> lock(mutex_);
        queue_.push(value);
        cv_.notify_one();  // 通知一個等待的消費者
    }
    
    // 消費者:取出資料(會阻塞)
    T pop() {
        std::unique_lock<std::mutex> lock(mutex_);
        
        // 等待直到佇列不為空
        cv_.wait(lock, [this] { return !queue_.empty(); });
        
        T value = queue_.front();
        queue_.pop();
        return value;
    }
    
    // 嘗試取出(不阻塞)
    bool try_pop(T& value, std::chrono::milliseconds timeout) {
        std::unique_lock<std::mutex> lock(mutex_);
        
        if (cv_.wait_for(lock, timeout, [this] { return !queue_.empty(); })) {
            value = queue_.front();
            queue_.pop();
            return true;
        }
        return false;  // 逾時
    }
};

🔄 詳細執行流程

初始狀態:佇列空

步驟 1: 消費者 A 嘗試 pop()
┌──────────┐
│ mutex_   │ ← A 取得
├──────────┤
│ queue_   │
│ (空)     │
└──────────┘
消費者 A: "佇列是空的,我要等待"
         cv_.wait() → 釋放 mutex_ → 進入睡眠

步驟 2: 生產者 B 執行 push("資料")
┌──────────┐
│ mutex_   │ ← B 取得(因為 A 釋放了)
├──────────┤
│ queue_   │
│ [資料]   │ ← B 放入
└──────────┘
生產者 B: "通知等待的人"
         cv_.notify_one() → 喚醒 A
         釋放 mutex_

步驟 3: 消費者 A 被喚醒
┌──────────┐
│ mutex_   │ ← A 重新取得
├──────────┤
│ queue_   │
│ [資料]   │ ← A 取出
└──────────┘
消費者 A: "拿到資料了!"
         返回 "資料"
         釋放 mutex_

⚠️ 常見陷阱:虛假喚醒

// ❌ 錯誤寫法(可能虛假喚醒)
cv_.wait(lock);
T value = queue_.front();  // 可能 queue_ 還是空的!

// ✅ 正確寫法(用 predicate)
cv_.wait(lock, [this] { return !queue_.empty(); });
T value = queue_.front();  // 保證有資料

// 等價於:
while (queue_.empty()) {
    cv_.wait(lock);  // 被喚醒後重新檢查
}

✅ 優點

  • 高效等待:不需要忙碌輪詢
  • 自動管理:配合 RAII 不會忘記解鎖
  • 支援逾時:wait_for, wait_until

❌ 缺點

  • 需搭配 Mutex:不能單獨使用
  • 虛假喚醒:需要用 while 迴圈
  • 效能開銷:系統呼叫

🎯 使用場景

  • 生產者-消費者佇列
  • 執行緒池
  • 事件通知系統

7. 信號量 (Semaphore)

🎯 核心概念

想像「停車場」:

  • 有 N 個停車位(資源數量)
  • 車進來 → 可用車位 -1(acquire)
  • 車出去 → 可用車位 +1(release)
  • 車位滿了就在外面等

📊 示意圖

二元信號量(Binary Semaphore)= Mutex
┌─────┐
│  1  │ ← 初始值
└─────┘

執行緒 A:  acquire() → 0 → [執行] → release() → 1
執行緒 B:              ↑ 等待(值=0)              ↑ 被喚醒

計數信號量(Counting Semaphore):資源池
┌─────┐
│  3  │ ← 有 3 個資源
└─────┘

執行緒 A: acquire() → 2
執行緒 B: acquire() → 1
執行緒 C: acquire() → 0
執行緒 D: acquire() → [阻塞,等待有人 release()]

視覺化:
停車場(容量 3):
[🚗] [🚗] [🚗]  ← 滿了,執行緒 D 在外面等
 A    B    C

執行緒 A 離開:
[  ] [🚗] [🚗]  ← 有空位了!執行緒 D 進來
     B    C
[🚗]
 D

💻 程式碼範例

#include <semaphore>  // C++20

// 資料庫連線池(最多 5 個連線)
class ConnectionPool {
    std::counting_semaphore<5> semaphore_{5};  // 初始值 5
    std::vector<Connection*> connections_;
    std::mutex mutex_;
    
public:
    ConnectionPool() {
        // 初始化 5 個連線
        for (int i = 0; i < 5; ++i) {
            connections_.push_back(new Connection());
        }
    }
    
    // 取得連線(會阻塞直到有可用連線)
    Connection* acquire() {
        semaphore_.acquire();  // 可用數 -1(可能阻塞)
        
        std::lock_guard<std::mutex> lock(mutex_);
        Connection* conn = connections_.back();
        connections_.pop_back();
        return conn;
    }
    
    // 歸還連線
    void release(Connection* conn) {
        std::lock_guard<std::mutex> lock(mutex_);
        connections_.push_back(conn);
        
        semaphore_.release();  // 可用數 +1(喚醒等待者)
    }
    
    // 嘗試取得(不阻塞)
    Connection* try_acquire() {
        if (semaphore_.try_acquire()) {  // 成功返回 true
            std::lock_guard<std::mutex> lock(mutex_);
            Connection* conn = connections_.back();
            connections_.pop_back();
            return conn;
        }
        return nullptr;  // 無可用連線
    }
};

// 使用範例
void worker_thread(ConnectionPool& pool) {
    Connection* conn = pool.acquire();  // 等待可用連線
    
    // 使用連線執行查詢
    conn->execute("SELECT * FROM users");
    
    pool.release(conn);  // 歸還連線
}

🔄 執行時序

場景:5 個連線的資料庫池

時刻 0: 信號量值 = 5
┌───┬───┬───┬───┬───┐
│ 1 │ 2 │ 3 │ 4 │ 5 │ 所有連線都可用
└───┴───┴───┴───┴───┘

時刻 1: 執行緒 A, B, C, D, E 都 acquire()
信號量值 = 0
┌───┬───┬───┬───┬───┐
│ A │ B │ C │ D │ E │ 連線全部被佔用
└───┴───┴───┴───┴───┘

時刻 2: 執行緒 F, G 嘗試 acquire()
┌─────────┐
│ 等待佇列│
│   F     │ ← 阻塞中
│   G     │ ← 阻塞中
└─────────┘

時刻 3: 執行緒 A release()
信號量值 = 1 → 喚醒 F
┌───┬───┬───┬───┬───┐
│ F │ B │ C │ D │ E │
└───┴───┴───┴───┴───┘
等待佇列:[G]

時刻 4: 執行緒 B release()
信號量值 = 1 → 喚醒 G
┌───┬───┬───┬───┬───┐
│ F │ G │ C │ D │ E │
└───┴───┴───┴───┴───┘

🆚 Semaphore vs Mutex 對比

┌─────────────┬──────────────┬─────────────┐
│   特性      │   Mutex      │  Semaphore  │
├─────────────┼──────────────┼─────────────┤
│ 初始值      │ 1 (unlocked) │ N (可設定)  │
│ 誰能釋放    │ 只有擁有者   │ 任何執行緒  │
│ 用途        │ 互斥存取     │ 資源計數    │
│ 可重入      │ 需 recursive │ 不支援      │
└─────────────┴──────────────┴─────────────┘

✅ 優點

  • 限制並行數:控制資源使用
  • 簡單直觀:概念容易理解
  • 彈性高:支援多種語義(二元、計數)

❌ 缺點

  • 容易誤用:release 不當會破壞計數
  • 無擁有權概念:任何人都能 release
  • 除錯困難:信號量洩漏難以追蹤

🎯 使用場景

  • 連線池、執行緒池
  • 限流器(Rate Limiter)
  • 資源池管理

8. 柵欄 (Barrier)

🎯 核心概念

想像「團體旅遊集合點」:

  • 所有人都到了才能繼續
  • 早到的人等晚到的
  • 最後一人到達 → 所有人一起出發

📊 示意圖

5 個執行緒使用 Barrier(5):

執行緒 1:  ───────────●──→ [等待....]
執行緒 2:  ──────●───────→ [等待....]
執行緒 3:  ────●─────────→ [等待....]
執行緒 4:  ──●───────────→ [等待....]
執行緒 5:  ─────────────●→ [最後到達,釋放所有人!]
                         ↓
執行緒 1:                ──────────→ [繼續執行]
執行緒 2:                ──────────→ [繼續執行]
執行緒 3:                ──────────→ [繼續執行]
執行緒 4:                ──────────→ [繼續執行]
執行緒 5:                ──────────→ [繼續執行]

集合點示意:
到達計數器:0/5 → 1/5 → 2/5 → 3/5 → 4/5 → 5/5 ✓
            ↑     ↑     ↑     ↑     ↑     ↑
           T4    T2    T3    T1    T5   (全員到齊!)

💻 程式碼範例

#include <barrier>  // C++20

// 多階段平行計算
class ParallelProcessor {
    std::barrier<> sync_point_;  // 同步點
    std::vector<std::thread> workers_;
    
public:
    ParallelProcessor(int num_threads) 
        : sync_point_(num_threads) {  // 初始化柵欄
        
        for (int i = 0; i < num_threads; ++i) {
            workers_.emplace_back([this, i] {
                worker_task(i);
            });
        }
    }
    
    void worker_task(int id) {
        // 階段 1:讀取資料
        auto data = read_data(id);
        sync_point_.arrive_and_wait();  // 等所有人讀完
        
        // 階段 2:處理資料
        auto result = process_data(data);
        sync_point_.arrive_and_wait();  // 等所有人處理完
        
        // 階段 3:寫入結果
        write_result(id, result);
        sync_point_.arrive_and_wait();  // 等所有人寫完
    }
};

// 使用回調的版本(C++20)
void parallel_map_reduce() {
    const int N = 4;
    std::vector<int> results(N);
    
    // Barrier 完成時執行的回調
    auto on_completion = [&]() noexcept {
        int sum = 0;
        for (int r : results) sum += r;
        std::cout << "總和: " << sum << std::endl;
    };
    
    std::barrier sync_point(N, on_completion);
    
    std::vector<std::thread> threads;
    for (int i = 0; i < N; ++i) {
        threads.emplace_back([&, i] {
            results[i] = compute(i);
            sync_point.arrive_and_wait();  // 最後一人觸發回調
        });
    }
    
    for (auto& t : threads) t.join();
}

🔄 執行時序範例

場景:4 個執行緒計算矩陣

階段 1:每個執行緒計算自己的列
┌─────────────────────────────────┐
│ Thread 0: [計算列 0] → 完成 ●   │
│ Thread 1: [計算列 1] → 完成 ●   │
│ Thread 2: [計算列 2........] →  │ ← 還在算
│ Thread 3: [計算列 3] → 完成 ●   │
└─────────────────────────────────┘
到達計數:3/4(等待 Thread 2)

Thread 2 完成:
┌─────────────────────────────────┐
│ Thread 0: [等待]                │
│ Thread 1: [等待]                │
│ Thread 2: [計算列 2] → 完成 ● → │ 全員到齊!
│ Thread 3: [等待]                │
└─────────────────────────────────┘
到達計數:4/4 ✓ → 所有人繼續

階段 2:同步後開始下一階段
┌─────────────────────────────────┐
│ Thread 0: → [正規化列 0]        │
│ Thread 1: → [正規化列 1]        │
│ Thread 2: → [正規化列 2]        │
│ Thread 3: → [正規化列 3]        │
└─────────────────────────────────┘

🆚 Barrier vs Condition Variable

┌────────────┬─────────────────┬──────────────────┐
│   比較項   │    Barrier      │ Condition Var    │
├────────────┼─────────────────┼──────────────────┤
│ 喚醒方式   │ 最後一人到達    │ 明確 notify      │
│ 等待語義   │ 全員到齊        │ 條件滿足         │
│ 重複使用   │ 自動重置        │ 需手動管理       │
│ 適用場景   │ 多階段同步      │ 生產者-消費者    │
└────────────┴─────────────────┴──────────────────┘

✅ 優點

  • 簡化多階段同步:不需複雜的計數邏輯
  • 自動重置:可重複使用
  • 支援回調:完成時自動執行

❌ 缺點

  • 固定參與者數量:無法動態調整
  • 全員阻塞:一個慢拖累全部
  • 需 C++20:舊編譯器不支援

🎯 使用場景

  • 平行演算法(MapReduce)
  • GPU 計算同步
  • 遊戲引擎幀同步

9. 順序鎖 (Seqlock)

🎯 核心概念

想像「博物館展品」:

  • 參觀者:隨時可以看(讀者)
  • 管理員:偶爾換展品(寫者)
  • 參觀者規則:如果看到管理員在換,重新看一次

📊 示意圖

資料結構:
┌─────────────────────┐
│ sequence: 0 (偶數)  │ ← 版本號
│ data: {x:10, y:20}  │ ← 實際資料
└─────────────────────┘

寫入過程:
步驟 1: 寫者開始
sequence = 1 (奇數)  ← "正在修改中"
寫者: 寫入 x = 15
寫者: 寫入 y = 25
sequence = 2 (偶數)  ← "修改完成"

讀取過程(樂觀讀取):
┌─────────────────────────────────────┐
│ 讀者開始:                            │
│   seq1 = sequence.load()  // 記錄版本│
│   ↓                                  │
│   讀取 data                          │
│   ↓                                  │
│   seq2 = sequence.load()  // 再次檢查│
│   ↓                                  │
│   if (seq1 == seq2 && seq1 % 2 == 0) │
│     → 資料一致,使用它               │
│   else                               │
│     → 重試!                         │
└─────────────────────────────────────┘

時序圖:
讀者 A:  [seq=0] → 讀取 → [seq=0] ✓ 成功
寫者:                  [seq=1] 寫入中 [seq=2]
讀者 B:          [seq=0] → 讀取 → [seq=2] ✗ 重試
                         [seq=2] → 讀取 → [seq=2] ✓ 成功

💻 程式碼範例

#include <atomic>

template<typename T>
class Seqlock {
    std::atomic<unsigned> sequence_{0};
    T data_;
    
public:
    // 寫入(獨佔)
    void write(const T& new_data) {
        // 步驟 1:增加版本號(變奇數)
        unsigned seq = sequence_.load(std::memory_order_relaxed);
        sequence_.store(seq + 1, std::memory_order_release);
        
        // 步驟 2:寫入資料
        data_ = new_data;
        
        // 步驟 3:再次增加版本號(變偶數)
        sequence_.store(seq + 2, std::memory_order_release);
    }
    
    // 讀取(樂觀)
    T read() const {
        T result;
        unsigned seq1, seq2;
        
        do {
            // 記錄版本號
            seq1 = sequence_.load(std::memory_order_acquire);
            
            // 如果是奇數,表示正在寫入,等一下
            while (seq1 & 1) {
                seq1 = sequence_.load(std::memory_order_acquire);
            }
            
            // 讀取資料
            result = data_;
            
            // 檢查版本號是否改變
            seq2 = sequence_.load(std::memory_order_acquire);
            
        } while (seq1 != seq2);  // 不一致就重試
        
        return result;
    }
};

// 實際應用:座標追蹤
struct Position {
    double x, y, z;
    uint64_t timestamp;
};

Seqlock<Position> player_position;

// 遊戲執行緒(寫者)
void update_position(double x, double y, double z) {
    Position pos{x, y, z, get_timestamp()};
    player_position.write(pos);
}

// 渲染執行緒(讀者)
void render_player() {
    Position pos = player_position.read();
    draw_at(pos.x, pos.y, pos.z);
}

🔄 競爭條件處理

場景:讀者在寫者修改期間讀取

初始狀態:
┌────────────────────┐
│ sequence = 100     │
│ x = 10, y = 20     │
└────────────────────┘

時刻 T1:讀者開始讀取
讀者: seq1 = 100 (偶數,OK)
讀者: 讀取 x = 10

時刻 T2:寫者開始寫入(在讀者讀完之前)
寫者: sequence = 101 (奇數)  ← 標記"正在寫"
寫者: x = 15
寫者: y = 25
寫者: sequence = 102 (偶數)

時刻 T3:讀者讀完檢查
讀者: 讀取 y = 25  ← 可能是新值!
讀者: seq2 = 102
讀者: seq1(100) != seq2(102) → 資料不一致!
讀者: 重試...

時刻 T4:讀者重試
讀者: seq1 = 102
讀者: 讀取 x = 15, y = 25
讀者: seq2 = 102
讀者: seq1 == seq2 → 成功!✓

🆚 Seqlock vs RWLock

┌─────────────┬──────────────┬─────────────┐
│   特性      │   Seqlock    │   RWLock    │
├─────────────┼──────────────┼─────────────┤
│ 讀取成本    │ 無鎖(快)   │ 需加鎖(慢)│
│ 寫入成本    │ 無鎖         │ 獨佔鎖      │
│ 讀失敗處理  │ 重試         │ 阻塞等待    │
│ 資料一致性  │ 最終一致     │ 強一致      │
│ 適用場景    │ 讀多寫少     │ 讀多寫中    │
└─────────────┴──────────────┴─────────────┘

✅ 優點

  • 讀取超快:完全無鎖
  • 寫者不阻塞讀者:讀者自己重試
  • 無死鎖:沒有鎖就不會卡

❌ 缺點

  • 讀者可能餓死:頻繁寫入時一直重試
  • 資料大小限制:複製成本高
  • 寫者需互斥:多寫者需額外機制

🎯 使用場景

  • 即時系統(股票價格)
  • 遊戲引擎(位置更新)
  • 系統監控(統計資料)

10. 效能比較總表

📊 延遲對比

操作延遲(單位:奈秒,以現代 x86-64 為準):

Lock-Free (CAS):      ~10-50 ns
Spinlock:             ~50-100 ns
Mutex (無競爭):       ~50-100 ns
Mutex (有競爭):       ~1,000-10,000 ns  ← Context Switch
RWLock (讀):          ~100-200 ns
RWLock (寫):          ~1,000-10,000 ns
Semaphore:            ~1,000-5,000 ns
Condition Variable:   ~1,000-10,000 ns

圖示:
Lock-Free  ▓░░░░░░░░░░░░░░░░░░░░  (極快)
Spinlock   ▓▓░░░░░░░░░░░░░░░░░░░  (快)
Mutex(無)  ▓▓░░░░░░░░░░░░░░░░░░░  (快)
Mutex(有)  ▓▓▓▓▓▓▓▓▓▓░░░░░░░░░░  (慢)
Semaphore  ▓▓▓▓▓▓▓▓░░░░░░░░░░░░  (中慢)

🎯 使用場景選擇表

┌────────────────────┬─────────────────────────────────┐
│     場景           │          推薦鎖機制              │
├────────────────────┼─────────────────────────────────┤
│ SPSC 佇列          │ Lock-Free Queue                 │
│ 共享計數器         │ Atomic / Spinlock               │
│ 銀行轉帳           │ Mutex                           │
│ 快取系統           │ RWLock / Seqlock                │
│ 遞迴函式保護       │ Recursive Mutex                 │
│ 生產者-消費者      │ Condition Variable              │
│ 連線池             │ Semaphore                       │
│ 平行演算法同步     │ Barrier                         │
│ 即時股價更新       │ Seqlock                         │
│ 核心 Interrupt     │ Spinlock                        │
└────────────────────┴─────────────────────────────────┘

🔍 決策樹

開始
  │
  ├─ 是否需要共享資料?
  │   ├─ 否 → 無需鎖(Thread-Local Storage)
  │   └─ 是 ↓
  │
  ├─ 是否為 SPSC 模式?
  │   ├─ 是 → Lock-Free Queue
  │   └─ 否 ↓
  │
  ├─ 臨界區是否極短(< 100ns)?
  │   ├─ 是 → Spinlock / Atomic
  │   └─ 否 ↓
  │
  ├─ 是否讀多寫少(讀寫比 > 10:1)?
  │   ├─ 是 → RWLock / Seqlock
  │   └─ 否 ↓
  │
  ├─ 是否需要限制資源數量?
  │   ├─ 是 → Semaphore
  │   └─ 否 ↓
  │
  ├─ 是否需要條件等待?
  │   ├─ 是 → Condition Variable
  │   └─ 否 ↓
  │
  ├─ 是否有遞迴呼叫?
  │   ├─ 是 → Recursive Mutex
  │   └─ 否 ↓
  │
  └─ 預設選擇 → Mutex

⚡ 優化建議

1. 減少鎖粒度
   ❌ 整個物件一個大鎖
   ✅ 不同欄位用不同鎖

2. 避免鎖順序死鎖
   ❌ 執行緒 A: lock(L1) → lock(L2)
      執行緒 B: lock(L2) → lock(L1)
   ✅ 統一鎖順序:都先 L1 再 L2

3. 使用 RAII 管理鎖
   ❌ mutex.lock();
      do_something();
      mutex.unlock();  // 可能忘記
   ✅ std::lock_guard<std::mutex> lock(mutex);

4. 考慮無鎖替代
   ❌ mutex + counter++
   ✅ std::atomic<int> counter;
      counter.fetch_add(1);

5. 批次操作減少鎖次數
   ❌ for (item : items) {
        lock();
        process(item);
        unlock();
      }
   ✅ lock();
      for (item : items) {
        process(item);
      }
      unlock();

📝 總結

快速記憶口訣

🚀 速度要求高?        → Lock-Free / Spinlock
🔐 簡單互斥就好?      → Mutex
📖 讀多寫少?          → RWLock / Seqlock
🔁 會遞迴呼叫?        → Recursive Mutex
⏰ 需要條件等待?      → Condition Variable
🎫 限制資源數量?      → Semaphore
🚧 多階段同步?        → Barrier

常見錯誤

❌ 忘記解鎖               → 使用 RAII (lock_guard)
❌ 死鎖                   → 統一鎖順序
❌ 虛假喚醒               → Condition Variable 用 while
❌ 過度使用 Spinlock      → 長臨界區用 Mutex
❌ 忽略記憶體順序         → Atomic 正確使用 memory_order

📚 延伸閱讀


文件版本: 1.0
最後更新: 2026-01-09
授權: CC BY-NC-SA 4.0


附錄:實用程式碼片段

A1. 簡易效能測試框架

#include <chrono>
#include <iostream>

template<typename Func>
double benchmark(Func f, int iterations = 1000000) {
    auto start = std::chrono::high_resolution_clock::now();
    
    for (int i = 0; i < iterations; ++i) {
        f();
    }
    
    auto end = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start);
    
    return duration.count() / (double)iterations;
}

// 使用範例
int main() {
    std::mutex mutex;
    int counter = 0;
    
    double avg_ns = benchmark([&] {
        std::lock_guard<std::mutex> lock(mutex);
        ++counter;
    });
    
    std::cout << "平均每次操作: " << avg_ns << " ns\n";
}

A2. 死鎖檢測器

#include <map>
#include <thread>
#include <mutex>

class DeadlockDetector {
    std::map<std::thread::id, std::vector<void*>> held_locks_;
    std::mutex detector_mutex_;
    
public:
    void acquire(void* lock) {
        std::lock_guard<std::mutex> guard(detector_mutex_);
        auto tid = std::this_thread::get_id();
        held_locks_[tid].push_back(lock);
        check_for_cycles(tid);
    }
    
    void release(void* lock) {
        std::lock_guard<std::mutex> guard(detector_mutex_);
        auto tid = std::this_thread::get_id();
        auto& locks = held_locks_[tid];
        locks.erase(std::remove(locks.begin(), locks.end(), lock), locks.end());
    }
    
private:
    void check_for_cycles(std::thread::id tid) {
        // 簡化版:檢查是否有執行緒持有多個鎖
        if (held_locks_[tid].size() > 1) {
            std::cerr << "警告:執行緒持有多個鎖,可能死鎖!\n";
        }
    }
};

希望這份指南能幫助你理解各種鎖機制!🎓