分享

冷却热数据:从Kafka到Athena

 邸彦强 2021-07-31

在过去的15年里,Leboncoin一直在法国提供分类广告服务,其目前的大小在70多个类别中托管超过4000万广告 - 从房地产到视频游戏 - 每月为3000万活跃用户服务。

冷却热数据:从Kafka到Athena

> The swimming pool room at leboncoin’s UFO HQ

这导致平台上产生的越来越大的数据,既有规模,又有复杂性。2015年左右,我们意识到我们需要提高我们的技术和组织能力,以利用数据,以便为客户提供更好的服务。

这导致我们在其他事情中将我们的数据整合模式从原始日志和数据库转储转移到基于流的方法,因此我们与Apache Kafka的历史 - 分布式事件流/ Pub-Subl平台 - 开始。

但Kafka是一个“在线”系统,更好地用于服务间通信并在发生时存储事件日志,尽管没有无限期。为了能够分析,转换和整合“脱机”数据存储中的所有这些事件 - 例如数据方案 - 更适合大型批量作业和机器学习培训任务,需要一个事件归档工作流程。

本文是我们如何重新制作此工作流程的故事,从我们的“Online”Kafka集群将大量的异构数据暴露于我们的数据群中,以便长期数据保留,分析,法律要求以及作为机器学习“肥料”的机器。我们将探讨我们如何从手动和繁琐的过程到一个完全自动化的工作流程,包括我们在AWS S3 / Athena / Spark上的DataLake上的表的创建和演变,只有他们最重要的地方所需的人类控制和检查。

冷却热数据:从Kafka到Athena

一点历史

Kafka的第一个实际使用情况是受众追踪服务,当时已经达到了接近1亿日记录。基于JSON的序列化将在存储和网络带宽方面证明昂贵。

因此,我们很幸运能够以最佳的事件序列化的最佳实践,使用Avro格式通过其模式注册表的汇合集成。我们甚至提出了我们未来的主题的一些规范,以及未来预期主题类型的一些基线配置选项(基于吞吐量,灵敏度和一致性要求)。

然后,我们在S3上实现了我们自己的自定义解决方案,以便在S3上存档,使得冷库并访问主题作为具有更长保留的数据集,以及用于分析和机器学习培训的能力(首先使用Spark,然后首先使用Spark Athena)。该解决方案负责从Avro Serialization(最适合原子事件)的事件转换为Parquet格式,更适合大型数据集,读取密集型操作。

这是在我们的Air Flow调度程序中编织作为一个小时的DAG,并且几年,符合我们的期望,Kafka的使用量在整个公司中。

冷却热数据:从Kafka到Athena

> The Kafka “hot” to S3/Athena “cold” storage workflow

巩固架构目录

随着使用率的增长,技术和组织的新挑战开始出现。更多的贡献者意味着更多的工作方式和定义事件数据模式,虽然Avro规范非常完整,但在涉及到以下方式时,它不足以保证整个公司的一致性:

  • 强制性字段(标识符,事件ID,时间戳……)
  • 日期格式:字符串或数字?RFC-3339或其他?时间戳以秒或千秒为单位或毫秒?
  • 向后兼容性检查
  • 字段名称约定

以及许多其他“细节”,非常重要,可以简化与数据集一起使用。通过以下方式解决了这一特定问题:

  • 搬到一个专门的平台团队为Kafka的基础设施
  • Git上的统一主题和架构目录库存储库
  • 自动化ACL管理
  • 严格的架构持续整合和部署
  • 向后兼容性检查和更好的工具
  • 通过一套RFC和约定培训/文件

它还奠定了明确的基础,我们在其上建立了本公司所有架构的参考资料,但不仅,我们的子公司和我们的母公司(Adevinta)都能够提供/发布活动。因此,存储库的布局为每个主题的三个静态级别构成:

  • 公司(Leboncoin,Adevinta,L’Argus ……)
  • 域名(广告,付款,消息……)
  • 环境(架构跨暂存/生产一致,但未以同一速度部署)

这给了我们这个布局:

