由于大多数Spark计算的内存性质,Spark程序可能会受到集群中任何资源(CPU,网络带宽或内存)的瓶颈。Spark优化主要是围绕着这几个瓶颈展开,优化方式包括序列化调优、内存调优等。
数据序列化
在任何分布式系统中,序列化都是扮演着一个重要的角色的。如果使用的序列化技术,在执行序列化操作的时候很慢,或者是序列化后的数据还是很大,那么会让分布式应用程序的性能下降很多。所以,进行Spark性能优化的第一步,就是进行序列化的性能优化。
比如之前,我们要传输数据,从driver端传输到executor端、executor端发送到driver端、广播、shuffle都需要序列化。
序列化库
Spark 旨在便利性(允许您在操作中使用任何 Java 类型)和性能之间取得平衡。它提供了两个序列化库:
- Java 序列化机制:默认情况下,spark使用此种机制。默认情况下,Spark使用Java自身的ObjectInputStream和ObjectOutputStream机制进行对象的序列化。而且Java序列化机制是提供了自定义序列化支持的,只要你实现Serializable接口即可实现自己的更高性能的序列化算法。Java序列化机制的速度比较慢,而且序列化后的数据占用的内存空间比较大。
- Kryo 序列化机制:Spark也支持使用Kryo类库来进行序列化。Kryo序列化机制比Java序列化机制更快,而且序列化后的数据占用的空间更小,通常比Java序列化的数据占用的空间要小10倍。Kryo序列化机制之所以不是默认序列化机制的原因是,有些类型它也不一定能够进行序列化;此外,如果你要得到最佳的性能,Kryo还要求你在Spark应用程序中,对所有你需要序列化的类型都进行注册。
Kryo 序列化机制使用方式
// 开启Kyro序列化
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 开启主动注册,以获得最佳性能,人为指定序列化的数据类型,默认是加载全部类型
conf.set("spark.kryo.registrationRequired", "true")
优化Kryo 类库的使用
优化缓存大小
如果注册的要序列化的自定义的类型,本身特别大,比如包含了超过100个field。那么就会导致要序列化的对象过大。此时就需要对Kryo本身进行优化。因为Kryo内部的缓存可能不够存放那么大的class对象。此时就需要调用SparkConf.set()方法,设置spark.kryoserializer.buffer.max参数的值,将其调大。
默认情况下它的值是64,就是说最大能缓存64M的对象,然后进行序列化。可以在必要时将其调大。
预先注册自定义类型
使用自定义类型时需要预先注册好要序列化的自定义的类。
在什么场景下使用Kryo 序列化类库?
- 从 Spark 2.0.0 开始,在内部使用 Kryo 序列化程序来对具有简单类型、简单类型数组或字符串类型的 RDD 进行shuffle
- 在你的算子中使用了别人实现写的且没有实现Serializable,比如hadoop的Text。
- 算子函数使用到了外部的大对象情况。(类似广播变量,driver端传输到executor端)比如我们在外部自定义了一个Map对象,里面包含了100m的数据。然后,在算子函数里面,使用到了这个外部的大对象。此时用广播变量替代大对象。
内存调优
内存都花费在哪了
每个Java对象,都有一个对象头,会占用16个字节,主要是包括了一些对象的元信息,比如指向它的类的指针。如果一个对象本身很小,比如就包括了一个int类型的field,那么它的对象头实际上比对象自己还要大。
JAVA对象 = 对象头 + 实例数据 + 对象填充(补余用的,用于保证对象所占空间是8个字节的整数倍)
Java的String对象,会比它内部的原始数据,要多出40个字节。因为它内部使用char数组来保存内部的字符序列的,并且还得保存诸如数组长度之类的信息。而且因为String使用的是UTF-16编码,所以每个字符会占用2个字节。比如,包含10个字符的String,会占用60个字节。
Java中的集合类型,比如HashMap和LinkedList,内部使用的是链表数据结构,所以对链表中的每一个数据,都使用了Entry对象来包装。Entry对象不仅有对象头,还有指向下一个Entry的指针,通常占用8个字节。
元素类型为原始数据类型(比如int)的集合,内部通常会使用原始数据类型的包装类型,比如用Integer来存储元素。
下面将从 Spark 中内存管理的概述开始,然后我们讨论可以采取的特定策略,以更有效地使用内存。特别是,我们将描述如何确定对象的内存使用情况,以及如何改进它——通过更改数据结构或以序列化格式存储数据。然后我们将介绍调整 Spark 的缓存大小和 Java 垃圾收集器。
内存管理
元素类型为原始数据类型(比如int)的集合,内部通常会使用原始数据类型的包装类型,比如用Integer来存储元素。
Spark1.6及以后,引入的统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域,提供更好的性能。此种方式使得我们不需要修改内存比例。
如何判断你的程序消耗了多少内存
这里有一个非常简单的办法来判断,你的spark程序消耗了多少内存。
- 首先,自己设置RDD的并行度,有下列方法:
- 在parallelize()、textFile()等方法中,传入第二个参数,设置RDD的task 或 partition的数量;
- 用SparkConf.set()方法,设置一个参数,spark.default.parallelism,可以统一设置这个application所有RDD的partition数量。
- 其次,在程序中将RDD cache到内存中,调用RDD.cache()方法即可。(抽样sample,预估大小)
- 最后,观察web ui

