Language:Chinese VersionEnglish Version

事件驱动架构(EDA)存在声誉问题。当小团队听到这个术语时,他们会想到 Kafka 集群、模式注册表、复杂的消费者群组,以及雇佣的分布式系统博士。企业文献也没有帮助——它假设你有一个平台团队、专门的基础设施预算,以及数十个服务。

但事件驱动模式也能极大地简化小型应用程序。关键是为你的团队规模选择合适的复杂度级别。一个 5 人的初创公司不需要 Kafka。他们需要能够解耦服务、优雅地处理故障,并在时机成熟时能够扩展的模式。

本文将介绍三种复杂度级别的事件驱动架构,并提供每种级别的实际实现模式。从第一级开始,只有当你遇到特定的扩展瓶颈时,才升级到第二级或第三级。

为什么事件对小型团队很重要

考虑一个典型的 SaaS 应用程序。用户注册。接下来会发生什么?

  • 在数据库中创建用户记录
  • 发送欢迎邮件
  • 创建 Stripe 客户
  • 设置默认项目和 workspace
  • 在 Slack 中通知销售团队
  • 在分析中跟踪该事件

在同步架构中,所有这些操作都在注册处理器中发生:

// 同步的噩梦
async function handleSignup(req, res) {
  const user = await db.users.create(req.body);     // 50ms
  await sendWelcomeEmail(user);                       // 800ms
  await stripe.customers.create({ email: user.email }); // 400ms
  await createDefaultWorkspace(user.id);              // 200ms
  await notifySlack(`New signup: ${user.email}`);     // 300ms
  await analytics.track("user.signup", user);         // 150ms

  // 总计:约1900ms — 用户等待近2秒
  // 如果 Stripe 宕机,注册完全失败
  res.json({ user });
}

这有三个关键问题:用户等待所有下游操作,任何单个失败都会阻止整个注册过程,每个新的副作用都会使处理器变得更慢且更脆弱。

事件驱动:注册处理器创建用户并发布一个事件。其他所有事情异步发生:

// 事件驱动:干净、快速、有弹性
async function handleSignup(req, res) {
  const user = await db.users.create(req.body);       // 50ms
  await events.publish("user.signup", { userId: user.id, email: user.email });
  res.json({ user });                                  // 总计:约60ms
}

// 每个副作用都是一个独立的处理器
events.on("user.signup", sendWelcomeEmail);
events.on("user.signup", createStripeCustomer);
events.on("user.signup", createDefaultWorkspace);
events.on("user.signup", notifySlackSales);
events.on("user.signup", trackAnalytics);

响应时间从1900毫秒降至60毫秒。如果Stripe服务宕机,用户仍然可以完成注册——Stripe处理器会独立重试。添加新的副作用只需添加一个处理器,而不是修改关键代码路径。

第1级:进程内事件(0-3个服务)

对于单个应用或单体应用,您不需要消息代理。带有持久化作业队列的进程内事件发射器可以处理大多数事件驱动模式。

使用BullMQ实现(Node.js)

// events/emitter.ts
import { Queue, Worker, QueueEvents } from "bullmq";
import Redis from "ioredis";

const connection = new Redis(process.env.REDIS_URL);

// 每种事件类型一个队列
const queues = new Map<string, Queue>();

export function getQueue(eventName: string): Queue {
  if (!queues.has(eventName)) {
    queues.set(eventName, new Queue(eventName, { connection }));
  }
  return queues.get(eventName)!;
}

export async function publish(eventName: string, data: Record<string, unknown>) {
  const queue = getQueue(eventName);
  await queue.add(eventName, data, {
    attempts: 3,
    backoff: { type: "exponential", delay: 1000 },
    removeOnComplete: { count: 1000 },
    removeOnFail: { count: 5000 },
  });
}

// 为事件注册处理器
export function on(
  eventName: string,
  handler: (data: Record<string, unknown>) => Promise<void>,
  options?: { concurrency?: number }
) {
  const worker = new Worker(
    eventName,
    async (job) => {
      await handler(job.data);
    },
    {
      connection,
      concurrency: options?.concurrency ?? 5,
      limiter: { max: 10, duration: 1000 }, // 速率限制
    }
  );

  worker.on("failed", (job, err) => {
    console.error(`${eventName}的处理器失败:`, {
      jobId: job?.id,
      attempt: job?.attemptsMade,
      error: err.message,
    });
  });

  return worker;
}
// handlers/signup.ts — 清晰、隔离的处理器
import { on } from "../events/emitter";

