Spark 笔记 – RDD编程

RDD编程

首先我们使用spark-shell进行编程。

RDD创建

textFile()

从文件系统加载数据创建RDD

val conf = new SparkConf().setAppName("rddtest").setMaster("lcoal")
val sc: SparkContext = new SparkContext(conf)
val data: RDD[String] = sc.textFile("E:\\tmp\\spark\\input\\f1.txt")

parallelize()|markRDD

集合并行化,从一个已经存在的集合上创建RDD、

val arr = Array(1,2,3,4,5)
val data: RDD[Int] = sc.parallelize(arr)
println(data.count()) // 统计RDD元素个数

parallelize 和makeRDD是一个算子,makeRDD其实就是parallelize,查看源码可以看到,如下:

分区个数

RDD默认带有分区的,那么创建完毕rdd以后他的分区数量判定如下:

从hdfs读取文件的方式是最正规的方式,我们通过计算原理可以推出block的个数和分区数量是一致的,本地化计算。

读取文件那么是使用textFile,查看源码如下:

我们可以发现数据的读取使用的是textInputFormat,读取的数据内容是文本

数据结构的实现由上图进行规划。

FileInputFormat中对于文件的切分进行了分割,切分几个部分就可以实现分区的个数。

文件在hdfs存储的文件在spark中的使用是不同的,存储的时候单位是block块 128M。读取的时候是以spark为主,spark的读取大小叫做split切片。

千万要将存储和计算区分开,分区的多少完全看切片是多少,和hdfs的存储无关,但是如果切片大小和block的大小一致的话那么就可以实现本地化计算。

从TextInputFormat中LineRecordReader读取器方法查看源码,可以找到getSplits

split分区的源码解析

具体split分区的源码解析如下:

// 这个方法是切分切片的方法  
public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
    StopWatch sw = new StopWatch().start();
    FileStatus[] stats = listStatus(job);

    // Save the number of input files for metrics/loadgen
    job.setLong(NUM_INPUT_FILES, stats.length);
    long totalSize = 0;                           // compute total size
    boolean ignoreDirs = !job.getBoolean(INPUT_DIR_RECURSIVE, false)
      && job.getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
// 获取文件总大小,检查文件类型
    List<FileStatus> files = new ArrayList<>(stats.length);
    for (FileStatus file: stats) {                // check we have valid files
      if (file.isDirectory()) {
        if (!ignoreDirs) {
          throw new IOException("Not a file: "+ file.getPath());
        }
      } else {
        files.add(file);
        totalSize += file.getLen();
      }
    }
// numSplits 期望分区大小,sc.textFile("a.txt", 3(期望分区数量))
    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

    // generate splits
    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
    NetworkTopology clusterMap = new NetworkTopology();
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        FileSystem fs = path.getFileSystem(job);
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
// 判断文件是否可以切分       
 if (isSplitable(fs, path)) {
// 获取blockSize 的大小
          long blockSize = file.getBlockSize();
// 通过goalSize, minSize, blockSize计算一个切片的大小
          long splitSize = computeSplitSize(goalSize, minSize, blockSize);
// 文件的整体长度
          long bytesRemaining = length;
// 每次按照切片生成任务,现在是形成切片
// 如果剩余文件大小与切片大小的比例小于1.1
// 我们就会将文件不再切片,放到一起,节约资源,小文件不再生成一个多余任务
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
// 首先拿整体的大小减去切片大小,形成一块
                length-bytesRemaining, splitSize, clusterMap);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                splitHosts[0], splitHosts[1]));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
                - bytesRemaining, bytesRemaining, clusterMap);
            splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
                splitHosts[0], splitHosts[1]));
          }
        } else {
          if (LOG.isDebugEnabled()) {
            // Log only if the file is big enough to be splitted
            if (length > Math.min(file.getBlockSize(), minSize)) {
              LOG.debug("File is not splittable so no parallelization "
                  + "is possible: " + file.getPath());
            }
          }
// 比如压缩文件就是不可切分的,如果不可切分就把整个文件当作一个整体输入
          String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
          splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    }
    return splits.toArray(new FileSplit[splits.size()]);
  }
// 计算规则是按照三个值进行计算,block大小、期望切片大小取最小值,但是必须大于minSize  protected long computeSplitSize(long goalSize, long minSize,
                                       long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));
  }

案例如下:

在rdd中查看partition数量的语句如下:

res0.partitions.size

在读取hdfs的文件的时候,一般文件都比较大,所以期望分区在不设定的时候默认值是2,切片大小肯定大于128M,那么以128M为主肯定切片和block的数量是一致的。

集合并行化情况

根据集群中的核数进行适配,启动的时候有几个核,产生分区数量就是几个。因为在计算的过程中,我们是为了做测试,为了达到最大的性能,所以分区数量会自己适配。

scala> sc.makeRDD(Array(1,2,3,4,5,6), 10)
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24

scala> sc.makeRDD(Array(1,2,3,4,5,6), 100)
res1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24

scala> res0.partitions.size
res2: Int = 10

scala> res1.partitions.size
res3: Int = 100

RDD操作

RDD操作包括两种类型,即转换(Transformation)操作和行动(Action)操作。

对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个操作使用。RDD的转换过程是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,值遇到行动操作时,才会触发“从头到尾”的真正的计算。

常用转换操作

map(func)

将函数应用于RDD中的每个元素,将返回值构成新的RDD。

scala> sc.makeRDD(Array("1 2 3","4 5 6"),2)
res5: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at makeRDD at <console>:24
// 这里实际上不是没有split的,只有在有action算子的时候,才会执行。
scala> res5.map(s => s.split(" "))
res6: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[4] at map at <console>:24

scala> res6.foreach(println)

// 进入WEB UI,查看返回结果
// [Ljava.lang.String;@7fe1314b
// [Ljava.lang.String;@6433946c

还可以查看到对应的DAG图

println 作用于 Array[String],它会打印 Array 的默认 toString 格式,而不会逐个打印数组中的元素。因此,会显示这样的输出。

如果希望看到具体的数组内容,可以使用 mkString 来转换数组为字符串。

res6.foreach(arr => println(arr.mkString(",")))
//输出为
//1,2,3
//4,5,6

这样查看输出需要每次都打开WEB UI查看,所以可以使用collect()将数据拉回到驱动端,逐行打印数组内容。

res6.collect
// 返回结果
// res9: Array[Array[String]] = Array(Array(1, 2, 3), Array(4, 5, 6))
res6.collect().foreach(arr => println(arr.mkString(",")))
// 返回结果
// 1,2,3
// 4,5,6

小心使用collect,收集回来的数据在driver,rdd是分布式的,但是driver都是放入到内存中,这个数据会造成内存溢出的问题,数据比较小的测试的内容可以收集回来。

flatMap(func)

将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD 执行过程:map—> flat(拍扁)。

案例1

scala> val arr = Array("hello tom", "hello jack hello world", "tom world")
arr: Array[String] = Array(hello tom, hello jack hello world, tom world)

scala> sc.makeRDD(arr)
res10: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at makeRDD at <console>:25

scala> res10.flatMap(_.split(" "))
res11: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at flatMap at <console>:24

scala> res11.collect
res12: Array[String] = Array(hello, tom, hello, jack, hello, world, tom, world)

案例2

scala> val arr = Array("zhangsan 100 90 80 70", "lisi 60 50 40")
arr: Array[String] = Array(zhangsan 100 90 80 70, lisi 60 50 40)

scala> sc.makeRDD(arr)
res4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at makeRDD at <console>:25

scala> res2.flatMap(x => {
     | val arr = x.split(" ")
     | arr.tail.map((arr.head, _))
     | })
res5: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at flatMap at <console>:25