.├── leboncoin│ ├── ads│ │ ├── accepted-ad│ │ │ ├── prod│ │ │ │ └── schemas│ │ │ └── staging│ │ │ └── schemas│ │ ├── action-states│ │ │ ├── prod│ │ │ │ └── schemas│ │ │ └── staging│ │ │ └── schemas│ │ ├── action-states-for-search-only│ │ │ ├── prod│ │ │ │ └── schemas│ │ │ └── staging│ │ │ └── schemas│ │ ├── ad-params-image-inference│ │ │ ├── prod│ │ │ │ └── schemas│ │ │ └── staging│ │ │ └── schemas[...]├── adevinta

然后,每个环境都包含一个“简化的”主题配置,它将通过我们的连续交付工具转换为更详细的Kafka主题配置:

{  'typology': 'job-queue',  'compaction': 'none',  'scope': 'public',  'encoding': 'avro',  'throughput': 'normal',  'ownership': 'team-ads',  'pii': 'HIGH',  'producers': [    'producer_app_1'  ],  'consumers': [    'consumer_app_1',    'consumer_app_2',    'archiver'  ]}

在这些行动之后不久,我们开始看到发表畸形事件的数量明显减少,以及跨越模式的差异减少。

人类处理单位缩放

但这只照顾了我们缩放问题的一部分,主要是为生产者团队(主要是微服务所有者)。我们仍然必须配置我们想要在AWS S3上存档的每个主题,在特定配置中定义:

  • 归档主题的名称
  • 应该存储数据集的路径
  • 输出的分区方案
  • 要使用的事件日期/时间列
  • 预期日期/时间的格式
  • 在我们的Hive Metastore中创建的表的名称(AWS胶水)
  • 创建数据集的敏感性及其生命周期

我们甚至必须使用Spark编写(并维护)Parquet 转换工具的自定义Avro。此外,虽然Avro Serialization是规则,但某些主题的例外情况,我们没有计划,并且不得不处理使用其他方法。

最后,通过在所有环境中手动宣布适当的DDL在所有环境中手动宣布适当的DDL,必须从Avro转换为Hive / Presto,以便在Athena或Prive才能使用,以便在所有环境中的胶水转移中宣布适当的DDL。

总而言之,在一个小型数据 - 工程团队中有几个主题和少数生产商的可行方法,到2018年我们迅速变得无法管理,当我们开始转移到一个特征团队组织时,具有更为分布式的制片人和利益相关者的模式……

随着符合条件主题的数量接近100(现在更接近160),我们必须重新思考我们的工作流程,以充分自动化这种繁荣和易于易于归档的过程。

计划

因此,该计划是完全自动化这一过程,首先从Kafka到S3(在我们已经在我们的自定义过程中留下了我们已经实施的Parquet 转换),然后从S3到雅典娜。

预期的改进是多数:

  • 由于我们将不再需要担心传播配置,因此数据工程团队的生产力增加了
  • 新主题的数据可用性提升,因为在第一次活动发布后不到30分钟的Athena 表可以提供任何新版本
  • 更快地检测和隔离由于切换到非批量入口过程(虽然S3上的写作仍然批量,但归档过程本身将由现场消费者制作)
  • 实时更清晰地清晰的状态,事件速率,误差计数,滞后和每个连接器的内存消耗
  • 在数据结构中添加额外的Kafka特定信息以进行调试和检测异常行为(迟到事件,偏斜分区……)
  • 改进了产品和数据团队的过程和工具之间的对齐

这也是有机会修复我们数据阿拉克布局中的一些错误:

  • 理想的情景是athena中的桶前缀和表名将直接从主题名称和确定性推断出来,但由于多年的手动配置,过去的配置是不一致的
  • 在S3上移动到一个简单的“每天”Hive分区方案到“每小时”方法,同时也将架构添加为分区(即使我们的Avro实践也可确保我们具有兼容的模式,将它们拆分在单独的路径中提高了我们的调试能力在不可预见的典范合并并发症的情况下给出我们的选择)
  • 归一化前缀结构,以便启用数据集存储策略的微调(GDPR的到期和降低成本降低成本的转换)

