RDD 分为转换算子和行动算子。
RDD 根据数据处理方式的不同,分为 value 类型、双 value 类型、key-value 类型。
1. value 类型 1.1 map函数签名:
def map[U: ClassTag](f: T => U): RDD[U]
函数说明:
Return a new RDD by applying a function to all elements of this RDD.
val data: RDD[Long] = context.range(1, 11) data.map(i => i * 2)1.2 mapPartitions
函数签名:
def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
函数说明:
Return a new RDD by applying a function to each partition of this RDD.
val data: RDD[Long] = context.range(1, 11) data.mapPartitions(iterator => iterator.map(i => i * 2))1.3 mapPartitionsWithIndex
函数签名:
def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
函数说明:
Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.
val data: RDD[Long] = context.range(1, 11) data.mapPartitionsWithIndex((index, iterator) => iterator.map(num => (index, num)))1.4 flatMap
函数签名:
def filter(f: T => Boolean): RDD[T]
函数说明:
Return a new RDD containing only the elements that satisfy a predicate.
val data: RDD[Long] = context.range(1, 11) data.filter(num => num % 2 == 0)1.5 glom
函数签名:
def glom(): RDD[Array[T]]
函数说明:
Return an RDD created by coalescing all elements within each partition into an array.
val data: RDD[Long] = context.range(1, 11) val rddArr: RDD[Array[Long]] = data.glom()1.6 groupBy
函数签名:
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
函数说明:
Return an RDD of grouped items. Each group consists of a key and a sequence of elements mapping to that key. The ordering of elements within each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.
val data: RDD[Long] = context.range(1, 11) val groupByData: RDD[(Boolean, Iterable[Long])] = data.groupBy(num => num % 2 == 0)1.7 filter
函数签名:
def filter(f: T => Boolean): RDD[T]
函数说明:
Return a new RDD containing only the elements that satisfy a predicate.
val data: RDD[Long] = context.range(1, 11) val filterData: RDD[Long] = data.filter(num => num % 2 == 0)1.8 sample
函数签名:
def sample( withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
函数说明:
Return a sampled subset of this RDD.
Params:
withReplacement – can elements be sampled multiple times (replaced when sampled out)
fraction – expected size of the sample as a fraction of this RDD’s size without replacement: probability that each element is chosen; fraction must be [0, 1] with replacement: expected number of times each element is chosen; fraction must be greater than or equal to 0
seed – seed for the random number generator
val data: RDD[Long] = context.range(1, 101) val sampleData: RDD[Long] = data.sample(true, 0.2, 807)1.9 distinct
函数签名:
def distinct(): RDD[T]
函数说明:
Return a new RDD containing the distinct elements in this RDD.
val distinctRdd: RDD[Long] = data.distinct()1.10 coalesce
函数签名:
def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty) (implicit ord: Ordering[T] = null) : RDD[T]
函数说明:
Return a new RDD that is reduced into numPartitions partitions.
val lessPartition: RDD[Long] = data.coalesce(4)1.11 repartition
函数签名:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) }
函数说明:
Return a new RDD that has exactly numPartitions partitions.
val morePartition: RDD[Long] = data.repartition(16)1.12 sortBy
函数签名:
def sortBy[K]( f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length) (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
函数说明:
Return this RDD sorted by the given key function.
val data: RDD[Long] = context.range(1, 101) val sortRDD: RDD[Long] = data.sortBy(num => num, false)2. 双 value 类型 2.1 intersection
函数签名:
def intersection(other: RDD[T]): RDD[T]
函数说明:
Return the intersection of this RDD and another one.
val data1: RDD[Long] = context.range(1, 11) val data2: RDD[Long] = context.range(5, 16) val intersectionRDD: RDD[Long] = data1.intersection(data2)2.2 union
函数签名:
def union(other: RDD[T]): RDD[T]
函数说明:
Return the union of this RDD and another one.
val data1: RDD[Long] = context.range(1, 11) val data2: RDD[Long] = context.range(5, 16) val unionRDD: RDD[Long] = data1.union(data2)2.3 subtract
函数签名:
def subtract(other: RDD[T]): RDD[T]
函数说明:
Return an RDD with the elements from this that are not in other.
val data1: RDD[Long] = context.range(1, 11) val data2: RDD[Long] = context.range(5, 16) val subtractRDD: RDD[Long] = data1.subtract(data2)2.4 zip
函数签名:
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
函数说明:
Zips this RDD with another one, returning key-value pairs with the first element in each RDD, second element in each RDD, etc. Assumes that the two RDDs have the *same number of partitions and the same number of elements in each partition.
val data1: RDD[Long] = context.range(1, 5, 1, 1) val data2: RDD[String] = context.makeRDD(List("zhangsan", "lisi", "wangwu", "zhaoliu"), 1) val zipRDD: RDD[(Long, String)] = data1.zip(data2)3. key-value 类型 3.1 partitionBy
函数签名:
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
函数说明:
Return a copy of the RDD partitioned using the specified partitioner.
val data: RDD[(Int, String)] = context.makeRDD(Array((1, "zhangsan"), (2, "lisi"), (3, "wangwu"), (4, "zhaoliu"))) import org.apache.spark.HashPartitioner val hashPartitionerRDD: RDD[(Int, String)] = data.partitionBy(new HashPartitioner(1))3.2 reduceByKey
函数签名:
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
函数说明:
Merge the values for each key using an associative and commutative reduce function.
val data: RDD[(String, Int)] = context.makeRDD(Array(("zhangsan", 1), ("lisi", 1), ("zhangsan", 1), ("lisi", 1))) val reduceByKeyRDD = data.reduceByKey((value1, value2) => value1 + value2)3.3 groupByKey
函数签名:
def groupByKey(): RDD[(K, Iterable[V])]
函数说明:
Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with the existing partitioner/parallelism level. The ordering of elements within each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.
val data: RDD[(String, Int)] = context.makeRDD(Array(("zhangsan", 1), ("lisi", 1), ("zhangsan", 1), ("lisi", 1))) val groupByKeyRDD: RDD[(String, Iterable[Int])] = data.groupByKey()3.4 aggregateByKey
函数签名:
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
函数说明:
Aggregate the values of each key, using given combine functions and a neutral “zero value”.
val data: RDD[(String, Int)] = context.makeRDD(Array(("zhangsan", 1), ("lisi", 1), ("zhangsan", 1), ("lisi", 1))) val aggregateByKeyRDD: RDD[(String, Int)] = data.aggregateByKey(0)((num1, num2) => num1 + num2, (num1, num2) => num1 + num2)3.5 foldByKey
函数签名:
def foldByKey( zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
函数说明:
Merge the values for each key using an associative function and a neutral “zero value” which may be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
val data: RDD[(String, Int)] = context.makeRDD(Array(("zhangsan", 1), ("lisi", 1), ("zhangsan", 1), ("lisi", 1))) val foldByKeyByKeyRDD: RDD[(String, Int)] = data.foldByKey(3)((num1, num2) => num1 + num2)3.6 sortByKey
函数签名:
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)]
函数说明:
Sort the RDD by key, so that each partition contains a sorted range of the elements.
val data: RDD[(Int, String)] = context.makeRDD(Array((3, "zhangsan"), (2, "lisi"), (4, "wangwu"), (0, "laoliu"), (1, "xiaoqi"))) val sortByKeyRDD: RDD[(Int, String)] = data.sortByKey(false, 1)3.7 join
函数签名:
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
函数说明:
Return an RDD containing all pairs of elements with matching keys in this and other.
val data1: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20), ("wangwu", 21), ("zhaoliu", 20))) val data2: RDD[(String, String)] = context.makeRDD(List(("zhangsan", "male"), ("lisi", "famle"), ("wangwu", "male"), ("laoliu", "male"))) val joinRDD: RDD[(String, (Int, String))] = data1.join(data2)3.8 leftOuterJoin
函数签名:
def leftOuterJoin[W]( other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]
函数说明:
Perform a left outer join of this and other.
val data1: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20), ("wangwu", 21), ("zhaoliu", 20))) val data2: RDD[(String, String)] = context.makeRDD(List(("zhangsan", "male"), ("lisi", "famle"), ("wangwu", "male"), ("laoliu", "male"))) val leftOuterJoinnRDD: RDD[(String, (Int, Option[String]))] = data1.leftOuterJoin(data2)3.9 cogroup
函数签名:
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], partitioner: Partitioner) : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
函数说明:
For each key k in this or other1 or other2 or other3, return a resulting RDD that contains a tuple with the list of values for that key in this, other1, other2 and other3.
val data1: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20), ("wangwu", 21), ("zhaoliu", 20))) val data2: RDD[(String, String)] = context.makeRDD(List(("zhangsan", "male"), ("lisi", "famle"), ("wangwu", "male"), ("laoliu", "male"))) val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[String]))] = data1.cogroup(data2)4. action 算子 4.1 reduce
函数签名:
def reduce(f: (T, T) => T): T
函数说明:
Reduces the elements of this RDD using the specified commutative and associative binary operator.
val data: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20))) val result: (String, Int) = data.reduce((data1, data2) => ("sum", data1._2 + data2._2))4.2 collect
函数签名:
def collect(): Array[T]
函数说明:
Return an array that contains all of the elements in this RDD.
val data: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20))) val result: Array[(String, Int)] = data.collect()4.3 count
函数签名:
def count(): Long
函数说明:
Return the number of elements in the RDD.
val result: Long = data.count()4.4 first
函数签名:
def first(): T
函数说明:
Return the first element in this RDD.
val data: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20))) val result: (String, Int) = data.first()4.5 take
函数签名:
def take(num: Int): Array[T]
函数说明:
Take the first num elements of the RDD. It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.
val data: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20))) val result: Array[(String, Int)] = data.take(1)4.6 aggregate
函数签名:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
函数说明:
Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral “zero value”. This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U’s, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.
Params:
zeroValue – the initial value for the accumulated result of each partition for the seqOp operator, and also the initial value for the combine results from different partitions for the combOp operator - this will typically be the neutral element (e.g. Nil for list concatenation or 0 for summation)
seqOp – an operator used to accumulate results within a partition
combOp – an associative operator used to combine results from different partitions
val data: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20), ("wangwu", 21)), 3) val result: String = data.aggregate("A")((A, data1) => A + data1._1, (str1, str2) => str1 + str2)4.7 fold
函数签名:
def fold(zeroValue: T)(op: (T, T) => T): T
函数说明:
Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral “zero value”. The function op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.
This behaves somewhat differently from fold operations implemented for non-distributed collections in functional languages like Scala. This fold operation may be applied to partitions individually, and then fold those results into the final result, rather than apply the fold to each element sequentially in some defined ordering. For functions that are not commutative, the result may differ from that of a fold applied to a non-distributed collection.
Params:
zeroValue – the initial value for the accumulated result of each partition for the op operator, and also the initial value for the combine results from different partitions for the op operator - this will typically be the neutral element (e.g. Nil for list concatenation or 0 for summation)
op – an operator used to both accumulate results within a partition and combine results from different partitions
val data: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20), ("wangwu", 21)), 3) val result: (String, Int) = data.fold("A", 65)((data1, data2) => ("sum", data1._2 + data2._2))4.8 countByKey
函数签名:
def countByKey(): Map[K, Long]
函数说明:
Count the number of elements for each key, collecting the results to a local Map.
val data: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20), ("wangwu", 21)), 3) val result: collection.Map[String, Long] = data.countByKey()4.9 saveAsTextFile
函数签名:
def saveAsTextFile(path: String): Unit
函数说明:
Save this RDD as a text file, using string representations of elements.
data.saveAsTextFile("./test")4.10 foreach
函数签名:
def foreach(f: T => Unit): Unit
函数说明:
Applies a function f to all elements of this RDD.
val data: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20), ("wangwu", 21)), 3) data.foreach(data1 => println(data1._1))