# 统计各部门薪资总和,按总和的降序排序
# (dept_num,salary) --> 分组求和reduceByKey --> 排序 sortBy
from pyspark import SparkContext
sc = SparkContext()
rdd = sc.textFile("/home/ubuntu/emp.csv")
rdd.take(3)
['emp_num,emp_name,role,leader_num,birth_date,salary,bonus,dept_num',
'7369,SMITH,CLERK,7902,1980/12/17,800,,20',
'7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30']
rdd = rdd.filter(lambda x:x[-8:] != "dept_num" )
rdd.take(3)
['7369,SMITH,CLERK,7902,1980/12/17,800,,20',
'7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30',
'7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30']
def func1(x):
t = x.split(",")
return (t[-1],int(t[-3]))
rdd1 = rdd.map(func1)
rdd1.collect()
[('20', 800),
('30', 1600),
('30', 1250),
('20', 2975),
('30', 1250),
('30', 2850),
('10', 2450),
('20', 3000),
('10', 5000),
('30', 1500),
('20', 1100),
('30', 950),
('20', 3000),
('10', 1300)]
rdd2 = rdd1.reduceByKey(lambda x,y:x+y)
rdd2.collect()
[('20', 10875), ('10', 8750), ('30', 9400)]
rdd2.sortBy(lambda x:x[1],ascending=False).collect()
[('20', 10875), ('30', 9400), ('10', 8750)]
rdd.take(3)
['7369,SMITH,CLERK,7902,1980/12/17,800,,20',
'7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30',
'7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30']
rdd.flatMap(lambda x:x.split(",")).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).sortBy(lambda x:x[1],ascending=False).collect()
[('', 11),
('7698', 6),
('30', 6),
('20', 5),
('SALESMAN', 4),
('7839', 4),
('CLERK', 4),
('10', 3),
('7566', 3),
('MANAGER', 3),
('1250', 2),
('7782', 2),
('7788', 2),
('7902', 2),
('ANALYST', 2),
('3000', 2),
('1981/12/3', 2),
('7369', 1),
('SMITH', 1),
('1980/12/17', 1),
('7499', 1),
('ALLEN', 1),
('1981/2/20', 1),
('1600', 1),
('7521', 1),
('1981/2/22', 1),
('JONES', 1),
('1981/4/2', 1),
('2975', 1),
('MARTIN', 1),
('1981/9/28', 1),
('1400', 1),
('CLARK', 1),
('1981/11/17', 1),
('0', 1),
('1987/5/23', 1),
('1100', 1),
('7900', 1),
('JAMES', 1),
('950', 1),
('1982/1/23', 1),
('800', 1),
('300', 1),
('WARD', 1),
('500', 1),
('7654', 1),
('BLAKE', 1),
('1981/5/1', 1),
('2850', 1),
('1981/6/9', 1),
('2450', 1),
('SCOTT', 1),
('1987/4/19', 1),
('KING', 1),
('PRESIDENT', 1),
('5000', 1),
('7844', 1),
('TURNER', 1),
('1981/9/8', 1),
('1500', 1),
('7876', 1),
('ADAMS', 1),
('FORD', 1),
('7934', 1),
('MILLER', 1),
('1300', 1)]
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv("/home/ubuntu/emp.csv",header=True)
df.show()
+-------+--------+---------+----------+----------+------+-----+--------+
|emp_num|emp_name| role|leader_num|birth_date|salary|bonus|dept_num|
+-------+--------+---------+----------+----------+------+-----+--------+
| 7369| SMITH| CLERK| 7902|1980/12/17| 800| null| 20|
| 7499| ALLEN| SALESMAN| 7698| 1981/2/20| 1600| 300| 30|
| 7521| WARD| SALESMAN| 7698| 1981/2/22| 1250| 500| 30|
| 7566| JONES| MANAGER| 7839| 1981/4/2| 2975| null| 20|
| 7654| MARTIN| SALESMAN| 7698| 1981/9/28| 1250| 1400| 30|
| 7698| BLAKE| MANAGER| 7839| 1981/5/1| 2850| null| 30|
| 7782| CLARK| MANAGER| 7839| 1981/6/9| 2450| null| 10|
| 7788| SCOTT| ANALYST| 7566| 1987/4/19| 3000| null| 20|
| 7839| KING|PRESIDENT| null|1981/11/17| 5000| null| 10|
| 7844| TURNER| SALESMAN| 7698| 1981/9/8| 1500| 0| 30|
| 7876| ADAMS| CLERK| 7788| 1987/5/23| 1100| null| 20|
| 7900| JAMES| CLERK| 7698| 1981/12/3| 950| null| 30|
| 7902| FORD| ANALYST| 7566| 1981/12/3| 3000| null| 20|
| 7934| MILLER| CLERK| 7782| 1982/1/23| 1300| null| 10|
+-------+--------+---------+----------+----------+------+-----+--------+
df1 = df.selectExpr("dept_num","cast(salary as int) salary")
import pyspark.sql.functions as F
# SQL DSL sum count mean max min
df1.groupBy("dept_num").sum().orderBy(F.col("sum(salary)").desc()).show()
+--------+-----------+
|dept_num|sum(salary)|
+--------+-----------+
| 20| 10875|
| 30| 9400|
| 10| 8750|
+--------+-----------+
df1.registerTempTable("emp")
spark.sql("select dept_num,sum(salary) as salary from emp group by dept_num order by salary desc").show()
+--------+------+
|dept_num|salary|
+--------+------+
| 20| 10875|
| 30| 9400|
| 10| 8750|
+--------+------+
rdd.collect()
['7369,SMITH,CLERK,7902,1980/12/17,800,,20',
'7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30',
'7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30',
'7566,JONES,MANAGER,7839,1981/4/2,2975,,20',
'7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30',
'7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30',
'7782,CLARK,MANAGER,7839,1981/6/9,2450,,10',
'7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20',
'7839,KING,PRESIDENT,,1981/11/17,5000,,10',
'7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30',
'7876,ADAMS,CLERK,7788,1987/5/23,1100,,20',
'7900,JAMES,CLERK,7698,1981/12/3,950,,30',
'7902,FORD,ANALYST,7566,1981/12/3,3000,,20',
'7934,MILLER,CLERK,7782,1982/1/23,1300,,10']
def func1(x):
t = x.split(",")
return t[-1],int(t[-3])
rdd1 = rdd.map(func1)
rdd1.collect()
[('20', 800),
('30', 1600),
('30', 1250),
('20', 2975),
('30', 1250),
('30', 2850),
('10', 2450),
('20', 3000),
('10', 5000),
('30', 1500),
('20', 1100),
('30', 950),
('20', 3000),
('10', 1300)]
df = rdd1.toDF(["dept_num","salary"])
df.show()
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-12-d1520c28608d> in <module>
----> 1 df = rdd1.toDF(["dept_num","salary"])
2 df.show()
AttributeError: 'PipelinedRDD' object has no attribute 'toDF'
help(rdd1)
Help on PipelinedRDD in module pyspark.rdd object:
class PipelinedRDD(RDD)
| Pipelined maps:
|
| >>> rdd = sc.parallelize([1, 2, 3, 4])
| >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect()
| [4, 8, 12, 16]
| >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect()
| [4, 8, 12, 16]
|
| Pipelined reduces:
| >>> from operator import add
| >>> rdd.map(lambda x: 2 * x).reduce(add)
| 20
| >>> rdd.flatMap(lambda x: [x, x]).reduce(add)
| 20
|
| Method resolution order:
| PipelinedRDD
| RDD
| builtins.object
|
| Methods defined here:
|
| __init__(self, prev, func, preservesPartitioning=False)
| Initialize self. See help(type(self)) for accurate signature.
|
| getNumPartitions(self)
| Returns the number of partitions in RDD
|
| >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
| >>> rdd.getNumPartitions()
| 2
|
| id(self)
| A unique ID for this RDD (within its SparkContext).
|
| ----------------------------------------------------------------------
| Methods inherited from RDD:
|
| __add__(self, other)
| Return the union of this RDD and another one.
|
| >>> rdd = sc.parallelize([1, 1, 2, 3])
| >>> (rdd + rdd).collect()
| [1, 1, 2, 3, 1, 1, 2, 3]
|
| __getnewargs__(self)
|
| __repr__(self)
| Return repr(self).
|
| aggregate(self, zeroValue, seqOp, combOp)
| Aggregate the elements of each partition, and then the results for all
| the partitions, using a given combine functions and a neutral "zero
| value."
|
| The functions C{op(t1, t2)} is allowed to modify C{t1} and return it
| as its result value to avoid object allocation; however, it should not
| modify C{t2}.
|
| The first function (seqOp) can return a different result type, U, than
| the type of this RDD. Thus, we need one operation for merging a T into
| an U and one operation for merging two U
|
| >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
| >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
| >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
| (10, 4)
| >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)
| (0, 0)
|
| aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None, partitionFunc=<function portable_hash at 0x7fc37857d730>)
| Aggregate the values of each key, using given combine functions and a neutral
| "zero value". This function can return a different result type, U, than the type
| of the values in this RDD, V. Thus, we need one operation for merging a V into
| a U and one operation for merging two U's, The former operation is used for merging
| values within a partition, and the latter is used for merging values between
| partitions. To avoid memory allocation, both of these functions are
| allowed to modify and return their first argument instead of creating a new U.
|
| cache(self)
| Persist this RDD with the default storage level (C{MEMORY_ONLY}).
|
| cartesian(self, other)
| Return the Cartesian product of this RDD and another one, that is, the
| RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and
| C{b} is in C{other}.
|
| >>> rdd = sc.parallelize([1, 2])
| >>> sorted(rdd.cartesian(rdd).collect())
| [(1, 1), (1, 2), (2, 1), (2, 2)]
|
| checkpoint(self)
| Mark this RDD for checkpointing. It will be saved to a file inside the
| checkpoint directory set with L{SparkContext.setCheckpointDir()} and
| all references to its parent RDDs will be removed. This function must
| be called before any job has been executed on this RDD. It is strongly
| recommended that this RDD is persisted in memory, otherwise saving it
| on a file will require recomputation.
|
| coalesce(self, numPartitions, shuffle=False)
| Return a new RDD that is reduced into `numPartitions` partitions.
|
| >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
| [[1], [2, 3], [4, 5]]
| >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
| [[1, 2, 3, 4, 5]]
|
| cogroup(self, other, numPartitions=None)
| For each key k in C{self} or C{other}, return a resulting RDD that
| contains a tuple with the list of values for that key in C{self} as
| well as C{other}.
|
| >>> x = sc.parallelize([("a", 1), ("b", 4)])
| >>> y = sc.parallelize([("a", 2)])
| >>> [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
| [('a', ([1], [2])), ('b', ([4], []))]
|
| collect(self)
| Return a list that contains all of the elements in this RDD.
|
| .. note:: This method should only be used if the resulting array is expected
| to be small, as all the data is loaded into the driver's memory.
|
| collectAsMap(self)
| Return the key-value pairs in this RDD to the master as a dictionary.
|
| .. note:: this method should only be used if the resulting data is expected
| to be small, as all the data is loaded into the driver's memory.
|
| >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
| >>> m[1]
| 2
| >>> m[3]
| 4
|
| combineByKey(self, createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=<function portable_hash at 0x7fc37857d730>)
| Generic function to combine the elements for each key using a custom
| set of aggregation functions.
|
| Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined
| type" C.
|
| Users provide three functions:
|
| - C{createCombiner}, which turns a V into a C (e.g., creates
| a one-element list)
| - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of
| a list)
| - C{mergeCombiners}, to combine two C's into a single one.
|
| In addition, users can control the partitioning of the output RDD.
|
| .. note:: V and C can be different -- for example, one might group an RDD of type
| (Int, Int) into an RDD of type (Int, List[Int]).
|
| >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
| >>> def add(a, b): return a + str(b)
| >>> sorted(x.combineByKey(str, add, add).collect())
| [('a', '11'), ('b', '1')]
|
| count(self)
| Return the number of elements in this RDD.
|
| >>> sc.parallelize([2, 3, 4]).count()
| 3
|
| countApprox(self, timeout, confidence=0.95)
| .. note:: Experimental
|
| Approximate version of count() that returns a potentially incomplete
| result within a timeout, even if not all tasks have finished.
|
| >>> rdd = sc.parallelize(range(1000), 10)
| >>> rdd.countApprox(1000, 1.0)
| 1000
|
| countApproxDistinct(self, relativeSD=0.05)
| .. note:: Experimental
|
| Return approximate number of distinct elements in the RDD.
|
| The algorithm used is based on streamlib's implementation of
| `"HyperLogLog in Practice: Algorithmic Engineering of a State
| of The Art Cardinality Estimation Algorithm", available here
| <http://dx.doi.org/10.1145/2452376.2452456>`_.
|
| :param relativeSD: Relative accuracy. Smaller values create
| counters that require more space.
| It must be greater than 0.000017.
|
| >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct()
| >>> 900 < n < 1100
| True
| >>> n = sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct()
| >>> 16 < n < 24
| True
|
| countByKey(self)
| Count the number of elements for each key, and return the result to the
| master as a dictionary.
|
| >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
| >>> sorted(rdd.countByKey().items())
| [('a', 2), ('b', 1)]
|
| countByValue(self)
| Return the count of each unique value in this RDD as a dictionary of
| (value, count) pairs.
|
| >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
| [(1, 2), (2, 3)]
|
| distinct(self, numPartitions=None)
| Return a new RDD containing the distinct elements in this RDD.
|
| >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
| [1, 2, 3]
|
| filter(self, f)
| Return a new RDD containing only the elements that satisfy a predicate.
|
| >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
| >>> rdd.filter(lambda x: x % 2 == 0).collect()
| [2, 4]
|
| first(self)
| Return the first element in this RDD.
|
| >>> sc.parallelize([2, 3, 4]).first()
| 2
| >>> sc.parallelize([]).first()
| Traceback (most recent call last):
| ...
| ValueError: RDD is empty
|
| flatMap(self, f, preservesPartitioning=False)
| Return a new RDD by first applying a function to all elements of this
| RDD, and then flattening the results.
|
| >>> rdd = sc.parallelize([2, 3, 4])
| >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
| [1, 1, 1, 2, 2, 3]
| >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
| [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
|
| flatMapValues(self, f)
| Pass each value in the key-value pair RDD through a flatMap function
| without changing the keys; this also retains the original RDD's
| partitioning.
|
| >>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
| >>> def f(x): return x
| >>> x.flatMapValues(f).collect()
| [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
|
| fold(self, zeroValue, op)
| Aggregate the elements of each partition, and then the results for all
| the partitions, using a given associative function and a neutral "zero value."
|
| The function C{op(t1, t2)} is allowed to modify C{t1} and return it
| as its result value to avoid object allocation; however, it should not
| modify C{t2}.
|
| This behaves somewhat differently from fold operations implemented
| for non-distributed collections in functional languages like Scala.
| This fold operation may be applied to partitions individually, and then
| fold those results into the final result, rather than apply the fold
| to each element sequentially in some defined ordering. For functions
| that are not commutative, the result may differ from that of a fold
| applied to a non-distributed collection.
|
| >>> from operator import add
| >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
| 15
|
| foldByKey(self, zeroValue, func, numPartitions=None, partitionFunc=<function portable_hash at 0x7fc37857d730>)
| Merge the values for each key using an associative function "func"
| and a neutral "zeroValue" which may be added to the result an
| arbitrary number of times, and must not change the result
| (e.g., 0 for addition, or 1 for multiplication.).
|
| >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
| >>> from operator import add
| >>> sorted(rdd.foldByKey(0, add).collect())
| [('a', 2), ('b', 1)]
|
| foreach(self, f)
| Applies a function to all elements of this RDD.
|
| >>> def f(x): print(x)
| >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
|
| foreachPartition(self, f)
| Applies a function to each partition of this RDD.
|
| >>> def f(iterator):
| ... for x in iterator:
| ... print(x)
| >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
|
| fullOuterJoin(self, other, numPartitions=None)
| Perform a right outer join of C{self} and C{other}.
|
| For each element (k, v) in C{self}, the resulting RDD will either
| contain all pairs (k, (v, w)) for w in C{other}, or the pair
| (k, (v, None)) if no elements in C{other} have key k.
|
| Similarly, for each element (k, w) in C{other}, the resulting RDD will
| either contain all pairs (k, (v, w)) for v in C{self}, or the pair
| (k, (None, w)) if no elements in C{self} have key k.
|
| Hash-partitions the resulting RDD into the given number of partitions.
|
| >>> x = sc.parallelize([("a", 1), ("b", 4)])
| >>> y = sc.parallelize([("a", 2), ("c", 8)])
| >>> sorted(x.fullOuterJoin(y).collect())
| [('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]
|
| getCheckpointFile(self)
| Gets the name of the file to which this RDD was checkpointed
|
| Not defined if RDD is checkpointed locally.
|
| getStorageLevel(self)
| Get the RDD's current storage level.
|
| >>> rdd1 = sc.parallelize([1,2])
| >>> rdd1.getStorageLevel()
| StorageLevel(False, False, False, False, 1)
| >>> print(rdd1.getStorageLevel())
| Serialized 1x Replicated
|
| glom(self)
| Return an RDD created by coalescing all elements within each partition
| into a list.
|
| >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
| >>> sorted(rdd.glom().collect())
| [[1, 2], [3, 4]]
|
| groupBy(self, f, numPartitions=None, partitionFunc=<function portable_hash at 0x7fc37857d730>)
| Return an RDD of grouped items.
|
| >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
| >>> result = rdd.groupBy(lambda x: x % 2).collect()
| >>> sorted([(x, sorted(y)) for (x, y) in result])
| [(0, [2, 8]), (1, [1, 1, 3, 5])]
|
| groupByKey(self, numPartitions=None, partitionFunc=<function portable_hash at 0x7fc37857d730>)
| Group the values for each key in the RDD into a single sequence.
| Hash-partitions the resulting RDD with numPartitions partitions.
|
| .. note:: If you are grouping in order to perform an aggregation (such as a
| sum or average) over each key, using reduceByKey or aggregateByKey will
| provide much better performance.
|
| >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
| >>> sorted(rdd.groupByKey().mapValues(len).collect())
| [('a', 2), ('b', 1)]
| >>> sorted(rdd.groupByKey().mapValues(list).collect())
| [('a', [1, 1]), ('b', [1])]
|
| groupWith(self, other, *others)
| Alias for cogroup but with support for multiple RDDs.
|
| >>> w = sc.parallelize([("a", 5), ("b", 6)])
| >>> x = sc.parallelize([("a", 1), ("b", 4)])
| >>> y = sc.parallelize([("a", 2)])
| >>> z = sc.parallelize([("b", 42)])
| >>> [(x, tuple(map(list, y))) for x, y in sorted(list(w.groupWith(x, y, z).collect()))]
| [('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))]
|
| histogram(self, buckets)
| Compute a histogram using the provided buckets. The buckets
| are all open to the right except for the last which is closed.
| e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50],
| which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1
| and 50 we would have a histogram of 1,0,1.
|
| If your histogram is evenly spaced (e.g. [0, 10, 20, 30]),
| this can be switched from an O(log n) inseration to O(1) per
| element (where n is the number of buckets).
|
| Buckets must be sorted, not contain any duplicates, and have
| at least two elements.
|
| If `buckets` is a number, it will generate buckets which are
| evenly spaced between the minimum and maximum of the RDD. For
| example, if the min value is 0 and the max is 100, given `buckets`
| as 2, the resulting buckets will be [0,50) [50,100]. `buckets` must
| be at least 1. An exception is raised if the RDD contains infinity.
| If the elements in the RDD do not vary (max == min), a single bucket
| will be used.
|
| The return value is a tuple of buckets and histogram.
|
| >>> rdd = sc.parallelize(range(51))
| >>> rdd.histogram(2)
| ([0, 25, 50], [25, 26])
| >>> rdd.histogram([0, 5, 25, 50])
| ([0, 5, 25, 50], [5, 20, 26])
| >>> rdd.histogram([0, 15, 30, 45, 60]) # evenly spaced buckets
| ([0, 15, 30, 45, 60], [15, 15, 15, 6])
| >>> rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"])
| >>> rdd.histogram(("a", "b", "c"))
| (('a', 'b', 'c'), [2, 2])
|
| intersection(self, other)
| Return the intersection of this RDD and another one. The output will
| not contain any duplicate elements, even if the input RDDs did.
|
| .. note:: This method performs a shuffle internally.
|
| >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
| >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
| >>> rdd1.intersection(rdd2).collect()
| [1, 2, 3]
|
| isCheckpointed(self)
| Return whether this RDD is checkpointed and materialized, either reliably or locally.
|
| isEmpty(self)
| Returns true if and only if the RDD contains no elements at all.
|
| .. note:: an RDD may be empty even when it has at least 1 partition.
|
| >>> sc.parallelize([]).isEmpty()
| True
| >>> sc.parallelize([1]).isEmpty()
| False
|
| isLocallyCheckpointed(self)
| Return whether this RDD is marked for local checkpointing.
|
| Exposed for testing.
|
| join(self, other, numPartitions=None)
| Return an RDD containing all pairs of elements with matching keys in
| C{self} and C{other}.
|
| Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
| (k, v1) is in C{self} and (k, v2) is in C{other}.
|
| Performs a hash join across the cluster.
|
| >>> x = sc.parallelize([("a", 1), ("b", 4)])
| >>> y = sc.parallelize([("a", 2), ("a", 3)])
| >>> sorted(x.join(y).collect())
| [('a', (1, 2)), ('a', (1, 3))]
|
| keyBy(self, f)
| Creates tuples of the elements in this RDD by applying C{f}.
|
| >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
| >>> y = sc.parallelize(zip(range(0,5), range(0,5)))
| >>> [(x, list(map(list, y))) for x, y in sorted(x.cogroup(y).collect())]
| [(0, [[0], [0]]), (1, [[1], [1]]), (2, [[], [2]]), (3, [[], [3]]), (4, [[2], [4]])]
|
| keys(self)
| Return an RDD with the keys of each tuple.
|
| >>> m = sc.parallelize([(1, 2), (3, 4)]).keys()
| >>> m.collect()
| [1, 3]
|
| leftOuterJoin(self, other, numPartitions=None)
| Perform a left outer join of C{self} and C{other}.
|
| For each element (k, v) in C{self}, the resulting RDD will either
| contain all pairs (k, (v, w)) for w in C{other}, or the pair
| (k, (v, None)) if no elements in C{other} have key k.
|
| Hash-partitions the resulting RDD into the given number of partitions.
|
| >>> x = sc.parallelize([("a", 1), ("b", 4)])
| >>> y = sc.parallelize([("a", 2)])
| >>> sorted(x.leftOuterJoin(y).collect())
| [('a', (1, 2)), ('b', (4, None))]
|
| localCheckpoint(self)
| Mark this RDD for local checkpointing using Spark's existing caching layer.
|
| This method is for users who wish to truncate RDD lineages while skipping the expensive
| step of replicating the materialized data in a reliable distributed file system. This is
| useful for RDDs with long lineages that need to be truncated periodically (e.g. GraphX).
|
| Local checkpointing sacrifices fault-tolerance for performance. In particular, checkpointed
| data is written to ephemeral local storage in the executors instead of to a reliable,
| fault-tolerant storage. The effect is that if an executor fails during the computation,
| the checkpointed data may no longer be accessible, causing an irrecoverable job failure.
|
| This is NOT safe to use with dynamic allocation, which removes executors along
| with their cached blocks. If you must use both features, you are advised to set
| L{spark.dynamicAllocation.cachedExecutorIdleTimeout} to a high value.
|
| The checkpoint directory set through L{SparkContext.setCheckpointDir()} is not used.
|
| lookup(self, key)
| Return the list of values in the RDD for key `key`. This operation
| is done efficiently if the RDD has a known partitioner by only
| searching the partition that the key maps to.
|
| >>> l = range(1000)
| >>> rdd = sc.parallelize(zip(l, l), 10)
| >>> rdd.lookup(42) # slow
| [42]
| >>> sorted = rdd.sortByKey()
| >>> sorted.lookup(42) # fast
| [42]
| >>> sorted.lookup(1024)
| []
| >>> rdd2 = sc.parallelize([(('a', 'b'), 'c')]).groupByKey()
| >>> list(rdd2.lookup(('a', 'b'))[0])
| ['c']
|
| map(self, f, preservesPartitioning=False)
| Return a new RDD by applying a function to each element of this RDD.
|
| >>> rdd = sc.parallelize(["b", "a", "c"])
| >>> sorted(rdd.map(lambda x: (x, 1)).collect())
| [('a', 1), ('b', 1), ('c', 1)]
|
| mapPartitions(self, f, preservesPartitioning=False)
| Return a new RDD by applying a function to each partition of this RDD.
|
| >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
| >>> def f(iterator): yield sum(iterator)
| >>> rdd.mapPartitions(f).collect()
| [3, 7]
|
| mapPartitionsWithIndex(self, f, preservesPartitioning=False)
| Return a new RDD by applying a function to each partition of this RDD,
| while tracking the index of the original partition.
|
| >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
| >>> def f(splitIndex, iterator): yield splitIndex
| >>> rdd.mapPartitionsWithIndex(f).sum()
| 6
|
| mapPartitionsWithSplit(self, f, preservesPartitioning=False)
| Deprecated: use mapPartitionsWithIndex instead.
|
| Return a new RDD by applying a function to each partition of this RDD,
| while tracking the index of the original partition.
|
| >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
| >>> def f(splitIndex, iterator): yield splitIndex
| >>> rdd.mapPartitionsWithSplit(f).sum()
| 6
|
| mapValues(self, f)
| Pass each value in the key-value pair RDD through a map function
| without changing the keys; this also retains the original RDD's
| partitioning.
|
| >>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
| >>> def f(x): return len(x)
| >>> x.mapValues(f).collect()
| [('a', 3), ('b', 1)]
|
| max(self, key=None)
| Find the maximum item in this RDD.
|
| :param key: A function used to generate key for comparing
|
| >>> rdd = sc.parallelize([1.0, 5.0, 43.0, 10.0])
| >>> rdd.max()
| 43.0
| >>> rdd.max(key=str)
| 5.0
|
| mean(self)
| Compute the mean of this RDD's elements.
|
| >>> sc.parallelize([1, 2, 3]).mean()
| 2.0
|
| meanApprox(self, timeout, confidence=0.95)
| .. note:: Experimental
|
| Approximate operation to return the mean within a timeout
| or meet the confidence.
|
| >>> rdd = sc.parallelize(range(1000), 10)
| >>> r = sum(range(1000)) / 1000.0
| >>> abs(rdd.meanApprox(1000) - r) / r < 0.05
| True
|
| min(self, key=None)
| Find the minimum item in this RDD.
|
| :param key: A function used to generate key for comparing
|
| >>> rdd = sc.parallelize([2.0, 5.0, 43.0, 10.0])
| >>> rdd.min()
| 2.0
| >>> rdd.min(key=str)
| 10.0
|
| name(self)
| Return the name of this RDD.
|
| partitionBy(self, numPartitions, partitionFunc=<function portable_hash at 0x7fc37857d730>)
| Return a copy of the RDD partitioned using the specified partitioner.
|
| >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
| >>> sets = pairs.partitionBy(2).glom().collect()
| >>> len(set(sets[0]).intersection(set(sets[1])))
| 0
|
| persist(self, storageLevel=StorageLevel(False, True, False, False, 1))
| Set this RDD's storage level to persist its values across operations
| after the first time it is computed. This can only be used to assign
| a new storage level if the RDD does not have a storage level set yet.
| If no storage level is specified defaults to (C{MEMORY_ONLY}).
|
| >>> rdd = sc.parallelize(["b", "a", "c"])
| >>> rdd.persist().is_cached
| True
|
| pipe(self, command, env=None, checkCode=False)
| Return an RDD created by piping elements to a forked external process.
|
| >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
| ['1', '2', '', '3']
|
| :param checkCode: whether or not to check the return value of the shell command.
|
| randomSplit(self, weights, seed=None)
| Randomly splits this RDD with the provided weights.
|
| :param weights: weights for splits, will be normalized if they don't sum to 1
| :param seed: random seed
| :return: split RDDs in a list
|
| >>> rdd = sc.parallelize(range(500), 1)
| >>> rdd1, rdd2 = rdd.randomSplit([2, 3], 17)
| >>> len(rdd1.collect() + rdd2.collect())
| 500
| >>> 150 < rdd1.count() < 250
| True
| >>> 250 < rdd2.count() < 350
| True
|
| reduce(self, f)
| Reduces the elements of this RDD using the specified commutative and
| associative binary operator. Currently reduces partitions locally.
|
| >>> from operator import add
| >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
| 15
| >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
| 10
| >>> sc.parallelize([]).reduce(add)
| Traceback (most recent call last):
| ...
| ValueError: Can not reduce() empty RDD
|
| reduceByKey(self, func, numPartitions=None, partitionFunc=<function portable_hash at 0x7fc37857d730>)
| Merge the values for each key using an associative and commutative reduce function.
|
| This will also perform the merging locally on each mapper before
| sending results to a reducer, similarly to a "combiner" in MapReduce.
|
| Output will be partitioned with C{numPartitions} partitions, or
| the default parallelism level if C{numPartitions} is not specified.
| Default partitioner is hash-partition.
|
| >>> from operator import add
| >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
| >>> sorted(rdd.reduceByKey(add).collect())
| [('a', 2), ('b', 1)]
|
| reduceByKeyLocally(self, func)
| Merge the values for each key using an associative and commutative reduce function, but
| return the results immediately to the master as a dictionary.
|
| This will also perform the merging locally on each mapper before
| sending results to a reducer, similarly to a "combiner" in MapReduce.
|
| >>> from operator import add
| >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
| >>> sorted(rdd.reduceByKeyLocally(add).items())
| [('a', 2), ('b', 1)]
|
| repartition(self, numPartitions)
| Return a new RDD that has exactly numPartitions partitions.
|
| Can increase or decrease the level of parallelism in this RDD.
| Internally, this uses a shuffle to redistribute data.
| If you are decreasing the number of partitions in this RDD, consider
| using `coalesce`, which can avoid performing a shuffle.
|
| >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
| >>> sorted(rdd.glom().collect())
| [[1], [2, 3], [4, 5], [6, 7]]
| >>> len(rdd.repartition(2).glom().collect())
| 2
| >>> len(rdd.repartition(10).glom().collect())
| 10
|
| repartitionAndSortWithinPartitions(self, numPartitions=None, partitionFunc=<function portable_hash at 0x7fc37857d730>, ascending=True, keyfunc=<function RDD.<lambda> at 0x7fc251eafd08>)
| Repartition the RDD according to the given partitioner and, within each resulting partition,
| sort records by their keys.
|
| >>> rdd = sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)])
| >>> rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: x % 2, 2)
| >>> rdd2.glom().collect()
| [[(0, 5), (0, 8), (2, 6)], [(1, 3), (3, 8), (3, 8)]]
|
| rightOuterJoin(self, other, numPartitions=None)
| Perform a right outer join of C{self} and C{other}.
|
| For each element (k, w) in C{other}, the resulting RDD will either
| contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w))
| if no elements in C{self} have key k.
|
| Hash-partitions the resulting RDD into the given number of partitions.
|
| >>> x = sc.parallelize([("a", 1), ("b", 4)])
| >>> y = sc.parallelize([("a", 2)])
| >>> sorted(y.rightOuterJoin(x).collect())
| [('a', (2, 1)), ('b', (None, 4))]
|
| sample(self, withReplacement, fraction, seed=None)
| Return a sampled subset of this RDD.
|
| :param withReplacement: can elements be sampled multiple times (replaced when sampled out)
| :param fraction: expected size of the sample as a fraction of this RDD's size
| without replacement: probability that each element is chosen; fraction must be [0, 1]
| with replacement: expected number of times each element is chosen; fraction must be >= 0
| :param seed: seed for the random number generator
|
| .. note:: This is not guaranteed to provide exactly the fraction specified of the total
| count of the given :class:`DataFrame`.
|
| >>> rdd = sc.parallelize(range(100), 4)
| >>> 6 <= rdd.sample(False, 0.1, 81).count() <= 14
| True
|
| sampleByKey(self, withReplacement, fractions, seed=None)
| Return a subset of this RDD sampled by key (via stratified sampling).
| Create a sample of this RDD using variable sampling rates for
| different keys as specified by fractions, a key to sampling rate map.
|
| >>> fractions = {"a": 0.2, "b": 0.1}
| >>> rdd = sc.parallelize(fractions.keys()).cartesian(sc.parallelize(range(0, 1000)))
| >>> sample = dict(rdd.sampleByKey(False, fractions, 2).groupByKey().collect())
| >>> 100 < len(sample["a"]) < 300 and 50 < len(sample["b"]) < 150
| True
| >>> max(sample["a"]) <= 999 and min(sample["a"]) >= 0
| True
| >>> max(sample["b"]) <= 999 and min(sample["b"]) >= 0
| True
|
| sampleStdev(self)
| Compute the sample standard deviation of this RDD's elements (which
| corrects for bias in estimating the standard deviation by dividing by
| N-1 instead of N).
|
| >>> sc.parallelize([1, 2, 3]).sampleStdev()
| 1.0
|
| sampleVariance(self)
| Compute the sample variance of this RDD's elements (which corrects
| for bias in estimating the variance by dividing by N-1 instead of N).
|
| >>> sc.parallelize([1, 2, 3]).sampleVariance()
| 1.0
|
| saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None)
| Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
| system, using the old Hadoop OutputFormat API (mapred package). Keys/values are
| converted for output using either user specified converters or, by default,
| L{org.apache.spark.api.python.JavaToWritableConverter}.
|
| :param conf: Hadoop job configuration, passed in as a dict
| :param keyConverter: (None by default)
| :param valueConverter: (None by default)
|
| saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None, compressionCodecClass=None)
| Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
| system, using the old Hadoop OutputFormat API (mapred package). Key and value types
| will be inferred if not specified. Keys and values are converted for output using either
| user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The
| C{conf} is applied on top of the base Hadoop conf associated with the SparkContext
| of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.
|
| :param path: path to Hadoop file
| :param outputFormatClass: fully qualified classname of Hadoop OutputFormat
| (e.g. "org.apache.hadoop.mapred.SequenceFileOutputFormat")
| :param keyClass: fully qualified classname of key Writable class
| (e.g. "org.apache.hadoop.io.IntWritable", None by default)
| :param valueClass: fully qualified classname of value Writable class
| (e.g. "org.apache.hadoop.io.Text", None by default)
| :param keyConverter: (None by default)
| :param valueConverter: (None by default)
| :param conf: (None by default)
| :param compressionCodecClass: (None by default)
|
| saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None)
| Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
| system, using the new Hadoop OutputFormat API (mapreduce package). Keys/values are
| converted for output using either user specified converters or, by default,
| L{org.apache.spark.api.python.JavaToWritableConverter}.
|
| :param conf: Hadoop job configuration, passed in as a dict
| :param keyConverter: (None by default)
| :param valueConverter: (None by default)
|
| saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None)
| Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
| system, using the new Hadoop OutputFormat API (mapreduce package). Key and value types
| will be inferred if not specified. Keys and values are converted for output using either
| user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The
| C{conf} is applied on top of the base Hadoop conf associated with the SparkContext
| of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.
|
| :param path: path to Hadoop file
| :param outputFormatClass: fully qualified classname of Hadoop OutputFormat
| (e.g. "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
| :param keyClass: fully qualified classname of key Writable class
| (e.g. "org.apache.hadoop.io.IntWritable", None by default)
| :param valueClass: fully qualified classname of value Writable class
| (e.g. "org.apache.hadoop.io.Text", None by default)
| :param keyConverter: (None by default)
| :param valueConverter: (None by default)
| :param conf: Hadoop job configuration, passed in as a dict (None by default)
|
| saveAsPickleFile(self, path, batchSize=10)
| Save this RDD as a SequenceFile of serialized objects. The serializer
| used is L{pyspark.serializers.PickleSerializer}, default batch size
| is 10.
|
| >>> tmpFile = NamedTemporaryFile(delete=True)
| >>> tmpFile.close()
| >>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3)
| >>> sorted(sc.pickleFile(tmpFile.name, 5).map(str).collect())
| ['1', '2', 'rdd', 'spark']
|
| saveAsSequenceFile(self, path, compressionCodecClass=None)
| Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
| system, using the L{org.apache.hadoop.io.Writable} types that we convert from the
| RDD's key and value types. The mechanism is as follows:
|
| 1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects.
| 2. Keys and values of this Java RDD are converted to Writables and written out.
|
| :param path: path to sequence file
| :param compressionCodecClass: (None by default)
|
| saveAsTextFile(self, path, compressionCodecClass=None)
| Save this RDD as a text file, using string representations of elements.
|
| @param path: path to text file
| @param compressionCodecClass: (None by default) string i.e.
| "org.apache.hadoop.io.compress.GzipCodec"
|
| >>> tempFile = NamedTemporaryFile(delete=True)
| >>> tempFile.close()
| >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name)
| >>> from fileinput import input
| >>> from glob import glob
| >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
| '0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n'
|
| Empty lines are tolerated when saving to text files.
|
| >>> tempFile2 = NamedTemporaryFile(delete=True)
| >>> tempFile2.close()
| >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name)
| >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*"))))
| '\n\n\nbar\nfoo\n'
|
| Using compressionCodecClass
|
| >>> tempFile3 = NamedTemporaryFile(delete=True)
| >>> tempFile3.close()
| >>> codec = "org.apache.hadoop.io.compress.GzipCodec"
| >>> sc.parallelize(['foo', 'bar']).saveAsTextFile(tempFile3.name, codec)
| >>> from fileinput import input, hook_compressed
| >>> result = sorted(input(glob(tempFile3.name + "/part*.gz"), openhook=hook_compressed))
| >>> b''.join(result).decode('utf-8')
| 'bar\nfoo\n'
|
| setName(self, name)
| Assign a name to this RDD.
|
| >>> rdd1 = sc.parallelize([1, 2])
| >>> rdd1.setName('RDD1').name()
| 'RDD1'
|
| sortBy(self, keyfunc, ascending=True, numPartitions=None)
| Sorts this RDD by the given keyfunc
|
| >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
| >>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
| [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
| >>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
| [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
|
| sortByKey(self, ascending=True, numPartitions=None, keyfunc=<function RDD.<lambda> at 0x7fc251eafe18>)
| Sorts this RDD, which is assumed to consist of (key, value) pairs.
| # noqa
|
| >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
| >>> sc.parallelize(tmp).sortByKey().first()
| ('1', 3)
| >>> sc.parallelize(tmp).sortByKey(True, 1).collect()
| [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
| >>> sc.parallelize(tmp).sortByKey(True, 2).collect()
| [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
| >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
| >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
| >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
| [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]
|
| stats(self)
| Return a L{StatCounter} object that captures the mean, variance
| and count of the RDD's elements in one operation.
|
| stdev(self)
| Compute the standard deviation of this RDD's elements.
|
| >>> sc.parallelize([1, 2, 3]).stdev()
| 0.816...
|
| subtract(self, other, numPartitions=None)
| Return each value in C{self} that is not contained in C{other}.
|
| >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
| >>> y = sc.parallelize([("a", 3), ("c", None)])
| >>> sorted(x.subtract(y).collect())
| [('a', 1), ('b', 4), ('b', 5)]
|
| subtractByKey(self, other, numPartitions=None)
| Return each (key, value) pair in C{self} that has no pair with matching
| key in C{other}.
|
| >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
| >>> y = sc.parallelize([("a", 3), ("c", None)])
| >>> sorted(x.subtractByKey(y).collect())
| [('b', 4), ('b', 5)]
|
| sum(self)
| Add up the elements in this RDD.
|
| >>> sc.parallelize([1.0, 2.0, 3.0]).sum()
| 6.0
|
| sumApprox(self, timeout, confidence=0.95)
| .. note:: Experimental
|
| Approximate operation to return the sum within a timeout
| or meet the confidence.
|
| >>> rdd = sc.parallelize(range(1000), 10)
| >>> r = sum(range(1000))
| >>> abs(rdd.sumApprox(1000) - r) / r < 0.05
| True
|
| take(self, num)
| Take the first num elements of the RDD.
|
| It works by first scanning one partition, and use the results from
| that partition to estimate the number of additional partitions needed
| to satisfy the limit.
|
| Translated from the Scala implementation in RDD#take().
|
| .. note:: this method should only be used if the resulting array is expected
| to be small, as all the data is loaded into the driver's memory.
|
| >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
| [2, 3]
| >>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
| [2, 3, 4, 5, 6]
| >>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3)
| [91, 92, 93]
|
| takeOrdered(self, num, key=None)
| Get the N elements from an RDD ordered in ascending order or as
| specified by the optional key function.
|
| .. note:: this method should only be used if the resulting array is expected
| to be small, as all the data is loaded into the driver's memory.
|
| >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
| [1, 2, 3, 4, 5, 6]
| >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
| [10, 9, 7, 6, 5, 4]
|
| takeSample(self, withReplacement, num, seed=None)
| Return a fixed-size sampled subset of this RDD.
|
| .. note:: This method should only be used if the resulting array is expected
| to be small, as all the data is loaded into the driver's memory.
|
| >>> rdd = sc.parallelize(range(0, 10))
| >>> len(rdd.takeSample(True, 20, 1))
| 20
| >>> len(rdd.takeSample(False, 5, 2))
| 5
| >>> len(rdd.takeSample(False, 15, 3))
| 10
|
| toDebugString(self)
| A description of this RDD and its recursive dependencies for debugging.
|
| toLocalIterator(self)
| Return an iterator that contains all of the elements in this RDD.
| The iterator will consume as much memory as the largest partition in this RDD.
|
| >>> rdd = sc.parallelize(range(10))
| >>> [x for x in rdd.toLocalIterator()]
| [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
|
| top(self, num, key=None)
| Get the top N elements from an RDD.
|
| .. note:: This method should only be used if the resulting array is expected
| to be small, as all the data is loaded into the driver's memory.
|
| .. note:: It returns the list sorted in descending order.
|
| >>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
| [12]
| >>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2)
| [6, 5]
| >>> sc.parallelize([10, 4, 2, 12, 3]).top(3, key=str)
| [4, 3, 2]
|
| treeAggregate(self, zeroValue, seqOp, combOp, depth=2)
| Aggregates the elements of this RDD in a multi-level tree
| pattern.
|
| :param depth: suggested depth of the tree (default: 2)
|
| >>> add = lambda x, y: x + y
| >>> rdd = sc.parallelize([-5, -4, -3, -2, -1, 1, 2, 3, 4], 10)
| >>> rdd.treeAggregate(0, add, add)
| -5
| >>> rdd.treeAggregate(0, add, add, 1)
| -5
| >>> rdd.treeAggregate(0, add, add, 2)
| -5
| >>> rdd.treeAggregate(0, add, add, 5)
| -5
| >>> rdd.treeAggregate(0, add, add, 10)
| -5
|
| treeReduce(self, f, depth=2)
| Reduces the elements of this RDD in a multi-level tree pattern.
|
| :param depth: suggested depth of the tree (default: 2)
|
| >>> add = lambda x, y: x + y
| >>> rdd = sc.parallelize([-5, -4, -3, -2, -1, 1, 2, 3, 4], 10)
| >>> rdd.treeReduce(add)
| -5
| >>> rdd.treeReduce(add, 1)
| -5
| >>> rdd.treeReduce(add, 2)
| -5
| >>> rdd.treeReduce(add, 5)
| -5
| >>> rdd.treeReduce(add, 10)
| -5
|
| union(self, other)
| Return the union of this RDD and another one.
|
| >>> rdd = sc.parallelize([1, 1, 2, 3])
| >>> rdd.union(rdd).collect()
| [1, 1, 2, 3, 1, 1, 2, 3]
|
| unpersist(self)
| Mark the RDD as non-persistent, and remove all blocks for it from
| memory and disk.
|
| values(self)
| Return an RDD with the values of each tuple.
|
| >>> m = sc.parallelize([(1, 2), (3, 4)]).values()
| >>> m.collect()
| [2, 4]
|
| variance(self)
| Compute the variance of this RDD's elements.
|
| >>> sc.parallelize([1, 2, 3]).variance()
| 0.666...
|
| zip(self, other)
| Zips this RDD with another one, returning key-value pairs with the
| first element in each RDD second element in each RDD, etc. Assumes
| that the two RDDs have the same number of partitions and the same
| number of elements in each partition (e.g. one was made through
| a map on the other).
|
| >>> x = sc.parallelize(range(0,5))
| >>> y = sc.parallelize(range(1000, 1005))
| >>> x.zip(y).collect()
| [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
|
| zipWithIndex(self)
| Zips this RDD with its element indices.
|
| The ordering is first based on the partition index and then the
| ordering of items within each partition. So the first item in
| the first partition gets index 0, and the last item in the last
| partition receives the largest index.
|
| This method needs to trigger a spark job when this RDD contains
| more than one partitions.
|
| >>> sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect()
| [('a', 0), ('b', 1), ('c', 2), ('d', 3)]
|
| zipWithUniqueId(self)
| Zips this RDD with generated unique Long ids.
|
| Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
| n is the number of partitions. So there may exist gaps, but this
| method won't trigger a spark job, which is different from
| L{zipWithIndex}
|
| >>> sc.parallelize(["a", "b", "c", "d", "e"], 3).zipWithUniqueId().collect()
| [('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)]
|
| ----------------------------------------------------------------------
| Data descriptors inherited from RDD:
|
| __dict__
| dictionary for instance variables (if defined)
|
| __weakref__
| list of weak references to the object (if defined)
|
| context
| The L{SparkContext} that this RDD was created on.