val cacheRdd = rdd.cache() //应该根据这个地方cache的结果,进行内存的调节
cacheRdd.count()
优化数据结构
减少内存消耗的第一种方法是避免Java语法特性中所导致的额外内存的开销,比如基于指针的Java数据结构,以及包装类型。
有一个关键的问题,就是优化什么数据结构?其实主要就是优化你的算子函数,内部使用到的局部数据,或者是算子函数外部的数据。都可以进行数据结构的优化。优化之后,都会减少其对内存的消耗和占用。
优化方法:
- 能用数组取代,就不用集合。比如:用Array代替List。
- 能用字符串取代,就不用数组或集合。
- 能用int型取代,就不要用字符串;比如:Map的key可以用int取代字符串。
对多次使用的RDD进行持久化或Checkpoint
RDD 持久化:
如果程序中,对某一个RDD,基于它进行了多次transformation或者action操作。那么就非常有必要对其进行持久化操作,以避免对一个RDD反复进行计算。
此外,如果RDD的持久化数据可能会丢失的(因为使用cache的时候),还要保证高性能,那么可以对RDD进行Checkpoint操作。
checkpoint:
checkpoint的意思就是建立检查点,类似于快照,当DAG计算过程出现问题了就可以从这个快照中恢复,当然我们也可以通过cache或者persist将中间的计算结果放到内存或者磁盘中,但也未必完全可靠,假如内存或者硬盘坏了,也会导致spark从头再根据rdd计算一遍,所以就有了checkpoint,其中checkpoint的作用就是将DAG中比较重要的中间数据做一个检查点将结果存储到一个高可用的地方比如HDFS。
使用方法:
// 设置checkpoint
sc.setCheckpointDir("checkpointt dir")
// 创建rdd
val rdd = sc.textFile("input file")
// rdd 的一些操作
rdd2 = rdd.flatMap(_.split("\t")).map((_, 1)).reduceByKey(_ + _)
// 设置rdd2 持久化,防止向上追溯
val cache = rdd2.cache()
// 执行checkpoint
cache.checkpoint()
cache.count()
选择带有序列化的持久化级别
除了对多次使用的RDD进行持久化操作之外,还可以进一步优化其性能。如果RDD数据持久化到内存或磁盘时,如果内存不够就可能只缓存RDD的部分数据。
为了提高效率,可以采取序列化持久到内存,这样内存占用少。比如MEMORY_ONLY_SER、MEMORY_AND_DISK_SER等。

对于序列化的持久化级别,还可以使用Kryo序列化进一步优化,这样,可以获得更快的序列化速度,并且占用更小的内存空间。
JVM调优
Java虚拟机垃圾回收调优的背景
如果在持久化RDD的时候,持久化了大量的数据,那么Java虚拟机的垃圾回收就可能成为一个性能瓶颈。因为Java虚拟机会定期进行垃圾回收,此时就会追踪所有的java对象,并且在垃圾回收时,找到那些已经不在使用的对象,然后清理旧的对象,来给新的对象腾出内存空间。
垃圾回收的性能开销,是跟内存中的对象的数量,成正比的。所以,对于垃圾回收的性能问题,首先要做的就是,使用更高效的数据结构,比如array和string;其次就是在持久化rdd时,使用序列化的持久化级别,而且用Kryo序列化类库,这样,每个partition就只是一个对象——一个字节数组。
我们可以对垃圾回收进行监测,包括多久进行一次垃圾回收,以及每次垃圾回收耗费的时间。只要在spark-submit脚本中,增加一个配置即可,–conf “spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps”。
但是要记住,这里虽然会打印出Java虚拟机的垃圾回收的相关信息,但是是输出到了worker上的日志中,而不是driver的日志中。
其实完全可以通过SparkUI(4040端口)来观察每个stage的垃圾回收的情况。
spark.executor.extraJavaOptions是配置executor的jvm参数
spark.driver.extraJavaOptions是配置driver的jvm参数
垃圾回收机制