scala> res5.collect
[Stage 0:>                                                          (0 + 6) / 6[Stage 0:=======================================>                   (4 + 2) / 6                                                                               res6: Array[(String, String)] = Array((zhangsan,100), (zhangsan,90), (zhangsan,80), (zhangsan,70), (lisi,60), (lisi,50), (lisi,40))

filter(func)

返回一个由通过func函数测试(返回true)的元素组成的RDD。

scala> val arr = Array(1,2,3,4,5,6,7,8)
arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8)

scala> sc.makeRDD(arr)
res7: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at makeRDD at <console>:25

scala> res7.filter(_>3)
res8: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at filter at <console>:24

scala> res8.collect
res9: Array[Int] = Array(4, 5, 6, 7, 8)
// 元素没了,分区还在,默认就是全部核数,spark-shell指定了六个核,所以六个分区
scala> res7.partitions.size
res12: Int = 6

scala> res8.partitions.size
res13: Int = 6

分区在创建rdd的时候就已经存在了,rdd上必须存在分区,因为可以在不同的机器上进行并行执行。

在计算过程中,有的时候分区数量可能过少或者过多。比如

  1. 读取的数据需要进行机器学习计算,一个block里面的128M数据只能给一个任务处理,这个时候需要增加分区。
  2. 或者filter在过滤数据的时候,将大量的脏数据过滤掉了,大量的分区中,存在数据量很少。任务处理的时候就会浪费资源

当遇到这些情况就需要修改分区的数量。map,flatMap,filter等算子都是简单的管道形式算子,加载到内存中直接执行,不会将数据分发到别的机器,他们没有修改分区的能力。

只有shuffle类的算子能够修改分区的数量。

distinct

对rdd进行去重

scala> val arr = Array(1,1,2,2,2,3,3,3,4,4,5,5,6,6)
arr: Array[Int] = Array(1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 5, 5, 6, 6)

scala> sc.makeRDD(arr)
res14: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at <console>:25

scala> res14.distinct
res15: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at distinct at <console>:24

scala> res15.collect
res16: Array[Int] = Array(6, 1, 2, 3, 4, 5)

distinct的底层是通过分组生成的,分组存在shuffle,所以可以修改分区数量。

这个去重也可以通过groupBy来实现,实现的方式如下:

scala> sc.makeRDD(arr)
res17: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at makeRDD at <console>:25
// 6个分区
scala> res17.groupBy(t => t)
res18: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[11] at groupBy at <console>:24
// 6个分区,所以一共12个分区,12个任务
scala> res18.map(_._1).collect
res19: Array[Int] = Array(6, 1, 2, 3, 4, 5)

可以看到返回的结果是相同的。

修改分区数量的方式,增加减少都可以:

scala> sc.makeRDD(arr,3)
res21: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at makeRDD at <console>:25

scala> res21.distinct(5)
res22: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[17] at distinct at <console>:24

scala> res22.partitions.size
res23: Int = 5

mapPartitions()

先partition,再把每个partition进行map函数。

scala> sc.makeRDD(arr)
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25

scala> res0.map(_*2)
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:24

scala> res0.map
map   mapPartitions   mapPartitionsWithEvaluator   mapPartitionsWithIndex

scala> res0.map

def map[U](f: Int => U)(implicit evidence$3: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[U]

scala> res0.mapPartitions
mapPartitions   mapPartitionsWithEvaluator   mapPartitionsWithIndex

scala> res0.mapPartitions

def mapPartitions[U](f: Iterator[Int] => Iterator[U],preservesPartitioning: Boolean)(implicit evidence$6: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[U]

scala> res0.mapPartitions(it => it.map(_*2))
res3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at mapPartitions at <console>:24

scala> res3.collect
res4: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16)

通过输出可以看出,mapPartitions里面是一个一个的Iterator迭代器,map中是一个一个的元素。每个分区都有Iterator他们是一个整体处理的。

mappartitions和map几乎一样,只不过mapPartitions是一个分区作为一个整体进行处理的。

连接处理的时候需要mapPartitions

案例如下:

  1. 首先创建data文件夹然后创建order.txt,输入如下内容
    • 001,1,2,5,5000
    • 002,2,3,6,6000
    • 003,3,4,7,7000
    • 004,4,5,8,8000
  2. 其中列含义order_id[订单编号],user_id[用户id],goods_id[商品id],number[数量],price[价格]
  3. 在数据库中创建一个用户表 user
    • id name age
    • 1 zhangsan 20
    • 2 lisi 30
    • 3 wangwu 40
    • 4 zhaosi 50

根据order.txt中的订单内容,通过里面的用户id去表中查询用户的名称。

object Test {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("test mapPartitions")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.textFile("data/order.txt")
      .map(t=>{
        val strs = t.split(",")
        (strs(0),strs(1).toInt,strs(2).toInt,strs(3).toInt,strs(4).toDouble)
        //orderId userId goodsId number price
      }).map(t=>{
      val con = DriverManager.getConnection("jdbc:mysql:/127.0.0.1:3306/lmk","root","rootroot")
      val prp = con.prepareStatement("select name from hainiu.user where id = ?")
      val userid = t._2
      prp.setInt(1,userid)
      val result = prp.executeQuery()
      var name:String = null
      while(result.next()){
        name = result.getString("name")
      }
      prp.close()
      con.close()
      (t._1,t._2,name,t._3,t._4,t._5)
    }).foreach(println)
  }
}

以上代码不可以这么使用,因为map中一条元素会和mysql创建一个连接,元素在生产环境中,可能达到几千万,mysql是不支持这么大量的链接,并且每次都创建链接性能非常低下。

那么如果我们想要将连接数据库的con 和prp提出来是不可以的,因为如果不在sc和rdd中执行,是在driver本地执行,不上集群的。

connect对象不能提取到公共变量中,executor端想要远程使用driver端的对象是需要序列化的,连接对象是不能序列化的。

优化后的代码如下:

object Test {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("test mapPartitions")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.textFile("data/order.txt")
      .map(t=>{
        val strs = t.split(",")
        (strs(0),strs(1).toInt,strs(2).toInt,strs(3).toInt,strs(4).toDouble)
        //orderId userId goodsId number price
      }).mapPartitions(it=>{
      val con = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/lmk", "root", "rootroot")
      val prp = con.prepareStatement("select name from hainiu.user where id = ?")
      val buffer = ListBuffer[(String,Int,String,Int,Int,Double)]()
      it.foreach(t=>{
        val userid = t._2
        prp.setInt(1, userid)
        val result = prp.executeQuery()
        var name: String = null
        while (result.next()) {
          name = result.getString("name")
        }
        buffer.append((t._1, t._2, name, t._3, t._4, t._5))
      })
      prp.close()
      con.close()
      buffer.toIterator
    }).foreach(println)
  }
}

使用mapPartitions算子,一个分区创建一个连接,连接数量减少,并且不涉及到序列化问题。

mapPartitionsWithIndex

这个算子中存在两个参数的分别是分区下标和分区中整体的元素。

scala> val arr=Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> sc.makeRDD(arr, 3)
res1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:25

scala> res1.mapPartitionsWithIndex

// Int是分区下标,Iterator[Int]是分区元素
def mapPartitionsWithIndex[U](f: (Int, Iterator[Int]) => Iterator[U],preservesPartitioning: Boolean)(implicit evidence$9: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[U]

scala> res1.mapPartitionsWithIndex((index,it)=> {
     | it.map((_,index))
     | })
res4: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at mapPartitionsWithIndex at <console>:24
// 输出的就是分区和下标的映射关系
scala> res4.collect
res5: Array[(Int, Int)] = Array((1,0), (2,0), (3,0), (4,1), (5,1), (6,1), (7,2), (8,2), (9,2))
// 如果先进行过滤,再查看分区的情况
scala> res1.filter(_>3)
res8: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at filter at <console>:24

scala> res8.mapPartitionsWithIndex((index, it) => {
     | it.map((_, index))
     | })
res9: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[4] at mapPartitionsWithIndex at <console>:24
// 可以发现,0号分区还在,只是没有数据了,不然就没有2号分区
scala> res9.collect
res10: Array[(Int, Int)] = Array((4,1), (5,1), (6,1), (7,2), (8,2), (9,2))

之前使用的算子saveAsTextFile,保存数据的算子,会将结果数据存储到hdfs的目录中。

案例:使用mapPartitionsWithIndex写出数据到hdfs中。

package com.lmk.spark

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}

import java.io.PrintWriter

object TextMapPartitionsWithIndex {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("sink")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)
    val arr = Array(1,2,3,4,5,6,7,8,9)
    val sink_dir = "data/res"
    // rdd中的fs涉及到序列化问题,所以不能使用
    val fs1 = FileSystem.get(new Configuration())
    if(fs1.exists(new Path(sink_dir)))
      throw new Exception("output path already exists ")
    else
      fs1.mkdirs(new Path(sink_dir))
    fs1.close()
    sc.makeRDD(arr, 3).mapPartitionsWithIndex((index, it) => {
      val fs = FileSystem.get(new Configuration())
      val out = fs.create(new Path(sink_dir+"/part-0000"+index))
      val pw = new PrintWriter(out, true)
      it.foreach(line => pw.println(line))
      pw.close()
      out.close()
      fs.close()
      Iterator.empty
    }).foreach((t:String)=>{})
  }
}

键值对(key, value)RDD操作

键值对RDD(pair RDD)是指每个RDD元素都是(key, value)键值对类型。

函数目的
reduceByKey(func)合并具有相同键的值,RDD[(K,V)] => RDD[(K,V)]按照key进行分组,并通过func 进行合并计算
groupByKey()对具有相同键的值进行分组,RDD[(K,V)] => RDD[(K, Iterable)]只按照key进行分组,不对value合并计算
mapValues(func)对 PairRDD中的每个值应用一个函数,但不改变键不会对值进行合并计算
flatMapValues(func)对PairRDD 中的每个值应用一个返回迭代器的函数,然后对返回的每个元素都生成一个对应原键的键值对记录
keys()返回一个仅包含键的 RDD,RDD[(K,V)] => RDD[K]返回键不去重
values()返回一个仅包含值的 RDD,RDD[(K,V)] => RDD[V]
sortByKey()返回一个根据键排序的 RDD,默认是升序false:降序
subtractByKey(other)删掉RDD中键与other RDD中的键相同的元素
cogroup将两个RDD中拥有相同键的数据分组到一起,RDD[(K,V)],RDD[(K, W)] => RDD[(K, (Iterable,Iterable))]
join(other)对两个RDD进行内连接,RDD[(K,V)],RDD[(K, W)] => RDD[(K, (V, W))]相当于MySQL 的 innerjoin
rightOuterJoin对两个RDD进行右连接,RDD[(K,V)],RDD[(K, W)] => RDD[(K, (Option[V], W))]相当于MySQL 的 rightjoin
leftOuterJoin对两个RDD进行左连接,RDD[(K,V)],RDD[(K, W)] => RDD[(K, (V, Option[W]))]相当于MySQL 的 leftjoin

reduceByKey(func)

按照key进行合并,不仅仅能够分组还能够合并。因为有分组聚合,shuffle类算子可以修改分区数量。

scala> sc.textFile("/test/a.txt")
res15: org.apache.spark.rdd.RDD[String] = /test/a.txt MapPartitionsRDD[13] at textFile at <console>:24

scala> res15.flatMap(_.split(" "))
res16: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[14] at flatMap at <console>:24

scala> res16.map((_, 1))
res17: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[15] at map at <console>:24
// 对key进行聚合,每次拿两个value进行操作。
// res17.reduceByKey(_+_, 4) 可以修改分区数量
scala> res17.reduceByKey(_+_)
res18: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[16] at reduceByKey at <console>:24

scala> res18.collect
res19: Array[(String, Int)] = Array((tom,10), (hello,20), (wrold,10))

reduceByKey实现distinct功能

scala> val arr = Array(1,1,2,2,3,3,4,4,5,5,6,6)
arr: Array[Int] = Array(1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6)

scala> sc.makeRDD(arr)
res20: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at makeRDD at <console>:25

scala> res20.map((_, null))
res21: org.apache.spark.rdd.RDD[(Int, Null)] = MapPartitionsRDD[18] at map at <console>:24

scala> res21.reduceByKey((a, b) => a)
res22: org.apache.spark.rdd.RDD[(Int, Null)] = ShuffledRDD[19] at reduceByKey at <console>:24