on("user.signup", async (data) => {
  const { userId, email } = data;
  await emailService.send({
    to: email,
    template: "welcome",
    data: { userId },
  });
}, { concurrency: 10 });

on("user.signup", async (data) => {
  const { userId, email } = data;
  const customer = await stripe.customers.create({
    email,
    metadata: { userId },
  });
  await db.users.update(userId, { stripeCustomerId: customer.id });
}, { concurrency: 3 }); // 为Stripe速率限制设置较低的并发性

这为您提供了重试逻辑、并发控制、速率限制和故障跟踪功能——所有这些都只需要Redis作为基础设施依赖(您可能已经将其用于缓存)。

第2级:消息代理(3-10个服务)

当您有多个需要响应事件的服务时,进程内发射器就不够用了。您需要消息代理。对于小型团队,有两个可靠的选择:

Redis Streams

如果你已经在运行 Redis,那么 Redis Streams 是阻力最小的路径。它提供了带有持久化的 pub/sub、消费者组和确认机制——本质上是一个轻量级的 Kafka。

// Producer: 发布事件到 Redis Stream
import Redis from "ioredis";

const redis = new Redis(process.env.REDIS_URL);

async function publishEvent(stream: string, event: object) {
  const id = await redis.xadd(
    stream,
    "*", // 自动生成 ID
    "data", JSON.stringify(event),
    "type", event.type,
    "timestamp", Date.now().toString()
  );
  return id;
}

// Consumer group: 每个服务都有自己的组
async function createConsumerGroup(stream: string, group: string) {
  try {
    await redis.xgroup("CREATE", stream, group, "0", "MKSTREAM");
  } catch (err) {
    if (!err.message.includes("BUSYGROUP")) throw err;
    // 组已存在,没关系
  }
}

// Consumer: 读取事件并进行确认
async function consumeEvents(
  stream: string,
  group: string,
  consumer: string,
  handler: (event: object) => Promise<void>
) {
  while (true) {
    const results = await redis.xreadgroup(
      "GROUP", group, consumer,
      "COUNT", 10,
      "BLOCK", 5000, // 最多等待 5 秒获取新事件
      "STREAMS", stream, ">"
    );

    if (!results) continue;

    for (const [, messages] of results) {
      for (const [id, fields] of messages) {
        const data = JSON.parse(fields[1]); // "data" 字段
        try {
          await handler(data);
          await redis.xack(stream, group, id);
        } catch (err) {
          console.error(`处理 ${id} 失败:`, err);
          // 消息将重新传递给另一个消费者
        }
      }
    }
  }
}

何时升级到 RabbitMQ 或 NATS

Redis Streams 适用于简单的事件路由。当你需要:

  • 复杂的路由(基于主题、基于头部)
  • 具有复杂重试策略的死信队列
  • 优先队列
  • 大于 512MB 的消息

那么 RabbitMQ 或 NATS 是正确的选择。两者都比 Kafka 简单得多地运行,并且在单个节点上每天可以处理数百万条消息。

第 3 级:事件流(10+ 个服务或实时需求)

当你需要以下功能时,Kafka 或 Redpanda 就派上用场了:

  • 事件重放(重新处理历史事件)
  • 事件溯源(从事件日志派生状态)
  • 实时流处理
  • 分区内的保证顺序

对于真正需要流处理的小团队来说,Redpanda 是更好的选择。它与 Kafka API 兼容,但作为单个二进制文件运行,无需 ZooKeeper/KRaft 的复杂性。

# Redpanda 的 docker-compose.yml(单节点,开发环境)
services:
  redpanda:
    image: redpandadata/redpanda:v24.1.1
    command:
      - redpanda
      - start
      - --smp 1
      - --memory 1G
      - --overprovisioned
      - --kafka-addr PLAINTEXT://0.0.0.0:9092
    ports:
      - "9092:9092"
      - "8081:8081"  # Schema registry
      - "9644:9644"  # Admin API

事件设计模式

无论您在哪个层面操作,这些模式决定了您的事件驱动架构是否可维护:

1. 事件命名约定

// 使用过去时态、领域特定的名称
// 好的:
"user.signup.completed"
"order.payment.failed"
"project.member.invited"

// 不好的:
"createUser"        // 祈使句 — 这是一个命令,不是事件
"USER_CREATED"      // 没有领域上下文
"handle-signup"     // 名称中包含实现细节

2. 带版本控制的事件模式

// 每个事件都有一个一致的封装结构
interface EventEnvelope<T> {
  id: string;           // 唯一事件ID(UUID)
  type: string;         // 事件名称
  version: number;      // 模式版本
  timestamp: string;    // ISO 8601格式
  source: string;       // 哪个服务产生的此事件
  correlationId: string; // 用于跨服务追踪
  data: T;              // 实际的有效载荷
}

// 示例
const event: EventEnvelope<UserSignupData> = {
  id: "evt_abc123",
  type: "user.signup.completed",
  version: 2,
  timestamp: "2026-03-30T10:00:00Z",
  source: "auth-service",
  correlationId: "req_xyz789",
  data: {
    userId: "usr_456",
    email: "developer@example.com",
    plan: "starter",
  },
};

3. 幂等处理器

事件将被多次传递。每个处理器必须是幂等的:

// 使用事件ID去重实现幂等处理器
async function handleUserSignup(event: EventEnvelope<UserSignupData>) {
  // 检查是否已处理过此事件
  const processed = await db.processedEvents.findUnique({
    where: { eventId: event.id },
  });
  if (processed) {
    console.log(`事件 ${event.id} 已处理,跳过`);
    return;
  }

  // 处理事件
  await stripe.customers.create({
    email: event.data.email,
    metadata: { userId: event.data.userId },
  });

  // 标记为已处理
  await db.processedEvents.create({
    data: { eventId: event.id, processedAt: new Date() },
  });
}

监控事件驱动系统

EDA 最大的运营挑战是可见性。当注册处理器静默失败时,直到用户抱怨未收到欢迎邮件,才会有人知道。

需要跟踪的关键指标:

  • 队列深度:有多少未处理的事件正在等待?队列增长表明消费者无法跟上处理速度。
  • 处理延迟:从事件发布到处理器完成的时间。峰值表明下游服务存在问题。
  • 每个处理器的错误率:哪些处理器失败了,以及失败频率如何?
  • 死信队列大小:有多少事件已经耗尽了所有重试次数?
// 使用 BullMQ 进行最小化监控
import { Queue } from "bullmq";

async function getQueueMetrics(queueName: string) {
  const queue = new Queue(queueName, { connection });
  const [waiting, active, completed, failed, delayed] = await Promise.all([
    queue.getWaitingCount(),
    queue.getActiveCount(),
    queue.getCompletedCount(),
    queue.getFailedCount(),
    queue.getDelayedCount(),
  ]);

  return { queueName, waiting, active, completed, failed, delayed };
}

// 作为 /metrics 端点暴露给 Prometheus
app.get("/metrics", async (req, res) => {
  const queues = ["user.signup", "order.created", "payment.processed"];
  const metrics = await Promise.all(queues.map(getQueueMetrics));

  let output = "";
  for (const m of metrics) {
    const name = m.queueName.replace(/./g, "_");
    output += `queue_waiting{queue="${name}"} ${m.waiting}n`;
    output += `queue_failed{queue="${name}"} ${m.failed}n`;
    output += `queue_active{queue="${name}"} ${m.active}n`;
  }
  res.type("text/plain").send(output);
});

何时不应使用事件

事件驱动架构增加了间接性。并非每个操作都适合使用事件:

  • 同步用户流程:如果用户需要立即响应且该响应依赖于结果(例如,显示收据前需要支付确认),则保持同步。
  • 无副作用的简单 CRUD 操作:在笔记应用中创建笔记不需要事件系统。
  • 需要强一致性的操作:如果两个操作必须同时成功或失败,则使用数据库事务,而非事件。

对于大多数小型团队而言,适量的事件驱动架构应该是选择性的:将事件用于副作用,保持核心业务逻辑同步。这样可以用20%的复杂性获得80%的好处。

By

Leave a Reply

Your email address will not be published. Required fields are marked *

You missed