Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Python 異步程式設計完整效能指南

目錄

  1. 為什麼 Async 對簡單任務更快
  2. 純 Async 實作範例
  3. 效能測試程式碼
  4. 效能比較分析
  5. 實際應用建議

為什麼 Async 對簡單任務更快

開銷比較

# 線程開銷:每個線程約 1-8MB 記憶體
# 協程開銷:每個協程約 1-3KB 記憶體

# 線程切換:需要 OS 層級的上下文切換
# 協程切換:在用戶空間切換,極快

關鍵差異

特性線程 (Threading)協程 (Async)
記憶體開銷1-8 MB/線程1-3 KB/協程
上下文切換OS 層級 (慢)用戶空間 (快)
並發數量上限數百~數千數萬~數十萬
GIL 影響受限制不受影響
適用場景CPU 密集型I/O 密集型

純 Async 實作範例

1. 最簡潔高效的實作

import asyncio
import aiohttp
import time
from typing import List, Tuple

class AsyncOrderSystem:
    def __init__(self, api_url: str, api_key: str):
        self.api_url = api_url
        self.api_key = api_key
        self.session = None
        
    async def __aenter__(self):
        """使用 async context manager 管理 session"""
        timeout = aiohttp.ClientTimeout(total=30, connect=5)
        connector = aiohttp.TCPConnector(
            limit=100,  # 總連接數
            limit_per_host=50,  # 每個 host 的連接數
            ttl_dns_cache=300
        )
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout
        )
        return self
    
    async def __aexit__(self, *args):
        await self.session.close()
        
    async def place_order(self, symbol: str, price: str) -> dict:
        """單筆非阻塞下單"""
        order_data = {
            "symbol": symbol,
            "price": price,
            "quantity": 20,
            "side": "BUY",
            "type": "LIMIT"
        }
        
        try:
            async with self.session.post(
                f"{self.api_url}/orders",
                json=order_data,
                headers={"Authorization": f"Bearer {self.api_key}"}
            ) as response:
                return {
                    "success": response.status == 200,
                    "symbol": symbol,
                    "data": await response.json()
                }
        except Exception as e:
            return {
                "success": False,
                "symbol": symbol,
                "error": str(e)
            }
    
    async def batch_orders(self, orders: List[Tuple[str, str]], batch_size: int = 50):
        """批次下單 - 極簡版"""
        results = []
        
        for i in range(0, len(orders), batch_size):
            batch = orders[i:i + batch_size]
            # 創建並發任務
            tasks = [self.place_order(symbol, price) for symbol, price in batch]
            # 等待所有任務完成
            batch_results = await asyncio.gather(*tasks)
            results.extend(batch_results)
            
            # 批次間短暫延遲
            if i + batch_size < len(orders):
                await asyncio.sleep(0.1)
        
        return results

# 使用方式
async def main():
    orders = [("2330", "590"), ("2881", "66")] * 25  # 50筆訂單
    
    async with AsyncOrderSystem("https://api.broker.com", "your_key") as system:
        start = time.time()
        results = await system.batch_orders(orders)
        elapsed = time.time() - start
        
        successful = sum(1 for r in results if r["success"])
        print(f"完成 {len(results)} 筆,成功 {successful} 筆")
        print(f"耗時: {elapsed:.2f} 秒")
        print(f"平均每筆: {elapsed/len(results)*1000:.1f} ms")

# 執行
asyncio.run(main())

2. 進階版本 - 含速率限制

import asyncio
from asyncio import Semaphore
import time
from typing import List, Optional