工作的合适工具

幸运的是,在这段时间(2019年),Kafka周围的生态系统已经进化了很多。kafka 0.8的日子已经过去了,融合平台在一分钟后越来越富裕。我们希望在强大,维护和稳定的和稳定的工具上基于我们的下一个方法,以便能够在AWS S3上有序和高效的方式用这些活动。

KAFKA CONNECT及其AWS S3 Sink连接器被选为此新设置的平台。Kafka Connect是一个专用于将数据推入Kafka集群(通过源连接器)的平台,并从其他类型的数据系统(RDBMS,Elasticsearch,DynamoDB,S3 …)中拉出数据输出(宿链路)。所有接线主要在配置中完成,而不是代码,通过对专用REST API的调用。

在内部,它将其作为连接器的基于Java的分布式“Manager”(是生产者,或者在我们的案例中,消费者),完全依赖于现有的Kafka集群以进行状态存储和跨工人同步。无需添加其他基础架构元素,而不是工人本身。

部署本身在Kubernetes上使用官方Confluent Inc平台的扩展Docker映像完成,以便添加一些最小的自定义元素(我们稍后将详细介绍)和一些插件,最值得注意包含S3宿插件和单个消息变换(可以在处理时间内消息发生的一组方便原子转换)。

FROM confluentinc/cp-kafka-connect:5.5.1# Remove bundled (older) version of kafka-connect plugins for S3/GCSRUN rm -rf /usr/share/java/kafka-connect-s3/ /usr/share/java/kafka-connect-gcs/ /usr/share/confluent-hub-components/confluentinc-kafka-connect-gcs/# Install the official kafka-connect-s3 plugin in v5.5.1RUN (echo 1 && yes) | confluent-hub install confluentinc/kafka-connect-s3:5.5.1# Install the latest kafka-connect Single Message Transforms pluginRUN (echo 1 && yes) | confluent-hub install confluentinc/connect-transforms:latest# Install our custom classes in the connect classpathCOPY build/libs/partitioner-latest-all.jar /usr/share/confluent-hub-components/confluentinc-kafka-connect-s3/lib/partitioner-latest-all.jarview raw

然后,我们部署到我们的EKS Kubernetes集群(但是,在调整官方Helm图表之后使用官方Helm图表部署到任何裸机或虚拟实例也可能)有点调整到我们的特定风格的监测和公约(例如使用用于监控的JMX + DataDog,因为提供的Helm图表假定Promotheus将被使用)。

调整工作流程

该项目的主要目标是自动化该过程来提高其稳定性,并减少维持其近零所需的人力。

我们可以执行此操作的唯一方法是利用我们的现有约定和CI / CD工具集在Git上的Schema存储库:每次在此存储库上执行更改时,会发生以下操作(每个环境):

  • 使用生产者提供的JSON配置(通常,负责微型服务的功能团队)的JSON配置,创建主题(具有吞吐量,分区,分区数量,ACL管理数量)
  • 验证架构,并在Kafka Cluster的架构注册表中注册
  • 在Kafka Connect平台上创建一个新连接器,其配置直接从主题和架构配置中提供的信息推断。在各处都使用一组(覆盖能力)默认值来设置连接器

最后一步是我们为归档目的添加的,通过添加一个简单的Python CLI,它自动配置群集中的连接器,了解模式存储库中声明的每个主题。然后通过我们的持续部署工具,大厅进行更改,然后在主分支机构上的每个合并上触发此CLI。

每个主题拥有单个消费者的想法是确保将每个主题的正确分离为单个工作流程,并确保未经配置的主题可能引起不期望的副作用(尽管那些现在很少见,但它们不是不可能,我们希望将过程中的错误范围限制为最小单位)。这种故意选择有其警告,因为我们稍后会看到。

AWS S3 连接器配置概述

以下是这种连接器的结果配置的示例(部分用于简洁起见,但大多数重要的是这里):

