Language:Chinese VersionEnglish Version

周日凌晨3点,数据管道决定给你上一课

我被叫醒处理的周日次数多到数不清,而根本原因几乎从来不是硬件故障或代码错误。问题出在数据管道。具体来说,是一个ETL作业,它在周一到周五运行完美,然后在周日悄无声息地崩溃了,因为它在周日预期的数据与一周中其他时间获取的数据完全不同。

生产环境的ETL管道在周日(以及节假日、月末边界、夏令时转换时)会失败,因为它们建立在关于数据的隐式假设上,而这些假设只在大多数情况下成立。最危险的假设是未来会与近期相似。数据管道正是这个假设死亡的地方。

管道在周末失效的七种方式

1. 数据量下降触发误报(或掩盖真实问题)

大多数数据管道监控都包含行数检查:如果今天的批次比昨天的行数显著减少,就触发警报。在周日,对于许多B2B应用来说,交易量确实会合法地下降40-70%。警报每周日都会触发。几周后,值班工程师会将其静音。然后某个周日,管道真的出问题了,但没有人注意到,因为警报已经被静音了。

# 天真的方法:静态阈值
def check_row_count(table, expected_min=10000):
    count = db.query(f"SELECT COUNT(*) FROM {table} WHERE date = CURRENT_DATE")
    if count < expected_min:
        alert(f"Low row count in {table}: {count} < {expected_min}")

# 更好的方法:考虑星期几的阈值
def check_row_count_smart(table):
    current_day = datetime.now().strftime("%A")
    
    # 计算过去4个同一天的预期范围
    stats = db.query("""
        SELECT 
            AVG(row_count) as avg_count,
            STDDEV(row_count) as std_count
        FROM pipeline_metrics 
        WHERE table_name = %s 
          AND day_of_week = %s 
          AND run_date > CURRENT_DATE - INTERVAL '28 days'
    """, [table, current_day])
    
    lower_bound = max(0, stats.avg_count - 2 * stats.std_count)
    upper_bound = stats.avg_count + 2 * stats.std_count
    
    count = get_today_count(table)
    if count < lower_bound or count > upper_bound:
        alert(
            f"Anomalous row count in {table}: {count}. "
            f"Expected {lower_bound:.0f}-{upper_bound:.0f} for {current_day}."
        )

2. 源系统维护窗口

许多上游数据源会在周末安排维护。您的管道会在常规时间尝试连接,但会收到连接被拒绝或超时错误,然后要么大声失败,要么更糟糕的是,成功获取空数据集或部分数据集。

# 一个健壮的提取步骤应该处理维护窗口
import tenacity
import httpx

@tenacity.retry(
    stop=tenacity.stop_after_attempt(5),
    wait=tenacity.wait_exponential(multiplier=1, min=60, max=3600),
    retry=tenacity.retry_if_exception_type((httpx.ConnectError, httpx.ReadTimeout)),
    before_sleep=lambda retry_state: logger.warning(
        "source_unavailable",
        attempt=retry_state.attempt_number,
        wait=retry_state.next_action.sleep,
        source="billing-api",
    ),
)
async def extract_billing_data(date: str):
    response = await httpx.AsyncClient().get(
        f"https://billing-api.internal/export?date={date}",
        timeout=300,
    )
    
    if response.status_code == 503:
        raise httpx.ConnectError("源系统处于维护模式")
    
    data = response.json()
    
    # 关键:验证我们获取的是完整数据集
    if data.get("is_partial"):
        raise PartialDataError(
            f"源系统为 {date} 返回了部分数据。 "
            f"预期约 {data.get('expected_count', 'unknown')} 条记录, "
            f"实际得到 {len(data['records'])} 条。可能是维护窗口期间。"
        )
    
    return data

3. 时区和夏令时转换

时钟每年会变化两次。而每年两次,假设一天有24小时的ETL管道就会崩溃。一个处理从午夜到午夜每日批次数据的管道,在夏令时转换期间要么会处理23小时的数据,要么会处理25小时的数据,导致计数不足或重复处理。

# 不好的:简单的日期算术
from datetime import datetime, timedelta

def get_date_range(date):
    start = datetime(date.year, date.month, date.day)
    end = start + timedelta(days=1)  # 不总是24小时!
    return start, end

# 好的:时区感知的日期边界
from zoneinfo import ZoneInfo