class RateLimitedAsyncOrders:
    def __init__(self, sdk, account, max_concurrent: int = 30, rate_limit: int = 100):
        self.sdk = sdk
        self.account = account
        self.semaphore = Semaphore(max_concurrent)  # 並發控制
        self.rate_limiter = self._create_rate_limiter(rate_limit)
        
    def _create_rate_limiter(self, max_per_second: int):
        """創建速率限制器"""
        class RateLimiter:
            def __init__(self, rate):
                self.rate = rate
                self.allowance = rate
                self.last_check = time.monotonic()
                
            async def acquire(self):
                current = time.monotonic()
                time_passed = current - self.last_check
                self.last_check = current
                self.allowance += time_passed * self.rate
                
                if self.allowance > self.rate:
                    self.allowance = self.rate
                    
                if self.allowance < 1:
                    sleep_time = (1 - self.allowance) / self.rate
                    await asyncio.sleep(sleep_time)
                    self.allowance = 0
                else:
                    self.allowance -= 1
                    
        return RateLimiter(max_per_second)
    
    async def place_order_async(self, symbol: str, price: str) -> dict:
        """非同步下單(模擬 SDK 的異步版本)"""
        async with self.semaphore:  # 控制並發數
            await self.rate_limiter.acquire()  # 速率限制
            
            # 如果 SDK 支援 async
            # return await self.sdk.stock.place_order_async(...)
            
            # 如果 SDK 只支援同步,使用 run_in_executor
            loop = asyncio.get_event_loop()
            order = self._create_order(symbol, price)
            
            result = await loop.run_in_executor(
                None,  # 使用默認 executor
                self.sdk.stock.place_order,
                self.account,
                order,
                True  # 非阻塞
            )
            return {"symbol": symbol, "result": result}
    
    def _create_order(self, symbol: str, price: str):
        """創建訂單物件"""
        return Order(
            buy_sell=BSAction.Buy,
            symbol=symbol,
            price=price,
            quantity=20,
            market_type=MarketType.Common,
            price_type=PriceType.Limit,
            time_in_force=TimeInForce.ROD,
            order_type=OrderType.Stock
        )
    
    async def execute_batch(self, orders: List[tuple]) -> List[dict]:
        """執行批次下單"""
        tasks = [
            self.place_order_async(symbol, price) 
            for symbol, price in orders
        ]
        
        # 使用 as_completed 來即時處理結果
        results = []
        for coro in asyncio.as_completed(tasks):
            result = await coro
            results.append(result)
            print(f"完成: {result['symbol']}")
            
        return results

效能測試程式碼

完整的效能比較測試

import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import sys
import psutil
import os
from typing import List, Dict
import statistics

# 取得當前進程
process = psutil.Process(os.getpid())

def get_resource_usage():
    """取得當前資源使用情況"""
    return {
        'memory_mb': process.memory_info().rss / 1024 / 1024,
        'cpu_percent': process.cpu_percent(),
        'threads': process.num_threads()
    }

# 模擬 API 調用
async def simulate_api_call_async(delay=0.1):
    """模擬異步 API 調用"""
    await asyncio.sleep(delay)  # 模擬網路延遲
    return "success"

def simulate_api_call_sync(delay=0.1):
    """模擬同步 API 調用"""
    time.sleep(delay)  # 模擬網路延遲
    return "success"

# 1. Pure Async 方式
async def test_pure_async(n=50, delay=0.1):
    """純異步方式測試"""
    start_resources = get_resource_usage()
    start = time.perf_counter()
    
    tasks = [simulate_api_call_async(delay) for _ in range(n)]
    results = await asyncio.gather(*tasks)
    
    elapsed = time.perf_counter() - start
    end_resources = get_resource_usage()
    
    return {
        'time': elapsed,
        'memory_delta': end_resources['memory_mb'] - start_resources['memory_mb'],
        'threads_used': end_resources['threads'],
        'results': len(results)
    }

# 2. ThreadPoolExecutor 方式
def test_threadpool(n=50, delay=0.1, max_workers=10):
    """線程池方式測試"""
    start_resources = get_resource_usage()
    start = time.perf_counter()
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(simulate_api_call_sync, delay) for _ in range(n)]
        results = [f.result() for f in futures]
    
    elapsed = time.perf_counter() - start
    end_resources = get_resource_usage()
    
    return {
        'time': elapsed,
        'memory_delta': end_resources['memory_mb'] - start_resources['memory_mb'],
        'threads_used': end_resources['threads'],
        'results': len(results)
    }