{  'connector.class': 'io.confluent.connect.s3.S3SinkConnector',  'errors.tolerance': 'none',  'errors.logs.enable': 'true',  'errors.log.include.messages': 'true',  'topics': 'leboncoin_staging_ads_ad-publications_public_avro',  's3.bucket.name': 'leboncoin_kafka_events_bucket',  'topics.dir': 'staging/topics/ads/raw/parquet',  'storage.class': 'io.confluent.connect.s3.storage.S3Storage',  'format.class': 'io.confluent.connect.s3.format.parquet.ParquetFormat',  'timestamp.extractor': 'fr.leboncoin.data.archiver.parser.LeboncoinTimestampExtractor',  'transforms': 'tombstoneHandler,insertFields',  'transforms.tombstoneHandler.type': 'io.confluent.connect.transforms.TombstoneHandler',  'transforms.tombstoneHandler.behavior': 'ignore',  'transforms.insertFields.type': 'org.apache.kafka.connect.transforms.InsertField$Value',  'transforms.insertFields.partition.field': '_kafka_partition',  'transforms.insertFields.offset.field': '_kafka_offset',  'transforms.insertFields.timestamp.field': '_kafka_timestamp',  'rotate.interval.ms': '3600000',  'rotate.schedule.interval.ms': '1200000',  'flush.size': '500000',  'path.format': ''schema_version'=VV/'event_date'=YYYY-MM-dd/'event_hour'=HH',  'partitioner.class': 'fr.leboncoin.data.archiver.partitioner.CustomTimePartitioner'}

解开了很少的事情,因此让我们一步一步地查看每个主要部分。

基本配置和错误处理

连接器.CLASS是指定要使用的哪个连接器(作为Kafka Connect中的可插拔类)。显然,这里我们通过指定类别将接收器连接器用于S3,Kafka Connect将用于实例化消费者。

'connector.class': 'io.confluent.connect.s3.S3SinkConnector', 'errors.tolerance': 'none', 'errors.logs.enable': 'true', 'errors.log.include.messages': 'true'

我们还指定如何处理错误(连接,解除序列化,数据提取,转换……)。可用行为是阻止,忽略或将原始事件推向死信队列并继续进行。在这里,我们指定了我们想要阻止任何错误(无容差)。此外,由于这是一个暂存配置,因此我们记录完整的事件,因为它们不包含来自客户的任何数据,并且可以在调试中提供极大的帮助。

对于任何其他类型的错误处理(与Kafka或S3的连接问题,由于系统错误而导致的崩溃,或吊舱重新启动),好消息是整个过程是标准的消费者,偏移只会在推动后才能提交S3成功了。甚至有一些保证一直是一致性,以避免S3上的任何重复项,但这些都需要一些特定的配置:

冷却热数据:从Kafka到Athena

> “Exactly once messaging is a complicated profession”

即使在这些保证中,应始终考虑在您的活动中具有某种唯一ID,仍然能够在下游的这些事件上执行重复。归档过程不是唯一可能发生复制的地方。

格式和序列化

然后,有“什么”和“其中”字段,它定义了消耗的主题,桶,前缀和存储类(用于其他实现,例如GCP)。

  'topics': 'leboncoin_staging_ads_ad-publications_public_avro',  's3.bucket.name': 'leboncoin_kafka_events_bucket',  'topics.dir': 'staging/topics/ads/raw/parquet',  'storage.class': 'io.confluent.connect.s3.storage.S3Storage',  'format.class': 'io.confluent.connect.s3.format.parquet.ParquetFormat',  'path.format': ''schema_version'=VV/'event_date'=YYYY-MM-dd/'event_hour'=HH',

我们还定义了key.converter或value.Converter类以用于输入的去序列化(这里未指定为Avro是默认值)和输出的格式。此处的格式化序列化(但您也可以使用JSON,AVRO或“RAW”以避免任何转换)。

最重要的是:分区路径.Format,它将在S3上附加到每个分区的地形文件。为了更顺畅地处理破坏变化,我们将Avro Schema版本作为第一前缀,然后使用标准日期/小时分区方案(以下主题上的更多)。

Hive兼容的分区方案

