分享

​Hadoop Yarn 在小米的实践

 520jefferson 2022-09-25 发布于北京


导读:大数据技术的应用已经在各行各业都比较成熟了,基于Hadoop Yarn的调度和资源管理在离线以及在线方面有着举足轻重的地位。本文将分享Hadoop Yarn在小米内部的实践。

今天的分享会围绕以下几点展开:

  • 调度优化实践

  • 资源优化实践

  • Yarn扩展性与其他优化

  • Yarn元仓建设

  • 未来规划

01
调度优化实践

目前小米国内及海外已拥有20+个集群,其中最大的集群节点数已达6000+,最大单集群队列数量也已经达到了1000+。接下来基于目前Hadoop Yarn在小米内部的实践经验,分享一下各个方面所做出的优化。

首先来看一下使用过程中存在的一些问题。

1. 问题修复

(1)节点资源更新导致调度卡顿问题

图片

在平时使用过程中,有时会遇到调度卡顿的现象。定位发现在ResouceManager收到 Nodemanager资源上报时会触发队列资源更新,队列资源更新过程中会获取锁,并且这个锁同时也会在调度线程线程中被使用。此时,如果 ResourceManager 内部队列数量过多,会出现队列资源更新耗时过长导致长时间占有该锁,进一步导致调度性能下降。

目前小米内部针对该情况,采用的优化方式是当节点上报资源时,不直接触发队列资源更新,而是基于异步批量更新的策略,该策略可以缓解短时间大量节点心跳上报或节点频繁上下线导致的调度性能下降问题。

(2)Global Scheduler多线程调度崩溃

图片

小米内部Hadoop Yarn版本是 3.1.0,调度器使用Capacity Scheduler并且开启多线程,在社区该模式也称为 Global Scheduler。Global Scheduler调度模型会存在多个调度线程和一个仲裁线程。在使用Global Scheduler 过程中发现有时候会出现因为调度线程崩溃导致 ResourceManger 调度持续hang住的问题,并且不会进行自我恢复,导致ResourceManger长时间不做任何资源分配,严重影响业务。

通过异常排查定位,发现在队列的排序过程中会因为不满足 JDK TimeSort 算法对于数据的约束从而抛出异常导致线程崩溃。具体的流程是:调度线程在进行队列排序时不会对整颗队列树加锁并且会将分配成功的结果写入backlogs,仲裁线程通过backlogs获取调度线程分配的结果,尝试对分配结果进行apply,当app和queue apply 成功之后会加写锁,更新对应队列资源。但是这个资源更新发生在调度线程对队列排序过程中有概率会打破TimeSort算法对于数据的要求(自反性、传递性和对称性),导致调度线程的崩溃,进一步导致ResourceManger无法响应调度需求。

目前两种解决方法:

第一种会使用legacyMergeSort去替换TimeSort,这种方式比较简单,直接在 JVM 启动时注入一个配置项-Djava.util.Arrays.userLegacyMergeSort=true解决;

第二种是在调度线程进行调度时不使用原始的队列结构而是使用深拷贝的形式将队列中的数据拷贝一份,通过拷贝后的数据进行排序,这种情况下,仲裁线程对队列的 apply就不会影响到整个队列的排序。关于这个问题可以看看内部推给社区的YARN-10178这个issue,其中也描述了定位的过程以及讨论的解决方案。

(3)ResourceLimit计算逻辑导致调度性能问题

图片

在调度线程进行队列调度时,会获取其可使用资源配额ResourceLimit,通过Resource.min基于当前集群层面和队列层面max capacity资源配额,计算得到当前队列能使用的最大资源量。Resource.min基于DRF算法再考虑多纬资源类型情况下返回主导资源中权重较大的值。当前小米内部考虑内存和CPU多维资源。通过 Resource.min基于主导资源计算返回的 ResourceLimit 会出现调度线程能通过资源判断校验,但是仲裁线程中提交失败的情况。

