Go 並行機制完整指南 🐹
📑 目錄結構
這份指南分為以下部分:
第一部分:概覽與基礎
第二部分:高效能原語
第三部分:高級同步機制
第四部分:實戰與最佳實踐
📊 視覺化概覽
Go 並行的選擇流程圖:
┌─────────────────┐
│ 需要並行嗎? │
└─────┬───────────┘
│ 是
▼
┌─────────────────┐ ┌──────────────────┐
│ 簡單並行任務? │───▶│ 使用 Goroutine │
└─────┬───────────┘ 是 │ 🏃 協程 │
│ 否 └──────────────────┘
▼
┌─────────────────┐ ┌──────────────────┐
│ 執行緒間通訊? │───▶│ 使用 Channel │
└─────┬───────────┘ 是 │ 📡 通道 │
│ 否 └──────────────────┘
▼
┌─────────────────┐ ┌──────────────────┐
│ 共享記憶體? │───▶│ 使用 Mutex │
└─────┬───────────┘ 是 │ 🔒 互斥鎖 │
│ 否 └──────────────────┘
▼
┌─────────────────┐ ┌──────────────────┐
│ 多讀少寫? │───▶│ 使用 RWMutex │
└─────┬───────────┘ 是 │ 📖 讀寫鎖 │
│ 否 └──────────────────┘
▼
┌─────────────────┐ ┌──────────────────┐
│ 原子操作? │───▶│ 使用 Atomic │
└─────┬───────────┘ 是 │ ⚛️ 原子類型 │
│ 否 └──────────────────┘
▼
┌─────────────────┐
│ 組合使用多種 │
│ 🎯 混合模式 │
└─────────────────┘
效能與使用場景快速參考
| 類型 | 效能 | 使用場景 | 特點 |
|---|---|---|---|
Goroutine | 🥇 最快 | 並行任務 | 輕量級執行緒 |
Channel | 🥈 很快 | 執行緒通訊 | 類型安全通訊 |
sync/atomic | 🥉 快 | 原子操作 | 無鎖操作 |
RWMutex (讀) | 🏅 中等 | 多讀少寫 | 並行讀取 |
Mutex | 🏅 中等 | 基本互斥 | 簡單可靠 |
WaitGroup | 🏅 中等 | 同步等待 | 任務協調 |
Goroutine 和 Channel 基礎 🏃📡
白話解釋: Goroutine 像輕量級的工人,Channel 像他們之間的傳輸帶
Goroutine + Channel 工作示意圖:
Goroutine1: 🏃 ──┐
Goroutine2: 🏃 ──┼──▶ 📡 Channel ──▶ 🏃 Goroutine3
Goroutine3: 🏃 ──┘
基本 Goroutine 範例
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func basicGoroutineExample() {
fmt.Println("主執行緒開始")
fmt.Printf("CPU 核心數: %d\n", runtime.NumCPU())
var wg sync.WaitGroup
// 啟動多個 goroutine
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 3; j++ {
fmt.Printf("Goroutine %d 執行第 %d 次\n", id, j+1)
time.Sleep(100 * time.Millisecond)
}
fmt.Printf("Goroutine %d 完成\n", id)
}(i)
}
wg.Wait()
fmt.Println("所有 goroutine 完成")
}
func main() {
basicGoroutineExample()
}
Channel 基本使用範例
package main
import (
"fmt"
"time"
)
func basicChannelExample() {
// 無緩衝通道
ch := make(chan string)
// 發送者 goroutine
go func() {
messages := []string{"Hello", "World", "From", "Go"}
for _, msg := range messages {
fmt.Printf("發送: %s\n", msg)
ch <- msg
time.Sleep(500 * time.Millisecond)
}
close(ch)
}()
// 接收者
for msg := range ch {
fmt.Printf("接收: %s\n", msg)
}
}
// 緩衝通道範例
func bufferedChannelExample() {
// 建立緩衝通道,容量為3
ch := make(chan int, 3)
// 發送者
go func() {
for i := 1; i <= 5; i++ {
fmt.Printf("嘗試發送 %d\n", i)
ch <- i
fmt.Printf("成功發送 %d\n", i)
}
close(ch)
}()
// 接收者故意延遲
time.Sleep(2 * time.Second)
for value := range ch {
fmt.Printf("接收: %d\n", value)
time.Sleep(500 * time.Millisecond)
}
}
生產者-消費者範例
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Job struct {
ID int
Data string
}
type Result struct {
Job Job
Output string
Worker int
}
func producerConsumerExample() {
const numWorkers = 3
const numJobs = 10
jobs := make(chan Job, 5)
results := make(chan Result, 5)
var wg sync.WaitGroup
// 啟動工作者
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
// 結果收集器
go func() {
for result := range results {
fmt.Printf("結果: 工作者 %d 完成任務 %d - %s\n",
result.Worker, result.Job.ID, result.Output)
}
}()
// 生產者:發送工作
for j := 1; j <= numJobs; j++ {
job := Job{
ID: j,
Data: fmt.Sprintf("任務資料 %d", j),
}
jobs <- job
}
close(jobs)
wg.Wait()
close(results)
time.Sleep(100 * time.Millisecond) // 等待結果輸出
}
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("工作者 %d 開始處理任務 %d\n", id, job.ID)
// 模擬工作
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
result := Result{
Job: job,
Output: fmt.Sprintf("處理完成: %s", job.Data),
Worker: id,
}
results <- result
}
fmt.Printf("工作者 %d 結束\n", id)
}
Select 多路復用
package main
import (
"fmt"
"time"
)
func selectExample() {
ch1 := make(chan string)
ch2 := make(chan string)
quit := make(chan bool)
// 發送者1
go func() {
for i := 0; i < 5; i++ {
time.Sleep(1 * time.Second)
ch1 <- fmt.Sprintf("通道1訊息 %d", i)
}
}()
// 發送者2
go func() {
for i := 0; i < 5; i++ {
time.Sleep(1500 * time.Millisecond)
ch2 <- fmt.Sprintf("通道2訊息 %d", i)
}
}()
// 超時控制
go func() {
time.Sleep(8 * time.Second)
quit <- true
}()
// 選擇器
for {
select {
case msg1 := <-ch1:
fmt.Printf("收到通道1: %s\n", msg1)
case msg2 := <-ch2:
fmt.Printf("收到通道2: %s\n", msg2)
case <-quit:
fmt.Println("超時退出")
return
case <-time.After(500 * time.Millisecond):
fmt.Println("等待中...")
}
}
}
Mutex 和 RWMutex 🔒📖
白話解釋: Mutex 像廁所門鎖,一次只能一個人用;RWMutex 像圖書館,多人可以看書但寫字時要清場
Mutex vs RWMutex:
Mutex: 🚪🔒 (互斥存取)
RWMutex: 👀👀👀 或 ✍️🚫 (讀者並行,寫者獨佔)
基本 Mutex 範例
package main
import (
"fmt"
"sync"
"time"
)
type SafeCounter struct {
mu sync.Mutex
value int
}
func (c *SafeCounter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
fmt.Printf("計數器增加到: %d\n", c.value)
}
func (c *SafeCounter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func mutexExample() {
counter := &SafeCounter{}
var wg sync.WaitGroup
// 多個 goroutine 並行增加計數器
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 3; j++ {
counter.Increment()
time.Sleep(100 * time.Millisecond)
}
}(i)
}
wg.Wait()
fmt.Printf("最終計數: %d\n", counter.Value())
}
RWMutex 讀寫鎖範例
package main
import (
"fmt"
"sync"
"time"
)
type ConfigCache struct {
mu sync.RWMutex
settings map[string]string
version int
}
func NewConfigCache() *ConfigCache {
return &ConfigCache{
settings: make(map[string]string),
version: 1,
}
}
func (c *ConfigCache) Get(key string) (string, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
value, exists := c.settings[key]
fmt.Printf("讀取設定 %s: %s (版本: %d)\n", key, value, c.version)
return value, exists
}
func (c *ConfigCache) Set(key, value string) {
c.mu.Lock()
defer c.mu.Unlock()
c.settings[key] = value
c.version++
fmt.Printf("更新設定 %s = %s (新版本: %d)\n", key, value, c.version)
}
func (c *ConfigCache) GetAll() map[string]string {
c.mu.RLock()
defer c.mu.RUnlock()
// 複製 map 以避免外部修改
result := make(map[string]string)
for k, v := range c.settings {
result[k] = v
}
return result
}
func rwMutexExample() {
cache := NewConfigCache()
var wg sync.WaitGroup
// 初始化一些設定
cache.Set("theme", "dark")
cache.Set("language", "zh-TW")
// 多個讀者
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 3; j++ {
cache.Get("theme")
time.Sleep(100 * time.Millisecond)
}
}(i)
}
// 少數寫者
for i := 0; i < 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
time.Sleep(200 * time.Millisecond)
cache.Set("theme", fmt.Sprintf("theme_%d", id))
}(i)
}
wg.Wait()
fmt.Println("最終設定:")
for k, v := range cache.GetAll() {
fmt.Printf(" %s: %s\n", k, v)
}
}
效能比較範例
package main
import (
"fmt"
"sync"
"time"
)
func performanceComparison() {
const iterations = 100000
const goroutines = 10
// Mutex 測試
fmt.Println("測試 Mutex 效能...")
start := time.Now()
var mutex sync.Mutex
data := 0
var wg sync.WaitGroup
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < iterations; j++ {
mutex.Lock()
_ = data // 模擬讀取
mutex.Unlock()
}
}()
}
wg.Wait()
mutexTime := time.Since(start)
// RWMutex 測試 (只讀)
fmt.Println("測試 RWMutex 讀取效能...")
start = time.Now()
var rwMutex sync.RWMutex
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < iterations; j++ {
rwMutex.RLock()
_ = data // 模擬讀取
rwMutex.RUnlock()
}
}()
}
wg.Wait()
rwMutexTime := time.Since(start)
fmt.Printf("Mutex 時間: %v\n", mutexTime)
fmt.Printf("RWMutex 時間: %v\n", rwMutexTime)
fmt.Printf("RWMutex 比 Mutex 快 %.2fx\n",
float64(mutexTime.Nanoseconds())/float64(rwMutexTime.Nanoseconds()))
}
Sync 包原語 📦
WaitGroup 同步等待
package main
import (
"fmt"
"sync"
"time"
)
func waitGroupExample() {
var wg sync.WaitGroup
tasks := []string{"任務A", "任務B", "任務C", "任務D"}
fmt.Println("開始執行並行任務...")
for i, task := range tasks {
wg.Add(1)
go func(id int, taskName string) {
defer wg.Done()
fmt.Printf("開始 %s\n", taskName)
// 模擬不同的工作時間
time.Sleep(time.Duration(id+1) * 500 * time.Millisecond)
fmt.Printf("完成 %s\n", taskName)
}(i, task)
}
wg.Wait()
fmt.Println("所有任務完成!")
}
// 錯誤示範:WaitGroup 的常見錯誤
func waitGroupWrongExample() {
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
// ❌ 錯誤:在 goroutine 內部調用 Add
go func(id int) {
wg.Add(1) // 競爭條件!
defer wg.Done()
fmt.Printf("任務 %d 完成\n", id)
}(i)
}
wg.Wait() // 可能提前結束
}
// 正確示範
func waitGroupCorrectExample() {
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1) // ✅ 正確:在啟動 goroutine 前調用 Add
go func(id int) {
defer wg.Done()
fmt.Printf("任務 %d 完成\n", id)
}(i)
}
wg.Wait()
}
Once 單次執行
package main
import (
"fmt"
"sync"
)
type Singleton struct {
data string
}
var (
instance *Singleton
once sync.Once
)
func GetSingleton() *Singleton {
once.Do(func() {
fmt.Println("建立單例實例...")
instance = &Singleton{data: "我是單例"}
})
return instance
}
func onceExample() {
var wg sync.WaitGroup
// 多個 goroutine 嘗試獲取單例
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
singleton := GetSingleton()
fmt.Printf("Goroutine %d 獲得: %s\n", id, singleton.data)
}(i)
}
wg.Wait()
}
// 初始化函數範例
var config map[string]string
var configOnce sync.Once
func loadConfig() {
configOnce.Do(func() {
fmt.Println("載入配置文件...")
config = map[string]string{
"database_url": "localhost:5432",
"api_key": "secret123",
}
})
}
func getConfig(key string) string {
loadConfig() // 保證只執行一次
return config[key]
}
Cond 條件變數
package main
import (
"fmt"
"sync"
"time"
)
func condExample() {
var mu sync.Mutex
cond := sync.NewCond(&mu)
queue := make([]int, 0)
// 消費者
go func() {
mu.Lock()
defer mu.Unlock()
for len(queue) == 0 {
fmt.Println("消費者等待...")
cond.Wait() // 釋放鎖並等待
}
item := queue[0]
queue = queue[1:]
fmt.Printf("消費者取得: %d\n", item)
}()
// 生產者
go func() {
for i := 1; i <= 3; i++ {
time.Sleep(1 * time.Second)
mu.Lock()
queue = append(queue, i)
fmt.Printf("生產者新增: %d\n", i)
cond.Signal() // 通知等待的 goroutine
mu.Unlock()
}
}()
time.Sleep(5 * time.Second)
}
// 多消費者範例
func multiConsumerExample() {
var mu sync.Mutex
cond := sync.NewCond(&mu)
items := []string{"蘋果", "香蕉", "橘子"}
// 多個消費者
for i := 0; i < 3; i++ {
go func(id int) {
mu.Lock()
defer mu.Unlock()
for len(items) == 0 {
fmt.Printf("消費者 %d 等待中...\n", id)
cond.Wait()
}
if len(items) > 0 {
item := items[0]
items = items[1:]
fmt.Printf("消費者 %d 取得: %s\n", id, item)
}
}(i)
}
time.Sleep(1 * time.Second)
// 喚醒所有等待者
mu.Lock()
fmt.Println("生產者準備喚醒所有消費者")
cond.Broadcast()
mu.Unlock()
time.Sleep(1 * time.Second)
}
Atomic 原子操作 ⚛️
白話解釋: 原子操作像不可分割的動作,要嘛全做完,要嘛不做
基本原子操作
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
func basicAtomicExample() {
var counter int64
var wg sync.WaitGroup
// 啟動多個 goroutine 進行原子增加
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 1000; j++ {
atomic.AddInt64(&counter, 1)
}
fmt.Printf("Goroutine %d 完成\n", id)
}(i)
}
wg.Wait()
fmt.Printf("最終計數: %d\n", atomic.LoadInt64(&counter))
}
// 原子標誌範例
func atomicFlagExample() {
var running int32 = 1
var wg sync.WaitGroup
// 工作 goroutine
wg.Add(1)
go func() {
defer wg.Done()
for atomic.LoadInt32(&running) == 1 {
fmt.Println("工作中...")
time.Sleep(500 * time.Millisecond)
}
fmt.Println("工作結束")
}()
// 主執行緒等待後停止
time.Sleep(3 * time.Second)
atomic.StoreInt32(&running, 0)
wg.Wait()
}
Compare-And-Swap (CAS) 操作
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
"unsafe"
)
func casExample() {
var value int64 = 10
var wg sync.WaitGroup
// 多個 goroutine 嘗試將值翻倍
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for {
oldValue := atomic.LoadInt64(&value)
newValue := oldValue * 2
if atomic.CompareAndSwapInt64(&value, oldValue, newValue) {
fmt.Printf("Goroutine %d 成功將 %d 更新為 %d\n",
id, oldValue, newValue)
break
} else {
fmt.Printf("Goroutine %d CAS 失敗,重試...\n", id)
time.Sleep(1 * time.Millisecond)
}
}
}(i)
}
wg.Wait()
fmt.Printf("最終值: %d\n", atomic.LoadInt64(&value))
}
// 無鎖堆疊實現
type LockFreeStack struct {
head unsafe.Pointer
}
type node struct {
data int
next unsafe.Pointer
}
func (s *LockFreeStack) Push(data int) {
newNode := &node{data: data}
for {
oldHead := atomic.LoadPointer(&s.head)
newNode.next = oldHead
if atomic.CompareAndSwapPointer(&s.head, oldHead, unsafe.Pointer(newNode)) {
break
}
}
}
func (s *LockFreeStack) Pop() (int, bool) {
for {
oldHead := atomic.LoadPointer(&s.head)
if oldHead == nil {
return 0, false
}
oldNode := (*node)(oldHead)
newHead := atomic.LoadPointer(&oldNode.next)
if atomic.CompareAndSwapPointer(&s.head, oldHead, newHead) {
return oldNode.data, true
}
}
}
原子值 (atomic.Value)
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
type Config struct {
Host string
Port int
}
func atomicValueExample() {
var config atomic.Value
// 初始配置
config.Store(Config{Host: "localhost", Port: 8080})
var wg sync.WaitGroup
// 多個讀者
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 5; j++ {
cfg := config.Load().(Config)
fmt.Printf("讀者 %d: %s:%d\n", id, cfg.Host, cfg.Port)
time.Sleep(200 * time.Millisecond)
}
}(i)
}
// 寫者
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(1 * time.Second)
config.Store(Config{Host: "production", Port: 9090})
fmt.Println("配置已更新")
time.Sleep(1 * time.Second)
config.Store(Config{Host: "backup", Port: 7070})
fmt.Println("配置再次更新")
}()
wg.Wait()
}
Context 上下文 🎯
白話解釋: Context 像控制器,可以取消操作、設定超時、傳遞值
package main
import (
"context"
"fmt"
"time"
)
// 基本超時控制
func contextTimeoutExample() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
go func() {
select {
case <-time.After(3 * time.Second):
fmt.Println("工作完成")
case <-ctx.Done():
fmt.Printf("工作被取消: %v\n", ctx.Err())
}
}()
<-ctx.Done()
fmt.Println("主程式結束")
}
// 手動取消
func contextCancelExample() {
ctx, cancel := context.WithCancel(context.Background())
go func() {
for {
select {
case <-ctx.Done():
fmt.Printf("工作被取消: %v\n", ctx.Err())
return
default:
fmt.Println("工作進行中...")
time.Sleep(500 * time.Millisecond)
}
}
}()
time.Sleep(2 * time.Second)
fmt.Println("發送取消信號")
cancel()
time.Sleep(1 * time.Second)
}
// 值傳遞
func contextValueExample() {
type key string
ctx := context.WithValue(context.Background(), key("userID"), "12345")
ctx = context.WithValue(ctx, key("requestID"), "req-789")
processRequest(ctx)
}
func processRequest(ctx context.Context) {
userID := ctx.Value("userID")
requestID := ctx.Value("requestID")
fmt.Printf("處理請求 - 用戶ID: %v, 請求ID: %v\n", userID, requestID)
// 傳遞給下層函數
handleDatabase(ctx)
}
func handleDatabase(ctx context.Context) {
userID := ctx.Value("userID")
fmt.Printf("資料庫操作 - 用戶ID: %v\n", userID)
}
// 鏈式取消
func contextChainExample() {
// 根上下文,10秒超時
parentCtx, parentCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer parentCancel()
// 子上下文,5秒超時
childCtx, childCancel := context.WithTimeout(parentCtx, 5*time.Second)
defer childCancel()
// 孫上下文,手動取消
grandChildCtx, grandChildCancel := context.WithCancel(childCtx)
defer grandChildCancel()
go func() {
select {
case <-grandChildCtx.Done():
fmt.Printf("孫上下文結束: %v\n", grandChildCtx.Err())
}
}()
// 2秒後手動取消孫上下文
time.Sleep(2 * time.Second)
grandChildCancel()
time.Sleep(1 * time.Second)
}
HTTP 服務器範例
package main
import (
"context"
"fmt"
"net/http"
"time"
)
func httpServerExample() {
http.HandleFunc("/long-task", longTaskHandler)
server := &http.Server{
Addr: ":8080",
Handler: nil,
}
go func() {
fmt.Println("服務器啟動在 :8080")
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
fmt.Printf("服務器錯誤: %v\n", err)
}
}()
// 模擬運行10秒後關閉
time.Sleep(10 * time.Second)
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()
if err := server.Shutdown(shutdownCtx); err != nil {
fmt.Printf("服務器關閉錯誤: %v\n", err)
} else {
fmt.Println("服務器優雅關閉")
}
}
func longTaskHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// 模擬長時間任務
select {
case <-time.After(8 * time.Second):
fmt.Fprintf(w, "任務完成")
case <-ctx.Done():
fmt.Printf("請求被取消: %v\n", ctx.Err())
http.Error(w, "請求被取消", http.StatusRequestTimeout)
}
}
高級並行模式 🚀
Worker Pool 模式
package main
import (
"fmt"
"sync"
"time"
)
type WorkerPool struct {
workerCount int
jobs chan Job
results chan Result
wg sync.WaitGroup
}
type Job struct {
ID int
Data interface{}
}
type Result struct {
Job Job
Output interface{}
Error error
}
func NewWorkerPool(workerCount int) *WorkerPool {
return &WorkerPool{
workerCount: workerCount,
jobs: make(chan Job, workerCount*2),
results: make(chan Result, workerCount*2),
}
}
func (wp *WorkerPool) Start() {
for i := 0; i < wp.workerCount; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
}
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
for job := range wp.jobs {
fmt.Printf("工作者 %d 處理任務 %d\n", id, job.ID)
// 模擬處理時間
time.Sleep(time.Duration(job.ID%3+1) * 500 * time.Millisecond)
result := Result{
Job: job,
Output: fmt.Sprintf("任務 %d 的結果", job.ID),
Error: nil,
}
wp.results <- result
}
fmt.Printf("工作者 %d 結束\n", id)
}
func (wp *WorkerPool) Submit(job Job) {
wp.jobs <- job
}
func (wp *WorkerPool) Stop() {
close(wp.jobs)
wp.wg.Wait()
close(wp.results)
}
func (wp *WorkerPool) Results() <-chan Result {
return wp.results
}
func workerPoolExample() {
pool := NewWorkerPool(3)
pool.Start()
// 提交任務
go func() {
for i := 1; i <= 10; i++ {
pool.Submit(Job{ID: i, Data: fmt.Sprintf("data-%d", i)})
}
pool.Stop()
}()
// 收集結果
for result := range pool.Results() {
if result.Error != nil {
fmt.Printf("任務 %d 失敗: %v\n", result.Job.ID, result.Error)
} else {
fmt.Printf("收到結果: %v\n", result.Output)
}
}
}
Pipeline 管道模式
package main
import (
"fmt"
"sync"
)
// 階段1: 數字生成器
func numberGenerator(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// 階段2: 平方計算
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// 階段3: 結果收集
func collect(in <-chan int) []int {
var results []int
for n := range in {
results = append(results, n)
}
return results
}
func pipelineExample() {
// 建立管道
numbers := numberGenerator(1, 2, 3, 4, 5)
squares := square(numbers)
results := collect(squares)
fmt.Printf("結果: %v\n", results)
}
// 扇出-扇入模式
func fanOutFanInExample() {
numbers := numberGenerator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// 扇出: 多個工作者處理
worker1 := square(numbers)
worker2 := square(numbers)
worker3 := square(numbers)
// 扇入: 合併結果
merged := fanIn(worker1, worker2, worker3)
// 收集結果
for result := range merged {
fmt.Printf("結果: %d\n", result)
}
}
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 為每個輸入通道啟動一個 goroutine
multiplex := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(channels))
for _, c := range channels {
go multiplex(c)
}
// 等待所有輸入完成後關閉輸出通道
go func() {
wg.Wait()
close(out)
}()
return out
}
Publish-Subscribe 模式
package main
import (
"fmt"
"sync"
"time"
)
type PubSub struct {
mu sync.RWMutex
subscribers map[string][]chan interface{}
}
func NewPubSub() *PubSub {
return &PubSub{
subscribers: make(map[string][]chan interface{}),
}
}
func (ps *PubSub) Subscribe(topic string) <-chan interface{} {
ps.mu.Lock()
defer ps.mu.Unlock()
ch := make(chan interface{}, 1)
ps.subscribers[topic] = append(ps.subscribers[topic], ch)
return ch
}
func (ps *PubSub) Publish(topic string, data interface{}) {
ps.mu.RLock()
defer ps.mu.RUnlock()
for _, ch := range ps.subscribers[topic] {
select {
case ch <- data:
default:
// 非阻塞發送,避免慢消費者阻塞發布者
}
}
}
func (ps *PubSub) Unsubscribe(topic string, ch <-chan interface{}) {
ps.mu.Lock()
defer ps.mu.Unlock()
subs := ps.subscribers[topic]
for i, subscriber := range subs {
if subscriber == ch {
ps.subscribers[topic] = append(subs[:i], subs[i+1:]...)
close(subscriber)
break
}
}
}
func pubSubExample() {
ps := NewPubSub()
// 訂閱者1
news := ps.Subscribe("news")
go func() {
for msg := range news {
fmt.Printf("新聞訂閱者收到: %v\n", msg)
}
}()
// 訂閱者2
sports := ps.Subscribe("sports")
go func() {
for msg := range sports {
fmt.Printf("體育訂閱者收到: %v\n", msg)
}
}()
// 訂閱者3 (也訂閱新聞)
news2 := ps.Subscribe("news")
go func() {
for msg := range news2 {
fmt.Printf("新聞訂閱者2收到: %v\n", msg)
}
}()
// 發布訊息
time.Sleep(100 * time.Millisecond)
ps.Publish("news", "重要新聞:Go 1.22 發布")
ps.Publish("sports", "足球賽事:臺灣 vs 日本")
ps.Publish("news", "科技新聞:AI 新突破")
time.Sleep(1 * time.Second)
}
限制器 (Rate Limiter)
package main
import (
"context"
"fmt"
"sync"
"time"
)
type RateLimiter struct {
tokens chan struct{}
ticker *time.Ticker
done chan struct{}
}
func NewRateLimiter(rate int, capacity int) *RateLimiter {
rl := &RateLimiter{
tokens: make(chan struct{}, capacity),
ticker: time.NewTicker(time.Second / time.Duration(rate)),
done: make(chan struct{}),
}
// 初始填滿令牌桶
for i := 0; i < capacity; i++ {
rl.tokens <- struct{}{}
}
// 定期添加令牌
go func() {
for {
select {
case <-rl.ticker.C:
select {
case rl.tokens <- struct{}{}:
default:
// 桶已滿,丟棄令牌
}
case <-rl.done:
return
}
}
}()
return rl
}
func (rl *RateLimiter) Allow() bool {
select {
case <-rl.tokens:
return true
default:
return false
}
}
func (rl *RateLimiter) Wait(ctx context.Context) error {
select {
case <-rl.tokens:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (rl *RateLimiter) Stop() {
rl.ticker.Stop()
close(rl.done)
}
func rateLimiterExample() {
limiter := NewRateLimiter(2, 5) // 每秒2個請求,容量5
defer limiter.Stop()
var wg sync.WaitGroup
// 模擬10個並發請求
for i := 1; i <= 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
start := time.Now()
if err := limiter.Wait(ctx); err != nil {
fmt.Printf("請求 %d 超時: %v\n", id, err)
return
}
fmt.Printf("請求 %d 通過,等待時間: %v\n", id, time.Since(start))
}(i)
}
wg.Wait()
}
選擇指南與最佳實踐 🎯
完整選擇決策樹
/*
Go 並行原語選擇指南:
1. 需要並行執行嗎?
└─ 否 → 順序執行
└─ 是 → 繼續
2. 執行緒間需要通訊嗎?
├─ 需要 → Channel (推薦)
│ ├─ 一對一 → 無緩衝 Channel
│ ├─ 一對多 → 緩衝 Channel
│ ├─ 多對一 → 工作者池
│ └─ 複雜路由 → Select + 多 Channel
└─ 不需要 → 繼續
3. 需要共享狀態嗎?
├─ 簡單計數/標誌 → Atomic
├─ 複雜資料結構 → Mutex/RWMutex
│ ├─ 多讀少寫 → RWMutex
│ └─ 讀寫平衡 → Mutex
└─ 不需要 → Goroutine + WaitGroup
4. 需要取消/超時嗎?
└─ 是 → Context
5. 需要同步等待嗎?
├─ 等待多個任務完成 → WaitGroup
├─ 單次初始化 → Once
└─ 條件等待 → Cond
記住:優先使用 Channel,它是 Go 的核心設計理念
*/
效能對比表
| 同步原語 | 延遲 | 吞吐量 | 記憶體使用 | 複雜度 | 適用場景 |
|---|---|---|---|---|---|
Goroutine | 🟢 極低 | 🟢 極高 | 🟢 極小 | 🟢 簡單 | 並行任務 |
Channel | 🟡 中等 | 🟢 高 | 🟡 中等 | 🟢 簡單 | 執行緒通訊 |
sync/atomic | 🟢 極低 | 🟢 極高 | 🟢 極小 | 🟡 中等 | 原子操作 |
RWMutex (讀) | 🟢 低 | 🟢 高 | 🟡 中等 | 🟡 中等 | 多讀少寫 |
Mutex | 🟡 中等 | 🟡 中等 | 🟡 中等 | 🟢 簡單 | 基本互斥 |
WaitGroup | 🟡 中等 | N/A | 🟢 小 | 🟢 簡單 | 同步等待 |
Context | 🟡 中等 | N/A | 🟡 中等 | 🟡 中等 | 取消控制 |
最佳實踐指南
1. Goroutine 管理
package main
import (
"context"
"fmt"
"sync"
"time"
)
// ✅ 好的模式:明確的生命週期管理
func goodGoroutineManagement() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
worker(ctx, id)
}(i)
}
wg.Wait()
fmt.Println("所有工作者完成")
}
func worker(ctx context.Context, id int) {
for {
select {
case <-ctx.Done():
fmt.Printf("工作者 %d 收到取消信號\n", id)
return
case <-time.After(500 * time.Millisecond):
fmt.Printf("工作者 %d 工作中\n", id)
}
}
}
// ❌ 避免的模式:洩漏 goroutine
func avoidGoroutineLeak() {
ch := make(chan int)
// 這個 goroutine 可能永遠不會結束
go func() {
for {
select {
case n := <-ch:
fmt.Println(n)
// 缺少退出條件!
}
}
}()
// 如果沒有發送資料,goroutine 會洩漏
}
2. Channel 最佳實踐
// ✅ 好的模式:適當的 Channel 緩衝
func goodChannelBuffering() {
// 無緩衝:用於同步
sync := make(chan bool)
go func() {
// 做一些工作
time.Sleep(1 * time.Second)
sync <- true // 同步信號
}()
<-sync // 等待完成
// 有緩衝:用於解耦
buffer := make(chan int, 10)
// 生產者
go func() {
for i := 0; i < 5; i++ {
buffer <- i
}
close(buffer)
}()
// 消費者
for val := range buffer {
fmt.Println(val)
}
}
// ✅ 好的模式:Channel 方向
func goodChannelDirection() {
ch := make(chan int, 5)
// 只能發送
go producer(ch)
// 只能接收
consumer(ch)
}
func producer(ch chan<- int) {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch)
}
func consumer(ch <-chan int) {
for val := range ch {
fmt.Printf("消費: %d\n", val)
}
}
3. 錯誤處理
package main
import (
"fmt"
"sync"
"time"
)
// 強健的錯誤處理
type WorkerResult struct {
ID int
Data interface{}
Error error
}
func robustWorkerPattern() {
jobs := make(chan int, 10)
results := make(chan WorkerResult, 10)
var wg sync.WaitGroup
// 啟動工作者
for i := 0; i < 3; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
results <- WorkerResult{
ID: workerID,
Error: fmt.Errorf("panic: %v", r),
}
}
}()
for job := range jobs {
result := processJob(workerID, job)
results <- result
}
}(i)
}
// 發送任務
go func() {
for i := 1; i <= 10; i++ {
jobs <- i
}
close(jobs)
}()
// 收集結果
go func() {
wg.Wait()
close(results)
}()
// 處理結果
for result := range results {
if result.Error != nil {
fmt.Printf("工作者 %d 錯誤: %v\n", result.ID, result.Error)
} else {
fmt.Printf("工作者 %d 完成: %v\n", result.ID, result.Data)
}
}
}
func processJob(workerID, job int) WorkerResult {
// 模擬可能失敗的工作
if job%7 == 0 {
return WorkerResult{
ID: workerID,
Error: fmt.Errorf("任務 %d 失敗", job),
}
}
time.Sleep(100 * time.Millisecond)
return WorkerResult{
ID: workerID,
Data: fmt.Sprintf("任務 %d 完成", job),
}
}
4. 效能優化技巧
package main
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
)
// 效能監控
type PerformanceMonitor struct {
goroutineCount int64
requestCount int64
errorCount int64
}
func (pm *PerformanceMonitor) IncrementGoroutine() {
atomic.AddInt64(&pm.goroutineCount, 1)
}
func (pm *PerformanceMonitor) DecrementGoroutine() {
atomic.AddInt64(&pm.goroutineCount, -1)
}
func (pm *PerformanceMonitor) IncrementRequest() {
atomic.AddInt64(&pm.requestCount, 1)
}
func (pm *PerformanceMonitor) IncrementError() {
atomic.AddInt64(&pm.errorCount, 1)
}
func (pm *PerformanceMonitor) Report() {
goroutines := atomic.LoadInt64(&pm.goroutineCount)
requests := atomic.LoadInt64(&pm.requestCount)
errors := atomic.LoadInt64(&pm.errorCount)
fmt.Printf("📊 效能報告:\n")
fmt.Printf(" 活躍 Goroutine: %d\n", goroutines)
fmt.Printf(" 系統 Goroutine: %d\n", runtime.NumGoroutine())
fmt.Printf(" 處理請求數: %d\n", requests)
fmt.Printf(" 錯誤數: %d\n", errors)
if requests > 0 {
fmt.Printf(" 錯誤率: %.2f%%\n", float64(errors)/float64(requests)*100)
}
}
// 自適應工作者池
type AdaptiveWorkerPool struct {
minWorkers int
maxWorkers int
current int
jobs chan func()
monitor *PerformanceMonitor
mu sync.Mutex
}
func NewAdaptiveWorkerPool(min, max int) *AdaptiveWorkerPool {
pool := &AdaptiveWorkerPool{
minWorkers: min,
maxWorkers: max,
current: min,
jobs: make(chan func(), max*2),
monitor: &PerformanceMonitor{},
}
// 啟動最小工作者數量
for i := 0; i < min; i++ {
go pool.worker()
}
// 定期調整工作者數量
go pool.autoScale()
return pool
}
func (pool *AdaptiveWorkerPool) worker() {
pool.monitor.IncrementGoroutine()
defer pool.monitor.DecrementGoroutine()
for job := range pool.jobs {
job()
pool.monitor.IncrementRequest()
}
}
func (pool *AdaptiveWorkerPool) Submit(job func()) {
pool.jobs <- job
}
func (pool *AdaptiveWorkerPool) autoScale() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
pool.mu.Lock()
queueLen := len(pool.jobs)
// 如果隊列積壓太多,增加工作者
if queueLen > pool.current && pool.current < pool.maxWorkers {
pool.current++
go pool.worker()
fmt.Printf("擴展工作者池到 %d\n", pool.current)
}
// 如果隊列空閒,減少工作者(實際實現會更複雜)
if queueLen == 0 && pool.current > pool.minWorkers {
// 這裡簡化處理,實際需要優雅關閉工作者
pool.current--
fmt.Printf("縮減工作者池到 %d\n", pool.current)
}
pool.mu.Unlock()
pool.monitor.Report()
}
}
除錯與診斷技巧
1. Goroutine 洩漏檢測
package main
import (
"fmt"
"runtime"
"time"
)
func detectGoroutineLeak() {
initial := runtime.NumGoroutine()
fmt.Printf("初始 Goroutine 數量: %d\n", initial)
// 執行一些可能洩漏的操作
for i := 0; i < 10; i++ {
leakyFunction()
}
// 等待一段時間讓正常的 goroutine 結束
time.Sleep(2 * time.Second)
final := runtime.NumGoroutine()
fmt.Printf("最終 Goroutine 數量: %d\n", final)
if final > initial {
fmt.Printf("⚠️ 可能存在 Goroutine 洩漏: %d 個\n", final-initial)
// 打印 goroutine 堆疊
buf := make([]byte, 1<<16)
stackSize := runtime.Stack(buf, true)
fmt.Printf("Goroutine 堆疊:\n%s\n", buf[:stackSize])
}
}
func leakyFunction() {
ch := make(chan int)
// 這個 goroutine 會洩漏,因為沒有發送者
go func() {
<-ch // 永遠阻塞
}()
}
2. 死鎖檢測
package main
import (
"fmt"
"sync"
"time"
)
func deadlockExample() {
var mu1, mu2 sync.Mutex
// Goroutine 1: 先鎖 mu1,再鎖 mu2
go func() {
mu1.Lock()
defer mu1.Unlock()
time.Sleep(100 * time.Millisecond)
mu2.Lock()
defer mu2.Unlock()
fmt.Println("Goroutine 1 完成")
}()
// Goroutine 2: 先鎖 mu2,再鎖 mu1 (死鎖)
go func() {
mu2.Lock()
defer mu2.Unlock()
time.Sleep(100 * time.Millisecond)
mu1.Lock()
defer mu1.Unlock()
fmt.Println("Goroutine 2 完成")
}()
time.Sleep(1 * time.Second)
fmt.Println("可能發生死鎖")
}
// 死鎖預防:鎖排序
func preventDeadlock() {
var mu1, mu2 sync.Mutex
lockInOrder := func(first, second *sync.Mutex) {
first.Lock()
defer first.Unlock()
second.Lock()
defer second.Unlock()
}
// 總是按照相同順序獲取鎖
go func() {
lockInOrder(&mu1, &mu2)
fmt.Println("Goroutine 1 完成")
}()
go func() {
lockInOrder(&mu1, &mu2) // 相同順序
fmt.Println("Goroutine 2 完成")
}()
time.Sleep(1 * time.Second)
}
學習路徑與總結 🎓
學習路徑建議
🌱 初學者 (0-2個月):
├── 理解 Goroutine 基礎
├── 掌握 Channel 基本用法
├── 學習 WaitGroup 和基本同步
└── 實作簡單並行程式
🚀 中級者 (2-4個月):
├── 深入 Select 和複雜 Channel 模式
├── 掌握 Context 的使用
├── 學習 Mutex 和 Atomic 操作
└── 實作 Worker Pool 等模式
🎯 高級者 (4個月以上):
├── 掌握高級並行模式
├── 效能調優和監控
├── 自訂同步原語
└── 大規模並行系統設計
Go 並行編程的核心理念
💡 設計哲學:
"Don't communicate by sharing memory; share memory by communicating." 不要透過共享記憶體來通訊;要透過通訊來共享記憶體。
🎯 核心原則:
- Goroutine 優先 - 使用輕量級協程而非傳統執行緒
- Channel 為王 - 優先使用 Channel 進行通訊
- CSP 模型 - 基於通訊循序程序的並行模型
- 組合勝過繼承 - 透過介面和組合構建複雜系統
🛠️ 最佳實踐總結:
何時使用什麼:
| 場景 | 推薦方案 | 原因 |
|---|---|---|
| 🔄 執行緒間通訊 | Channel | Go 的核心設計 |
| 🏃 並行任務 | Goroutine + WaitGroup | 輕量且高效 |
| 🔒 共享狀態保護 | Mutex/RWMutex | 當 Channel 不適用時 |
| ⚛️ 簡單原子操作 | sync/atomic | 最高效能 |
| ⏰ 超時和取消 | Context | 標準做法 |
| 🎯 一次性初始化 | sync.Once | 執行緒安全的單例 |
| 📊 效能監控 | pprof + 自訂監控 | 可觀測性 |
常見陷阱與解決方案:
| 問題 | 症狀 | 解決方案 |
|---|---|---|
| Goroutine 洩漏 | 記憶體持續增長 | 使用 Context 控制生命週期 |
| Channel 死鎖 | 程式掛起 | 檢查 Channel 的發送/接收平衡 |
| 競爭條件 | 不一致的結果 | 使用適當的同步原語 |
| 過度同步 | 效能低下 | 重新設計,減少共享狀態 |
效能調優指南:
// 效能調優檢查清單
func performanceTuning() {
// 1. Goroutine 數量控制
// - 避免無限制建立 Goroutine
// - 使用 Worker Pool 模式
// 2. Channel 緩衝優化
// - 根據生產消費速度調整緩衝大小
// - 避免過大的緩衝區導致記憶體浪費
// 3. 鎖競爭最小化
// - 縮短臨界區
// - 使用 RWMutex 優化讀多寫少場景
// - 考慮無鎖資料結構
// 4. 記憶體分配優化
// - 重用物件,減少 GC 壓力
// - 使用 sync.Pool 池化物件
// 5. 監控和診斷
// - 使用 pprof 分析效能
// - 監控 Goroutine 數量
// - 檢測記憶體洩漏
}
進階學習資源
📚 必讀資料:
🔧 實用工具:
go tool pprof- 效能分析go test -race- 競爭條件檢測GODEBUG=schedtrace=1000- 排程器追蹤
🎯 實戰項目建議:
- 聊天伺服器 - 練習 Channel 和 Goroutine
- 爬蟲系統 - 練習 Worker Pool 和限流
- 快取服務 - 練習 RWMutex 和原子操作
- 微服務閘道器 - 練習 Context 和超時控制
總結
Go 的並行模型是其最大的特色之一,透過 Goroutine 和 Channel 提供了一種直觀且高效的並行程式設計方式。記住以下要點:
🎯 核心記憶點:
- Goroutine 輕量 - 可以輕鬆建立數百萬個
- Channel 安全 - 型別安全的通訊機制
- Context 控制 - 優雅的取消和超時處理
- 組合優於繼承 - 透過介面和嵌入構建複雜系統
🚀 進階發展: 隨著經驗累積,你會發現 Go 的並行模型不僅簡單易用,更能幫助你構建高效、可維護的分散式系統。從簡單的 Goroutine 開始,逐步掌握複雜的並行模式,最終能夠設計出優雅的高並行架構。
Go 的哲學是 "簡單而強大",其並行機制完美體現了這一點。透過本指南的學習,相信你已經掌握了 Go 並行程式設計的精髓,現在是時候在實際項目中應用這些知識了!🐹✨
完整指南到此結束。記住:在 Go 中,並行不僅是一種技術,更是一種思維方式。享受 Go 並行程式設計的樂趣吧!