然后有最有趣的部分:分区器和时间戳提取器,携手合作,找到应该使用哪个时间在S3上分区事件。

'partitioner.class': 'fr.leboncoin.data.archiver.partitioner.CustomTimePartitioner', 'timestamp.extractor': 'fr.leboncoin.data.archiver.parser.LeboncoinTimestampExtractor',

每个流式事件包含两个单独的,类似但从根本不同的时间信息:

  • 事件发生的时间,这意味着在现实世界中发生的时间(用户发布广告,付款经历,请单击)。让我们称之为“商业时间”。按照惯例,我们总是把它放在架构里(在格式化时,我们对我们的约定的一些不同的解释,但必须存在)
  • 将事件摄入到流媒体解决方案中的时间,这意味着它在KAFKA上生成的时间,默认存储在每个事件的元数据中(如图所示)。让我们称之为“kafka时间”。

还有归档过程发生的时间,但是这一个是非确定性,因为可能需要若干尝试来存储数据。切勿将其作为参考。

冷却热数据:从Kafka到Athena

我们需要使用Hive风格的分区在S3上存储此数据,以允许不同查询引擎的分区修剪,这需要尽可能高效地读取数据。

但我们只能使用两个方案中的一个来分区数据。并且没有完美的解决方案:

  • 使用Kafka时间是最简单的,因为它将由每个事件的元数据提供(因此它始终存在标准格式,并且可以轻松检索),并且应该单调增量(每个分区)。但是,它不代表实时:由于技术或商业原因,可能会延迟进入Kafka的事件可能会延迟,几分钟或几天。因此,分析和下游算法不能用它作为可靠的真理来源。存储的数据集必须重新解释并重新处理以在使用之前匹配实际业务时间。
  • 使用业务时代是棘手的,常见于惯例,无法保证该字段将在那里,将有效,并将是连贯的。此外,如果生产者写得不好,它也可能在单个分区中产生秩序的事件,从而使过程复杂化。但它具有在其目标分区模式(无论如何必须完成的东西)的良好优势,即使对于晚期事件(也需要具体处理/监控)。

在Leboncoin,我们使用第二种模式,主要是因为我们的数据平台的历史深深植根于分析。

虽然Kafka Connect附带自己的一组分区,但是可以调整这两种行为,我们必须实现自己的“自动”尝试,以找到候选物之间的业务时空字段,并使用第一个它可以定位为基础的业务时期字段时间分区。它还负责在分区方案中注入模式版本。

这是一种自定义类,实现我们与我们的Kafka集群捆绑的标准界面。界面本身相当简单,因此在我们的自定义Docker图像中编写足够的java并将它们捆绑在一个罐子里,这不是一个巨大的障碍:

public interface Partitioner<T> {  void configure(Map<String, Object> config);  /**   * Returns string representing the output path for a sinkRecord to be encoded and stored.   *   * @param sinkRecord The record to be stored by the Sink Connector   * @return The path/filename the SinkRecord will be stored into after it is encoded   */  String encodePartition(SinkRecord sinkRecord);  /**   * Returns string representing the output path for a sinkRecord to be encoded and stored.   *   * @param sinkRecord The record to be stored by the Sink Connector   * @param nowInMillis The current time in ms. Some Partitioners will use this option, but by   *                    default it is unused.   * @return The path/filename the SinkRecord will be stored into after it is encoded   */  default String encodePartition(SinkRecord sinkRecord, long nowInMillis) {    return encodePartition(sinkRecord);  }  String generatePartitionedPath(String topic, String encodedPartition);  List<T> partitionFields();}

该实施还应注意确保在提取日期/时间信息时使用UTC时区,因为有明显的原因。

轻型变换

接下来,我们具有单一消息转换,这是一组可配置的简单转换,可以方便地改变输出格式或添加信息字段。