用一个实际的例子来解释这种情况:存在一个队列A,资源为<60G, 100core>,其子队列A1的资源为<5G, 100core>,A2的资源为<40G, 70core>。此时对A2队列分配了一个<30G, 1core>的一个资源,此时队列的资源就变成了A<30G, 99core>,A1<5G, 100core>,A2<10G, 69core>。这种情况下如果尝试给A1分配一个<10G, 1core>的资源请求,正常情况下,因为A1的max capcity是<5GB, 100core>,从理解上来讲应该在调度线程尝试调度时因资源检测失败直接拒绝该队列的资源分配。但是由于Resource.min基于DRF算法选择 CPU 为主导资源进行计算,最终会返回<30G, 99core>的ResourceLimit,导致调度线程在尝试分配<10G, 1core>的资源时因小于 ResourceLimit 通过校验写入backlogs。仲裁线程会获取backlogs中的 commit进行apply,由于仲裁线程会严格比较各纬度资源,所以在apply过程中就会因为申请的资源<10G, 1core>大于A1队列的 max 资源<5G, 100core> 失败,产生大量的Failed to accept this proposal 错误严重影响调度性能。

目前在小米内部针对这种情况,再获取ResourceLimit时通过Resources.componentwiseMin函数获取各项资源的最小值,这样可以在调度线程中针对资源配比拒绝掉不合理的请求。关于这个问题可以参考YARN-11083这个issue,其中也描述了定位的过程以及讨论的解决方案。

介绍完使用过程中遇到的问题及对应解决方案后,接下来介绍小米内部针对Hadoop Yarn所做的性能优化。

2. 性能优化

图片

(1)跳过userlimit计算逻辑

因为在小米内部场景中针对单个队列内多用户之间的资源隔离的需求并不是很多,而userlimit的计算逻辑是嵌入在整个调度流程当中,会产生大量的无效计算导致调度效率不高。目前小米内部采取的优化手段是在涉及配置加载重构整个队列树的时候,基于minimum-user-limit-percent以及userlimit-factor这两个参数和队列的资源量计算出一个值,通过该值判断队列内部是否需要用户级别的隔离,如果不需要,会直接在涉及对应队列的调度流程中将userlimit计算跳过避免大量的无效计算逻辑去计算userlimit的操作。

(2)单次分配多个container

Hadoop Yarn的调度原理是先基于队列的排序,在进行各层级的排序后会最后会选择最优的一个container进行调度,如果队列数过多就会产生大量的排序操作,同时也因为在小米内部集群规模比较大,所以可以接受放弃一定的公平性来提升集群的调度性能。另外虽然Capacity Scheduler调度策略内部使用公平算法去进行分配,但并不是绝对的公平。所以目前在小米内部使用的优化策略是调度策略选择出子队列后,尝试分配多个container,去提升单位时间分配的container数量。

(3)效果

图片

小结:平时遇到的问题主要分为性能&功能两大类,首先会定位具体原因,明确具体问题,后根据社区进度以及基于小米内部业务场景进行优化或者修复,来满足小米内部大规模集群下调度的需求。

02

资源优化实践

接下来介绍一下小米内部在使用Hadoop Yarn过程中针对资源优化的一些实践。

1. 弹性调度

图片

小米内部离线集群的使用情况如上图1所示,凌晨因大量例行任务资源占用比较多,白天任务数量相较于凌晨少很多,存在明显的资源低谷,导致白天存在大量的资源浪费。而期望的资源使用方式是如上图2所示,只保障低峰资源的使用,而对于高峰资源的需求,尝试使用公有云实例或者云主机去满足。另外实现这个同时也会带来一些其他问题,作业如何选择以及作业稳定性如何保障。

图片

在作业筛选时,通过时间和优先级两个维度对作业进行筛选。时间维度方面,目前这批弹性节点有可预测的下线时间,所以在选取作业时会考虑作业的运行时长,在节点可预测下线的时间之前能满足作业运行结束的要求;还有一方面就是作业运行的开始时间或者作业的运行时间段不能和节点下线的时间吻合。优先级方面,目前不会针对高优的作业使用弹性调度,而是针对默认或者低优先级的作业使用弹性调度。