def get_date_range_safe(date, tz_name="America/New_York"):
    tz = ZoneInfo(tz_name)
    start = datetime(date.year, date.month, date.day, tzinfo=tz)
    end = datetime(date.year, date.month, date.day, tzinfo=tz) + timedelta(days=1)
    
    # 在夏令时转换期间:
    # 春季向前:end - start = 23小时(这是正确的!)
    # 秋季向后:end - start = 25小时(这也正确!)
    
    # 转换为UTC用于数据库查询
    return start.astimezone(ZoneInfo("UTC")), end.astimezone(ZoneInfo("UTC"))

# 更好的方法:全部以UTC存储和处理
# 只在表示层转换为本地时间

4. 迟到的数据

周末数据常常延迟到达。在早上6点处理”昨日数据”的批处理系统通常假设数据在那时已经完整。在工作日,这通常是正确的。但在周日,周六晚上的数据可能要到中午才能到达,因为源系统的批处理导出是在周末计划运行的。

# 基于水印的完整性检查
async def wait_for_data_completeness(
    source: str,
    date: str,
    expected_watermark: str = "23:59",
    timeout_hours: int = 6,
    check_interval_minutes: int = 15,
):
    """等待源数据包含直到预期水印的事件。"""
    start_time = datetime.now()
    timeout = timedelta(hours=timeout_hours)
    
    while datetime.now() - start_time < timeout:
        latest_event = await db.fetchval(
            "SELECT MAX(event_timestamp) FROM raw_events "
            "WHERE source = $1 AND event_date = $2",
            source, date,
        )
        
        if latest_event and latest_event.strftime("%H:%M") >= expected_watermark:
            logger.info("data_complete", source=source, date=date, 
                       latest_event=str(latest_event))
            return True
        
        logger.info("waiting_for_data", source=source, date=date,
                    latest_event=str(latest_event), expected=expected_watermark)
        await asyncio.sleep(check_interval_minutes * 60)
    
    raise DataIncompleteError(
        f"{source}/{date}的数据在{timeout_hours}小时后仍未完整。 "
        f"最新事件: {latest_event}。预期水印: {expected_watermark}。"
    )

5. 在周五实施的架构变更

有人在周五下午部署了一个架构变更(完全违背常理)。应用程序运行良好,因为它能够处理新的架构。但是,从副本或事件流读取的ETL管道并不知道新列或重命名字段。它在周五晚上和周六运行良好,因为新字段是可空的,管道会忽略额外的字段。到了周日,第一条包含必需新字段的记录到达,管道就崩溃了。

# 转换步骤中的防御性架构处理
import pandas as pd

def transform_orders(raw_df: pd.DataFrame) -> pd.DataFrame:
    # 定义带有默认值的预期架构
    expected_columns = {
        "order_id": {"type": "str", "required": True},
        "user_id": {"type": "str", "required": True},
        "amount": {"type": "float", "required": True},
        "currency": {"type": "str", "default": "USD"},
        "discount_code": {"type": "str", "default": None},
        "fulfillment_type": {"type": "str", "default": "standard"},
    }
    
    # 检查缺失的必需列
    missing_required = [
        col for col, spec in expected_columns.items()
        if spec.get("required") and col not in raw_df.columns
    ]
    if missing_required:
        raise SchemaError(
            f"缺少必需列: {missing_required}. "
            f"可用列: {list(raw_df.columns)}. "
            f"这可能表明上游架构发生了变更。"
        )
    
    # 添加缺失的可选列并设置默认值
    for col, spec in expected_columns.items():
        if col not in raw_df.columns and "default" in spec:
            raw_df[col] = spec["default"]
            logger.warning("schema_drift", column=col, action="added_default",
                          default=spec["default"])
    
    # 警告意外的列
    known_columns = set(expected_columns.keys())
    unknown_columns = set(raw_df.columns) - known_columns
    if unknown_columns:
        logger.warning("schema_drift", new_columns=list(unknown_columns),
                      action="ignored")
    
    return raw_df[list(expected_columns.keys())]

6. 备份作业导致的资源竞争

许多组织会在周末安排数据库备份、索引重建和 VACUUM 操作。这些维护任务与在同一时间运行的 ETL 管道竞争 I/O 和 CPU 资源。原本在周二只需 20 分钟的管道,在周日需要 3 小时,因为数据库同时还在运行完整备份。