scala> res22.map(_._1)
res23: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[20] at map at <console>:24

scala> res23.collect
res24: Array[Int] = Array(6, 1, 2, 3, 4, 5)

其实distinct也是使用reduceByKey实现的,源码如下:

  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = {
      // Create an instance of external append only map which ignores values.
      val map = new ExternalAppendOnlyMap[T, Null, Null](
        createCombiner = _ => null,
        mergeValue = (a, b) => a,
        mergeCombiners = (a, b) => a)
      map.insertAll(partition.map(_ -> null))
      map.iterator.map(_._1)
    }
    partitioner match {
      case Some(_) if numPartitions == partitions.length =>
        mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true)
      case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
    }
  }

groupByKey()

它和groupBy相比返回值 RDD[k,Iterable[v]],因为它也是shuffle类算子可以修改分区数量。

scala> sc.textFile("/test/a.txt")
res0: org.apache.spark.rdd.RDD[String] = /test/a.txt MapPartitionsRDD[1] at textFile at <console>:24

scala> res0.flatMap(_.sp)
span   split   splitAt

scala> res0.flatMap(_.split(" "))
res1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:24

scala> res1.map((_, 1))
res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:24

scala> res2.groupBy(_._1)
res4: org.apache.spark.rdd.RDD[(String, Iterable[(String, Int)])] = ShuffledRDD[5] at groupBy at <console>:24
-- 上面返回Iterable[(String, Int)])类型,不好处理,所以换一个返回int类型的
-- groupByKey必须使用(k,v)类型的数据,res2.groupByKey(2)修改分区数量
scala> res2.groupByKey()
res6: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[6] at groupByKey at <console>:24

scala> res6.mapValues(_.sum)
res7: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[7] at mapValues at <console>:24

scala> res7.collect
res8: Array[(String, Int)] = Array((tom,10), (hello,20), (wrold,10))

groupBykey也可以实现distinct去重

scala> val arr = Array(1,1,2,2,3,3,4,4,5,5,6,6)
arr: Array[Int] = Array(1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6)

scala> sc.makeRDD(arr)
res9: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at makeRDD at <console>:25

scala> res9.map((_,null))
res10: org.apache.spark.rdd.RDD[(Int, Null)] = MapPartitionsRDD[9] at map at <console>:24

scala> res10.groupByKey()
res12: org.apache.spark.rdd.RDD[(Int, Iterable[Null])] = ShuffledRDD[10] at groupByKey at <console>:24

scala> res12.map(_._1)
res13: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[11] at map at <console>:24

scala> res13.collect
res14: Array[Int] = Array(6, 1, 2, 3, 4, 5)

reduceByKey(func)与groupByKey()的区别

公共源码如下:

  def combineByKeyWithClassTag[C](
// 初始值
      createCombiner: V => C,
// 分区内算法,rdd中带有分区,首先使用mergeValue对分区内数据进行的聚合
      mergeValue: (C, V) => C,
// 分区间算法,然后对分区间的数据聚合
      mergeCombiners: (C, C) => C,
// 拉取数据用的partitioner
      partitioner: Partitioner,
// map端进行单独聚合,map端写入数据前是否进行combiner操作
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
    if (keyClass.isArray) {
      if (mapSideCombine) {
        throw SparkCoreErrors.cannotUseMapSideCombiningWithArrayKeyError()
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError()
      }
    }
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }

groupByKey源码如下:

  def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createCombiner = (v: V) => CompactBuffer(v)
// 只是将按照key作一个归类,不对value进行操作
// 分区内算法是把每个元素加入到ListBuffer里面,数据会越来越多
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
// map端(上游)不combiner,reduce端(下游)拉取的数据量大
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
  }

reduceByKey源码如下:

与上面的groupByKey相比,空间使用小了。

  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
// 两个func 分前后为分区内算法和分区间算法
// map端(上游)有combiner,reduce端(下游)拉取的数据量小
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }

groupByKey和reduceByKey使用案例

需求:首先求出整个文档中访问量最高的前三个,每个专业中访问量最高的前两个

数据准备,内容大致如下:

http://hive.lmkspaceol.cn/lmk
http://hive.lmkspaceol.cn/lmk
http://hive.lmkspaceol.cn/lmk
http://hive.lmkspaceol.cn/lmk
http://hive.lmkspaceol.cn/lmk
http://hive.lmkspaceol.cn/l
http://hive.lmkspaceol.cn/l
http://hive.lmkspaceol.cn/l
http://hive.lmkspaceol.cn/l
http://hive.lmkspaceol.cn/lm
http://hive.lmkspaceol.cn/l
http://spark.lmkspaceol.cn/l
http://spark.lmkspaceol.cn/l

整个文档中访问量最高的前三个代码如下:

package com.lmk.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import java.net.URL

object GroupTest {
  def main(args: Array[String]): Unit = {
    //val url = new URL("http://hive.lmkspaceol.cn/lmk")
    //println(url.getHost.split("\\.")(0))
    // hive.lmkspaceol.cn \\是正则的转义
    //println(url.getPath.substring(1))
    // /lmk
    val conf = new SparkConf()
    conf.setMaster("local[*]")
    conf.setAppName("grouptest")
    val sc = new SparkContext(conf)
    val userVisit:RDD[((String, String), Int)] = sc.textFile("data/grouptest.txt")
      .map(t => {
        val url = new URL(t)
        val subject = url.getHost.split("\\.")(0)
        val user = url.getPath.substring(1)
        ((subject, user), 1)
      }).reduceByKey(_+_)
    // 不加- 是正序
    userVisit.sortBy(-_._2)
      .take(3)
      .foreach(println)
  }
}

返回结果为

((zookeeper,lmk),6)
((hive,l),5)
((hdfs,lm),5)

每个专业中访问量最高的前两个代码如下

// 不加- 是正序
//    userVisit.sortBy(-_._2)
//      .take(3)
//      .foreach(println)
    // (String, String), Int) 变成了 (String, (String, Int))
    userVisit.map(x => (x._1._1, (x._1._2, x._2)))
      // (String, Iterable[(String, Int)])
      .groupByKey()
      // (String, List[(String, Int)]) take取几个
      .map(x => (x._1, x._2.toList.sortBy(-_._2).take(2)))
      .foreach(println)

输出结果为:

(spark,List((lmk,4), (lm,4)))
(hive,List((l,5), (lmk,5)))
(hdfs,List((lm,5)))
(zookeeper,List((lmk,6), (l,4)))

keys

只获取key的值

scala> sc.makeRDD(List((1, "zs"), (2, "ls"), (3, "ww")))
res11: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[2] at makeRDD at <console>:24

scala> res11.keys
res12: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at keys at <console>:24

scala> res12.collect
res13: Array[Int] = Array(1, 2, 3)

当然,使用map获取也可以

scala> res11.map(_._1)
res19: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:24

scala> res19.collect
res20: Array[Int] = Array(1, 2, 3)

values

只获取value的值,同样也可以使用map获取。

scala> res11.values
res23: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at values at <console>:24

scala> res23.collect
res24: Array[String] = Array(zs, ls, ww)

mapValues(func)

处理map中value的值。

scala> val arr = Array(("zs", 3000), ("ls", 4000), ("ww", 5000))
arr: Array[(String, Int)] = Array((zs,3000), (ls,4000), (ww,5000))

scala> sc.makeRDD(arr)
res25: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at makeRDD at <console>:25

scala> res25.mapValues(_+1000)
res26: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[7] at mapValues at <console>:24

scala> res26.collect
res27: Array[(String, Int)] = Array((zs,4000), (ls,5000), (ww,6000))

flatMapValues(func)

只对value压平,前提要是kv格式,key直接保留。flatMapValues中放入的函数功能是怎么处理value变成集合形式。

scala> val arr = Array(("zs", Array(100, 93, 90)), ("ls", Array(23, 44, 94, 28)))
arr: Array[(String, Array[Int])] = Array((zs,Array(100, 93, 90)), (ls,Array(23, 44, 94, 28)))

scala> sc.makeRDD(arr)
res33: org.apache.spark.rdd.RDD[(String, Array[Int])] = ParallelCollectionRDD[10] at makeRDD at <console>:25
// 内部的公式是拿到_2,也就是数组,然后对数组进行map操作
scala> res33.flatMap(x => x._2.map((x._1, _)))
res34: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[11] at flatMap at <console>:24

scala> res34.collect
res35: Array[(String, Int)] = Array((zs,100), (zs,93), (zs,90), (ls,23), (ls,44), (ls,94), (ls,28))
// 必须写一个公式,就是直接把值落下来
scala> res33.flatMapValues(t => t)
res36: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[12] at flatMapValues at <console>:24

scala> res36.collect
res37: Array[(String, Int)] = Array((zs,100), (zs,93), (zs,90), (ls,23), (ls,44), (ls,94), (ls,28))

案例2

scala> val arr = Array(("zs", "100, 23, 44"), ("ls", "83, 94, 23"))
arr: Array[(String, String)] = Array((zs,100, 23, 44), (ls,83, 94, 23))

scala> sc.makeRDD(arr)
res38: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[13] at makeRDD at <console>:25
// 因为这里就是对values操作,所以_代表的就是value,对split后的value进行压平就是结果值了。
scala> res38.flatMapValues(_.split(", "))
res41: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[15] at flatMapValues at <console>:24

scala> res41.collect
res42: Array[(String, String)] = Array((zs,100), (zs,23), (zs,44), (ls,83), (ls,94), (ls,23))

flatMap使用案例

数据准备

chinese-zs,ls,ww
math-zs,zs
english-ls,ww,zl

要求:获取每个老师对应的课程都是什么

分析,得到的结果类似 zs-chinese,math 。其实有点类似wordcount,还是一个对学科的聚合操作。

  1. 首先进行反推,通过结果zs-chinese,math。上一步数据为(zs,chinese),(zs,math)
  2. 通过reduceByKey(_+_)进行合并得出

代码如下:

package com.lmk.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FlatMapTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local[*]")
    conf.setAppName("flatMapTest")
    val sc = new SparkContext(conf)
    val arr = Array("chinese-zs,ls,ww", "math-zs,zs", "english-ls,ww,zl")
    val rdd:RDD[String] = sc.makeRDD(arr)
    rdd.map(x => {
      val strs = x.split("-")
      (strs(0), strs(1))
    }).flatMapValues(_.split(","))
    // .swap翻转
      .map(_.swap)
      .reduceByKey(_+","+_)
      .foreach(a=>println(a._1, a._2))

  }

}

输出结果如下:

(zs,chinese,math,math)
(ww,chinese,english)
(ls,chinese,english)
(zl,english)

sortByKey()

按照key进行排序,默认是升序,在参数中添加false就是倒序。

scala> sc.textFile("/test/a.txt").flatMap(_.split(" ")).map((_, 1))
res0: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:24

scala> res0.reduceByKey(_+_)
res1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:24

scala> res1.collect
res2: Array[(String, Int)] = Array((tom,10), (hello,20), (wrold,10))

scala> res1.sortByKey()
res3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[7] at sortByKey at <console>:24

scala> res3.collect
res4: Array[(String, Int)] = Array((hello,20), (tom,10), (wrold,10))
// 进行降序排序,并且修改分区数量(因为打乱排序,有shuffle操作)
scala> res1.sortByKey(false,3)
res5: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[10] at sortByKey at <console>:24

scala> res5.collect
res6: Array[(String, Int)] = Array((wrold,10), (tom,10), (hello,20))
// 按照values排序,反转,或者用下面的方式获取
scala> res1.map(x => (x._2, x)).sortByKey().values
res7: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[15] at values at <console>:24

scala> res7.collect
res8: Array[(String, Int)] = Array((tom,10), (wrold,10), (hello,20))

sortBy()

scala> res1.sortBy(-_._2)
res10: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[20] at sortBy at <console>:24

scala> res10.collect
res11: Array[(String, Int)] = Array((hello,20), (tom,10), (wrold,10))

scala> res1.sortBy
sortBy   sortByKey

scala> res1.sortBy(-_._2,false,4)
// 可以有三个参数,第二个与上面一样,写上false就是倒序,-和倒序一起,就是正序。最后一个是修改分区数量。
def sortBy[K](f: ((String, Int)) => K,ascending: Boolean,numPartitions: Int)(implicit ord: Ordering[K],implicit ctag: scala.reflect.ClassTag[K]): org.apache.spark.rdd.RDD[(String, Int)]

其实sortBy并不存在,是调用的sortByKey()。groupBy也一样,groupby底层是groupBykey。查看sortBy底层代码如下:

  /**
   * Return this RDD sorted by the given key function.
   */
  def sortBy[K](
      f: (T) => K,
      ascending: Boolean = true,
      numPartitions: Int = this.partitions.length)
      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
// 首先使用KeyBy,将这个指定的K变成key进行排序
    this.keyBy[K](f)
// 这里其实调用的是sortByKey
        .sortByKey(ascending, numPartitions)
//最后只保留values
        .values
  }
  /**
   * Creates tuples of the elements in this RDD by applying `f`.
   */
  def keyBy[K](f: T => K): RDD[(K, T)] = withScope {
    val cleanedF = sc.clean(f)
// 例如对(hello,3),(world,2),使用_._2排序。先变成(3,(hello,3)),(2,(world,2))
    map(x => (cleanedF(x), x))
  }

join()

也就是innerjoin。按照key进行关联,只保留能关联上的数据。

scala> val arr = Array(("zs",300),("ls",400),("ww",500),("zl", 600))
arr: Array[(String, Int)] = Array((zs,300), (ls,400), (ww,500), (zl,600))

scala> val arr2 = Array(("zs",22),("ls",39),("ww",42),("ss",4))
arr2: Array[(String, Int)] = Array((zs,22), (ls,39), (ww,42), (ss,4))

scala> sc.makeRDD(arr)
res16: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[26] at makeRDD at <console>:25

scala> sc.makeRDD(arr2)
res17: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[27] at makeRDD at <console>:25

scala> res16 join res17
res18: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[30] at join at <console>:25

scala> res18.collect
res19: Array[(String, (Int, Int))] = Array((ls,(400,39)), (zs,(300,22)), (ww,(500,42)))
// 进行薪资计算
scala> res18.mapValues(x => x._1*x._2)
res20: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[31] at mapValues at <console>:24

scala> res20.collect
res21: Array[(String, Int)] = Array((ls,15600), (zs,6600), (ww,21000))

leftOuterJoin

与left join类似,保留左边主表的所有信息,按照key补全右边表的信息。

// Option[Int]就代表可能存在空值
scala> res16 leftOuterJoin res17
res23: org.apache.spark.rdd.RDD[(String, (Int, Option[Int]))] = MapPartitionsRDD[34] at leftOuterJoin at <console>:25

scala> res23.collect
res24: Array[(String, (Int, Option[Int]))] = Array((ls,(400,Some(39))), (zl,(600,None)), (zs,(300,Some(22))), (ww,(500,Some(42))))
// getOrElse(0)有值拿出来,没有值就是0
scala> res23.mapValues(x => x._1*x._2.getOrElse(0))
res25: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[35] at mapValues at <console>:24

scala> res25.collect
res26: Array[(String, Int)] = Array((ls,15600), (zl,0), (zs,6600), (ww,21000))

rightOuterJoin()

与right join类似,与上面相同,只不过主表不一样,不多介绍了。需要注意,不同的是返回类型。

cogroup()

全外连接,与full join类似。

scala> val arr = Array(("zs",300),("ls",400),("ww",500),("zl", 600))
arr: Array[(String, Int)] = Array((zs,300), (ls,400), (ww,500), (zl,600))

scala> val arr2 = Array(("zs",22),("ls",39),("ww",42),("ss",4))
arr2: Array[(String, Int)] = Array((zs,22), (ls,39), (ww,42), (ss,4))

scala> sc.makeRDD(arr)
res0: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at <console>:25

scala> sc.makeRDD(arr2)
res1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at makeRDD at <console>:25
// 注意这里的返回类型是Iterable,因为是外连接,所以除去关联字段的其他字段都有可能是多个,所以这里返回的类型是一个集合。
scala> res0 cogroup res1
res2: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[3] at cogroup at <console>:25

scala> res2.collect
res3: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((ls,(CompactBuffer(400),CompactBuffer(39))), (ss,(CompactBuffer(),CompactBuffer(4))), (zl,(CompactBuffer(600),CompactBuffer())), (zs,(CompactBuffer(300),CompactBuffer(22))), (ww,(CompactBuffer(500),CompactBuffer(42))))
// 因为value的类型不同,所以处理的方式也有所改变
scala> res2.mapValues(x => x._1.sum * x._2.sum)
res5: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at mapValues at <console>:24

scala> res5.collect
res6: Array[(String, Int)] = Array((ls,15600), (ss,0), (zl,0), (zs,6600), (ww,21000))

rdd上的分区器

groupBy、groupByKey、reduceByKey、sortBy、sortByKey、join[cogroup、left、inner、right]在使用他们的时候都存在shuffle操作。

shuffle的过程是因为数据产生了打乱重分,分组、排序、join等算子需要将数据重新排版。

shuffle的过程是将上游的数据处理完毕写入到自己的磁盘上,然后下游的数据从磁盘上拉取。

重新排版,打乱重分是需要存在规则的。中间数据流向规则叫分区器partitioner,一般存在于shuffle类算子中。

分区器也可以单独存在,人为定义分发的规则。

分组类算子 groupBy、groupByKey、reduceByKey自带的分区器为HashPartitioner

排序类算子sortBy、sortByKey自带的分区器为rangePartitioner

HashPartitioner

规则:按照key的hashCode%下游的分区数量 = 分区编号

hash取余的方式,不管数据分发到下游的什么分区中,最终结果都是相同数据分发到一起

scala> val arr = Array(1,2,3,4,5,6,7,8,9)
arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> sc.makeRDD(arr)
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25

scala> res0.mapPartitionsWithIndex((index, x) => x.map((index,_)))
res1: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[1] at mapPartitionsWithIndex at <console>:24

scala> res1.collect
res2: Array[(Int, Int)] = Array((0,1), (1,2), (1,3), (2,4), (3,5), (3,6), (4,7), (5,8), (5,9))

scala> res0.map(x => (x, x))
res3: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at map at <console>:24
// 现在是没有分区器的
scala> res0.partitioner
res4: Option[org.apache.spark.Partitioner] = None
// 这里有6个分区
scala> res3.reduceByKey(_+_)
res5: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[3] at reduceByKey at <console>:24
// 现在已经有Hash分区器了
scala> res5.partitioner
res6: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@6)

scala> res5.mapPartitionsWithIndex((index,x) => x.map((index, _)))
res8: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[4] at mapPartitionsWithIndex at <console>:24

scala> res8.collect
res9: Array[(Int, (Int, Int))] = Array((0,(6,6)), (1,(1,1)), (1,(7,7)), (2,(8,8)), (2,(2,2)), (3,(3,3)), (3,(9,9)), (4,(4,4)), (5,(5,5)))

例如案例中,就是按照Key.hashcode进行分区,int类型的hashcode就是自己本身。

Hash分区器的规则致使我们可以任意修改下游的分区数量,入下面的代码:

scala> res3.reduceByKey(_+_, 20)
res10: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[5] at reduceByKey at <console>:24

scala> res10.partitions.size
res12: Int = 20

scala> res3.reduceByKey(_+_, 2)
res13: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[6] at reduceByKey at <console>:24

scala> res13.partitions.size
res14: Int = 2

rangePartitioner

hashPartitioner规则非常简单,直接规定一个数据按照hashcode规则进行分配,但是可能会出现数据倾斜问题。

range分区规则种存在两个方法

1. rangeBounds

界限,在使用这个分区器之前,先做一个界限划定

首先使用水塘抽样算法,在未知的数据集中抽取能够代表整个数据集的样本,根据样本进行规则设定。

2. getPartitions

上面完成后再使用getPartitions的方法,获取对应partitions

