分享

反应式事件驱动系统及推荐实践

 新用户0175WbuX 2022-01-29

  本文讨论如何构建反应式事件驱动系统及其推荐实践,这是 《构建事件驱动的云应用和服务》 系列教程的第三篇。

  正如开篇讨论的那样,在一个反应式事件驱动的系统中,发布者发出事件来调用(触发)订阅者的操作;这种模式中的事件实际上与内部函数调用、HTTP请求或RPC调用没有什么区别,从技术上讲,你可以将这种模式改造任何工作流,包括单体和基于HTTP RESTful/RPC的微服务系统。常见的用例包括支付处理、预订/保留,以及任何长期运行的操作(如视频转码)。

  该模式有以下优点:

  高度解耦:发布者和订阅者之间几乎没有依赖性,服务现在可以单独演进更好的可扩展性(使用正确配置的消息队列/流方案):消息队列/流方案可以保留大量的消息,并随着用户处理事件的节奏自我适应更好的可扩展性:开发人员可以随时添加/删除订阅者,如设置多个工作流同时处理同一事件流。

  但缺点是追踪事件流程变得越来越难。另外,在没有执行路径的情况下,无法保证发布者收到回复,要想得到反馈,可能还得自己去做查询。

  在本教程中,您将使用 Stripe 和 Google Cloud Functions 构建一个简单的支付处理工作流,客户通过 Stripe 使用信用卡支付,您的服务完成订单并发送确认邮件。请注意,这个工作流和许多使用反应式模式构建的工作流一样,都是事务性的;更确切地说,这个工作流中的所有步骤要么全部成功,要么全部失败,而且是在每一种情况下,包括错误、崩溃和平台不可用(如Google Cloud Functions脱机)。

  该演示项目使用 Cloud Pub/Sub 来进行消息队列/流。Cloud Pub/Sub 是一个托管的解决方案,发布者和用户通过 Cloud Pub/Sub 主题(topic)处理消息(事件)。

  反应式事件驱动系统及推荐实践

  工作流程如下:

  客户通过APP进行消费。Stripe 向客户的信用卡收费,并发送一个 webhook 事件给 Cloud Function fulfillment (如果收费成功)或 cancellation (如果收费失败)。fulfillment 履行完成订单,并向 Cloud Pub/Sub 发布 orderProcessed 事件。 cancellation 拒绝订单,也向 Cloud Pub/Sub 发布 orderProcessed 事件。如果 fulfillment 或 cancellation 无法处理订单,Stripe 发送的 webhook 事件将保存在 DLQs 中,以便进一步检查。email 接收到 orderProcessed 事件,并发送确认邮件。如果 email 无法发送邮件,那么 orderProcessed 事件将被保存在DLQ中,以便进一步检查。stats 也会接收到这个事件,并将订单的ID写到 BigQuery 中以备参考。

  译者注:实操细节请参考英文原文。

  如果你喜欢在本地运行,请按照下面的步骤进行操作。

  设置好你的Python开发环境。安装Python 3.安装 Google Cloud SDK。按照 gcp_tutorial.md 继续进行。

  这个工作流所涉及的事件在 events.yaml 中定义。该演示项目使用 CloudEvents Generator准备的事件库来写入和读取事件。

  至少有一次交付,和正好是一次交付At least once delivery, and exactly once delivery

  在单体系统中,内部函数调用始终是同步的;如果进行一次函数调用,它将被调用一次,而且是精确的一次。不幸的是,在反应式事件驱动系统中,情况并非如此:大多数消息队列/流式解决方案,如 Cloud Pub/Sub 和 Apache Kafka(0.11之前),都只能保证至少一次交付,也就是说,所有的事件最终都会被交付,尽管订阅者可能会看到少量的事件两次或更多次。

  注意:

  即使你的消息队列/流解决方案承诺了精确的一次性交付,你也应该非常谨慎。这些承诺通常有一个前提条件:对于Kafka来说,精确的一次性处理是一个端到端的保证,你的应用程序必须设计成不违反该属性;另一方面,Google Cloud Tasks则偏向于在必要时保证执行,而不是精确的一次性执行,这意味着它对99.999%的任务承诺精确的一次性交付,但不是全部。

  如果你的订阅者不是幂等的话,至少一次交货会在你期待偏少的时候引起问题。以本教程的演示项目为例,一个订单不应该被执行两次。因此,强烈建议你通过事务将传入的事件(或者至少是它们的ID)通过数据库持久化,并使用这些记录拒绝任何重复的事件。此外,您应该为您的事件设置一个TTL(time to live)值(例如10分钟),这样您就不需要永远持久化所有的事件;任何不在数据库中但陈旧的事件也应该被拒绝。

  这个演示使用Cloud Firestore,一个托管的NoSQL数据库解决方案来保存事件。它首先检查事件的过期日期,然后使用事务以确保该事件之前从未被处理过。通过云函数开始时的事务把关,即使在执行过程中,例如部署时崩溃,每个事件也只会被处理一次。

  async function fulfill (req, res) {

  ...

  // Rejects stale events.

  utils.checkForExpiredEvents('POSIX', paymentIntent.created, EVENT_MAX_AGE);

  // Checks if the event is duplicated.

  await utils.guaranteeExactlyOnceDelivery(firestoreClient, EVENT_COLLECTION, paymentIntent.id, JSON.stringify(paymentIntent));

  ...

  }

  // Checks if an event has expired.

  exports.checkForExpiredEvents=function (format, eventTimestamp, maxAge) {

  let age;

  switch (format) {

  case 'RFC3339':

  age=Date.now() - Date.parse(eventTimestamp);

  break;

  case 'POSIX':

  age=Date.now() - eventTimestamp * 1000;

  break;

  default:

  throw Error('INVALID_TIMESTAMP');

  }

  if (age > maxAge) {

  throw Error('EXPIRED_EVENT');

  }

  };

  // Checks if an event is duplicated.

  exports.guaranteeExactlyOnceDelivery=async function (firestoreClient, collection, eventId, context) {

  const ref=firestoreClient.collection(collection).doc(eventId);

  return firestoreClient.runTransaction(async t=> {

  const doc=await t.get(ref);

  if (!doc.exists) {

  await t.set(ref, { context: context });

  } else {

  throw Error('DUPLICATE_EVENT');

  }

  });

  };

  此外,一些消息队列/流解决方案也不能保证消息的顺序。在这个用例中,消息的顺序是无关紧要的,但是,如果你的系统要求以正确的顺序接收消息,你就必须以某种方式持久化事件,并自行排序。

  正如前面所解释的,反应式事件驱动系统最突出的一个缺点是,你不能像在单体系统中的try-catch块(或其等价物)那样观察系统中的事件(命令)流。比如说,如果二手事件流突然停止(可能是由于代码中的bug),你将无法在事件流的上游(事件流的开始)或下游(事件流的结束)捕捉到它;为了排除故障,你必须逐一检查每一个发布者和订阅者。

  幸运的是,有一些工具可以提供帮助。 分布式日志 、 集中式监控 和 分布式跟踪 是反应式事件驱动系统(以及几乎所有分布式系统)中可观察性的三大支柱。

  分布式日志 有助于将所有逻辑上连接的组件的日志关联起来(例如在同一事件流中)。集中式监控 有助于收集来自所有逻辑上连接的组件的指标。分布式跟踪 有助于跟踪每个步骤在流中所花费的时间,从而衡量其性能。

  下面的两个例子,分别展示了如何使用 Stackdriver Logging、Stackdriver Monitoring 和OpenCensus 进行分布式日志和跟踪,前者使用订单的ID来分组日志,后者使用唯一的ID来分组同一跟踪下的span。

  const {Logging}=require('@google-cloud/logging');

  const logging=new Logging();

  // const logName='customLog';

  // const resourceType='cloud_function';

  // const labels={

  // 'source': 'fulfillment',

  // 'orderId': 'myOrderId'

  // };

  async function distributedLog(logName, resourceType, customLabels, text) {

  const log=logging.log(logName);

  const text=text;

  const metadata={

  labels: customLabels,

  resource: {

  type: resourceType

  }

  }

  const entry=log.entry(metadata, text);

  await log.write(entry);

  console.log(text);

  }

  可以在 Stackdriver 日志查看器中用过滤器 label.orderId=myOrderId 查询,以跟踪处理该特定订单时产生的所有日志。

  const tracing=require('@opencensus/nodejs');

  const { StackdriverTraceExporter }=require('@opencensus/exporter-stackdriver');

  const exporter=new StackdriverTraceExporter({projectId: "YOUR-GCP-PROJECT"});

  const tracer=tracing.registerExporter(exporter).start({ samplingRate: 1 }).tracer();

  function A () {

  let traceId;

  tracer.startRootSpan({ name: 'A' }, rootSpan=> {

  doSomeWorkInA();

  traceId=rootSpan.traceId();

  rootSpan.end();

  });

  // Pass traceId to another function

  }

  function B () {

  let traceId='The trace ID passed from A';

  tracer.startRootSpan({ name: 'B', traceId: traceId }, rootSpan=> {

  doSomeWorkInB();

  rootSpan.end();

  });

  // Keep passing the traceId if needed

  }

  当在 Stackdriver Trace Viewer 中查看跟踪时,即使两个span在两个不同的实例中运行,也会被关联到同一个跟踪。您可以用诸如 orderId 来注释跟踪,以备参考。

  一般来说,在单体或基于HTTP RESTful/RPC的微服务系统中,开发人员必须一丝不苟地检查每一个可能的输入,并尽力从异常中恢复。这部分原因是由于函数调用(或HTTP请求/RPC调用)从核心上说同步的;如果一个异常没有被捕获(和恢复),那么调用本身就会永远消失–没有简单的恢复方法。

  而反应式事件驱动系统中的事件则是不可变的,可以很容易恢复,这要归功于作为中介的消息队列/流解决方案。尽管你可能觉得奇怪,但建议在反应式事件驱动系统中采取 “让它崩溃 “的不良态度,专注于订阅者中的幸福路径,简单地拒绝任何你无法处理的事件到DLQ(死信队列)中。

  重要

  显然,这种态度并不适用于对业务逻辑至关重要的异常,这些异常仍然需要尽快捕获并处理。比如说在这个演示项目中,当一个订单因为bug而无法执行时,你仍然需要在代码中尽快修复它。或者,你可以拒绝所有的死事件到DLQ,并设置专门的worker来处理特定类型的错误。

  死信队列(Dead letter queues),顾名思义,就是死信事件的消息队列。它允许开发人员

  检查潜在的问题和BUG检测和分析问题的模式配置专职工作人员处理错误

  DLQ的好处在于它能准确地捕捉到事件生产者发送的事件,让你有机会获得第二次机会来轻松地修正问题。例如,如果由于某种原因,传递给 fulfillment 的订单ID暂时损坏,而你很快就找到了修复它们的方法,你可以从DLQ中找回所有被拒绝的事件,并要求 fulfillment 再次处理它们。

  备注

  Cloud Pub/Sub,与一些消息队列/流的选项不同,它没有内置DLQ支持。因此,我们在这里设置了一个单独的主题作为DQL;在你的生产系统中,你可能会有拒绝事件的选项,而不需要在代码中进行额外的设置。

  // A wrapper for rejecting dead-lettered events.

  module.exports=functions.https.onRequest(async (req, res)=> {

  try {

  return await fulfill(req, res);

  } catch (err) {

  console.log(err.message);

  // Stripe webhook events are delivered via HTTP. Here it is wrapped in

  // a Cloud Event and rejected to DLQ via Cloud Pub/Sub.

  const stripeWebhookEventRejected=utils.createStripeWebhookEventRejectedEvent(SOURCE, req.headers, req.body, err.message);

  await utils.publishEvent(pubSubClient, DLQ, stripeWebhookEventRejected);

  res.status(400).end();

  return Promise.resolve();

  }

  });

  async function fulfill (req, res) {

  ...

  }

  你可以在任何时候暂停,然后重新启动一部分事件流。您的消息队列/流解决方案应该能够在这段时间内(在Cloud Pub/Sub的情况下是7天)暂停事件,这样就可以在离开的地方恢复。要在演示项目中做到这一点,请在Cloud Console中删除与您希望暂停的云功能相关联的订阅。您可以在以后准备好后再重新创建一个。

  默认情况下,Cloud Pub/Sub 会根据您的 Cloud 函数的事件消费速率自动控制流速。Cloud Functions 会自动伸缩;如果您想限制 Cloud Function 实例的最大数量,从而调节流量速度,请调整 Cloud Functions 的可伸缩性设置。其他消息队列/流解决方案和计算平台可能也有类似的设置。

  Cloud Pub/Sub 和其他一些消息队列/流解决方案具有快照功能,允许您重放事件序列。当您在为您的反应式事件驱动系统的更新代码时,这将是非常有用的:只需要求 Cloud Pub/Sub 将相同的事件序列在另一个时间段传递给新代码,您就可以有一个可以用于比较的基础。

  另外,正如本系列教程的开篇所介绍的那样,在反应式事件驱动系统中,只要有消息队列/流解决方案的存在,你就可以随时添加/删除订阅者。这就实现了镜像模式,在这里你可以设置一个新版本的订阅者与旧版本的订阅者并排工作,以尝试一下新的功能。例如,这个演示项目增加了一个统计云函数,可以与email 并排工作,将订单状态保存到BigQuery。

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多