弹性调度方案设计初的目标就是对业务无感知的,目前内部有一个 infra-client 的组件,可以控制用户的依赖包与配置,通过该组件可以灰度用户配置,这样可以做到在用户无感知的情况下,达到弹性资源的使用。

图片

为了保障使用弹性资源的作业稳定性,方案有如下一些细节优化:

①对于作业的大脑ApplicationMaster的调度,尽量避免去调度到弹性节点上,降低弹性资源缩容时对作业稳定性的影响。

②目前社区只支持单Label配置,不支持或表达式的配置。但是因为凌晨的弹性资源可能因为节点异常或者其他因素导致弹性资源的资源量小于作业的资源需求,导致作业 pending,在这种情况下需要可以使用集群的固定实例资源,出于这个需求内部开发支持配置Label或表达式支持同时使用固定实例资源与弹性资源。

③针对Spark作业稳定性保障,如果调度在弹性节点上,当节点下线后,shuffle 数据也会丢失,导致重算,增加了作业异常的风险,内部通过引入Remote Shuffle Service去解决该问题。

④因为当前弹性节点类型是有可预期下线时间点,所以在节点下线之前的一段时间内会对该节点进行Graceful Decommission,这样在节点下线之前的一段时间,会保证Yarn不会继续向这批节点调度作业,同时也要保证已有的作业能在预期的时间内运行完成,尽可能保障作业的稳定性。

关于弹性调度实现及架构的更多细节可以参考《小米Hadoop YARN弹性调度的探索与落地》这个分享。

图片

最终在上线弹性调度之后的成果和预期的资源使用曲线基本吻合,而且在成本方面也有明显降低。

2. 单机节点资源超卖

图片

在集群规模较大时,会遇到明显的资源浪费问题,具体现象是业务申请的container的资源使用量与实际的资源使用量不符合,但是从集群逻辑资源来看资源使用率非常高,从单机物理资源使用情况来看利用率较低,存在浪费,导致目前集群中大量的资源空闲。

基于这种情况,目前小米内部采用的是单机节点资源静态超卖的模式,单机节点资源进行上报时上报比主机资源可用内存更大的值给ResourceManager,即上报real + overuse的资源给ResourceManager,但是这种模式会导致单机出现稳定性问题。比如某个节点成为调度热节点时,节点的物理资源使用率可能接近单机的上限导致触发系统的omkiller,极端情况下会导致整个节点hang住,严重影响作业的生产保障。目前小米内部使用Cgroup进行节点稳定性管控,结合使用NodeManager内部的elastic + enforce两种机制去满足单机节点稳定性的目标。

接下来介绍一下如何使用Cgroup超卖NodeManager资源超卖情况下节点的稳定性。

图片

资源超卖下 NM 基于Cgroup保障单机稳定性主要依赖于Elastic Memory Control的实现。

①单机内存利用率层面,在 Cgroup配置文件中针对hadoop-yarn的配置memory.limit_in_bytes=node.real,同时配置memory.oom_control的oom_kill_disable字段为1。这样,整个NM 的进程树的内存可使用总量受限于memory.limit_in_bytes的配置,一旦整个进程树的内存资源达到限制,内核会冻结整个进程树的执行。

另外,NM内部会监听内核的事件,当发生内存超用的发生时,NM监听的内核事件并触发管控策略,基于相关策略选择container进行kill,将单机利用率降低到安全阈值下,内核会使 NM 启动的 container 继续运行。在小米内部针对于kill的策略进行了扩展,在进行kill时优先选择优先级比较低的container进行kill,针对于同等优先级情况下选择最近启动的container进行kill。

②单个container级别设置memory.limit_in_bytes=reqest,管控单个container的内存限制。通过以上两种方式的限制就能满足单个节点资源超卖的这种机制。

图片

ElasticMemoryController 机制在内部还有其他场景,比如针对流式作业小米内部有专有集群,之前出现过Flink Container Lost过多影响业务的情况。经排查是Flink任务由于堆外内存使用过多造成大量 Container 被NodeManager kill掉。对于这种情况,小米内部在流式集群落地了Elastic Memory Control机制,保障单机不触发OOM-KILLER的情况下,允许container进行资源超用,只有当达到了单机内存上限或者触发稳定性问题的时候,才会进行Container的kill。这样可以尽可能保障业务稳定的情况下让 flink 同学定位以及优化Flink堆外内存溢出或者其他可能的内存使用问题。

