栏目分类:
子分类:
返回
文库吧用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
文库吧 > IT > 软件开发 > 后端开发 > Python

35 个 Spark 常用算子总结

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

35 个 Spark 常用算子总结

35个 Spark 常用算子总结

RDD 分为转换算子和行动算子。

RDD 根据数据处理方式的不同,分为 value 类型、双 value 类型、key-value 类型。

1. value 类型 1.1 map

函数签名:

def map[U: ClassTag](f: T => U): RDD[U]

函数说明:

Return a new RDD by applying a function to all elements of this RDD.

val data: RDD[Long] = context.range(1, 11)
data.map(i => i * 2)
1.2 mapPartitions

函数签名:

def mapPartitions[U: ClassTag](
    f: Iterator[T] => Iterator[U],
    preservesPartitioning: Boolean = false): RDD[U]

函数说明:

Return a new RDD by applying a function to each partition of this RDD.

val data: RDD[Long] = context.range(1, 11)
data.mapPartitions(iterator => iterator.map(i => i * 2))
1.3 mapPartitionsWithIndex

函数签名:

def mapPartitionsWithIndex[U: ClassTag](
      f: (Int, Iterator[T]) => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U]

函数说明:

Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.

val data: RDD[Long] = context.range(1, 11)
data.mapPartitionsWithIndex((index, iterator) => iterator.map(num => (index, num)))
1.4 flatMap

函数签名:

def filter(f: T => Boolean): RDD[T]

函数说明:

Return a new RDD containing only the elements that satisfy a predicate.

val data: RDD[Long] = context.range(1, 11)
data.filter(num => num % 2 == 0)
1.5 glom

函数签名:

def glom(): RDD[Array[T]]

函数说明:

Return an RDD created by coalescing all elements within each partition into an array.

val data: RDD[Long] = context.range(1, 11)
val rddArr: RDD[Array[Long]] = data.glom()
1.6 groupBy

函数签名:

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

函数说明:

Return an RDD of grouped items. Each group consists of a key and a sequence of elements mapping to that key. The ordering of elements within each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.

val data: RDD[Long] = context.range(1, 11)
val groupByData: RDD[(Boolean, Iterable[Long])] = data.groupBy(num => num % 2 == 0)
1.7 filter

函数签名:

def filter(f: T => Boolean): RDD[T]

函数说明:

Return a new RDD containing only the elements that satisfy a predicate.

val data: RDD[Long] = context.range(1, 11)
val filterData: RDD[Long] = data.filter(num => num % 2 == 0)
1.8 sample

函数签名:

def sample(
      withReplacement: Boolean,
      fraction: Double,
      seed: Long = Utils.random.nextLong): RDD[T]

函数说明:

Return a sampled subset of this RDD.

Params:
withReplacement – can elements be sampled multiple times (replaced when sampled out)
fraction – expected size of the sample as a fraction of this RDD’s size without replacement: probability that each element is chosen; fraction must be [0, 1] with replacement: expected number of times each element is chosen; fraction must be greater than or equal to 0
seed – seed for the random number generator

val data: RDD[Long] = context.range(1, 101)
val sampleData: RDD[Long] = data.sample(true, 0.2, 807)
1.9 distinct

函数签名:

def distinct(): RDD[T]

函数说明:

Return a new RDD containing the distinct elements in this RDD.

val distinctRdd: RDD[Long] = data.distinct()
1.10 coalesce

函数签名:

def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null)
      : RDD[T]

函数说明:

Return a new RDD that is reduced into numPartitions partitions.

val lessPartition: RDD[Long] = data.coalesce(4)
1.11 repartition

函数签名:

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }

函数说明:

Return a new RDD that has exactly numPartitions partitions.

val morePartition: RDD[Long] = data.repartition(16)
1.12 sortBy

函数签名:

def sortBy[K](
      f: (T) => K,
      ascending: Boolean = true,
      numPartitions: Int = this.partitions.length)
      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

函数说明:

Return this RDD sorted by the given key function.

val data: RDD[Long] = context.range(1, 101)
val sortRDD: RDD[Long] = data.sortBy(num => num, false)
2. 双 value 类型 2.1 intersection

函数签名:

def intersection(other: RDD[T]): RDD[T]

函数说明:

Return the intersection of this RDD and another one.

val data1: RDD[Long] = context.range(1, 11)
val data2: RDD[Long] = context.range(5, 16)
val intersectionRDD: RDD[Long] = data1.intersection(data2)
2.2 union

