https://www.cnblogs.com/wonglu/p/8459694.html spark调优2018-02-22 18:06 by 牛仔裤的夏天, 538 阅读, 0 评论, 收藏, 编辑 摘要:鉴于 Spark 基于内存计算这一天性,以下集群资源可能会造成 Spark 程序的瓶颈:CPU,带宽和内存。通常情况下,如果内存足够的情况下,瓶颈就是网络带宽,但有时,你也需要做一些优化,例如以序列化的格式存储RDD,来减少内存使用。本指南将涵盖两个主要主题:数据序列化(这对于良好的网络性能至关重要,并且还可以减少内存使用)、内存调优。同时也会讨论一些较小的主题。 官网地址:https://spark./docs/2.1.0/tuning.html 1.数据序列化 一、数据序列化序列化在分布式系统中扮演着重要的角色,那些会让对象序列化过程缓慢,或是会消耗大量字节存储的序列化格式会大大降低计算速率。通常这是用户在优化Spark应用程序中应该调整的第一件事。Spark旨在在便利性(允许您使用操作中的任何Java类型)和性能之间取得平衡。它提供了下面两种序列化库:
可以通过设置初始化时的 SparkConf 和调用conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")来切换到Kryo 模式。这项配置不仅可以对worker节点之间的shuffle数据起作用,还可以在将RDD序列化到硬盘上时起作用。Kryo 之所以没有成为默认设置是因为使用者需要自行注册一些类,但是我们建议在一些网络密集型应用中尝试使用 kryo 序列化。从 Spark2.0.0 开始,当对于简单类型,简单类型数组和字符串类型的 RDD 进行shuffling,Spark 已经使用 Kryo 进行了内部整合。 Spark自动对许多在Twitter chill库中的AllScalaRegistrar所覆盖的许多常用核心Scala类注册了Kryo。 要使自定义类应用 Kryo 注册,你需要 registerKryoClasses 方法。 val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf) Kryo文档https://github.com/EsotericSoftware/kryo介绍了更多高级注册选项,例如添加自定义序列化代码。 如果你的对象很大,你可能需要增大spark.kryoserializer.buffer配置。这个值需要足够大以至于能够容纳下被序列化的最大的对象。 最后,如果你没有注册你的自定义类,Kryo仍然可以工作,但它将不得不为每个对象存储完整的类名,这是浪费的。 二、内存调优在对内存的使用进行调优时有三个考虑点:用户对象的内存使用量(用户可能希望整个数据集都保存在内存中),访问这些对象的开销和垃圾回收的开销(如果用户的对象周转率很高) 默认情况下,java对象的访问是很快的,但很容易就会消耗比字段中原始数据多2-5倍的空间。这是以下几个原因导致的:
本节会以Spark的内存管理机制的概述开始,然后讨论用户能在应用程序中采用的更有效的内存策略。我们将着重描述如何确定对象的内存占用和如何改变数据结构和序列化方式来降低内存占用。然后,我们会介绍如何优化 Spark的缓存大小和 Java 垃圾回收。 2.1 内存管理概述Spark的内存使用基本上可以分为两大类:执行内存和存储内存。执行内存指的是在shuffle,join,和aggregation计算中使用的内存,存储内存指的是集群中缓存和传播内部数据使用的内存。在Spark中,执行和存储共享一个统一的区域M。当没有执行内存使用时,存储可以获得全部的可用内存,反之亦然。执行在必要的时候可能会驱逐存储,但只有在总存储内存使用量低于某个阈值R时才会触发。用另一句话来说,R描述在统一内存M中一定不会被驱逐的缓存block子集。由于实现的复杂性,存储内存不会驱逐执行内存。 这种设计方案确保了几个令人满意的特性。首先,不使用缓存的应用可以使用全部内存来用于执行,从而消除不必要的磁盘溢出。其次,使用缓存的应用程序可以保留最小的不受驱逐的数据库存储空间R。最后,这种方法为各种工作负载提供了合理的开箱即用性能,不需要用户了解内存如何内部划分的专门知识。 尽管有两个相关的配置,但是通常用户不需要对它们进行调整,因为默认值适用于大多数工作负载:
spark.memory.fraction的值应该设置为可以适配JVM的老年代或终身代的使用。具体可以参考下面的GC章节。 2.2 内存消耗确定判断一个数据集到底消耗多少内存的最佳方式是:将数据集加载到 RDD 并将其缓存下来,然后去 Spark Web UI 查看“Storage”页面。这个页面将告诉你,你的 RDD 正在申请多大的内存。 估算某一个特定对象的内存消耗,可以使用SizeEstimator的estimate方法,这对于尝试不同的数据布局来减少内存使用,以及确定一个广播变量将占用每个执行器堆的空间量是很有用的。 2.3 数据结构调优减少内存消耗的首选方法是避免使用会增加开销的java特性,例如基于指针的数据结构和包装器对象。下面是集中解决方法:
2.4 序列化RDD存储当尽管进行了调优,但你的对象仍然太大,无法有效存储时,一个更简单的方法是使用序列化的格式来存储它们以此来减少内存的使用,使用RDD persistance API来设置序列化的存储级别,例如MEMORY_ONLY_SER。Spark将RDD的每一个分区作为一个大的字节数组进行存储。以序列化格式存储数据的唯一缺点是访问速度较慢,因为不得不在使用中反序列化每一个对象。如果您想以序列化的形式缓存数据,那么我们强烈建议使用Kryo,因为它比Java序列化(当然也要比原始Java对象)小得多。 2.5 垃圾回收调优当你的程序中存储的RDD有大量的替换和变更时,JVM垃圾回收可能会造成问题(当对一个 RDD 仅读取一次,然后在其上进行多次操作时并不会带来问题)。当Java需要将旧对象驱逐出去来为新对象腾出空间时,它需要跟踪所有的Java对象来找到未引用的对象。这里需要记住的要点是,垃圾收集的成本与Java对象的数量成正比,因此使用较少对象的数据结构(例如使用int的数组而不是LinkedList)会极大地减少消耗。一个更好的方法是以序列化的形式持久化对象,如上所述:每个RDD的分区只会有一个对象(一个字节数组)。所以当存在 GC 问题时,在尝试其他技巧前,首先要尝试的是使用序列化的缓存。 由于任务的工作内存(运行任务所需的空间量)和在节点上缓存的RDDs之间的干扰, GC也可能是一个问题。我们将讨论如何控制分配给RDD缓存的空间以减轻这个问题。 2.5.1 测量GC的影响 GC调优的第一步是收集GC发生频率和GC时间的统计。可以通过增加 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 的Java选项来实现。http://spark./docs/latest/configuration.html#Dynamically-Loading-Spark-Properties中详细描述了将Java参数传递给Spark Job的方法。下次Spark应用程序运行时,就可以看到Woker节点的log会打印出GC信息。注意这些log是在集群中的workder节点,而不是driver程序中。 2.5.2 GC调优 为了进一步优化垃圾收集,我们首先需要了解JVM中关于内存管理的一些基本信息:
Spark中的GC调优的目的是为了确保只有长期存在RDD会存储在老年代中,新生代有足够大的空间来存储短期对象。这有助于在任务执行期间避免收集临时对象造成的full GC。下面是一些可用步骤:
我们的经验表明,GC调优的效果取决于您的应用程序和可用内存的数量。在网上有更多的调优选项,管理频繁的GC发生的频率可以帮助减少开销。 执行器的GC调整标志可以通过设置作业配置中的"spark.executor.extraJavaOptions"来指定。 三、其他3.1 并行级别除非每一个操作的并行度都设置的足够高,要不然集群不会被充分利用。Spark自动根据文件的大小设定了运行在其上的map任务的数量(也可以通过SparkContext.textFile参数来控制),并且对于分布式的reduce操作,例如groupBykey和reduceByKey,它会使用父RDD中最大的分区数量。你可以将并行度作为一个次级参数床底,或是设置在配置文件spark.default.parallelism来改变默认配置。通常情况下,我们推荐为集群中的每个CPU分配2-3个任务。 3.2 Reduce任务的内存使用有些时候,你会因为task中的数据集,例如groupByKey,太大而造成OutOfMemoryError,而不是RDD和内存不匹配。Spark的shuffle操作(sortByKey,groupByKey,reduceByKey,join等等)会在每个任务中创建一个hash table来执行grouping操作,这个操作经常会很大。最简单的处理方案是增加并行度,让每个任务获取到的数据集更小。Spark对于短于200ms的任务执行的很好,因为它在多个任务中重用一个executor JVM,任务的启动成本很低,因此,你可以安全地将并行级别增加到您的集群中的核心数量。 3.3 广播大变量使用SparkContext中的广播特性,你可以极大地减少序列化任务的大小,和集群中的启动任务开销。如果你的任务用到了driver中的一个大的对象(例如一个static lookup table),可以考虑将它变为广播变量。Spark将每个任务的序列化大小打印在主服务器上,因此您可以查看它来决定您的任务是否太大;一般来说,大于20kb的任务很可能是值得优化的 3.4 数据本地性数据本地性对于Spark任务的性能有很大的影响。如果数据和操作的代码在一起,那么计算往往很快。但是由于代码和数据是分离开的,它们中总会有一方要向另一方传递。通常,将序列化的代码从一个地方发送到另一个地方比传输数据块要快,因为代码的大小比数据要小得多。Spark构建了它围绕数据局部性原则的调度。 数据本地性是数据和处理它的代码之间的距离。下面有基于数据当前维值的几种本地性设置。通过选取最短距离来达成最快的处理速度:
Spark希望把所有的任务都安排在最合适的位置上,但这并不会总是可行的。在没有任何空闲执行机的情况下,Spark会切换到较低的局部性。有两种选择:a. 在同一个服务器上等待CPU空闲,再提交任务 b. 立即在一个其他执行机上开始执行任务,并将数据移动过去。 Spark通常情况下会等待CPU空闲。一旦等待时间超时,它会开始移动数据到较远的空闲CPU上。每个级别之间的等待超时可以单独配置,也可以在一个参数中组合在一起。具体配置参考spark.locality。默认配置通常效果较好,可以根据任务特性来修改这些配置。 四、总结本文是针对Spark应用程序调优中需要注意的主要问题的一个简单指南,主要关注数据序列化和内存调优。对大多数应用来说,切换到Kryo序列化并persist序列化数据可以解决大多数性能问题。 |
|
来自: jasonbetter > 《Spark_Flink》