'transforms': 'tombstoneHandler,insertFields', 'transforms.tombstoneHandler.type': 'io.confluent.connect.transforms.TombstoneHandler', 'transforms.tombstoneHandler.behavior': 'ignore', 'transforms.insertFields.type': 'org.apache.kafka.connect.transforms.InsertField$Value', 'transforms.insertFields.partition.field': '_kafka_partition', 'transforms.insertFields.offset.field': '_kafka_offset', 'transforms.insertFields.timestamp.field': '_kafka_timestamp',

在这里,我们添加了将事件写入Kafka(从事件的元数据,因此根据我们之前的定义的Kafka时间),分区和偏移到输出的偏移量,并丢弃所有墓碑(由单独的过程处理。对于GDPR合规性)。

刷新到S3政策

最后,我们定义了事件如何刷新到S3。

  'rotate.interval.ms': '3600000',  'rotate.schedule.interval.ms': '1200000',  'flush.size': '500000'

由于S3不是流友好的数据存储,我们无法发送IT原子事件,并且需要在每个分区处理的特定条件下缓冲/刷新:

  • 当已处理特定量的事件(刷新)时:这是为了避免S3上的非常大的文件,并限制连接器上的内存消耗(具有内部内存缓冲区)。
  • 当特定量的运行时间(rotate.schedule.interval.ms)经过(壁钟)时:即使没有新事件到达,也可以确保定期刷新事件,特别是在低吞吐量环境/主题中特别有用。在这里,我们希望为每个分区强制每〜20分钟创建S3的新文件。
  • 当特定的时间在提取的分区字段中传递(数据时钟):由于我们的Hive分区是由业务时的1H桶事件,如果我们输入新的桶,我们希望创建新文件。

方向盘

如果没有足够的监测和警报所有主题(其中一些通常在吞吐量休息)的情况下,所有这些都不会完整,以确保我们能够及时处理任何异常,从而维护数据我们数据字母的“Fresh”以供我们的数据科学家,分析师和数据服务使用。

冷却热数据:从Kafka到Athena
冷却热数据:从Kafka到Athena

我们的主要指标是实际运行的任务数量,每个主题/分区的滞后金额,负载(尤其是内存)及其对每个工作人员的偏差,以及每个时间帧处理的事件量。

还是一些缺点

总而言之,这个解决方案运作良好,但在未来的工作中可能有一些缺点:

  • 内存消耗很高。Kafka Connect缓冲了内存中的事件,而不是磁盘。这对大多数来说是有道理的,但对于S3,我们每20分钟只刷新到S3,对于某些主题,这可能意味着很多事件。此外,在已经完整主题上启动新连接器将涉及许多待维持的缓冲区,特别是如果刷新配置未正确设置。更好的解决方案是在临时磁盘空间中缓存此缓冲区,更适合这种汇。Kafka Connect的未来功能贡献,也许是S3 Sink插件内部工作的一些调查,也许?
  • 文件太多了。我们故意选择每20分钟冲洗一次。这意味着每个分区的新文件每20分钟,其中一些具有> 70分区的一些最大主题。这是每小时210个Parquet文件,或每天5040个文件。我们的一些主题具有足够大的吞吐量,以证明这么多,但这远非理想的大多数(默认情况下,parquet块大小为128 MB)。将需要第二阶段“重新压缩”,以优化和降低读者的成本。
  • 可以完善负载平衡。每个连接器都有特定的最大“任务”,它会处理主题分区的子集,并以循环时尚为主题分区的子集。但是,这不是理想的,即使我们考虑了每个主题的预期吞吐量来增加任务数量,作为大主题或具有偏斜分区的主题可能会消耗更多内存,导致不同节点的不均匀消耗群集。
  • 没有自动重启失败任务。虽然罕见,任务有时可能会失败。大多数故障涉及重试机制,但一旦达到重试阈值,任务就会移动到“失败”状态。目前没有明确的机制可以定期将这些任务重试在失败的状态下,其中一些可能能够恢复其操作(例如,我们有时会在主题ACL中遇到Mishaps,导致任务失败。修复ACL需要明确重启之后的任务)。
  • 不合时宜的重新平衡。当节点发生故障或被我们的Kubernetes集群重新安排时,消费者重新平衡以处理预期的丢失的连接器和任务。但是,当更多资源变得可用时,重新分发均缺少任务所需的时间似乎并不非常调整,并且可以长于预期的晦涩难懂的原因。
  • 从JSON到Parquet的转换需要一个明确的JSON架构(尽管这可能是越来越好)。我们选择在JSON中保留归档过程输出的少数主题。