首先,Eden区域和Survivor1区域用于存放对象,Survivor2区域备用。创建的对象,首先放入Eden区域和Survivor1区域,如果Eden区域满了,那么就会触发一次Minor GC,进行年轻代的垃圾回收。Eden和Survivor1区域中存活的对象,会被移动到Survivor2区域中。然后Survivor1和Survivor2的角色调换,Survivor1变成了备用。
如果一个对象,在年轻代中,撑过了多次垃圾回收,都没有被回收掉,那么会被认为是长时间存活的,此时就会被移入老年代。此外,如果在将Eden和Survivor1中的存活对象,尝试放入Survivor2中时,发现Survivor2放满了,那么会直接放入老年代。此时就出现了,短时间存活的对象,进入老年代的问题。
如果老年代的空间满了,那么就会触发Full GC,进行老年代的垃圾回收操作。
高级垃圾回收调优
Spark如果发现,在task执行期间,大量full gc发生了,那么说明,年轻代的Eden区域,给的空间不够大。此时可以执行一些操作来优化垃圾回收行为:
- 包括降低存储内存的比例(spark.memory.storageFraction),给年轻代更多的空间,来存放短时间存活的对象;
- 当大对象很多,但minorGC少,说明大对象都进入了老年代,此时给Eden区域分配更大的空间,使用-Xmn(年轻代的heap大小)即可,通常建议给Eden区域,预计大小的4/3;
- 如果使用的是HDFS文件,那么很好估计Eden区域大小,如果每个executor有4个task,然后每个hdfs压缩块解压缩后是该压缩块大小的3倍,每个hdfs块的大小是128M,那么Eden区域的预计大小就是:4 * 3 * 128MB,然后呢,再通过-Xmn参数,将Eden区域大小设置为4 * 3 * 128* 4/3。
总结
根据经验来看,对于垃圾回收的调优,因为jvm的调优是非常复杂和敏感的。除非真的到了万不得已的地步,并且,自己本身又对jvm相关的技术很了解,那么此时进行Eden区域的调节是可以的。
一些高级的参数:
- -XX:SurvivorRatio=4:
设置年轻代中Eden区与Survivor区的大小比值。如果值为4,那么就是Eden跟两个Survivor的比例是4:2,也就是说每个Survivor占据的年轻代的比例是1/6,所以,你其实也可以尝试调大Survivor区域的大小。
- -XX:NewRatio=4:
调节新生代和老年代的比例。如果为4,则年轻代与年老代所占比值为1:4,年轻代占整个堆栈的1/5。
其它设置内存大小的参数:
- -Xms:为jvm启动时分配的内存,比如-Xms200m,表示分配200M。
- -Xmx:为jvm运行过程中分配的最大内存,比如-Xms500m,表示jvm进程最多只能够占用500M内存。
- -Xmn:年轻代的heap大小
- -Xss:为jvm启动的每个线程分配的内存大小
常用shuffle优化
shuffle是一个涉及到CPU(序列化反序列化)、网络IO(跨节点数据传输)以及磁盘IO(shuffle中间结果落盘)的操作。
优化思路:减少shuffle的数据量,减少shuffle的次数
具体方式:
- 能不shuffle的时候尽量不要shuffle数据,可以使用mapjoin(广播变量);
- 能用reduceByKey就不要用groupByKey,因为reducerByKey会在shuffle前进行本地聚合,可以使在shuffle过程中减少磁盘IO;
- spark2.0后已经没有HashShuffleManager,只有SortShuffleManager,SortShuffleManager内部有3种shuffle操作,可适应小中大集群。
- 参数调节:
- spark.reducer.maxSizeInFlight:reduce task的拉取缓存,默认48m
- spark.shuffle.file.buffer:map task的写磁盘缓存,默认32k
- spark.shuffle.io.maxRetries:拉取失败的最大重试次数,默认3次
- spark.shuffle.io.retryWait:拉取失败的重试间隔,默认5s
提高并行度(资源足够的情况下)
在执行任务过程中,Spark集群的资源并不一定会被充分利用到,所以要尽量设置合理的并行度,来充分地利用集群的资源。才能充分提高Spark应用程序的性能。
Spark会自动设置以文件作为输入源的RDD的并行度,依据其大小,比如HDFS,就会给每一个block创建一个partition,也依据这个设置并行度。对于reduceByKey等会发生shuffle的操作,就使用并行度最大的父RDD的并行度即可。
- 使用textFile()、parallelize()方法时的第二个参数设置并行度;
- 使用 coalesce 或 repartition 设置并行度;
- 使用像 reduceByKey的第二个参数设置并行度;
- 使用spark.default.parallelism参数,来设置统一的并行度。Spark官方的推荐是,给集群中的每个cpu core设置2~3个task。
比如说,spark-submit设置了executor数量是10个,每个executor要求分配2个core,那么application总共会有20个core。此时可以设置new SparkConf().set(“spark.default.parallelism”, “60”)来设置合理的并行度,从而充分利用资源。
推荐:一个CPU核对应2-3个task数。
任务运行数量与资源分配:
- Task被执行的并行度 = Executor数目 * 每个Executor核数(=core总个数)
- 当 executor数=2, 每个executor核数=1, task被执行的并行度= 2 * 1 = 2, 8个task就需要迭代4次。
- 当 executor数=2, 每个executor核数=2, task被执行的并行度= 2 * 2 = 4, 8个task就需要迭代2次
因为一个job会划分很多个阶段,所以没必要把所有阶段的task都占有一个CPU核,这样会极大的浪费资源。
分配资源时,尽量task数能整除开 task被执行的并行度,这样不会有CPU核空转。
- 比如 6 executor数=2, 每个executor核数=3, task被执行的并行度= 2 * 3 = 6, 那执行一次后,就有4个核空转,浪费资源。
广播共享数据
如果你的算子函数中,使用到了特别大的数据,那么这个时候,推荐将该数据进行广播。这样的话,就不至于将一个大数据拷贝到每一个task上去。而是给每个节点拷贝一份,然后节点上的task共享该数据。
这样就可以减少大数据在节点上的内存消耗。并且可以减少数据到节点的网络传输消耗。
数据本地化
数据本地化对于Spark Job性能有着巨大的影响。如果数据以及要计算它的代码是在一起的,那么性能当然会非常高。但是,如果数据和计算它的代码是分开的,那么其中之一必须到另外一方的机器上。通常来说,移动代码到其他节点,会比移动数据到代码所在的节点上去,速度要快得多,因为代码比较小。Spark也正是基于这个数据本地化的原则来构建task调度算法的。
数据本地化,指的是数据离计算它的代码有多近。基于数据距离代码的距离,有几种数据本地化级别:
- PROCESS_LOCAL:进程本地化。数据和计算它的代码在同一个JVM进程中。
- NODE_LOCAL:节点本地化。数据和计算它的代码在一个节点上,但是不在一个进程中,比如在不同的executor进程中,但是尽量在读取文件(HDFS文件的block)所在的机器上
- NO_PREF:对于task来说,数据从哪里获取都一样,没有好坏之分,比如从数据库中获取数据。
- RACK_LOCAL:机架本地化。数据和计算它的代码在一个机架上。
- ANY:数据和task可能在集群中的任何地方,而且不在一个机架中,性能最差。
Spark默认会等待一会儿,来期望task要处理的数据所在的节点上的executor空闲出一个cpu,从而将task分配过去。只要超过了时间,那么Spark就会将task分配到其他任意一个空闲的executor上。
可以设置参数,spark.locality系列参数,来调节Spark等待task可以进行数据本地化的时间。
spark.locality.wait(3000毫秒) spark.locality.wait.process spark.locality.wait.node spark.locality.wait.rack
数据倾斜
- 可以用hive进行发生倾斜的key做聚合
每几分钟使用hive聚合一下,作用有,但不大。
- 进行数据的清洗,把发生倾斜的刨除,用单独的程序去算倾斜的key。
- 提高shuffle并行度,用随机前缀
方法是打上随机前缀(加盐)先聚合一次,然后去掉随机前缀再聚合一次。适用场景groupby
- 指定“倍数”的数据扩容+随机“倍数”值前缀+自定义partition
适用场景join
- 缩小粒度,比如年月日 省市县。先按照日统计,再按照月,再按照年。前提是有层级关系
- 能不shuffle就不shuffle,能广播就广播
spark-streaming优化
Streaming应用程序中获得最佳性能,需要考虑两件事:
- 通过有效使用群集资源减少每批数据的处理时间。
- 设置正确的批处理大小,使得数据处理跟上数据摄取的速度。
带有receiver的数据接收并行度调优——多个DStream
通过网络接收数据时(比如Kafka、Flume),会将数据反序列化,并存储在Spark的内存中。如果数据接收成为系统的瓶颈,那么可以考虑并行化数据接收。每一个输入DStream都会在某个Worker的Executor上启动一个Receiver,该Receiver接收一个数据流。因此可以通过创建多个输入DStream,并且配置它们接收数据源不同的分区数据,达到接收多个数据流的效果。比如说,一个接收两个Kafka Topic的输入DStream,可以被拆分为两个输入DStream,每个分别接收一个topic的数据。这样就会创建两个Receiver,从而并行地接收数据,进而提升吞吐量。多个DStream可以使用union算子进行聚合,从而形成一个DStream。然后后续的transformation算子操作都针对该一个聚合后的DStream即可。
注意这种增加receiver的方法不适合DirectStream直连模式,因直连模式不需要Receiver。
val numStreams = 5
// 每个topic 创建流,流多receiver就多
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
设置socket的revicer