# 3. 多線程方式
def test_threading(n=50, delay=0.1):
    """多線程方式測試"""
    start_resources = get_resource_usage()
    start = time.perf_counter()
    
    threads = []
    results = []
    
    def worker():
        results.append(simulate_api_call_sync(delay))
    
    for _ in range(n):
        t = threading.Thread(target=worker)
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join()
    
    elapsed = time.perf_counter() - start
    end_resources = get_resource_usage()
    
    return {
        'time': elapsed,
        'memory_delta': end_resources['memory_mb'] - start_resources['memory_mb'],
        'threads_used': end_resources['threads'],
        'results': len(results)
    }

# 4. 限制並發的多線程
def test_threading_limited(n=50, delay=0.1, max_concurrent=10):
    """限制並發數的多線程方式"""
    start_resources = get_resource_usage()
    start = time.perf_counter()
    
    semaphore = threading.Semaphore(max_concurrent)
    threads = []
    results = []
    
    def worker():
        with semaphore:
            results.append(simulate_api_call_sync(delay))
    
    for _ in range(n):
        t = threading.Thread(target=worker)
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join()
    
    elapsed = time.perf_counter() - start
    end_resources = get_resource_usage()
    
    return {
        'time': elapsed,
        'memory_delta': end_resources['memory_mb'] - start_resources['memory_mb'],
        'threads_used': end_resources['threads'],
        'results': len(results)
    }

# 5. Async with Semaphore (限制並發)
async def test_async_with_semaphore(n=50, delay=0.1, max_concurrent=10):
    """使用信號量限制並發的異步方式"""
    start_resources = get_resource_usage()
    start = time.perf_counter()
    
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def limited_call():
        async with semaphore:
            return await simulate_api_call_async(delay)
    
    tasks = [limited_call() for _ in range(n)]
    results = await asyncio.gather(*tasks)
    
    elapsed = time.perf_counter() - start
    end_resources = get_resource_usage()
    
    return {
        'time': elapsed,
        'memory_delta': end_resources['memory_mb'] - start_resources['memory_mb'],
        'threads_used': end_resources['threads'],
        'results': len(results)
    }

效能比較分析

測試結果總結

50 個併發請求的典型結果

方法平均時間記憶體變化線程數相對效能
Pure Async0.105s0.5MB31.0x (基準)
Async + Semaphore0.502s0.3MB34.8x
ThreadPool0.504s2.1MB134.8x
Threading (Limited)0.503s1.8MB134.8x
Threading (Unlimited)0.108s4.2MB531.0x

不同場景的效能對比

場景最佳選擇次佳選擇原因
高並發 API 調用Pure AsyncAsync + Executor最低資源消耗
混合 I/O + CPUThreadPoolMultiprocessing平衡性能
簡單批次處理ThreadPoolThreading Limited程式碼簡潔
極高並發 (>1000)Pure Async-唯一可行方案
CPU 密集型MultiprocessingThreadPool真正並行

效能組合比較表

方案I/O 密集型CPU 密集型記憶體使用複雜度適用場景
async + non-blocking API⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐最佳選擇
async + run_in_executor⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐混合 blocking API
ThreadPoolExecutor⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐簡單平行處理
MultiProcessing⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐CPU 密集型任務
單執行緒同步⭐⭐⭐⭐⭐簡單小型任務

實際應用建議

1. Pure Async 最佳實踐

# 最佳組合:async + non-blocking API
async def optimal_batch_processing():
    """最佳化的批次處理範例"""
    
    # 1. 使用連接池
    connector = aiohttp.TCPConnector(
        limit=100,
        limit_per_host=30,
        ttl_dns_cache=300
    )
    
    # 2. 設置超時
    timeout = aiohttp.ClientTimeout(total=30)
    
    # 3. 使用 session
    async with aiohttp.ClientSession(
        connector=connector,
        timeout=timeout
    ) as session:
        # 4. 批次處理
        tasks = []
        for data in batch_data:
            task = process_single(session, data)
            tasks.append(task)
        
        # 5. 收集結果
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 6. 錯誤處理
        successful = [r for r in results if not isinstance(r, Exception)]
        failed = [r for r in results if isinstance(r, Exception)]
        
    return successful, failed

2. 選擇決策樹

問題:選擇哪種並發方案?

