分享

Apache Spark优化技术

 新用户0175WbuX 2022-02-22

  Apache Spark优化技术

  在讨论各种优化技术之前,请快速回顾一下Spark如何运行

  Spark如何运行:

  用户使用集群模式下的spark-submit提交应用程序(也有本地和客户端模式,但要考虑生产情况)。

  然后,spark-submit实用程序将与Resource Manager通信,以在数据节点之一中启动Application Master。

  该驱动程序将在Application Master Container中启动。

  Apache Spark优化技术

  之后,Spark驱动程序将与资源管理器进行通信,以启动容器来处理数据。

  然后,资源管理器将分配容器,Spark驱动程序将在所有分配的容器上启动执行程序,并分配要运行的任务。

  现在,所有执行程序将直接与Spark Driver程序通信,并且所有执行程序的输出将由Spark Driver程序收集。

  在每个执行器上完成任务后,驱动程序将调用SparkContext.stop()方法,该方法将终止执行器并释放基础资源。

  注意:请记住,Application Master(包含驱动程序)和Executor进程都在各自的容器上运行。 在这里,容器不过是内存和CPU的分配。 因此,如果我们可以有效利用基础资源并优化执行过程,那么整体性能将会提高。

  基本的Spark存储器管理:

  spark中使用的内存可以归类为-

  执行内存:指用于洗牌,联接,排序和聚合期间的计算的内存。

  存储内存:指用于在集群中缓存和循环内部数据的存储。

  用户内存:用于存储RDD转换所需的数据,例如RDD依赖项的跟踪信息。

  保留的内存:用于为系统保留和存储Spark的内部对象。

  在Spark中,执行和存储共享一个统一的区域。 意味着,当不执行任何操作时,存储可以利用所有可用的内存,反之亦然。 如果需要,仅执行可以退出存储,直到达到一定阈值。

  Apache Spark优化技术

  内存管理级别很少,例如-Spark级别,Yarn级别,JVM级别和OS级别。

  SPARK使用多个执行程序和核心:

  每个火花作业包含一个或多个动作。 这些动作分为多个阶段。 为了实现并行性,这些阶段分为多个任务,这些任务分布在执行者之间。 这些任务中的每一个都在处理数据的子集,并且可以并行运行。

  因此,当我们在每个执行器中有多个内核(不超过5个)时,spark基本上会使用这些额外的内核来产生额外的线程,并且这些线程将同时执行任务。

  使用更多数量的执行器和核心并不意味着性能会更高。 我们需要找出最佳数量的执行程序和核心以有效地执行我们的任务。

  为什么优化很重要:

  在生产环境中,Spark在分布式计算机上运行,并且分布式系统也可以由其他一些应用程序使用。 这意味着不同应用程序正在利用底层资源。 需要进行优化才能通过使用最佳资源来更快地执行作业。 资源的过度利用或利用不足可能会对作业的执行时间和处理能力产生负面影响。

  构建优化器中的Spark非常高效,此过程可确保Spark具有最佳性能。

  但是,由于我们知道不同的应用程序具有不同的行为,因此,基于我们的应用程序和处理行为的性质,我们需要对其进行微调和优化。

  理解前:

  在开始实施优化技术之前,我们应该对以下方面有清楚的了解–

  1.我们在哪个分布式系统上。

  2.分布式系统的性质及其工作方式。

  3.分布式系统的资源详细信息。

  4.由平台团队设置的已定义火花分配和配置详细信息。

  5.群集的当前利用率和即将到来的负载。

  6.您的应用程序和工作的性质。

  7.流入数据类型和频率。

  8.业务期望,需求和SLA(如果有)。

  9.对应用程序和工作流程有很好的了解。

  优化技术:

  我们基本上应该关注两个主要领域–

  1.应用代码级别

  2.集群配置级别

  1.应用代码级别:

  在编写Spark应用程序时,应遵循许多最佳实践和标准技巧教程,以使其在执行和资源利用方面更加方便。 在这里,我重点介绍几个重要的标准:

  A)明智地进行分区:在随机操作期间,spark默认会创建200个分区。 作为开发人员,我们应确定发生混洗的区域,并根据处理后的数据量在此操作期间定义适当的分区号。

  要检查数据帧(df)上的现有分区,请执行以下操作:

  p=df.rdd.getNumberPartitions()

  要检查分区中数据的分布方式:

  print('distribution:'+ str(df.rdd.glom()。map(len).collect()))

  请记住,对于所有应用程序而言,分区太少(可能会导致较少的并发,数据扭曲和资源使用不当)或分区太多(可能导致任务调度花费比实际执行时间更多的时间)对任何应用程序都是有害的。 在我们的应用程序代码中或多或少地处理着相同数量的数据,那么我们可以在会话级别定义shuffle分区:

  spark.conf.set(" spark.sql.shuffle.partitions"," 40")

  否则,根据洗牌操作期间的需求,我们可以使用repartition(40)或coalesce(40)定义大小。

  请记住,重新分区和合并之间的区别:重新分区会进行完整的混洗,并使用均匀分布的数据创建新分区。 Coalesce使用现有分区的地方可以最大程度地减少混洗的数据量。

  注意:默认情况下,火花随机播放块不能超过2GB。 在每个分区中有一个128MB以内的块来实现并行性总是好事。

  B)缓存数据:尝试缓存将在程序中多次使用的数据框,以避免重复的转换步骤。 但是请记住避免不必要的缓存,并且不要忘记取消缓存的数据帧。

  C)避免使用UDF:请记住,火花优化器不够聪明,无法理解用户定义函数中编写的逻辑,因此优化引擎将与逻辑保持一致并按原样执行。 除非需要,否则不要创建不必要的UDF。

  D)随机播放的成本很高:随机播放操作在分布式环境中总是很昂贵的,因此,请避免不必要的随机播放操作。

  E)使用广播:将小型数据集加载到内存中,以便与大型数据集合并以避免混洗。

  F)盐化:如果我们的数据偏斜,则可以在使数据集倾斜的值上创建存储桶,然后对这些存储桶执行操作,最后合并所有存储桶。

  G)其他:在编码时,我们应该重点关注的几个重点,例如-消除冗余操作,最小化I / O操作,选择正确的文件格式(如果在范围内),在生产代码中不要使用show() 。

  2.集群配置级别:

  在这里,我们将讨论:

  A.覆盖spark默认配置。

  B.定义最佳基础资源以加快执行速度。

  A.覆盖SPARK默认配置:

  A.1)启用堆外内存:

  我们知道改组是一项昂贵的操作,要对大型数据集更快地进行操作,我们可以启用堆外内存。 堆外内存将有助于在本机内存中分配随机播放数据结构。 这意味着它们将不受JVM内存管理器的管理,也不受任何垃圾回收的约束。 这将有助于加快执行速度。 注意,在启用此属性期间,我们需要定义堆外大小。

  sparkmory.offHeap.enabled=true

  sparkmory.offHeap.size=3g(这是一个样本值,将根据需要更改)

  A.2)消除磁盘I / O瓶颈:

  在讨论这一点之前,我们应该了解spark在磁盘I / O的实际位置。 执行存储器用于存储中间的混洗行。 当我们的应用程序读取越来越多的数据时,可能达到执行内存的阈值。 一旦达到极限,数据就会被排序并溢出到磁盘上,并作为临时溢出文件存储。 磁盘上可能有多个临时溢出文件,最后我们的应用程序读取了所有这些溢出文件并将它们合并在一起,并作为Final Shuffle文件存储在磁盘上。

  Apache Spark优化技术

  我们知道磁盘访问比内存访问慢,并且当有许多临时溢出文件可用于合并时,磁盘访问可能成为一项工作的性能瓶颈。

  这也可能适用于少量数据。 我们可能认为我们的数据足够小,无法容纳到内存中,但由于最终的混洗输出必须返回磁盘,因此不一定总是正确的。

  随机播放和溢出使用的默认缓冲区大小为32KB,但根据需要我们可以相应地定义:

  spark.shuffle.file.buffer=1MB

  A.3)音调压缩块大小:

  默认压缩块大小为32KB,这不是最佳选择。 我们可以减少随机播放和溢出文件的大小。 通过增加压缩块大小将其减少20%。

  sparkpression.lz4.blockSize=512KB

  注意:默认情况下,spark提供四个编解码器:lz4,lzf,snappy和zstd。 在这里,我考虑了Iz4。

  B.定义最佳的基础资源:

  我们需要了解执行操作并相应地定义它们需要多少资源。 这对于我们的项目以及在同一群集上运行的其他应用程序非常重要。 下面我将解释确定最佳资源的逻辑和计算。

  可能有两种情况–

  -定义了动态分配

  -未定义动态分配

  通常在生产集群中将定义动态分配。 但是,在两种情况下,我们都可以使用以下计算得出最佳资源值并进行相应协商。

  我们可以定义许多配置细节,但是这些细节基于作业执行行为,并且可能因项目而异。 因此,我将主要介绍如何找出–

  ·核心数

  ·执行者人数

  ·执行者回忆

  在开始计算资源之前,我们应该了解以下内容-

  ·我们火花工作的性质(频率,依赖性等)

  ·我们正在处理的数据种类(格式,类型,流入频率等)

  ·处理数据量

  ·群集详细信息(节点数,总内存,可用内核数)

  Apache Spark优化技术

  例:

  为简单起见,我们将以下配置详细信息作为示例,并进行计算:

  群集详细信息:

  节点=5

  每个节点的核心数=32………总核心数=5 * 32=160

  每个节点的内存=512 GB…………总内存=5 * 512=2560 GB

  考虑整个集群都用于我们的应用程序处理。

  第1步:确定核心数量:我们知道,对于并发处理,每个执行程序都应具有多个核心。 但是研究表明,具有5个以上并发任务的应用程序会产生问题。 因此,我们应始终考虑<=5个内核。 让我们以CORES=5为标准。

  步骤2:操作系统和守护程序的内存和内核:注意,操作系统和其他可用守护程序在节点上运行所需的内核和内存很少。 这些价值我们可以从平台支持团队获得。 现在,假设为此目的每个节点分配2个内核和2GB内存。 因此,为OS和其他守护程序分配了内存=(5 * 2)=10GB,内核=(5 * 2)=10。

  所以,

  Spark执行的可用内存=(2560–10)=2550 GB

  Spark执行的可用内核=(160–10)=150

  步骤3:确定执行者人数:

  我们可能需要的执行者总数=(总核心数/每个执行者的核心数)=(150/5)=30

  作为标准,我们需要1个执行者来执行YARN中的Application Master

  因此,我们需要处理的EXECUTOR的最终数量=(30–1)=29

  步骤4:确定执行器内存:我们已经计算出Spark执行的总可用内存=2550 GB,执行器总数=29。因此,

  每个执行者的内存=(总可用内存/执行者总数)

  =(2550/29)?87 GB

  现在,还需要较小的开销内存来确定每个执行器和其他处理对YARN的内存请求。 我们可以使用以下计算来确定-

  开销内存=max(384mb,.07 * spark_executor_memory)

  =最大值(384mb,.07 * 87gb)

  =最大(384mb,6.09gb)?6 GB

  因此,每个执行器的最终计算内存=87–6=81 GB

  重要说明:必须尽可能减少执行程序内存,因为它可能导致JVM垃圾回收延迟。 这个事实也适用于小型执行程序,因为多个任务可以在单个JVM实例上运行。

  总结:经过以上所有计算,我们最终得到以下最优数:

  CORES=5,EXECUTORs=29,EXECUTOR MEMORY=81GB

  请记住,此数字是根据定义的公式和概念计算的,但是在生产中,情况可能有所不同,其中更有可能启用动态分配,并且多个应用程序共享相同的基础资源。

  因此,一旦我们了解了上述概念并能够确定最佳数,在进行优化时,我们也应考虑以下几点:

  一世。 根据处理能力和处理流入数据量,我们需要执行一些测试运行,并根据最佳执行时间确定数量。

  ii。 如果我们知道在一段持续时间内需要处理多少数据,则可以进行后向计算以了解预期的作业执行时间以及需要调整的时间。

  iii。 如果启用了动态分配,请检查maxExecutors,minExecutors和initialExecutors的spark.dynamicAllocation属性。 这些配置有助于了解在执行过程中将捕获多少基础资源,并考虑到我们的资源假设(如上计算),确保最小和最大之间的范围不应太小或太大。

  iv。 确保在动态分配中将spark.executor.cores定义为5。

  v。检查spark.drivermory,spark.executormory和spark.drivermoryOverhead的动态分配详细信息。 请记住,这些记忆将在作业执行期间为每个驱动程序和执行程序占用。 因此,我们需要确保不要分配太多或更少的内存。

  重要说明:上面的计算是考虑到该作业将处理大量文件,但是有可能需要处理少量文件,并且在这种情况下根本不需要大量资源。 这有点棘手,不是直截了当的。 在这种情况下,请首先使用上述逻辑并获得最佳资源计数。 如果我们已经有一个生产作业正在运行,请找出其执行行为,并尝试使用最少数量的执行程序(基于传入文件的大小)。 执行一些测试以获得最佳计数。 另外,在此过程中,请尽量不要为每个执行程序使用大量内存。

  结论:

  在开始优化我们的Spark作业之前,我们了解所有我们应该知道的细节。

  一旦我们有了清晰的画面,就可以使用最佳实践和技术来微调我们的代码,这些代码最终将在具有更好性能的最佳资源上运行。

  通过计算,我们可以获得工作所需的最佳资源详细信息。 而且,一旦有了,我们就可以进行一些演示运行并找出实际的最佳数量。

  还有许多其他事实将帮助我们了解执行的本质并找到最佳资源。

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多