首先进行水塘抽样,规定数据的流向后再执行整体逻辑,会先触发计算

scala> sc.textFile("/test/a.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
res17: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[11] at reduceByKey at <console>:24

scala> res17.sortByKey()
res18: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[14] at sortByKey at <console>:24

原本转换类的算子是不触发计算的,只有行动类算子才会触发计算,但是发现还是触发计算(部分)了。因为计算之前先进行水塘抽样,能够规定下游数据规则,再进行数据的计算。

scala> val arr = Array(1,9,3,4,5,7,2,6,8)
arr: Array[Int] = Array(1, 9, 3, 4, 5, 7, 2, 6, 8)

scala> sc.makeRDD(arr, 3)
res19: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at makeRDD at <console>:25

scala> res19.sortBy(x => x)
res20: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[20] at sortBy at <console>:24
// 这里没有分区器,是因为之前说过,sortBy是通过sortByKey实现的,但是由于一开始map拼上key,后面又map将key去掉,所有最后不带分区器了。
scala> res20.partitioner
res21: Option[org.apache.spark.Partitioner] = None

scala> res19.map(t => (t, t))
res23: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[21] at map at <console>:24

scala> res23.sortByKey()
res24: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[24] at sortByKey at <console>:24

scala> res24.partitioner
res25: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.RangePartitioner@852b)

scala> res26.mapPartitionsWithIndex((index, it)=>it.map((index,_)))
res28: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[28] at mapPartitionsWithIndex at <console>:24
// 可以看到分布很均匀
scala> res28.collect
res30: Array[(Int, (Int, Int))] = Array((0,(1,1)), (0,(2,2)), (0,(3,3)), (0,(4,4)), (0,(5,5)), (1,(6,6)), (1,(7,7)), (1,(8,8)), (1,(9,9)))

range分区器,是先做抽样,然后指定下游的数据界限。可以修改分区数量,但是分区数量不能大于元素个数,必须保证每个分区中都有元素。

scala> res24.sortByKey(true, 20)
res31: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[31] at sortByKey at <console>:24
// 可以看到分区数量还是9
scala> res31.partitions.size
res32: Int = 9

自定义分区器

工作中,还可能会遇到数据分类的情况,想要根据自己的需求定义分区的规则,让符合规则的数据发送到不同的分区中,这时候就需要自定义分区器了。

案例需求如下

  1. 将wordcount练习的数据按照规定规则分发到不同分区中
  2. hello放到一个分区中,其他分到另一个分区中

定义分区器,让数据发送到不同的分区,从而不同的task任务输出的文件结果也不同。

分区器的定义需要实现分区器的接口

class MyPartitioner extends Partitioner{
// 定义下游存在几个分区
  override def numPartitions: Int = ???
// 按照key设定分区位置
  override def getPartition(key: Any): Int = ???
}

代码实现如下:

package com.lmk.spark

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

object WordCountForPartitioner {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local[*]")
    conf.setAppName("WordCountForPartitioner")
    val sc = new SparkContext(conf)
    // stage1
    val rdd = sc.textFile("data/a.txt")
      .flatMap(_.split(" "))
      .map((_, 1))
      // stage2
      .reduceByKey(_+_)
    // stage3
    val rdd2 = rdd.partitionBy(new MyPartitioner)
    val fs = FileSystem.get(new Configuration())
    val out = "data/res"
    if (fs.exists(new Path(out)))
      fs.delete(new Path(out), true)
    rdd2.saveAsTextFile("data/res")


  }

}

class MyPartitioner extends Partitioner{
// 定义下游存在几个分区
  override def numPartitions: Int = 2
// 按照key设定分区位置
  override def getPartition(key: Any): Int = {
    if(key.toString.equals("hello"))
      0
    else
      1
  }
}

返回文件结果如下:

案例

需求:以grouptest.txt中的数据为基础,里面是url格式的。用自定义分区的方式实现主题top2

分析需求如下:

  1. 首先求出每个人员的访问量
  2. 按照主题将访问量分区
  3. 使用mapPartitions一次性取出一个分区进行排序

代码如下:

package com.lmk.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

import java.net.URL

object GroupTextForPartitioner {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local[*]")
    conf.setAppName("GroupTextForPartitioner")
    val sc = new SparkContext(conf)
    val rdd:RDD[((String, String), Int)] = sc.textFile("data/grouptest.txt")
      .map(t => {
        val url = new URL(t) // http://hive.lmkspaceol.cn/lmk
        val host = url.getHost // hive.lmkspaceol.cn
        val path = url.getPath // /lmk
        val subject = host.split("\\.")(0) // hive
        val user = path.substring(1) // lmk
        ((subject, user), 1)
        }).reduceByKey(_+_)
    //  获取主题数量
    val rdd2:RDD[String] = rdd.keys.map(_._1).distinct()
    // 将数据收集回来当作参数传递,rdd不会当作参数传递,数组格式
    val arritems = rdd2.collect()
    val rdd1:RDD[((String, String), Int)] = rdd.partitionBy(new UserPartitioner(arritems))
    rdd1.mapPartitions(x => {
      // 因为数据是迭代器形式的所以先tolist,最后再转成Iterator
      x.toList.sortBy(-_._2).take(2).toIterator
    }).foreach(println)
  }
}
// 类之间数据的传递使用构造器
class UserPartitioner(arr:Array[String]) extends Partitioner {

  override def numPartitions: Int = arr.size
  // 因为分区数量等于数组长度,所以数组的下标就是分区号
  override def getPartition(key: Any): Int = {
    val item = key.asInstanceOf[(String, String)]._1
    arr.indexOf(item)
    }
}

结果如下:

join的原理

join是两个结果集之间的连接,需要对数据进行匹配,首先我们试一下是否存在shuffle。

如果两个rdd都没有分区器,分区个数一致。如下

scala> val arr = Array(("zs",300),("ls",400),("ww",500),("zl", 600))
arr: Array[(String, Int)] = Array((zs,300), (ls,400), (ww,500), (zl,600))

scala> val arr2 = Array(("zs",22),("ls",39),("ww",42),("ss",4))
arr2: Array[(String, Int)] = Array((zs,22), (ls,39), (ww,42), (ss,4))

scala> sc.makeRDD(arr)
res0: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at <console>:25

scala> sc.makeRDD(arr2)
res1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at makeRDD at <console>:25

scala> res0 join res1
res2: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[4] at join at <console>:25

scala> res2.collect
res3: Array[(String, (Int, Int))] = Array((ls,(400,39)), (zs,(300,22)), (ww,(500,42)))

scala> res2.partitions.size
res7: Int = 6

分区个数相同情况,join后分区个数是不变的。

对于分区个数不一致的情况,rdd分区个数以多的为准。

scala> sc.makeRDD(arr,4)
res9: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[5] at makeRDD at <console>:25

scala> sc.makeRDD(arr2,3)
res10: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at makeRDD at <console>:25

scala> res9 join res10
res11: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[9] at join at <console>:25

scala> res11.partitions.size
res12: Int = 4

如果分区个数一样,分区器一样,那join就没有shuffle,分区个数也不变。

scala> val arr = Array(("zs",300),("ls",400),("ww",500),("zl", 600))
arr: Array[(String, Int)] = Array((zs,300), (ls,400), (ww,500), (zl,600))

scala> val arr2 = Array(("zs",22),("ls",39),("ww",42),("ss",4))
arr2: Array[(String, Int)] = Array((zs,22), (ls,39), (ww,42), (ss,4))

scala> sc.makeRDD(arr)
res17: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[15] at makeRDD at <console>:25

scala> sc.makeRDD(arr2)
res18: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[16] at makeRDD at <console>:25

scala> res17.reduceByKey(_+_)
res19: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[17] at reduceByKey at <console>:24

scala> res18.reduceByKey(_+_)
res20: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:24

scala> res19 join res20
res21: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[21] at join at <console>:25

scala> res21.collect
res22: Array[(String, (Int, Int))] = Array((ls,(400,39)), (zs,(300,22)), (ww,(500,42)))

如果分区个数不一样,分区器一样,会存在shuffle,分区个数以多的为准。

scala> val arr = Array(("zs",300),("ls",400),("ww",500),("zl", 600))
arr: Array[(String, Int)] = Array((zs,300), (ls,400), (ww,500), (zl,600))

scala> val arr2 = Array(("zs",22),("ls",39),("ww",42),("ss",4))
arr2: Array[(String, Int)] = Array((zs,22), (ls,39), (ww,42), (ss,4))

scala> sc.makeRDD(arr, 4)
res23: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[22] at makeRDD at <console>:25

scala> sc.makeRDD(arr, 3)
res24: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[23] at makeRDD at <console>:25

scala> res23.reduceByKey(_+_)
res26: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[24] at reduceByKey at <console>:24

scala> res24.reduceByKey(_+_)
res27: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[25] at reduceByKey at <console>:24

scala> res26 join res27
res28: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[28] at join at <console>:25

scala> res28.collect
res29: Array[(String, (Int, Int))] = Array((ww,(500,500)), (zs,(300,300)), (zl,(600,600)), (ls,(400,400)))

一个带有分区器一个不带分区器,那么以带有分区器的rdd分区数量为主,并且存在shuffle。

scala> arr
res32: Array[(String, Int)] = Array((zs,300), (ls,400), (ww,500), (zl,600))

scala> arr2
res33: Array[(String, Int)] = Array((zs,22), (ls,39), (ww,42), (ss,4))

scala> sc.makeRDD(arr, 3)
res34: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[29] at makeRDD at <console>:25

scala> sc.makeRDD(arr2, 4)
res35: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at makeRDD at <console>:25

scala> res34.reduceByKey(_+_)
res36: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[31] at reduceByKey at <console>:24

scala> res36 join res35
res37: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[34] at join at <console>:25

scala> res37.collect
res38: Array[(String, (Int, Int))] = Array((zs,(300,22)), (ls,(400,39)), (ww,(500,42)))

scala> res37.partitions.size
res39: Int = 3
案例

在实际应用场景中,常见的是多表联合查询

需求如下:

// 表1 movies电影表
// movie_id movie_name movie_types
// 文件数据样例为1, Toy Story (1995), Animation|Children's|Comedy 
// 表2 ratings评分表
// user_id movie_id score timestamp
// 文件数据样例为 1,1193,5,978300760
// 问题1 每个用户最喜欢哪个类型的电影
// 问题2 每个类型中评分最高的前三个电影
// 问题3 给每个用户推荐最喜欢的类型的前三个电影
问题1

代码实现如下:

package com.lmk.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object TestMovie {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local[*]")
    conf.setAppName("testmovie")
    val sc = new SparkContext(conf)
    val movieRDD:RDD[(String,(String,String))] = sc.textFile("data/movies.txt")
      .map(x => {
        val strs = x.split(",")
        // head获取第一个元素
        val mid = strs.head
        // reverse反转
        val types = strs.reverse.head
        // tail去掉第一个元素,然后反转后再去掉第一个元素,再反转回来,就是中间内容
        // strs.tail.reverse.tail返回类型和 strs 一样
        // mkString把一个数组(或集合)中的所有元素,用空格 " " 连接成一个字符串。
        val name = strs.tail.reverse.tail.mkString(" ")
        (mid, name, types)
      }).flatMap(x => {
        x._3.split("\\|").map((x._1, x._2, _))
        //(11, The (1995)" "American President,Comedy)
        //(11, The (1995)" "American President,Drama)
        //(11, The (1995)" "American President,Romance)
      }).map(x => (x._1, (x._2, x._3)))
        //        mid     name   type
        // 将id当作key进行关联使用

    val ratingsRDD:RDD[(String,(String,Double))] = sc.textFile("data/ratings.txt")
      // 1,16,4.0,1217897793
      .map(x => {
        val strs = x.split(",")
        // 评分需要计算,转换一下格式。
        // 因为需要使用电影id做key进行关联,所以将他放在第一个
        // mid     userid   score
        (strs(1), (strs(0), strs(2).toDouble))
      })
      //              mid       name    type      userid  score
    val baseData:RDD[(String,((String, String), (String, Double)))] = movieRDD.join(ratingsRDD)

    val userTypeCount:RDD[((String,String),Int)] = baseData.map(x => {
      ((x._2._2._1,x._2._1._2), 1)
    }).reduceByKey(_+_)

    val userTypeCountGroup:RDD[(String, Iterable[(String, Int)])] = userTypeCount.map(x => {
      (x._1._1,(x._1._2,x._2))
    }).groupByKey()

    val userTypeTop1:RDD[(String, (String, Int))] = userTypeCountGroup.mapValues(x => {
      // x.toList.sortBy(-_._2).take(1)
      // 整体排序效率太低,只要一个最大值,不进行全局排序
      var max = 0
      var max_tp: (String, Int) = null
      x.foreach(tp => {
        if (tp._2 > max) {
          max_tp = tp
          max = tp._2
        }
      })
      max_tp
    })

    userTypeTop1.foreach(println)
  }
}

结果:

问题2

代码如下:

package com.lmk.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object TestMovieType {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("testmovietype")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)
    val movieRDD:RDD[(String,(String,String))] = sc.textFile("data/movies.txt")
      .map(x => {
        val strs = x.split(",")
        val mid = strs.head
        val types = strs.reverse.head
        val name = strs.tail.reverse.tail.reverse.mkString(" ")
        (mid, name, types)
      }).flatMap(x => {
        x._3.split("\\|").map((x._1, x._2, _))
      }).map(x => (x._1, (x._2, x._3)))
    val ratringRDD:RDD[(String,(String,Double))] = sc.textFile("data/ratings.txt")
      .map(x => {
        val strs = x.split(",")
        (strs(1), (strs(0), strs(2).toDouble))
      })
    val baseData: RDD[(String, ((String, String), (String, Double)))] = movieRDD.join(ratringRDD)
    val groupData: RDD[((String, String), Iterable[Double])] = baseData.map(x => {
      ((x._2._1._2, x._2._1._1), x._2._2._2)
    }).groupByKey()

    val avgData: RDD[((String, String), Double)] = groupData.mapValues(x => {
      x.sum / x.size
    })

    val groupAvgData: RDD[(String, Iterable[(String, Double)])] = avgData.map(x => {
      (x._1._1, (x._1._2, x._2))
    }).groupByKey()

    groupAvgData.mapValues(x => {
      x.toList.sortBy(-_._2).take(3)
    }).foreach(println)

  }
}

结果如下:

问题3

代码实现如下:

package com.lmk.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object TestMovieUser {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local[*]")
    conf.setAppName("testmovieuser")
    val sc = new SparkContext(conf)
    val movieRDD: RDD[(String, (String, String))] = sc.textFile("data/movies.txt")
      .map(x => {
        val strs = x.split(",")
        val mid = strs.head
        val types = strs.reverse.head
        val name = strs.tail.reverse.tail.reverse.mkString(" ")
        (mid, name, types)
      }).flatMap(x => {
        x._3.split("\\|").map((x._1, x._2, _))
      }).map(x => (x._1, (x._2, x._3)))

    val ratingsRDD: RDD[(String, (String, Double))] = sc.textFile("data/ratings.txt")
      .map(x => {
        val strs = x.split(",")
        (strs(1), (strs(0), strs(2).toDouble))
      })

    val baseData: RDD[(String, ((String, String), (String, Double)))] = movieRDD.join(ratingsRDD)
    val groupData: RDD[((String, String), Iterable[Double])] = baseData.map(x => {
      ((x._2._1._2, x._2._1._1), x._2._2._2)
    }).groupByKey()
    val groupTypeData: RDD[(String, List[(String, Double)])] = groupData.mapValues(x => {
      x.sum / x.size
    }).map(x => {
      (x._1._2, (x._1._1, x._2))
    }).groupByKey().mapValues(x => {
      x.toList.sortBy(-_._2).take(3)
    })

    val userType: RDD[(String, String)] = baseData.map(x => {
        ((x._2._1._2, x._2._2._1), 1)
      }).reduceByKey(_ + _)
      .map(x => {
        (x._1._2, (x._1._1, x._2))
      }).groupByKey()
      .mapValues(x => {
        var max = 0
        var max_tp: (String, Int) = null
        x.foreach(tp => {
          if (max < tp._2) {
            max_tp = tp
            max = tp._2
          }
        })
        max_tp
      }).map(x => {
        (x._2._1, x._1)
      })
    val typeMovie: RDD[(String, String)] = groupTypeData.flatMap(x => {
      x._2.map(y => {
        (y._1, x._1)
      })
    })
    typeMovie.join(userType).map(x => {
      (x._2._2, x._2._1)
    }).foreach(println)
  }
}

返回结果如下:

二次排序

当我们想要排序的时候,想要优先使用一个字段排序,如果这个字段相同使用另外一个字段排序。

这种情况使用自带的sortBy是实现不了的。需要我们自定义排序规则

package com.lmk.spark

import org.apache.spark.{SparkConf, SparkContext}

object SecondarySort {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("SecondarySort")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.textFile("data/score.txt")
      .map(x => {
        val strs = x.split(" ")
        // 这里我们将元素转换成了MyScore的对象,输出MyScore的对象
        // 对象序列化了,所以需要toString方法才能看到内容
        // new MyScore(strs(0), strs(1).toInt, strs(2).toInt)
        //}).sortBy(x=>x)
        (strs(0), strs(1).toInt, strs(2).toInt)
      // 这里这种方式输出的就是原本的数据,但是规则使用了自定义的规则
      }).sortBy(x=>new MyScore(x._1, x._2, x._3))
      .foreach(println)
  }
  // 想要远程传输,就需要序列化,继承序列化接口
  class MyScore(var name: String, var math: Int, var english: Int) extends Ordered[MyScore] with Serializable {
    override def compare(that: MyScore): Int = {
      val mathRes = this.math - that.math
      if (mathRes != 0)
        mathRes else this.english - that.english
    }

    override def toString: String = s"name=${name} + math=${math} + english=${english}"
  }
}

RDD间操作

函数目的示例结果
union(other)生成一个包含两个RDD中所有元素的RDDrdd.union(other){1, 2, 3, 4, 5}
intersection(other)求两个RDD交集的RDDrdd.intersection(other){3}
subtract(other)移除一个RDD的内容,差集rdd.subtract(other){1, 2}
cartesian(other)与另一个RDD的笛卡尔积rdd.cartesian(other){(1, 3), (1, 4)....}

注意:类型要一致

union

合并两个RDD,注意这里是不去重的。之间简单的合并,分区个数也是之间加起来的,虽然分区个数变了,但是不存在shuffle

需要注意的是,一个stage中task任务的个数以最后一个RDD的分区个数为主。union或者coalesce都可能产生这种影响。如下图

scala> val arr = Array(1,2,3,4,5)
arr: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val arr1 = Array(3,4,5,6,7)
arr1: Array[Int] = Array(3, 4, 5, 6, 7)

scala> sc.makeRDD(arr)
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25

scala> sc.makeRDD(arr1)
res1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:25

scala> res0 union res1
res2: org.apache.spark.rdd.RDD[Int] = UnionRDD[2] at union at <console>:25

scala> res2.collect
res3: Array[Int] = Array(1, 2, 3, 4, 5, 3, 4, 5, 6, 7)

scala> res2.partitions.size
res14: Int = 12

intersection

取两个RDD交集,这里可以看出是去重的。

scala> res0 intersection res1
res4: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at intersection at <console>:25

scala> res4.collect
res5: Array[Int] = Array(3, 4, 5)

subtract

对一个RDD移除令一个RDD的内容,做减法。

scala> res0 subtract res1
res6: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[12] at subtract at <console>:25

scala> res6.collect
res7: Array[Int] = Array(1, 2)

scala> res1 subtract res0
res8: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[16] at subtract at <console>:25

scala> res8.collect
res9: Array[Int] = Array(6, 7)

cartesian

与另一个RDD做笛卡尔积,不只是数据笛卡尔积了,分区也会笛卡尔积。