函数签名:

def union(other: RDD[T]): RDD[T]

函数说明:

Return the union of this RDD and another one.

val data1: RDD[Long] = context.range(1, 11)
val data2: RDD[Long] = context.range(5, 16)
val unionRDD: RDD[Long] = data1.union(data2)
2.3 subtract

函数签名:

def subtract(other: RDD[T]): RDD[T]

函数说明:

Return an RDD with the elements from this that are not in other.

val data1: RDD[Long] = context.range(1, 11)
val data2: RDD[Long] = context.range(5, 16)
val subtractRDD: RDD[Long] = data1.subtract(data2)
2.4 zip

函数签名:

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

函数说明:

Zips this RDD with another one, returning key-value pairs with the first element in each RDD, second element in each RDD, etc. Assumes that the two RDDs have the *same number of partitions and the same number of elements in each partition.

val data1: RDD[Long] = context.range(1, 5, 1, 1)
val data2: RDD[String] = context.makeRDD(List("zhangsan", "lisi", "wangwu", "zhaoliu"), 1)
val zipRDD: RDD[(Long, String)] = data1.zip(data2)
3. key-value 类型 3.1 partitionBy

函数签名:

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

函数说明:

Return a copy of the RDD partitioned using the specified partitioner.

val data: RDD[(Int, String)] = context.makeRDD(Array((1, "zhangsan"), (2, "lisi"), (3, "wangwu"), (4, "zhaoliu")))
import org.apache.spark.HashPartitioner
val hashPartitionerRDD: RDD[(Int, String)] = data.partitionBy(new HashPartitioner(1))
3.2 reduceByKey

函数签名:

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

函数说明:

Merge the values for each key using an associative and commutative reduce function.

val data: RDD[(String, Int)] = context.makeRDD(Array(("zhangsan", 1), ("lisi", 1), ("zhangsan", 1), ("lisi", 1)))
val reduceByKeyRDD = data.reduceByKey((value1, value2) => value1 + value2)
3.3 groupByKey

函数签名:

def groupByKey(): RDD[(K, Iterable[V])]

函数说明:

Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with the existing partitioner/parallelism level. The ordering of elements within each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.

val data: RDD[(String, Int)] = context.makeRDD(Array(("zhangsan", 1), ("lisi", 1), ("zhangsan", 1), ("lisi", 1)))
val groupByKeyRDD: RDD[(String, Iterable[Int])] = data.groupByKey()
3.4 aggregateByKey

函数签名:

def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)]

函数说明:

Aggregate the values of each key, using given combine functions and a neutral “zero value”.

val data: RDD[(String, Int)] = context.makeRDD(Array(("zhangsan", 1), ("lisi", 1), ("zhangsan", 1), ("lisi", 1)))
val aggregateByKeyRDD: RDD[(String, Int)] = data.aggregateByKey(0)((num1, num2) => num1 + num2, (num1, num2) => num1 + num2)
3.5 foldByKey

函数签名:

def foldByKey(
      zeroValue: V,
      partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

函数说明:

Merge the values for each key using an associative function and a neutral “zero value” which may be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).

val data: RDD[(String, Int)] = context.makeRDD(Array(("zhangsan", 1), ("lisi", 1), ("zhangsan", 1), ("lisi", 1)))
val foldByKeyByKeyRDD: RDD[(String, Int)] = data.foldByKey(3)((num1, num2) => num1 + num2)
3.6 sortByKey

函数签名:

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
      : RDD[(K, V)]

函数说明:

Sort the RDD by key, so that each partition contains a sorted range of the elements.

val data: RDD[(Int, String)] = context.makeRDD(Array((3, "zhangsan"), (2, "lisi"), (4, "wangwu"), (0, "laoliu"), (1, "xiaoqi")))
val sortByKeyRDD: RDD[(Int, String)] = data.sortByKey(false, 1)
3.7 join

函数签名:

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

函数说明:

Return an RDD containing all pairs of elements with matching keys in this and other.

val data1: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20), ("wangwu", 21), ("zhaoliu", 20)))
val data2: RDD[(String, String)] = context.makeRDD(List(("zhangsan", "male"), ("lisi", "famle"), ("wangwu", "male"), ("laoliu", "male")))
val joinRDD: RDD[(String, (Int, String))] = data1.join(data2)
3.8 leftOuterJoin

函数签名:

def leftOuterJoin[W](
      other: RDD[(K, W)],
      partitioner: Partitioner): RDD[(K, (V, Option[W]))]