带有receiver的数据接收并行度调优——blockinterval
数据接收并行度调优,除了创建更多输入DStream和Receiver以外,还可以考虑调节block interval。通过参数,spark.streaming.blockInterval,可以设置block interval,默认是200ms。对于大多数Receiver来说,都会将数据切分为一个一个的block。而每个batch中的block数量,则决定了该batch对应的RDD的partition的数量,以及针对该RDD执行transformation操作时,创建的task的数量。每个batch对应的task数量是大约估计的,即batch interval / block interval。
例如说,batch interval为2s,block interval为200ms,会创建10个task。如果你认为每个batch的task数量太少,即低于每台机器的cpu core数量,那么就说明batch的task数量是不够的,因为所有的cpu资源无法完全被利用起来。要为batch增加block的数量,那么就减小block interval。然而,推荐的block interval最小值是50ms,如果低于这个数值,那么大量task的启动时间,可能会变成一个性能开销点。
现在是没有数据接收,所以receiver就是skipped,那这个task的并行度其实是根据任务的cpu core来定,默认情况下,当然可以通过设置spark.streaming.blockInterval来自己指定任务数。
receiver task的并行度是由bacth inerval/block interval决定,初始没有数据的时候,task的数量是由 cpu core来定,随着数据量越来越大,task的数量也在增加,当数据量达到一定规模,task数就能达到 bacth inerval/block interval 数量。