尽管有这些缺点,我们必须在未来几个月/年内改善,解决方案非常稳定,使我们能够拔下我们的旧系统,以获得Parquet转换和主题归档。

但是,在此,我们只有一半……

通过Athena自动化阐述

现在我们有一个干净的主题流入我们的S3,我们希望用户能够发现它们并查询它们。如果新的存档主题自动添加到我们的胶水转移(我们的AWS管理的蜂巢转移)作为表格,可供雅典娜(AWS管理的Presto实施)和我们的火花工作?

好吧,首先,我们需要解决一些问题,以及我们将如何:

  • 从S3的Parquet文件中推断表的DDL,包括分区方案和存储位置
  • 在新表/新架构版本时创建/更新Metastore中的表定义
  • 在到达S3时,在Metastore中注册新分区

幸运的是,“有一个应用程序”。AWS Glue服务不仅由托管Hive转移而且还包括ETL工具以及数据集爬虫系统。最后一个功能为我们提供了“扫描”S3(或其他数据源)的路径的能力,并发现相应的数据集(只要覆盖架构兼容性,我们的Avro Workflow保证)。然后,它会使用架构和分区方案添加和更新Hive Metastore。

这正是我们想要做的,所以不需要编码吗?起初似乎太好了……而且,不幸的是,它是……

爬取不是前进的最快方法

这种方法有几个问题:

  • 这很慢。爬虫器在现有分区下重新列出整个S3树,以检测任何新分区/更改。这可能需要一段时间用于存储数十万个镶木地板文件的主题…这也意味着我们每个主题需要一个爬虫,并行运行每个发现。
  • 它的价格昂贵。在我们的地区,爬虫机每DPU-ogp-0.44美元收费,最低计费的时间为10分钟。每次运行超过0.05美元,我们想要为每个主题触发每小时。无需进行数学,这是我们的预算。
  • 它没有扩展。在履带并发性上,它看起来有100个难题100。这意味着不超过100多个爬虫可以同时运行。我们目前有〜160个主题(每个环境)归档,并且它一直在生长……
  • 这是不透明的。胶水爬虫的实际运行时的代码不是开源,并且缺乏关于执行每次操作(架构转换,推断,冲突解决)的实现细节的方法。

但也许都不会丢失。我们需要爬行者所需的唯一复杂部分是模式发现和转移维护(显然我们可以用一些Spark魔法编码它,但是这个想法是减少整个归档和发现过程的维护足迹,而不是增加它)。

但此模式维护部件仅需要在非常具体的情况下运行:

  • 添加新架构时
  • 当现有的架构更改时

我们目前在我们的架构存储库上有大约10个每日提交,其中一些人甚至不会需要更改模式。所以它是可行的,很多空间备用,如果我们可以减少到严格的最低限度,我们需要运行爬行者的时间。

现在,我们如何做到这一点?

来自Lambda与爱❤️

我们用于的解决方案是完全事件驱动的:

冷却热数据:从Kafka到Athena

> How the files are processed in small batches for integration of new partitions/schemas into the metastore

我们的Kafka Connect Archiver将新文件推送到S3。这些触发事件,该事件被推到SQS队列。此队列用作要处理的所有新文件的缓冲区。

然后,每10分钟一次,触发Lambda,从队列中弹出那些事件,然后从路径推断出来:

  • 公司(以及数据库名称)
  • 主题名称(因此表名)
  • 架构版本
  • 分区信息

这在每个SQS S3复制消息中定义的路径上的简单regexp不比:

