在过去的15年里,Leboncoin一直在法国提供分类广告服务,其目前的大小在70多个类别中托管超过4000万广告 - 从房地产到视频游戏 - 每月为3000万活跃用户服务。 > The swimming pool room at leboncoin’s UFO HQ 这导致平台上产生的越来越大的数据,既有规模,又有复杂性。2015年左右,我们意识到我们需要提高我们的技术和组织能力,以利用数据,以便为客户提供更好的服务。 这导致我们在其他事情中将我们的数据整合模式从原始日志和数据库转储转移到基于流的方法,因此我们与Apache Kafka的历史 - 分布式事件流/ Pub-Subl平台 - 开始。 但Kafka是一个“在线”系统,更好地用于服务间通信并在发生时存储事件日志,尽管没有无限期。为了能够分析,转换和整合“脱机”数据存储中的所有这些事件 - 例如数据方案 - 更适合大型批量作业和机器学习培训任务,需要一个事件归档工作流程。 本文是我们如何重新制作此工作流程的故事,从我们的“Online”Kafka集群将大量的异构数据暴露于我们的数据群中,以便长期数据保留,分析,法律要求以及作为机器学习“肥料”的机器。我们将探讨我们如何从手动和繁琐的过程到一个完全自动化的工作流程,包括我们在AWS S3 / Athena / Spark上的DataLake上的表的创建和演变,只有他们最重要的地方所需的人类控制和检查。 一点历史Kafka的第一个实际使用情况是受众追踪服务,当时已经达到了接近1亿日记录。基于JSON的序列化将在存储和网络带宽方面证明昂贵。 因此,我们很幸运能够以最佳的事件序列化的最佳实践,使用Avro格式通过其模式注册表的汇合集成。我们甚至提出了我们未来的主题的一些规范,以及未来预期主题类型的一些基线配置选项(基于吞吐量,灵敏度和一致性要求)。 然后,我们在S3上实现了我们自己的自定义解决方案,以便在S3上存档,使得冷库并访问主题作为具有更长保留的数据集,以及用于分析和机器学习培训的能力(首先使用Spark,然后首先使用Spark Athena)。该解决方案负责从Avro Serialization(最适合原子事件)的事件转换为Parquet格式,更适合大型数据集,读取密集型操作。 这是在我们的Air Flow调度程序中编织作为一个小时的DAG,并且几年,符合我们的期望,Kafka的使用量在整个公司中。 > The Kafka “hot” to S3/Athena “cold” storage workflow 巩固架构目录随着使用率的增长,技术和组织的新挑战开始出现。更多的贡献者意味着更多的工作方式和定义事件数据模式,虽然Avro规范非常完整,但在涉及到以下方式时,它不足以保证整个公司的一致性:
以及许多其他“细节”,非常重要,可以简化与数据集一起使用。通过以下方式解决了这一特定问题:
它还奠定了明确的基础,我们在其上建立了本公司所有架构的参考资料,但不仅,我们的子公司和我们的母公司(Adevinta)都能够提供/发布活动。因此,存储库的布局为每个主题的三个静态级别构成:
这给了我们这个布局: .├── leboncoin│ ├── ads│ │ ├── accepted-ad│ │ │ ├── prod│ │ │ │ └── schemas│ │ │ └── staging│ │ │ └── schemas│ │ ├── action-states│ │ │ ├── prod│ │ │ │ └── schemas│ │ │ └── staging│ │ │ └── schemas│ │ ├── action-states-for-search-only│ │ │ ├── prod│ │ │ │ └── schemas│ │ │ └── staging│ │ │ └── schemas│ │ ├── ad-params-image-inference│ │ │ ├── prod│ │ │ │ └── schemas│ │ │ └── staging│ │ │ └── schemas[...]├── adevinta 然后,每个环境都包含一个“简化的”主题配置,它将通过我们的连续交付工具转换为更详细的Kafka主题配置:
在这些行动之后不久,我们开始看到发表畸形事件的数量明显减少,以及跨越模式的差异减少。 人类处理单位缩放但这只照顾了我们缩放问题的一部分,主要是为生产者团队(主要是微服务所有者)。我们仍然必须配置我们想要在AWS S3上存档的每个主题,在特定配置中定义:
我们甚至必须使用Spark编写(并维护)Parquet 转换工具的自定义Avro。此外,虽然Avro Serialization是规则,但某些主题的例外情况,我们没有计划,并且不得不处理使用其他方法。 最后,通过在所有环境中手动宣布适当的DDL在所有环境中手动宣布适当的DDL,必须从Avro转换为Hive / Presto,以便在Athena或Prive才能使用,以便在所有环境中的胶水转移中宣布适当的DDL。 总而言之,在一个小型数据 - 工程团队中有几个主题和少数生产商的可行方法,到2018年我们迅速变得无法管理,当我们开始转移到一个特征团队组织时,具有更为分布式的制片人和利益相关者的模式…… 随着符合条件主题的数量接近100(现在更接近160),我们必须重新思考我们的工作流程,以充分自动化这种繁荣和易于易于归档的过程。 计划因此,该计划是完全自动化这一过程,首先从Kafka到S3(在我们已经在我们的自定义过程中留下了我们已经实施的Parquet 转换),然后从S3到雅典娜。 预期的改进是多数:
这也是有机会修复我们数据阿拉克布局中的一些错误:
工作的合适工具幸运的是,在这段时间(2019年),Kafka周围的生态系统已经进化了很多。kafka 0.8的日子已经过去了,融合平台在一分钟后越来越富裕。我们希望在强大,维护和稳定的和稳定的工具上基于我们的下一个方法,以便能够在AWS S3上有序和高效的方式用这些活动。 KAFKA CONNECT及其AWS S3 Sink连接器被选为此新设置的平台。Kafka Connect是一个专用于将数据推入Kafka集群(通过源连接器)的平台,并从其他类型的数据系统(RDBMS,Elasticsearch,DynamoDB,S3 …)中拉出数据输出(宿链路)。所有接线主要在配置中完成,而不是代码,通过对专用REST API的调用。 在内部,它将其作为连接器的基于Java的分布式“Manager”(是生产者,或者在我们的案例中,消费者),完全依赖于现有的Kafka集群以进行状态存储和跨工人同步。无需添加其他基础架构元素,而不是工人本身。 部署本身在Kubernetes上使用官方Confluent Inc平台的扩展Docker映像完成,以便添加一些最小的自定义元素(我们稍后将详细介绍)和一些插件,最值得注意包含S3宿插件和单个消息变换(可以在处理时间内消息发生的一组方便原子转换)。 FROM confluentinc/cp-kafka-connect:5.5.1# Remove bundled (older) version of kafka-connect plugins for S3/GCSRUN rm -rf /usr/share/java/kafka-connect-s3/ /usr/share/java/kafka-connect-gcs/ /usr/share/confluent-hub-components/confluentinc-kafka-connect-gcs/# Install the official kafka-connect-s3 plugin in v5.5.1RUN (echo 1 && yes) | confluent-hub install confluentinc/kafka-connect-s3:5.5.1# Install the latest kafka-connect Single Message Transforms pluginRUN (echo 1 && yes) | confluent-hub install confluentinc/connect-transforms:latest# Install our custom classes in the connect classpathCOPY build/libs/partitioner-latest-all.jar /usr/share/confluent-hub-components/confluentinc-kafka-connect-s3/lib/partitioner-latest-all.jarview raw 然后,我们部署到我们的EKS Kubernetes集群(但是,在调整官方Helm图表之后使用官方Helm图表部署到任何裸机或虚拟实例也可能)有点调整到我们的特定风格的监测和公约(例如使用用于监控的JMX + DataDog,因为提供的Helm图表假定Promotheus将被使用)。 调整工作流程该项目的主要目标是自动化该过程来提高其稳定性,并减少维持其近零所需的人力。 我们可以执行此操作的唯一方法是利用我们的现有约定和CI / CD工具集在Git上的Schema存储库:每次在此存储库上执行更改时,会发生以下操作(每个环境):
最后一步是我们为归档目的添加的,通过添加一个简单的Python CLI,它自动配置群集中的连接器,了解模式存储库中声明的每个主题。然后通过我们的持续部署工具,大厅进行更改,然后在主分支机构上的每个合并上触发此CLI。 每个主题拥有单个消费者的想法是确保将每个主题的正确分离为单个工作流程,并确保未经配置的主题可能引起不期望的副作用(尽管那些现在很少见,但它们不是不可能,我们希望将过程中的错误范围限制为最小单位)。这种故意选择有其警告,因为我们稍后会看到。 AWS S3 连接器配置概述以下是这种连接器的结果配置的示例(部分用于简洁起见,但大多数重要的是这里):
解开了很少的事情,因此让我们一步一步地查看每个主要部分。 基本配置和错误处理连接器.CLASS是指定要使用的哪个连接器(作为Kafka Connect中的可插拔类)。显然,这里我们通过指定类别将接收器连接器用于S3,Kafka Connect将用于实例化消费者。 'connector.class': 'io.confluent.connect.s3.S3SinkConnector', 'errors.tolerance': 'none', 'errors.logs.enable': 'true', 'errors.log.include.messages': 'true' 我们还指定如何处理错误(连接,解除序列化,数据提取,转换……)。可用行为是阻止,忽略或将原始事件推向死信队列并继续进行。在这里,我们指定了我们想要阻止任何错误(无容差)。此外,由于这是一个暂存配置,因此我们记录完整的事件,因为它们不包含来自客户的任何数据,并且可以在调试中提供极大的帮助。 对于任何其他类型的错误处理(与Kafka或S3的连接问题,由于系统错误而导致的崩溃,或吊舱重新启动),好消息是整个过程是标准的消费者,偏移只会在推动后才能提交S3成功了。甚至有一些保证一直是一致性,以避免S3上的任何重复项,但这些都需要一些特定的配置: > “Exactly once messaging is a complicated profession” 即使在这些保证中,应始终考虑在您的活动中具有某种唯一ID,仍然能够在下游的这些事件上执行重复。归档过程不是唯一可能发生复制的地方。 格式和序列化然后,有“什么”和“其中”字段,它定义了消耗的主题,桶,前缀和存储类(用于其他实现,例如GCP)。
我们还定义了key.converter或value.Converter类以用于输入的去序列化(这里未指定为Avro是默认值)和输出的格式。此处的格式化序列化(但您也可以使用JSON,AVRO或“RAW”以避免任何转换)。 最重要的是:分区路径.Format,它将在S3上附加到每个分区的地形文件。为了更顺畅地处理破坏变化,我们将Avro Schema版本作为第一前缀,然后使用标准日期/小时分区方案(以下主题上的更多)。 Hive兼容的分区方案然后有最有趣的部分:分区器和时间戳提取器,携手合作,找到应该使用哪个时间在S3上分区事件。 'partitioner.class': 'fr.leboncoin.data.archiver.partitioner.CustomTimePartitioner', 'timestamp.extractor': 'fr.leboncoin.data.archiver.parser.LeboncoinTimestampExtractor', 每个流式事件包含两个单独的,类似但从根本不同的时间信息:
还有归档过程发生的时间,但是这一个是非确定性,因为可能需要若干尝试来存储数据。切勿将其作为参考。 我们需要使用Hive风格的分区在S3上存储此数据,以允许不同查询引擎的分区修剪,这需要尽可能高效地读取数据。 但我们只能使用两个方案中的一个来分区数据。并且没有完美的解决方案:
在Leboncoin,我们使用第二种模式,主要是因为我们的数据平台的历史深深植根于分析。 虽然Kafka Connect附带自己的一组分区,但是可以调整这两种行为,我们必须实现自己的“自动”尝试,以找到候选物之间的业务时空字段,并使用第一个它可以定位为基础的业务时期字段时间分区。它还负责在分区方案中注入模式版本。 这是一种自定义类,实现我们与我们的Kafka集群捆绑的标准界面。界面本身相当简单,因此在我们的自定义Docker图像中编写足够的java并将它们捆绑在一个罐子里,这不是一个巨大的障碍:
该实施还应注意确保在提取日期/时间信息时使用UTC时区,因为有明显的原因。 轻型变换接下来,我们具有单一消息转换,这是一组可配置的简单转换,可以方便地改变输出格式或添加信息字段。 'transforms': 'tombstoneHandler,insertFields', 'transforms.tombstoneHandler.type': 'io.confluent.connect.transforms.TombstoneHandler', 'transforms.tombstoneHandler.behavior': 'ignore', 'transforms.insertFields.type': 'org.apache.kafka.connect.transforms.InsertField$Value', 'transforms.insertFields.partition.field': '_kafka_partition', 'transforms.insertFields.offset.field': '_kafka_offset', 'transforms.insertFields.timestamp.field': '_kafka_timestamp', 在这里,我们添加了将事件写入Kafka(从事件的元数据,因此根据我们之前的定义的Kafka时间),分区和偏移到输出的偏移量,并丢弃所有墓碑(由单独的过程处理。对于GDPR合规性)。 刷新到S3政策最后,我们定义了事件如何刷新到S3。
由于S3不是流友好的数据存储,我们无法发送IT原子事件,并且需要在每个分区处理的特定条件下缓冲/刷新:
方向盘如果没有足够的监测和警报所有主题(其中一些通常在吞吐量休息)的情况下,所有这些都不会完整,以确保我们能够及时处理任何异常,从而维护数据我们数据字母的“Fresh”以供我们的数据科学家,分析师和数据服务使用。 我们的主要指标是实际运行的任务数量,每个主题/分区的滞后金额,负载(尤其是内存)及其对每个工作人员的偏差,以及每个时间帧处理的事件量。 还是一些缺点总而言之,这个解决方案运作良好,但在未来的工作中可能有一些缺点:
尽管有这些缺点,我们必须在未来几个月/年内改善,解决方案非常稳定,使我们能够拔下我们的旧系统,以获得Parquet转换和主题归档。 但是,在此,我们只有一半…… 通过Athena自动化阐述现在我们有一个干净的主题流入我们的S3,我们希望用户能够发现它们并查询它们。如果新的存档主题自动添加到我们的胶水转移(我们的AWS管理的蜂巢转移)作为表格,可供雅典娜(AWS管理的Presto实施)和我们的火花工作? 好吧,首先,我们需要解决一些问题,以及我们将如何:
幸运的是,“有一个应用程序”。AWS Glue服务不仅由托管Hive转移而且还包括ETL工具以及数据集爬虫系统。最后一个功能为我们提供了“扫描”S3(或其他数据源)的路径的能力,并发现相应的数据集(只要覆盖架构兼容性,我们的Avro Workflow保证)。然后,它会使用架构和分区方案添加和更新Hive Metastore。 这正是我们想要做的,所以不需要编码吗?起初似乎太好了……而且,不幸的是,它是…… 爬取不是前进的最快方法这种方法有几个问题:
但也许都不会丢失。我们需要爬行者所需的唯一复杂部分是模式发现和转移维护(显然我们可以用一些Spark魔法编码它,但是这个想法是减少整个归档和发现过程的维护足迹,而不是增加它)。 但此模式维护部件仅需要在非常具体的情况下运行:
我们目前在我们的架构存储库上有大约10个每日提交,其中一些人甚至不会需要更改模式。所以它是可行的,很多空间备用,如果我们可以减少到严格的最低限度,我们需要运行爬行者的时间。 现在,我们如何做到这一点? 来自Lambda与爱❤️我们用于的解决方案是完全事件驱动的: > How the files are processed in small batches for integration of new partitions/schemas into the metastore 我们的Kafka Connect Archiver将新文件推送到S3。这些触发事件,该事件被推到SQS队列。此队列用作要处理的所有新文件的缓冲区。 然后,每10分钟一次,触发Lambda,从队列中弹出那些事件,然后从路径推断出来:
这在每个SQS S3复制消息中定义的路径上的简单regexp不比: 然后,所有遗骸,都是查询:
这确保了我们在必要时只致电一些履带,这是每天最多的触发器,但以非常有效和同质的方式维护分区。 出于性能和成本原因,Lambda将其所有结果从内存中的AWS API缓存,因此每10分钟仅执行一次。 限制虽然完全令人满意,但这里还有一些烦恼:
活动去BRRR,人类去ZZZZ好吧,也许不是“Zzzz”,但至少团队可以专注于一些其他努力,而不是维护数千行的JSON,YAML和DDL。 作为最后一个概述,这是解决方案的完整工作流程,从开始完成: > The complete workflow from the initial producer to the consumer teams/services 人类管理的唯一部分在模式和主题配置的定义中,我们很快就会自信地让这一过程自动运行其课程(无手动阻止到连续交付,或大红色按钮推动),在所有环境中,包括生产。 通过删除这种工作流程中的人为干预的所有基本部分,如预期的那样,我们不仅可以恢复和勾取过程所有逃避我们人类警惕的主题(oops),还可以提高性能,可靠性和及时性所有团队的数据集,同时向生产者和消费者团队提供更一致的信息系统。 在这样做的过程中,我们能够再次工作更高的增值任务,例如咨询关于架构结构的团队,通过机器学习和纯数据服务在更多应用程序中进行船上的数据,并且通常重新加工此数据以使其更多直接可用。 下一步当然,没有任何工作真正过,在未来,我们仍将改善这一过程以使:
(本文由闻数起舞翻译自leboncoin的文章《Cooling down hot data: From Kafka to Athena》,转载请注明出处,原文链接: |
|