在流式集群上线Elastic Memory Control机制后,Flink Container Lost从日级别的1000+降低到了500+,下降了接近50%,对整个Flink作业的稳定性提升非常明显。

3. 基线任务执行优化

图片

基线任务业务可以通过小米内部的数据工厂配置基线通过配置作业的预定产出时间工厂可以推导出其上游各个依赖作业需要启动时间此时存在一个需求就是队列内基线优先级高的任务比基线优先级低的任务更快拿到资源目前 Yarn 队列内优先级的支持通过 fifo + priority 的策略进行排序但是内部需求想在同等优先级下实现公平分配不同优先级下实现高优先级优先分配

针对需求内部实现了 fair + priority 的比较器,去满足整个基线对于高优先级作业的执行的资源需求的优化。

 小总结:

内部资源保障,主要分几个方面:

①首先尽可能去寻求公有云的弹性资源,降本增效。

②尽可能提升单机物理资源利用率。

③资源量一定的情况下高优先级作业可以更快获取资源。

03

Yarn扩展性与其他优化

接下来介绍 Hadoop Yarn 扩展性方面的一些实践,以及使用过程中的其他一些优化。

1. 扩展性

目前小米 Yarn 集群存在两个隐患

  • 如何适配新的机房,如何对外屏蔽新机房的存在以及减少两个机房之间的专线带宽;

  • 单集群因规模较大面临稳定性及扩展性的问题,急需扩展性的方案。

通过对社区实现方式以及业界常见的一些优化手段的调研,以及小米内部更倾向于更靠近社区,所以决定落地 Yarn Federation 架构。

图片

Yarn Federation在小米内部的架构如上图所示,小米内部有一个资源运维平台叫做资源管理服务,通过资源管理服务去做 Federation 规则的管理。目前落地模式是单队列单集群的模式。GPG模块的引入主要是做 zk 中大量 app 数据的清理工作,因为Router在进行app转发的时候要记录app对应的subcluster信息,会导致zookeeper中存在大量的数据,但目前Router是不具备清理功能的,所以需要一个外部的组件去进行清理,所以从社区中引入 GPG 模块实现该需求,当然 GPG 还有更丰富的功能,不过当前内部还未采用。

Federation 架构引入之初的一个目标是对外屏蔽子集群的概念,该需求在 federation 基于 router 转发已经默认支持,另一个需求是如何减少多机房之间的流量带宽,尽可能降低跨机房的数据访问要求。当前内部会基于数据血缘分析,将分析出数据孤岛所对应的链路扔到扩展的 subcluster 去进行运行,达到流量收敛的目的。

接下来介绍一下上面刚刚提到的资源运维平台,即资源管理服务。

图片

资源管理服务是小米内部Yarn运维管理平台,用户可以自助进行队列的增删该查,权限以及队列属性修改等等功能。因为有些队列可能申请过后使用率比较低,所以资源管理服务也会定时的对队列资源进行治理。对于Federation,资源管理服务提供了一个规则配置页面,提交之后会生成一个规则,由管理员进行审批,审批过后,整个规则会写入到zookeeper当中,Router加载zookeeper中的配置,最终这条规则就落地生效了。

在落地Federation架构的过程中,我们遇到了如下问题:

图片

下面介绍一下其中的AM Invalid Token问题。

图片

在内部通过原地升级到Federation架构时遇到一个Invalid Token的问题,造成该问题的原因是因为ResourceManager去NodeManger拉起ApplicationMaster的时候,如果AMRMProxy开启的状态下,会将ApplicationMaster的token换成一个local token,然后ApplicationMaster通过local token与AMRMProxy进行通信,然后AMRMProxy去代理ApplicatonMaster原来的token去和ResourceManger进行通信。由于滚动部署NodeManager与灰度Client不能同时生效,就会造成ApplicationMaster拿着local token直接与ResourceManger进行通信,最终导致Invalid Token,进而导致作业的失败。