然后,所有遗骸,都是查询:

  • Glue爬虫引用,看看特定表是否存在爬网。该遗漏将通过CD进程抢先地创建,其中包含包含主题/表名的通告。如果存在爬虫,那么我们需要维护相应的表。
  • Metastore查看是否已存在表和分区。如果表格不存在,我们立即触发爬虫(它将负责表格创建)。如果表存在,但不是分区,我们会添加它。如果存在分区,我们可以安全地忽略它。
  • 如果架构版本不存在分区方案,那么这意味着模式是新的:我们再次重新触发爬虫,这应该拿起新的架构并很快发布它。

这确保了我们在必要时只致电一些履带,这是每天最多的触发器,但以非常有效和同质的方式维护分区。

出于性能和成本原因,Lambda将其所有结果从内存中的AWS API缓存,因此每10分钟仅执行一次。

限制

虽然完全令人满意,但这里还有一些烦恼:

  • 直到最近,Athena引擎无法合并包含突变子结构的表中的架构。这是一个巨大的痛苦,因为我们在自动化整个过程中变得更加普遍,并且有时可能导致对某些表的可读性打破变化。它常常通过最近发布的Athena 发动机V2发布,升级底层Presto引擎来解决这个问题,即使对于复杂的嵌套架构,现在也运作良好。
  • Glue履带部分仍然是不透明的,并且无法预测其在不兼容的模式(在我们的过程中不应该发生的情况下的每一个举动,而是……)非常令人失望。根据一些闭合来源,我们不是100%舒适的舒适,潜在的非常复杂的逻辑,因为这是架构推论,即使它到目前为止我们从未失败过。
  • 分区完整性检测,或“我应该考虑分区足够完整的分区以便处理”(除了不可避免但相当罕见的晚期活动之外),比它看起来更难。首先,我们有一个非常复杂的方法检查Kafka Connect集群的每个消费者偏移的状态,以确保所有分区都传递了某个阈值。但是,由于某些主题并不总是在所有分区上发布足够的事件,这被证明与任何其他过程一样不可靠。对此的干净解决方案是在所有生产商中强制执行PARED水印,我们目前没有做过。所以我们改为懒惰,简单的方式:如果H + 1开始出现,那么它意味着H就在那里(Modulo一个安全阈值)。它可能看起来很简单,但它做了工作,没有任何价值丢失。此外,它也让事情变得更加容易阅读和调试。

活动去BRRR,人类去ZZZZ

好吧,也许不是“Zzzz”,但至少团队可以专注于一些其他努力,而不是维护数千行的JSON,YAML和DDL。

作为最后一个概述,这是解决方案的完整工作流程,从开始完成:

冷却热数据:从Kafka到Athena

> The complete workflow from the initial producer to the consumer teams/services

人类管理的唯一部分在模式和主题配置的定义中,我们很快就会自信地让这一过程自动运行其课程(无手动阻止到连续交付,或大红色按钮推动),在所有环境中,包括生产。

通过删除这种工作流程中的人为干预的所有基本部分,如预期的那样,我们不仅可以恢复和勾取过程所有逃避我们人类警惕的主题(oops),还可以提高性能,可靠性和及时性所有团队的数据集,同时向生产者和消费者团队提供更一致的信息系统。

在这样做的过程中,我们能够再次工作更高的增值任务,例如咨询关于架构结构的团队,通过机器学习和纯数据服务在更多应用程序中进行船上的数据,并且通常重新加工此数据以使其更多直接可用。

下一步

当然,没有任何工作真正过,在未来,我们仍将改善这一过程以使:

  • 将所有这些数据集清算在唯一的界面中以改善数据发现性
  • 使用新列更新架构时通知,或创建新表
  • 用于更好的读取性能的表分区大小自动压实和优化,降低成本
  • 完全自动gdpr pseudonamatation和数据集的匿名
  • 标准化访问控制管理
  • 降低成本降低和更好的动态缩放的内存占用空间
  • 用专用工具更换胶水爬虫,以保持架构推理逻辑的完全所有权

(本文由闻数起舞翻译自leboncoin的文章《Cooling down hot data: From Kafka to Athena》,转载请注明出处,原文链接:
https:///leboncoin-engineering-blog/cooling-down-hot-data-from-kafka-to-athena-5918a628bd98)

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多