# 在运行重查询前检查是否有竞争操作
async def check_database_load(max_active_queries: int = 20):
    active = await db.fetchval("""
        SELECT COUNT(*) FROM pg_stat_activity 
        WHERE state = 'active' 
          AND query NOT LIKE '%pg_stat_activity%'
          AND backend_type = 'client backend'
    """)
    
    if active > max_active_queries:
        logger.warning("high_database_load", active_queries=active,
                      threshold=max_active_queries)
        return False
    return True

# 同时检查长时间运行的维护操作
async def check_for_maintenance():
    maintenance = await db.fetch("""
        SELECT pid, query, state, now() - query_start as duration
        FROM pg_stat_activity
        WHERE query ILIKE '%vacuum%' 
           OR query ILIKE '%reindex%'
           OR query ILIKE '%pg_dump%'
        ORDER BY duration DESC
    """)
    
    if maintenance:
        logger.info("maintenance_detected", 
                   operations=[{
                       "pid": m["pid"], 
                       "type": m["query"][:50],
                       "duration": str(m["duration"])
                   } for m in maintenance])
    return maintenance

7. “没有有效数据”的问题

在某些星期日,确实可能没有数据。一个 B2B SaaS 产品在星期日可能没有任何交易。数据管道需要区分”因为没有数据所以是星期日”和”因为提取失败所以没有数据”这两种情况。这听起来比实际要难。

# 跟踪提取元数据以区分空数据与提取失败
@dataclass
class ExtractionResult:
    source: str
    date: str
    record_count: int
    extraction_started: datetime
    extraction_completed: datetime
    source_responded: bool
    source_reported_complete: bool
    
    @property
    def is_empty_but_valid(self) -> bool:
        return (
            self.record_count == 0 
            and self.source_responded 
            and self.source_reported_complete
        )
    
    @property
    def is_suspicious_empty(self) -> bool:
        return (
            self.record_count == 0 
            and (not self.source_responded or not self.source_reported_complete)
        )

async def extract_and_validate(source: str, date: str):
    result = await extract_data(source, date)
    
    if result.is_suspicious_empty:
        alert(
            f"从 {source} 提取 {date} 的数据返回了 0 条记录 "
            f"但数据源未确认完整性。 "
            f"数据源响应: {result.source_responded}。 "
            f"在将此运行标记为成功前需要调查。"
        )
    elif result.is_empty_but_valid:
        logger.info("empty_extraction_valid", source=source, date=date,
                    note="数据源确认该期间无数据")

构建弹性管道:检查清单

  1. 使用星期感知的阈值进行容量检查和异常检测。
  2. 处理维护窗口,使用重试和指数退避策略。
  3. 以 UTC 格式存储和处理时间戳。仅在展示层转换为本地时间。
  4. 实现基于水印的完整性检查,而不是假设数据在固定时间已准备好。
  5. 防御性地验证模式,包括已知列检查和默认值处理。
  6. 监控资源竞争,避免在备份窗口期间运行重型 ETL。
  7. 使用数据源元数据区分有效空数据和提取失败
  8. 使管道具有幂等性。应该能够重新运行任何日期的任何管道而不会重复数据。
  9. 使用周末数据进行测试。您的 CI 管道应包含测试装置,模拟星期日的数据量、夏令时转换和延迟到达的数据。
  10. 维护管道日历,将已知的不规则事件(节假日、夏令时、月末、季末)映射到预期的管道行为变化。

运营成熟度模型

级别 特征 周日行为
1: 临时性 无监控,手动修复 直到周一才知道出问题
2: 响应式 基本警报,手动修复 值班人员收到通知,手动重新运行
3: 防御性 重试机制,验证,感知日期的阈值 大多数问题自动修复,真正故障会通知
4: 主动性 异常检测,完整性检查,自动修复 流水线自动调整到周末模式

大多数团队处于第2级。达到第3级需要一周的专注工程工作。达到第4级需要持续投入,但通过减少值班负担和提高数据质量而获得回报。

数据流水线在周日崩溃,因为工程师是在周二构建它们的。周二是干净、完整且准时到达的数据。周日的数据则是混乱、不完整、延迟,有时甚至完全缺失。如果你的流水线无法优雅地处理周日数据,它就不是生产流水线——它只是一个碰巧每天运行的周二流水线。为周日而构建,其他所有日子都会自行解决。

By Michael Sun

Founder and Editor-in-Chief of NovVista. Software engineer with hands-on experience in cloud infrastructure, full-stack development, and DevOps. Writes about AI tools, developer workflows, server architecture, and the practical side of technology. Based in China.

Leave a Reply

Your email address will not be published. Required fields are marked *

You missed