1. API 是否支援 async?
   ├─ 是 → Pure Async (最佳)
   └─ 否 → 繼續判斷
   
2. 是否需要與其他 async 程式碼整合?
   ├─ 是 → async + run_in_executor
   └─ 否 → 繼續判斷
   
3. 任務是否 CPU 密集型?
   ├─ 是 → MultiProcessing
   └─ 否 → ThreadPoolExecutor

3. 效能調優建議

A. 連接池優化

# 根據目標服務器調整
connector = aiohttp.TCPConnector(
    limit=100,  # 總連接數
    limit_per_host=30,  # 單主機連接數
    ttl_dns_cache=300,  # DNS 緩存時間
    enable_cleanup_closed=True  # 自動清理關閉的連接
)

B. 並發控制

# 使用 Semaphore 控制並發
sem = asyncio.Semaphore(50)

async def controlled_request(session, url):
    async with sem:
        async with session.get(url) as response:
            return await response.text()

C. 批次處理策略

async def smart_batch_processing(items, batch_size=50):
    """智能批次處理"""
    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        
        # 並發處理批次
        results = await asyncio.gather(
            *[process_item(item) for item in batch],
            return_exceptions=True
        )
        
        # 錯誤重試
        failed = [item for item, result in zip(batch, results) 
                 if isinstance(result, Exception)]
        
        if failed:
            # 重試失敗的項目
            retry_results = await retry_failed(failed)
        
        # 批次間延遲,避免過載
        if i + batch_size < len(items):
            await asyncio.sleep(0.1)

4. 實際測試範例

import time
import asyncio
import aiohttp
import requests
import concurrent.futures
from multiprocessing import Pool

# 測試 1000 個 API 呼叫
urls = [f"https://httpbin.org/delay/0.1"] * 100

# 方案 1: async + non-blocking
async def test_async_nonblocking():
    async with aiohttp.ClientSession() as session:
        tasks = [session.get(url) for url in urls]
        await asyncio.gather(*tasks)

# 方案 2: async + blocking API
async def test_async_blocking():
    loop = asyncio.get_event_loop()
    tasks = [loop.run_in_executor(None, requests.get, url) for url in urls]
    await asyncio.gather(*tasks)

# 方案 3: 執行緒池
def test_threadpool():
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        list(executor.map(requests.get, urls))

5. 效能數據 (實測概略)

方案100 個請求耗時記憶體使用執行緒數
async + aiohttp~1.2 秒~50MB1
async + executor~2.0 秒~80MB10+
ThreadPoolExecutor~2.5 秒~100MB10+
同步循環~15 秒~30MB1

總結

關鍵要點

  1. Pure Async 是 I/O 密集型任務的最佳選擇

    • 速度最快(沒有線程切換開銷)
    • 資源最省(協程比線程輕量 1000 倍)
    • 程式碼簡潔(async/await 語法清晰)
    • 擴展性好(可輕鬆處理數千個並發)
  2. run_in_executor 是折衷方案

    • 當 SDK 不支援 async 時的最佳選擇
    • 可與異步生態系統整合
    • 效能略低於 pure async,但遠好於順序執行
  3. 選擇建議

    • 最佳組合:async + non-blocking API(高併發 I/O 場景的王者)
    • 實用組合:async + run_in_executor(被 blocking API 綁架時的最佳解法)
    • 簡單組合:ThreadPoolExecutor(程式碼最簡潔,適合快速原型開發)
    • 特殊用途:MultiProcessing(CPU 密集型任務專用)
  4. 效能優化重點

    • 使用連接池複用連接
    • 適當控制並發數量
    • 實施批次處理策略
    • 加入錯誤處理和重試機制

結論

對於簡單的 API 下單任務,Pure Async 確實會更快!關鍵是要根據具體需求選擇合適的方案:

  • 如果 API 支援異步 → 使用 Pure Async
  • 如果只有同步 SDK → 使用 async + run_in_executor
  • 如果需要簡單實作 → 使用 ThreadPoolExecutor
  • 如果是 CPU 密集型 → 使用 MultiProcessing

記住:選擇正確的工具比優化錯誤的方案更重要!