生产环境中良好的日志记录实际应该是什么样子
大多数应用程序都会记录日志。但很少有应用程序能做好日志记录。帮助你在10分钟内调试生产环境故障的日志与让你猜测数小时的日志之间的区别,取决于三个特性:结构、上下文和一致性。本指南将带你从基本的打印语句发展到使用OpenTelemetry关联的生产级结构化日志记录——并提供你可以立即应用到应用程序中的实际代码。
为什么非结构化日志在大规模场景下会失败
考虑这条日志行:
2026-03-15 14:23:01 ERROR 用户4821在3次重试后支付失败
从这条日志中你无法回答的问题:哪个服务发出的这条日志?跟踪ID是什么?支付网关返回了什么错误?触发了此请求的是什么?花费了多长时间?
现在考虑结构化的等效日志:
{
"timestamp": "2026-03-15T14:23:01.432Z",
"level": "error",
"service": "order-service",
"version": "1.4.2",
"environment": "production",
"trace_id": "4bf92f3577b34da6a3ce929d0e0e4736",
"span_id": "00f067aa0ba902b7",
"event": "payment_failed",
"user_id": 4821,
"order_id": "ord_9k2hf7",
"amount_cents": 4999,
"currency": "USD",
"gateway": "stripe",
"retry_count": 3,
"gateway_error_code": "card_declined",
"gateway_error_message": "您的卡片余额不足。",
"duration_ms": 2847,
"message": "3次重试后支付失败"
}
这条日志是可查询的,可以与跟踪关联,并且无需额外上下文就能提供完整的事件全貌。
在Python中设置结构化日志记录
标准库的 logging 模块可以通过正确的配置输出 JSON:
import logging
import json
import time
import traceback
from datetime import datetime, timezone
from typing import Any
class StructuredFormatter(logging.Formatter):
def __init__(self, service_name: str, version: str, environment: str):
super().__init__()
self.service_name = service_name
self.version = version
self.environment = environment
def format(self, record: logging.LogRecord) -> str:
log_entry = {
"timestamp": datetime.fromtimestamp(
record.created, tz=timezone.utc
).isoformat(),
"level": record.levelname.lower(),
"service": self.service_name,
"version": self.version,
"environment": self.environment,
"logger": record.name,
"message": record.getMessage(),
}
# 如果有跟踪上下文则添加(由 OTel 仪器填充)
if hasattr(record, 'trace_id'):
log_entry["trace_id"] = record.trace_id
if hasattr(record, 'span_id'):
log_entry["span_id"] = record.span_id
# 添加传递给记录器的任何额外字段
if hasattr(record, 'extra_fields'):
log_entry.update(record.extra_fields)
# 异常信息
if record.exc_info:
log_entry["exception"] = {
"type": record.exc_info[0].__name__,
"message": str(record.exc_info[1]),
"stacktrace": self.formatException(record.exc_info)
}
return json.dumps(log_entry, default=str)
def setup_logging(service_name: str, version: str, environment: str,
level: str = "INFO"):
handler = logging.StreamHandler()
handler.setFormatter(StructuredFormatter(service_name, version, environment))
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, level.upper()))
root_logger.handlers = [handler]
# 在启动时初始化一次
setup_logging(
service_name="order-service",
version="1.4.2",
environment=os.getenv("APP_ENV", "development")
)
使用绑定记录器的上下文感知日志记录
最常见的错误:将上下文(user_id、request_id 等)作为单独的关键字参数传递给每个日志调用。这既繁琐又不一致。使用自动携带上下文的绑定记录器:
import structlog
# structlog 是 Python 结构化日志记录的黄金标准
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.ExceptionRenderer(),
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.BoundLogger,
context_class=dict,
logger_factory=structlog.PrintLoggerFactory()
)
logger = structlog.get_logger()
# 在你的请求处理器中(FastAPI/Flask 中间件)
async def request_middleware(request, call_next):
request_id = request.headers.get('X-Request-ID', str(uuid.uuid4()))
user_id = get_user_from_token(request)
# 为此请求中的所有日志调用绑定上下文
structlog.contextvars.bind_contextvars(
request_id=request_id,
user_id=user_id,
path=str(request.url.path),
method=request.method
)
start = time.monotonic()
try:
response = await call_next(request)
logger.info("request_completed",
status_code=response.status_code,
duration_ms=round((time.monotonic() - start) * 1000, 2))
return response
except Exception as e:
logger.error("request_failed",
error=str(e),
duration_ms=round((time.monotonic() - start) * 1000, 2))
raise
finally:
structlog.contextvars.clear_contextvars()
# 在你的调用栈中的任何地方,user_id 和 request_id 都是自动的
def process_payment(order_id: str, amount: int):
log = logger.bind(order_id=order_id, amount_cents=amount)
log.info("payment_started")
try:
result = payment_gateway.charge(amount)
log.info("payment_succeeded", transaction_id=result.id)
return result
except PaymentError as e:
log.error("payment_failed",
gateway_error=e.code,
gateway_message=str(e))
raise
Node.js: 使用 Pino 实现高性能结构化日志
import pino from 'pino';
const logger = pino({
level: process.env.LOG_LEVEL || 'info',
base: {
service: 'api-gateway',
version: process.env.APP_VERSION,
environment: process.env.NODE_ENV,
},
timestamp: pino.stdTimeFunctions.isoTime,
formatters: {
level: (label) => ({ level: label }),
},
// 在开发环境中,使用 pino-pretty 获得人类可读的输出
transport: process.env.NODE_ENV === 'development'
? { target: 'pino-pretty', options: { colorize: true } }
: undefined,
});
// 子日志记录器会继承父上下文
const requestLogger = logger.child({
request_id: req.id,
user_id: req.user?.id,
ip: req.ip,
});
requestLogger.info({ path: req.path, method: req.method }, 'request_received');
将日志与 OpenTelemetry 链路关联
当日志与分布式链路关联时,结构化日志的真正威力才会显现。OpenTelemetry (OTel) 为此提供了标准。当你使用 OTel 为应用程序添加检测时,你会在上下文中获得一个 trace_id 和 span_id,可以将它们注入到每条日志行中。
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
import structlog
# 初始化 OTel
provider = TracerProvider()
provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint="http://otel-collector:4317"))
)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("order-service")
# 自定义 structlog 处理器以注入 OTel 链路上下文
def add_otel_context(logger, method, event_dict):
span = trace.get_current_span()
if span.is_recording():
ctx = span.get_span_context()
event_dict["trace_id"] = format(ctx.trace_id, '032x')
event_dict["span_id"] = format(ctx.span_id, '016x')
event_dict["trace_flags"] = format(ctx.trace_flags, '02x')
return event_dict
# 添加到 structlog 处理器
structlog.configure(
processors=[
add_otel_context, # 将链路上下文注入每条日志
structlog.contextvars.merge_contextvars,
# ... 其他处理器
]
)
# 现在每条日志行都包含 trace_id 和 span_id
# 你可以从 Grafana Loki 中的日志行直接点击到 Tempo 中的链路
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
logger.info("processing_order", order_id=order_id)
# 这条日志行有 trace_id,可以链接到 Jaeger/Tempo 中的 span
记录什么:重要的事件
记录过多日志几乎和记录过少一样糟糕。始终应记录的高价值事件:
- 服务生命周期:启动、关闭、配置加载、迁移运行
- 认证事件:登录成功/失败、令牌签发/撤销、权限拒绝
- 状态变更:订单创建/取消、支付处理完成、用户角色变更
- 外部服务调用:请求发送、响应接收(状态+持续时间)、错误
- 后台作业执行:作业开始、完成、失败(含持续时间)
- 错误和异常:始终包含堆栈跟踪,始终包含上下文
通常不值得记录的内容:
- 每次缓存命中(记录缓存未命中和错误即可)
- 常规健康检查请求
- 生产环境中的调试级别诊断(使用采样)
高流量服务的日志采样
import random
class SamplingLogger:
def __init__(self, logger, sample_rate: float = 0.01):
self._logger = logger
self.sample_rate = sample_rate
def debug(self, event: str, **kwargs):
"""以 sample_rate 采样调试日志,始终记录错误"""
if random.random() < self.sample_rate:
self._logger.debug(event, **kwargs, _sampled=True,
_sample_rate=self.sample_rate)
def error(self, event: str, **kwargs):
"""始终记录错误"""
self._logger.error(event, **kwargs)
# 对详细的缓存操作进行 1% 采样
cache_logger = SamplingLogger(logger, sample_rate=0.01)
cache_logger.debug("cache_hit", key=cache_key) # 只记录 1% 的情况
测试您的日志输出
日志输出应像其他接口一样进行测试。使用快照测试来捕获意外变更:
import json
import structlog.testing
def test_payment_failure_log():
with structlog.testing.capture_logs() as captured:
try:
process_payment("ord_123", 4999)
except PaymentError:
pass
assert len(captured) == 2
assert captured[0]["event"] == "payment_started"
assert captured[1]["event"] == "payment_failed"
assert captured[1]["order_id"] == "ord_123"
assert captured[1]["amount_cents"] == 4999
assert "gateway_error" in captured[1]
# 验证没有个人信息泄露到日志中
assert "card_number" not in json.dumps(captured[1])
迁移路径
如果您有一个使用非结构化日志的现有应用程序,请增量迁移:
- 立即为所有新代码添加结构化日志
- 使用结构化适配器包装现有日志记录器
- 首先通过中间件添加请求范围上下文(request_id, user_id)—价值最高
- 接下来为外部服务调用添加监控
- 最后添加 OTel 追踪关联
回报是可衡量的:拥有良好结构化日志的团队将解决问题的平均时间缩短了40-60%。当客户报告问题时,您可以在几秒钟内找到确切的日志行,而不是花费30分钟在非结构化文本中进行grep搜索。投资您的日志——它们是生产环境中发生一切情况的主要诊断工具。
