您的系统将会崩溃。问题在于崩溃的方式有多优雅。
当我们的支付服务因第三方欺诈检测 API 开始超时而宕机时,我正在值班。超时时间是 30 秒。我们有 200 个并发请求,每个请求都在等待一个永远不会到来的响应,同时占用着一个线程。在两分钟内,线程池被耗尽,我们的整个支付服务——不仅仅是欺诈检查——变得无响应。订单、退款、余额查询——一切都瘫痪了,仅仅因为一个下游依赖变慢了。
这是弹性模式旨在防止的典型故障模式。不是戏剧性的服务器着火场景,而是那种安静的、级联式的故障,其中一个慢速依赖拖垮了所有东西。熔断器、舱壁和重试模式是工程工具,它们可以在故障演变为完全中断之前将其控制住。
熔断器:停止调用已失效的服务
熔断器监控对下游服务的调用,当失败超过阈值时,它会断开。一旦断开,它会立即拒绝所有请求而不实际调用下游服务,给它时间恢复。
熔断器有三种状态:
- 关闭: 正常操作。请求正常通过。失败次数被计数。
- 开启: 失败超过阈值。所有请求立即被拒绝,不调用下游服务。
- 半开: 冷却期过后,允许有限数量的测试请求通过。如果它们成功,熔断器关闭。如果它们失败,熔断器再次开启。
Python 实现
import time
import httpx
from enum import Enum
from dataclasses import dataclass, field
from threading import Lock
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitBreaker:
failure_threshold: int = 5
recovery_timeout: float = 30.0
half_open_max_calls: int = 3
_state: CircuitState = field(default=CircuitState.CLOSED, init=False)
_failure_count: int = field(default=0, init=False)
_last_failure_time: float = field(default=0.0, init=False)
_half_open_calls: int = field(default=0, init=False)
_lock: Lock = field(default_factory=Lock, init=False)
@property
def state(self) -> CircuitState:
with self._lock:
if self._state == CircuitState.OPEN:
if time.time() - self._last_failure_time > self.recovery_timeout:
self._state = CircuitState.HALF_OPEN
self._half_open_calls = 0
return self._state
def record_success(self):
with self._lock:
if self._state == CircuitState.HALF_OPEN:
self._half_open_calls += 1
if self._half_open_calls >= self.half_open_max_calls:
self._state = CircuitState.CLOSED
self._failure_count = 0
else:
self._failure_count = 0
def record_failure(self):
with self._lock:
self._failure_count += 1
self._last_failure_time = time.time()
if self._failure_count >= self.failure_threshold:
self._state = CircuitState.OPEN
if self._state == CircuitState.HALF_OPEN:
self._state = CircuitState.OPEN
def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
raise CircuitOpenError(
f"Circuit is open. Will retry after {self.recovery_timeout}s. "
f"Last failure: {time.time() - self._last_failure_time:.1f}s ago."
)
try:
result = func(*args, **kwargs)
self.record_success()
return result
except Exception as e:
self.record_failure()
raise
class CircuitOpenError(Exception):
pass
# 使用示例
fraud_check_breaker = CircuitBreaker(
failure_threshold=5,
recovery_timeout=30.0,
)
async def check_fraud(transaction):
try:
return fraud_check_breaker.call(
httpx.post,
"https://fraud-api.example.com/check",
json=transaction.dict(),
timeout=5.0,
)
except CircuitOpenError:
# 降级策略:允许交易但标记为需要人工审核
return FraudResult(approved=True, requires_review=True)
实践中的断路器
当电路断开时的回退行为才是真正的工程判断所在。选项包括:
| 策略 | 使用时机 | 示例 |
|---|---|---|
| 返回缓存数据 | 数据陈旧是可以接受的 | 产品目录,用户偏好 |
| 返回默认值 | 存在安全的默认值 | 默认运费估算,功能标志关闭 |
| 优雅降级 | 功能是可选的 | 跳过推荐,跳过分析 |
| 快速失败并返回清晰错误 | 没有安全的回退方案 | 支付处理,身份验证检查 |
| 排队稍后处理 | 操作可以是异步的 | 电子邮件通知,webhook 交付 |
舱壁:隔离爆炸范围
舱壁模式借鉴了造船业:船只都有防水隔舱,这样当一个隔舱船体受损时不会导致整艘船只沉没。在软件中,舱壁用于隔离资源,这样某个组件的故障不会耗尽其他组件所需的资源。
线程池舱壁
最常见的舱壁实现是为不同的下游依赖项使用独立的线程池(或连接池、或信号量):
import asyncio
from dataclasses import dataclass
@dataclass
class Bulkhead:
name: str
max_concurrent: int
max_wait: float = 5.0
def __post_init__(self):
self._semaphore = asyncio.Semaphore(self.max_concurrent)
self._waiting = 0
async def execute(self, coro):
self._waiting += 1
try:
acquired = await asyncio.wait_for(
self._semaphore.acquire(),
timeout=self.max_wait,
)
except asyncio.TimeoutError:
self._waiting -= 1
raise BulkheadFullError(
f"Bulkhead '{self.name}' is full. "
f"{self.max_concurrent} calls in progress, "
f"{self._waiting} waiting."
)
self._waiting -= 1
try:
return await coro
finally:
self._semaphore.release()
class BulkheadFullError(Exception):
pass
# 为每个下游服务使用独立的舱壁
payment_bulkhead = Bulkhead("payment-api", max_concurrent=20, max_wait=5.0)
fraud_bulkhead = Bulkhead("fraud-api", max_concurrent=10, max_wait=3.0)
inventory_bulkhead = Bulkhead("inventory-api", max_concurrent=30, max_wait=5.0)
async def process_order(order):
# 每个调用都是隔离的。如果欺诈API变慢并且所有10个插槽
# 都被占用,它不能从支付或库存中窃取容量。
payment = await payment_bulkhead.execute(
check_payment(order.payment_method)
)
fraud = await fraud_bulkhead.execute(
check_fraud(order)
)
inventory = await inventory_bulkhead.execute(
reserve_inventory(order.items)
)
没有舱壁时,所有下游调用共享一个资源池。当一个服务变慢时,它会独占共享资源,导致所有其他服务调用受到影响。使用舱壁后,一个缓慢的欺诈API只能消耗其分配的10个并发插槽。支付和库存的剩余容量保持不受影响。
重试模式:工具箱中最危险的工具
重试是最常实现也最常被错误实现的弹性模式。一个简单的重试循环可能会将小故障转变为灾难性的重试风暴,使你试图访问的服务不堪重负。
错误的做法
# 绝不要这样做
async def call_service(url, payload):
for attempt in range(5):
try:
return await httpx.post(url, json=payload, timeout=10)
except Exception:
pass # 立即重试
raise Exception("服务不可用")
# 为什么这是危险的:
# - 没有退避策略:尽可能快地重试,给失败的服务造成巨大压力
# - 没有抖动:所有客户端在同一时间重试
# - 对所有异常都重试,包括 400 Bad Request
# - 5次重试 * N个客户端 = 5N个请求发送给已经挣扎的服务
正确的做法:带抖动的指数退避
import random
import asyncio
import httpx
async def call_with_retry(
url: str,
payload: dict,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
retryable_status_codes: set = {429, 502, 503, 504},
):
last_exception = None
for attempt in range(max_retries + 1):
try:
response = await httpx.AsyncClient().post(
url, json=payload, timeout=5.0
)
if response.status_code < 400:
return response
if response.status_code not in retryable_status_codes:
# 客户端错误(4xx) - 不要重试
raise NonRetryableError(
f"请求失败,状态码 {response.status_code}: {response.text}"
)
last_exception = HttpError(response.status_code, response.text)
except (httpx.ConnectTimeout, httpx.ReadTimeout) as e:
last_exception = e
except httpx.ConnectError as e:
last_exception = e
if attempt < max_retries:
# 完全抖动的指数退避
delay = min(base_delay * (2 ** attempt), max_delay)
jittered_delay = random.uniform(0, delay)
await asyncio.sleep(jittered_delay)
raise RetriesExhaustedError(
f"在{max_retries + 1}次尝试后失败。最后一个错误: {last_exception}"
)
重试预算模式
更好的方法是使用重试预算,将重试限制为总流量的一定百分比:
from collections import deque
from time import time
class RetryBudget:
"""限制在时间窗口内重试次数为总请求的百分比。"""
def __init__(self, max_retry_ratio=0.1, window_seconds=60, min_retries_per_second=10):
self.max_retry_ratio = max_retry_ratio
self.window_seconds = window_seconds
self.min_retries_per_second = min_retries_per_second
self._requests = deque()
self._retries = deque()
def _cleanup(self):
cutoff = time() - self.window_seconds
while self._requests and self._requests[0] < cutoff:
self._requests.popleft()
while self._retries and self._retries[0] < cutoff:
self._retries.popleft()
def record_request(self):
self._requests.append(time())
def can_retry(self) -> bool:
self._cleanup()
total_requests = len(self._requests)
total_retries = len(self._retries)
# 始终允许最低重试率
if total_retries < self.min_retries_per_second * self.window_seconds:
return True
# 检查重试是否超出预算
if total_requests == 0:
return True
return (total_retries / total_requests) < self.max_retry_ratio
def record_retry(self):
self._retries.append(time())
# 使用方式:重试预算在服务的所有调用者之间共享
payment_retry_budget = RetryBudget(max_retry_ratio=0.1) # 最多10%的重试
组合模式
这些模式一起使用效果最佳。以下是它们的组合方式:
async def resilient_call(service_name, url, payload):
"""
调用流程:
1. 检查断路器(如果打开则快速失败)
2. 获取舱壁槽位(如果容量耗尽则失败)
3. 使用重试逻辑进行调用
4. 在断路器中记录结果
"""
breaker = circuit_breakers[service_name]
bulkhead = bulkheads[service_name]
budget = retry_budgets[service_name]
# 步骤1:断路器检查
if breaker.state == CircuitState.OPEN:
return get_fallback(service_name, payload)
# 步骤2:舱壁
async with bulkhead:
# 步骤3:带重试的调用
try:
result = await call_with_retry(
url, payload,
max_retries=2 if budget.can_retry() else 0,
)
breaker.record_success()
return result
except Exception as e:
breaker.record_failure()
raise
库推荐
您不必从头实现这些模式。大多数语言都有生产级别的库:
| 语言 | 库 | 支持的模式 |
|---|---|---|
| Java/Kotlin | Resilience4j | 断路器、舱壁、重试、速率限制器、时间限制器 |
| Go | sony/gobreaker | 断路器 |
| Go | avast/retry-go | 带退避的重试 |
| Python | tenacity | 带退避和抖动的重试 |
| Python | pybreaker | 断路器 |
| Node.js | cockatiel | 断路器、舱壁、重试、超时 |
| .NET | Polly | 断路器、舱壁、重试、超时、回退 |
弹性模式不是在事情变得严重时才添加的可选复杂性。它们是小型依赖故障和导致整个值班轮换人员被惊醒的两小时停机之间的区别。从你最慢的下游依赖开始使用断路器。当你有三个以上的下游服务时添加舱壁。在重试循环放大下一次事件之前实施重试预算。凌晨3点的你会感激不尽的。