数据接收并行度调优——task
如果每秒钟启动的task过于多,比如每秒钟启动50个,那么发送这些task去Worker节点上的Executor的性能开销,会比较大,而且此时基本就很难达到毫秒级的延迟了。当然也要结合优化你的数据结构,尽量减少序列化后task的大小(注意:使用Kryo序列化只能优化shuffle数据不能用来序列化task),从而减少发送这些task到各个Worker节点上的Executor的时间。可以将每个batch的处理时间减少100毫秒。
数据处理并行度调优
如果在计算的任何stage中使用的并行task的数量没有足够多,那么集群资源是无法被充分利用的。举例来说,对于分布式的reduce操作,比如reduceByKey和reduceByKeyAndWindow,默认的并行task的数量是由spark.default.parallelism参数决定的。你可以在reduceByKey等操作中,传入第二个参数,手动指定该操作的并行度,也可以调节全局的spark.default.parallelism参数。

数据序列化调优
数据序列化造成的系统开销可以由序列化格式来减小。在流传输过程中,有两种类型的数据需要序列化:
输入数据:
默认情况下,接收到的输入数据,是存储在Executor的内存中的,使用的持久化级别是StorageLevel.MEMORY_AND_DISK_SER_2。这意味着,数据被序列化为字节从而减小GC开销,并且会复制其它executor进行失败的容错。因此,数据首先会存储在内存中,然后在内存不足时会溢写到磁盘上,从而为流式计算来保存所有需要的数据。这里的序列化有明显的性能开销——Receiver必须反序列化从网络接收到的数据,然后再使用Spark的序列化格式序列化数据。
流式计算操作生成的持久化RDD:
流式计算操作生成的持久化RDD,可能会持久化到内存中。例如,窗口操作默认就会将数据持久化在内存中,因为这些数据后面可能会在多个窗口中被使用,并被处理多次。流式计算操作生成的RDD的默认持久化级别是StorageLevel.MEMORY_ONLY_SER ,默认就会减小GC开销。
在上述的场景中,使用Kryo序列化类库可以减小CPU和内存的性能开销。使用Kryo时,一定要考虑注册自定义的类,并且禁用对应引用的tracking(spark.kryo.referenceTracking)。
- spark.kryo.referenceTracking
true : 当用Kryo序列化时,跟踪是否引用同一对象。如果你的对象图有环,这是必须的设置。如果他们包含相同对象的多个副本,这个设置对效率是有用的。如果你知道不在这两个场景,那么可以禁用它以提高效率
batch interval调优(最重要)
如果想让一个运行在集群上的Spark Streaming应用程序可以稳定,它就必须尽可能快地处理接收到的数据。换句话说,批量数据的处理速度应与生成它们的速度一样快。对于一个应用来说,可以通过观察Spark UI上的batch处理时间来定。batch处理时间必须小于batch interval时间。
基于流式计算的本质,batch interval对于,在固定集群资源条件下,应用能保持的数据接收速率,会有巨大的影响。例如,在WordCount例子中,对于一个特定的数据接收速率,应用业务可以保证每2秒打印一次单词计数,而不是每500ms。因此batch interval需要被设置得,让预期的数据接收速率可以在生产环境中保持住。
为你的应用计算正确的batch大小的比较好的方法,是在一个很保守的batch interval,比如5~10s,以很慢的数据接收速率进行测试。要检查应用是否跟得上这个数据速率,可以检查每个batch的处理时间的延迟,如果处理时间与batch interval基本吻合,那么应用就是稳定的。否则,如果batch调度的延迟持续增长,那么就意味应用无法跟得上这个速率,也就是不稳定的。因此你要想有一个稳定的配置,可以尝试提升数据处理的速度,或者增加batch interval。记住,由于临时性的数据增长导致的暂时的延迟增长,可以合理的,只要延迟情况可以在短时间内恢复即可。
测试代码:
package sparkstreaming
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Duration, Durations, StreamingContext}
object SparkStreamingSocket {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingSocket")
// 设置初始并行度5,初始是5个分区
// conf.set("spark.default.parallelism","5")
// conf.set("spark.streaming.blockInterval", "1000ms")
// 创建StreamingContext
// 批次间隔时间5秒,也就是说每5秒攒一批数据并处理
val ssc = new StreamingContext(conf,Durations.seconds(1))
// socket流的缓存级别: storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
val inputDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 6666)
// DStream 转换操作
val reduceByKeyDS: DStream[(String, Long)] = inputDS.countByValue()
reduceByKeyDS.foreachRDD((rdd,t) =>{
// 使每批次数据处理达到2s以上
Thread.sleep(2000)
println(s"count time:${t}\t${rdd.collect().toBuffer}")
})
// 启动流式计算
ssc.start()
// 阻塞一直运行下去,除非异常退出或关闭
ssc.awaitTermination()
}
}
内存调优
Spark Streaming应用需要的集群内存资源,是由使用的transformation操作类型决定的。举例来说,如果想要使用一个窗口长度为10分钟的window操作,那么集群就必须有足够的内存来保存10分钟内的数据。如果想要使用updateStateByKey来维护许多key的state,那么你的内存资源就必须足够大。反过来说,如果想要做一个简单的map-filter-store操作,那么需要使用的内存就很少。
通常来说,通过Receiver接收到的数据,会使用StorageLevel.MEMORY_AND_DISK_SER_2持久化级别来进行存储,因此无法保存在内存中的数据会溢写到磁盘上。而溢写到磁盘上,是会降低应用的性能的。因此,通常是建议为应用提供它需要的足够的内存资源。建议在一个小规模的场景下测试内存的使用量,并进行评估。
内存调优的另外一个方面是垃圾回收。对于流式应用程序,如果要获得低延迟,肯定不想要有因为JVM垃圾回收导致的长时间延迟。
减少存储空间的方式:
- DStream的持久化:输入数据和某些操作生产的中间RDD,默认持久化时都会序列化为字节。与非序列化的方式相比,这会降低内存和GC开销。使用Kryo序列化机制可以进一步减少内存使用和GC开销。
- 进一步降低内存使用率,可以对数据进行压缩,由spark.rdd.compress参数控制(默认false)。