函数说明:
Perform a left outer join of this and other.

val data1: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20), ("wangwu", 21), ("zhaoliu", 20)))
val data2: RDD[(String, String)] = context.makeRDD(List(("zhangsan", "male"), ("lisi", "famle"), ("wangwu", "male"), ("laoliu", "male")))
val leftOuterJoinnRDD: RDD[(String, (Int, Option[String]))] = data1.leftOuterJoin(data2)
3.9 cogroup

函数签名:

def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
      other2: RDD[(K, W2)],
      other3: RDD[(K, W3)],
      partitioner: Partitioner)
      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]

函数说明:

For each key k in this or other1 or other2 or other3, return a resulting RDD that contains a tuple with the list of values for that key in this, other1, other2 and other3.

val data1: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20), ("wangwu", 21), ("zhaoliu", 20)))
val data2: RDD[(String, String)] = context.makeRDD(List(("zhangsan", "male"), ("lisi", "famle"), ("wangwu", "male"), ("laoliu", "male")))
val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[String]))] = data1.cogroup(data2)
4. action 算子 4.1 reduce

函数签名:

def reduce(f: (T, T) => T): T

函数说明:

Reduces the elements of this RDD using the specified commutative and associative binary operator.

val data: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20)))
val result: (String, Int) = data.reduce((data1, data2) => ("sum", data1._2 + data2._2))
4.2 collect

函数签名:

def collect(): Array[T]

函数说明:

Return an array that contains all of the elements in this RDD.

val data: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20)))
val result: Array[(String, Int)] = data.collect()
4.3 count

函数签名:

def count(): Long

函数说明:

Return the number of elements in the RDD.

val result: Long = data.count()
4.4 first

函数签名:

def first(): T

函数说明:

Return the first element in this RDD.

val data: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20)))
val result: (String, Int) = data.first()
4.5 take

函数签名:

def take(num: Int): Array[T]

函数说明:

Take the first num elements of the RDD. It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.

val data: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20)))
val result: Array[(String, Int)] = data.take(1)
4.6 aggregate

函数签名:

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

函数说明:

Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral “zero value”. This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U’s, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.
Params:
zeroValue – the initial value for the accumulated result of each partition for the seqOp operator, and also the initial value for the combine results from different partitions for the combOp operator - this will typically be the neutral element (e.g. Nil for list concatenation or 0 for summation)
seqOp – an operator used to accumulate results within a partition
combOp – an associative operator used to combine results from different partitions

val data: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20), ("wangwu", 21)), 3)
val result: String = data.aggregate("A")((A, data1) => A + data1._1, (str1, str2) => str1 + str2)
4.7 fold

函数签名:

def fold(zeroValue: T)(op: (T, T) => T): T

函数说明:

Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral “zero value”. The function op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.
This behaves somewhat differently from fold operations implemented for non-distributed collections in functional languages like Scala. This fold operation may be applied to partitions individually, and then fold those results into the final result, rather than apply the fold to each element sequentially in some defined ordering. For functions that are not commutative, the result may differ from that of a fold applied to a non-distributed collection.
Params:
zeroValue – the initial value for the accumulated result of each partition for the op operator, and also the initial value for the combine results from different partitions for the op operator - this will typically be the neutral element (e.g. Nil for list concatenation or 0 for summation)
op – an operator used to both accumulate results within a partition and combine results from different partitions

val data: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20), ("wangwu", 21)), 3)
val result: (String, Int) = data.fold("A", 65)((data1, data2) => ("sum", data1._2 + data2._2))
4.8 countByKey

函数签名:

def countByKey(): Map[K, Long]

函数说明:

Count the number of elements for each key, collecting the results to a local Map.

val data: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20), ("wangwu", 21)), 3)
val result: collection.Map[String, Long] = data.countByKey()
4.9 saveAsTextFile

函数签名:

def saveAsTextFile(path: String): Unit

函数说明:

Save this RDD as a text file, using string representations of elements.

data.saveAsTextFile("./test")
4.10 foreach

函数签名:

def foreach(f: T => Unit): Unit

函数说明:

Applies a function f to all elements of this RDD.

val data: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20), ("wangwu", 21)), 3)
data.foreach(data1 => println(data1._1))
转载请注明:文章转载自 www.wk8.com.cn
本文地址:https://www.wk8.com.cn/it/1038745.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 wk8.com.cn

ICP备案号:晋ICP备2021003244-6号