scala> res0 cartesian res1
res10: org.apache.spark.rdd.RDD[(Int, Int)] = CartesianRDD[17] at cartesian at <console>:25

scala> res10.collect
res11: Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (1,6), (1,7), (2,3), (2,4), (2,5), (2,6), (2,7), (3,3), (3,4), (3,5), (3,6), (3,7), (4,3), (4,4), (4,5), (4,6), (4,7), (5,3), (5,4), (5,5), (5,6), (5,7))

scala> res10.partitions.size
res13: Int = 36

分区数量修改算子coalesce 与 repartition

这两个算子是专门调整分区数量的,之前也了解过,存在shuffle的算子可以修改分区数量。

比如groupBy、groupByKey、reduceByKey、distinct可以任意修改分区数量,分区器是HashPartitioner

比如sortBy、sortByKey可以修改但不能大于元素个数,分区器是RangePartitioner,需要先做水塘抽样,均衡数据量规定上下限。

coalesce 与 repartition不同的是coalesce只能减少分区数量,而且不存在shuffle。repartition可以加也可以减,存在shuffle流程。

对于有些特殊的情况需要修改分区的数量。比如下面的情况:

  1. 大量数据处理时,压力过大,需要增加分区数量
  2. 在数据清洗的过程中,删除大量脏数据,分区数量过多,无意义占用资源,需要减少分区数量。
scala> val arr = Array(1,2,3,4,5,6)
arr: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> sc.makeRDD(arr)
res15: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at makeRDD at <console>:25

scala> res15.coalesce(2)
res16: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[19] at coalesce at <console>:24

scala> res16.partitions.size
res17: Int = 2

scala> res15.coalesce(12)
res18: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[20] at coalesce at <console>:24

scala> res18.partitions.size
res19: Int = 6

scala> res15.repartition(2)
res20: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[24] at repartition at <console>:24

scala> res20.partitions.size
res21: Int = 2

scala> res15.repartition(12)
res22: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[28] at repartition at <console>:24

scala> res22.partitions.size
res23: Int = 12

查看repartition源码如下:

repartition其实调用 了coalesce。将shuffle改成了true

coalesce默认shuffle是false,所以coalesce不带shuffle。减少的时候,没有打乱重分,减少的时候,将两个分区合并成一个。

其他转换操作

foldByKey

def foldByKey(zeroValue: V)func: (V, V) => V):RDD[(K,V)]

参数zeroValue:是一个初始化值,它可以是任意类型。

参数func:是一个函数,两个输入参数相同。

带有shuffle并且可以修改分区。

scala> val arr = Array(("a", 1), ("b", 1), ("a", 1), ("b", 1), ("a", 1), ("c", 1))
arr: Array[(String, Int)] = Array((a,1), (b,1), (a,1), (b,1), (a,1), (c,1))

scala> sc.makeRDD(arr, 3)
res29: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[35] at makeRDD at <console>:25

scala> res29.reduceByKey(_+_)
res30: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[36] at reduceByKey at <console>:24

scala> res30.collect
res31: Array[(String, Int)] = Array((c,1), (a,3), (b,2))

scala> res29.foldByKey

def foldByKey(zeroValue: Int)(func: (Int, Int) => Int): org.apache.spark.rdd.RDD[(String, Int)]
def foldByKey(zeroValue: Int,numPartitions: Int)(func: (Int, Int) => Int): org.apache.spark.rdd.RDD[(String, Int)]
def foldByKey(zeroValue: Int,partitioner: org.apache.spark.Partitioner)(func: (Int, Int) => Int): org.apache.spark.rdd.RDD[(String, Int)]

scala> res29.foldByKey(0, 6)(_+_)
res32: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[37] at foldByKey at <console>:24

scala> res32.partitions.size
res33: Int = 6

scala> res32.collect
res35: Array[(String, Int)] = Array((a,3), (b,2), (c,1))

scala> res29.foldByKey(10, 6)(_+_)
res36: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[38] at foldByKey at <console>:24

scala> res36.collect
res37: Array[(String, Int)] = Array((a,33), (b,22), (c,11))

aggregateByKey

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp:(U, U)=>U):RDD(K,U)

zero value(初始值)∶给每一个分区中的每一种key一个初始值。

seqOp(分区内):函数用于在每一个分区中用初始值逐步迭代value 。

combOp(分区间):函数用于合并每个分区中的结果。

带有初始值的reduceByKey,分区内的算法和分区间的算法可以不一样,有shuffle流程,使用hash分区器。

scala> val arr = Array(("a", 1), ("b", 1), ("a", 1), ("b", 1), ("a", 1), ("c", 1))
arr: Array[(String, Int)] = Array((a,1), (b,1), (a,1), (b,1), (a,1), (c,1))

scala> sc.makeRDD(arr, 3)
res39: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[40] at makeRDD at <console>:25
// 现在分区中做什么操作,然后再整体做什么操作
scala> res39.aggregateByKey(0)(_+_,_+_)
res40: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[41] at aggregateByKey at <console>:24

scala> res40.collect
res41: Array[(String, Int)] = Array((c,1), (a,3), (b,2))

scala> res39.aggregateByKey