针对这种情况,目前内部的解决方案是在拉起ApplicaitonMaster的时候通过环境变量决定AMRMProxy是否替换Token,如果环境变量开启,AMRMProxy才会进行token的替换,如果环境变量未开启,即使AMRMProxy开启,也不会进行token的替换,这样ApplicationMaster会直接通过原来token与ResourceManager进行通信。之后等NodeManager更新完成并开启 AMRMProxy之后,通过将Client的 scheduler.address配置以及环境变量打开完成请求的转发。

2. 其他优化 - RM App State To MySQL

图片

ResouceManger内部会缓存很多已经结束的app,主要是为了提供给业务通过访问Yarn Web UI去获取已结束作业的执行数据,进行问题定位。但是如果ResourceManger缓存大量的已结束的app,会导致内存压力特别大,特别在进行高可用切换时耗时非常长。另外即使ResourceManager缓存大量的已结束的app的数据,也解决不了结束时间较久的作业的数据的访问。

针对这种情况,目前小米内部在app finish时通过state store去写zookeeper的时候同时也会全量写一份数据到mysql,这样当用户通过Yarn UI去访问已经结束的app 数据,如果内存中没有,就会去mysql加载该app的数据,并且内部会模拟recover的流程去恢复当时app的数据,这样业务无论访问过期多久的app都可以访问到,同时也解决了ResourceManager内存的压力问题 。

3. 其他优化

图片

在运维集群的过程中,会遇到用户希望知道container中作业运行慢的原因,如果经常去线上进行排查会是一个耗时耗力的过程,所以基于这一背景,内部在Yarn UI上扩展了一个功能,业务可以直接通过Yarn UI上jstack的一个链接直接 dump下来container对应的thread信息,极大地提升了运维的效率。

同时还有一个优化就是在container执行前注入了一个hook机制,这个机制会在 container 拉起来时进行container的一些必要的校验,如果业务提的container不满足需求,就会直接拒绝该请求。比如前阶段的log4j的漏洞问题,就可以通过该hook机制,在container执行之前直接扫描初有问题的container并且进行实时拦截。

还有一个问题就是NodeManager日志上报比较慢的问题,因为目前NodeManager上报日志是在app进行初始化的时候,就将该app的信息放入线程池中,当单机的流式作业过多时就会占满线程池。此时如果有批处理的app finish的话,由于该app拿不到线程,就导致外部长时间看不到日志。目前的解决方案是不在app init的时候进行日志监控上报,改为只在app finish的时候进行日志上报。

另外在维护Yarn的过程中,肯定想对离线资源的使用数据做一个观测,比如当前运行了哪些app,是哪种类型的,资源占比是多少,运行了多久等等信息,所以基于这个需求背景建立了Yarn元仓。

04

Yarn元仓建设

图片

Yarn元仓中包含了几大类型的数据,包括占比、趋势、成本,以及队列的满载率和空闲率。基于元仓的数据可以为弹性调度提供数据支撑,并且可以通过不同的占比和趋势掌控集群的变动,产出的元仓数据表还可以提供给外部进行数据分析。

图片

目前的数据基于app event change log,内部在进行app事件转换的时候,会同步记录一份日志,同时会将这部分的数据写入内部的消息队列Talos,然后通过数据同步工具将数据写入到iceberg表中,之后通过flinksql将数据进行处理。同时针对正在运行的任务会通过定时任务从ResourceMananger API中获取,之后将app table和app running table 采用 T+1生成一张大宽表供分析使用。

图片

上面就是目前小米内部Yarn元仓的部分看板,通过其可以很快分析出某个时间段内提交的作业的数量,作业的类型,以及作业的运行时长,同时也能获取到某个时间段内内存及CPU的使用情况。

05

未来规划

图片

在未来的发展中,我们将通过离在线混部的方式在提高在线集群资源利用率的同时充分利用内部闲置资源;同时引入动态超卖去解决静态超卖所存在的问题;将作业调度和资源调度进行通盘探讨,以实现小米离线集群资源的高效利用。 

 今天的分享就到这里,谢谢大家。


    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多