def aggregateByKey[U](zeroValue: U)(seqOp: (U, Int) => U,combOp: (U, U) => U)(implicit evidence$3: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[(String, U)]
def aggregateByKey[U](zeroValue: U,numPartitions: Int)(seqOp: (U, Int) => U,combOp: (U, U) => U)(implicit evidence$2: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[(String, U)]
def aggregateByKey[U](zeroValue: U,partitioner: org.apache.spark.Partitioner)(seqOp: (U, Int) => U,combOp: (U, U) => U)(implicit evidence$1: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[(String, U)]

scala> res39.aggregateByKey(10, 9)(_+_,_+_)
res42: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[42] at aggregateByKey at <console>:24

scala> res42.partitions.size
res43: Int = 9

combineByKey

def combineByKey[C](
// 初始化值,可以是函数
createCombiner: V => C,
// 局部聚合
mergeValue: (C, V) => C
// 全局聚合
mergeCombiners: (C, C) => C): RDD[(K, C)]

createCombiner(转换数据的结构):combineByKey会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey会使用一个叫作createCombiner的函数来创建那个键对应的累加器的初始值。

mergeValue(分区内):如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue方法将该键的累加器对应的当前值与这个新的值进行合并。

mergeCombiners(分区间):由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners方法将各分区结果进行合并。

此操作针对相同K,将V合并成一个集合。

scala> val arr = Array(("a", 1), ("b", 1), ("a", 1), ("b", 1), ("a", 1), ("c", 1))
arr: Array[(String, Int)] = Array((a,1), (b,1), (a,1), (b,1), (a,1), (c,1))

scala> sc.makeRDD(arr, 3)
res44: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[43] at makeRDD at <console>:25

scala> res44.combineByKey
combineByKey   combineByKeyWithClassTag

scala> res44.combineByKey

def combineByKey[C](createCombiner: Int => C,mergeValue: (C, Int) => C,mergeCombiners: (C, C) => C): org.apache.spark.rdd.RDD[(String, C)]
def combineByKey[C](createCombiner: Int => C,mergeValue: (C, Int) => C,mergeCombiners: (C, C) => C,numPartitions: Int): org.apache.spark.rdd.RDD[(String, C)]
def combineByKey[C](createCombiner: Int => C,mergeValue: (C, Int) => C,mergeCombiners: (C, C) => C,partitioner: org.apache.spark.Partitioner,mapSideCombine: Boolean,serializer: org.apache.spark.serializer.Serializer): org.apache.spark.rdd.RDD[(String, C)]
// 报错了,需要指定格式
scala> res44.combineByKey(x => x, _+_, _+_)
<console>:24: error: missing parameter type for expanded function ((x$1: <error>, x$2: Int) => x$1.$plus(x$2))
       res44.combineByKey(x => x, _+_, _+_)
                                  ^
<console>:24: error: missing parameter type for expanded function ((x$3: <error>, x$4) => x$3.$plus(x$4))
       res44.combineByKey(x => x, _+_, _+_)
                                       ^
<console>:24: error: missing parameter type for expanded function ((x$3: <error>, x$4: <error>) => x$3.$plus(x$4))
       res44.combineByKey(x => x, _+_, _+_)
                                         ^

scala> res44.combineByKey((x:Int) => x, (a:Int, b:Int) => a + b, (a:Int, b:Int) => a + b)
res46: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[44] at combineByKey at <console>:24

scala> res46.collect
res47: Array[(String, Int)] = Array((c,1), (a,3), (b,2))

scala> res44.combineByKey((x:Int) => x + 10, (a:Int, b:Int) => a + b, (a:Int, b:Int) => a + b)
res48: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[45] at combineByKey at <console>:24

scala> res48.collect
res49: Array[(String, Int)] = Array((c,11), (a,33), (b,22))

filterByRange

filter过滤元素,在排序完毕以后的结果集上面使用,按照范围得出符合元素规则的子结果集。

子结果集是原来rdd的一个片段,分区和元素的部署对应情况不变。

scala> val arr = Array((1,1), (2,2), (3,3), (4,4), (5,5), (6,6))
arr: Array[(Int, Int)] = Array((1,1), (2,2), (3,3), (4,4), (5,5), (6,6))

scala> sc.makeRDD(arr, 4)
res11: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[8] at makeRDD at <console>:25

scala> res11.sortByKey()
res12: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[11] at sortByKey at <console>:24

scala> res12.mapPartitionsWithIndex((index, it) => it.map((index, _)))
res13: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[12] at mapPartitionsWithIndex at <console>:24

scala> res13.collect
res14: Array[(Int, (Int, Int))] = Array((0,(1,1)), (0,(2,2)), (1,(3,3)), (2,(4,4)), (2,(5,5)), (3,(6,6)))

scala> res12.filterByRange(2, 3)
res19: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[16] at filterByRange at <console>:24
// 可以看出是按照范围进行过滤,与分区无关
scala> res19.collect
res20: Array[(Int, Int)] = Array((2,2), (3,3))

scala> res12.filterByRange(0, 2)
res22: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[18] at filterByRange at <console>:24

scala> res22.collect
res23: Array[(Int, Int)] = Array((1,1), (2,2))

scala> res19.mapPartitionsWithIndex((index, it) => it.map((index, _)))
res25: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[19] at mapPartitionsWithIndex at <console>:24
// 新的RDD就只有两个分区了
scala> res25.collect
res26: Array[(Int, (Int, Int))] = Array((0,(2,2)), (1,(3,3)))

scala> res12.filterByRange(2, 5)
res29: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[23] at filterByRange at <console>:24

scala> res29.collect
res30: Array[(Int, Int)] = Array((2,2), (3,3), (4,4), (5,5))

行动类算子(action)

函数目的示例结果
collect()以数组的形式返回RDD中的所有元素rdd.collect(){1, 2, 3, 3}
collectAsMap()该函数用于Pair RDD,最终返回Map类型的结果rdd2.collectAsMap()Map("a"->1, "b"->2)
count()返回RDD中的元素个数rdd.count()4
countByValue()返回各元素在RDD中出现的次数rdd.countByValue(){(1, 1), (2, 1), (3, 2)}
take(n)从RDD中返回n个元素rdd.take(1){1}
first()从RDD中返回第一个元素rdd.first()1
top(num)返回最大的num个元素rdd.top(2){3, 3}
takeOrdered(num)(ordering)按照指定顺序返回前面num个元素rdd.takeOrdered(2){1, 2}
reduce(func)通过函数func(输入两个参数并返回一个值)聚合数据集中的元素,比如求和rdd.reduce((a,b)=>a+b)((1+2) + 3) + 3) = 9
fold(zero)(func)和reduce一样,给定初值,每个分区计算时都会使用此初值rdd.fold(1)((a,b)=>a+b)2个分区时结果:1+ ((1+1) + 2) + ((1 +3) +3)
aggregate(zeroValue)(seqOp,combOp)和reduce类似,但可以返回类型不同的结果rdd.aggregate(0)((x,y) => x + y, (x,y) => x + y)9
foreach(func)对每个元素使用func函数rdd.foreach(println(_))在executor端打印输出所有元素
foreachPartition每个分区遍历一次

使用案例如下:

collect()

collect将RDD类型的数据转化为数组,同时会从远程集群拉取所有数据到driver端,谨慎使用。

scala> val arr = Array(1,2,3,4,5,6,7,8,9)
arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> sc.makeRDD(arr, 3)
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25

scala> res0.collect
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

collectAsMap()

collectAsMap函数返回所有元素集合,使用时需要rdd时(k,v)数据,不过该集合是去掉的重复的key的集合,如果元素重该复集合中保留的元素是位置最后的一组。

toMap也会去重, collect + toMap = collectAsMap

scala> res0.map(x => (x, x))
res3: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[1] at map at <console>:24

scala> res3.collectAsMap
res4: scala.collection.Map[Int,Int] = Map(8 -> 8, 2 -> 2, 5 -> 5, 4 -> 4, 7 -> 7, 1 -> 1, 9 -> 9, 3 -> 3, 6 -> 6)

scala> res3.collect
res5: Array[(Int, Int)] = Array((1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8), (9,9))
// toMap也会去重, collect + toMap = collectAsMap
scala> res5.toMap
res6: scala.collection.immutable.Map[Int,Int] = Map(5 -> 5, 1 -> 1, 6 -> 6, 9 -> 9, 2 -> 2, 7 -> 7, 3 -> 3, 8 -> 8, 4 -> 4)

scala> val arr1 = Array(1,2,3,4,5,2,3,4,5)
arr1: Array[Int] = Array(1, 2, 3, 4, 5, 2, 3, 4, 5)

scala> sc.parallelize(arr1)
res7: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:25

scala> res7.map(x => (x, x))
res8: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[3] at map at <console>:24
// 去重了
scala> res8.collectAsMap
res9: scala.collection.Map[Int,Int] = Map(2 -> 2, 5 -> 5, 4 -> 4, 1 -> 1, 3 -> 3)

count()

返回整个 RDD 的元素个数。

scala> res7.count
res12: Long = 9

countByValue()

返回各元素在RDD中出现的次数,会自动变成k,v格式返回。

scala> val arr1 = Array(1,2,3,4,5,2,3,4,5)
arr1: Array[Int] = Array(1, 2, 3, 4, 5, 2, 3, 4, 5)

scala> sc.parallelize(arr1)
res15: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:25

scala> res15.countByValue()
res16: scala.collection.Map[Int,Long] = Map(5 -> 2, 1 -> 1, 2 -> 2, 3 -> 2, 4 -> 2)

scala> sc.textFile("/test/a.txt")
res18: org.apache.spark.rdd.RDD[String] = /test/a.txt MapPartitionsRDD[9] at textFile at <console>:24
// 使用这个实现wordcount就很简单了,只不过返回值变成了map,不是转换类的rdd了
scala> res18.flatMap(x => x.split(" ")).countByValue()
res20: scala.collection.Map[String,Long] = Map(tom -> 10, hello -> 20, wrold -> 10)

take(n)

返回一个由RDD的前n个元素组成的数组。

scala> val arr1 = Array(1,2,3,4,5,2,3,4,5)
arr1: Array[Int] = Array(1, 2, 3, 4, 5, 2, 3, 4, 5)

scala> sc.parallelize(arr1)
res15: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:25

scala> res15.take(3)
res21: Array[Int] = Array(1, 2, 3)

first()

从RDD中返回第一个元素

scala> res15.first
res22: Int = 1

top(num)

返回最大的num个元素,有排序的,倒序。

scala> res15.top(4)
res23: Array[Int] = Array(5, 5, 4, 4)

takeOrdered(num)(ordering)

按照指定顺序返回前面num个元素,有排序,正序。

scala> val arr2 = Array(6,5,4,3,2,1)
arr2: Array[Int] = Array(6, 5, 4, 3, 2, 1)

scala> sc.parallelize(arr2)
res24: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at <console>:25

scala> res24.top(3)
res25: Array[Int] = Array(6, 5, 4)

scala> res24.take(3)
res26: Array[Int] = Array(6, 5, 4)

scala> res24.takeOrdered(3)
res27: Array[Int] = Array(1, 2, 3)

有排序就意味着数据必须能排序,因为没有排序规则,需要自己指定排序。比如下面的案例:

// 如果想要排序需要在类上加排序规则ordered。或者new 一个比较器ordering
scala> class Student(val name:String, val age:Int)
defined class Student

scala> val s1 = new Student("zs", 14)
s1: Student = Student@60869c90

scala> val s2 = new Student("ls", 24)
s2: Student = Student@25bdc068

scala> val s3 = new Student("ww", 27)
s3: Student = Student@57bfeb0f

scala> val arr = Array(s1, s2, s3)
arr: Array[Student] = Array(Student@60869c90, Student@25bdc068, Student@57bfeb0f)

scala> sc.parallelize(arr)
res28: org.apache.spark.rdd.RDD[Student] = ParallelCollectionRDD[19] at parallelize at <console>:25

scala> res28.top(2)
<console>:24: error: No implicit Ordering defined for Student.
       res28.top(2)

reduce(func)

通过函数func(输入两个参数并返回一个值)聚合数据集中的元素,比如求和。

scala> val arr = Array(1,2,3,4,5,6,7,8,9)
arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> sc.parallelize(arr,3)
res1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25

scala> res1.reduce((a, b) => a + b)
res3: Int = 45

scala> res1.reduce((a, b) => a + b * b)
res4: Int = 4573

scala> res1.reduce(_+_)
res5: Int = 45

fold(zero)(func)

和reduce一样,给定初值,每个分区计算时都会使用此初值。与groupByKey不同的是,除了每个分区有初始化值,整体也会有一个初始化值。

scala> res1.fold(0)(_+_)
res6: Int = 45
// 三个分区,每个分区增加10,最后整体聚合又多了10,所以多了40
scala> res1.fold(10)(_+_)
res7: Int = 85

aggregate(zeroValue)(seqOp,combOp)

和reduce类似,但可以返回类型不同的结果,后面先是分区合并公式,后是整体合并公式。

scala> res1.aggregate(0)(_+_,_+_)
res12: Int = 45

scala> res1.aggregate(0)(_+_,_*_)
res13: Int = 0

scala> res1.aggregate(0)(_+_,_-_)
res14: Int = -45

scala> res1.aggregate(10)(_+_,_+_)
res15: Int = 85

foreach(func)

对每个元素使用func函数。

scala> res1.foreach(println)
// 这个操作每个分区打印自己的数据,在driver端是看不到的。

标准输出通过webUI查看

foreachPartition

每个分区遍历一次,就像之前需要每个元素查询数据库的操作一样。连接代码不能完全提出来,因为提出来就是放到drive段,不到各个节点。需要放在rdd中去操作,所以使用这个方法就能减少连接的次数。

foreachPartition 一般都是没有后续操作的时候使用,因为触发运算,比如插入数据。

mapPartitions 一般都是后续继续使用的时候,比如查询操作。

scala> res1.foreachPartition(println)
返回数据到redis案例:
package com.lmk.spark

import org.apache.spark.{SparkConf, SparkContext}
import redis.clients.jedis.Jedis

/**
 * redis 是一个分布式的key-value数据库
 * set 插入或修改数据
 * get 查询数据
 */
object ForeachPartition2Redis {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("ForeachPartition2Redis")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)

    sc.textFile("data/a.txt")
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_+_)
      .foreachPartition(x => {
        // 每个分区
        val redis = new Jedis("192.168.149.50", 6379)
        redis.auth("****")
        x.foreach(tp => {
          // redis 只支持String类型
          redis.set(tp._1, tp._2.toString)
        })
        redis.close()
      })
  }
}

查询redis返回结果

root@nn1:/home/hadoop# redis-cli -h nn1 -p 6379 -a ***
Warning: Using a password with '-a' or '-u' option on the command line interface may not be safe.
nn1:6379> keys *
1) "word"
2) "tom"
3) "hello"
nn1:6379> get word
"5"
nn1:6379> get tom
"5"
nn1:6379> get hello
"10"
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