1. # 统计各部门薪资总和,按总和的降序排序
    2. # (dept_num,salary) --> 分组求和reduceByKey --> 排序 sortBy
    1. from pyspark import SparkContext
    2. sc = SparkContext()
    1. rdd = sc.textFile("/home/ubuntu/emp.csv")
    2. rdd.take(3)
    1. ['emp_num,emp_name,role,leader_num,birth_date,salary,bonus,dept_num',
    2. '7369,SMITH,CLERK,7902,1980/12/17,800,,20',
    3. '7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30']
    1. rdd = rdd.filter(lambda x:x[-8:] != "dept_num" )
    2. rdd.take(3)
    1. ['7369,SMITH,CLERK,7902,1980/12/17,800,,20',
    2. '7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30',
    3. '7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30']
    1. def func1(x):
    2. t = x.split(",")
    3. return (t[-1],int(t[-3]))
    4. rdd1 = rdd.map(func1)
    5. rdd1.collect()
    1. [('20', 800),
    2. ('30', 1600),
    3. ('30', 1250),
    4. ('20', 2975),
    5. ('30', 1250),
    6. ('30', 2850),
    7. ('10', 2450),
    8. ('20', 3000),
    9. ('10', 5000),
    10. ('30', 1500),
    11. ('20', 1100),
    12. ('30', 950),
    13. ('20', 3000),
    14. ('10', 1300)]
    1. rdd2 = rdd1.reduceByKey(lambda x,y:x+y)
    2. rdd2.collect()
    1. [('20', 10875), ('10', 8750), ('30', 9400)]
    1. rdd2.sortBy(lambda x:x[1],ascending=False).collect()
    1. [('20', 10875), ('30', 9400), ('10', 8750)]
    1. rdd.take(3)
    1. ['7369,SMITH,CLERK,7902,1980/12/17,800,,20',
    2. '7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30',
    3. '7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30']
    1. rdd.flatMap(lambda x:x.split(",")).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).sortBy(lambda x:x[1],ascending=False).collect()
    1. [('', 11),
    2. ('7698', 6),
    3. ('30', 6),
    4. ('20', 5),
    5. ('SALESMAN', 4),
    6. ('7839', 4),
    7. ('CLERK', 4),
    8. ('10', 3),
    9. ('7566', 3),
    10. ('MANAGER', 3),
    11. ('1250', 2),
    12. ('7782', 2),
    13. ('7788', 2),
    14. ('7902', 2),
    15. ('ANALYST', 2),
    16. ('3000', 2),
    17. ('1981/12/3', 2),
    18. ('7369', 1),
    19. ('SMITH', 1),
    20. ('1980/12/17', 1),
    21. ('7499', 1),
    22. ('ALLEN', 1),
    23. ('1981/2/20', 1),
    24. ('1600', 1),
    25. ('7521', 1),
    26. ('1981/2/22', 1),
    27. ('JONES', 1),
    28. ('1981/4/2', 1),
    29. ('2975', 1),
    30. ('MARTIN', 1),
    31. ('1981/9/28', 1),
    32. ('1400', 1),
    33. ('CLARK', 1),
    34. ('1981/11/17', 1),
    35. ('0', 1),
    36. ('1987/5/23', 1),
    37. ('1100', 1),
    38. ('7900', 1),
    39. ('JAMES', 1),
    40. ('950', 1),
    41. ('1982/1/23', 1),
    42. ('800', 1),
    43. ('300', 1),
    44. ('WARD', 1),
    45. ('500', 1),
    46. ('7654', 1),
    47. ('BLAKE', 1),
    48. ('1981/5/1', 1),
    49. ('2850', 1),
    50. ('1981/6/9', 1),
    51. ('2450', 1),
    52. ('SCOTT', 1),
    53. ('1987/4/19', 1),
    54. ('KING', 1),
    55. ('PRESIDENT', 1),
    56. ('5000', 1),
    57. ('7844', 1),
    58. ('TURNER', 1),
    59. ('1981/9/8', 1),
    60. ('1500', 1),
    61. ('7876', 1),
    62. ('ADAMS', 1),
    63. ('FORD', 1),
    64. ('7934', 1),
    65. ('MILLER', 1),
    66. ('1300', 1)]
    1. from pyspark.sql import SparkSession
    2. spark = SparkSession.builder.getOrCreate()
    1. df = spark.read.csv("/home/ubuntu/emp.csv",header=True)
    2. df.show()
    1. +-------+--------+---------+----------+----------+------+-----+--------+
    2. |emp_num|emp_name| role|leader_num|birth_date|salary|bonus|dept_num|
    3. +-------+--------+---------+----------+----------+------+-----+--------+
    4. | 7369| SMITH| CLERK| 7902|1980/12/17| 800| null| 20|
    5. | 7499| ALLEN| SALESMAN| 7698| 1981/2/20| 1600| 300| 30|
    6. | 7521| WARD| SALESMAN| 7698| 1981/2/22| 1250| 500| 30|
    7. | 7566| JONES| MANAGER| 7839| 1981/4/2| 2975| null| 20|
    8. | 7654| MARTIN| SALESMAN| 7698| 1981/9/28| 1250| 1400| 30|
    9. | 7698| BLAKE| MANAGER| 7839| 1981/5/1| 2850| null| 30|
    10. | 7782| CLARK| MANAGER| 7839| 1981/6/9| 2450| null| 10|
    11. | 7788| SCOTT| ANALYST| 7566| 1987/4/19| 3000| null| 20|
    12. | 7839| KING|PRESIDENT| null|1981/11/17| 5000| null| 10|
    13. | 7844| TURNER| SALESMAN| 7698| 1981/9/8| 1500| 0| 30|
    14. | 7876| ADAMS| CLERK| 7788| 1987/5/23| 1100| null| 20|
    15. | 7900| JAMES| CLERK| 7698| 1981/12/3| 950| null| 30|
    16. | 7902| FORD| ANALYST| 7566| 1981/12/3| 3000| null| 20|
    17. | 7934| MILLER| CLERK| 7782| 1982/1/23| 1300| null| 10|
    18. +-------+--------+---------+----------+----------+------+-----+--------+
    1. df1 = df.selectExpr("dept_num","cast(salary as int) salary")
    1. import pyspark.sql.functions as F
    1. # SQL DSL sum count mean max min
    2. df1.groupBy("dept_num").sum().orderBy(F.col("sum(salary)").desc()).show()
    1. +--------+-----------+
    2. |dept_num|sum(salary)|
    3. +--------+-----------+
    4. | 20| 10875|
    5. | 30| 9400|
    6. | 10| 8750|
    7. +--------+-----------+
    1. df1.registerTempTable("emp")
    1. spark.sql("select dept_num,sum(salary) as salary from emp group by dept_num order by salary desc").show()
    1. +--------+------+
    2. |dept_num|salary|
    3. +--------+------+
    4. | 20| 10875|
    5. | 30| 9400|
    6. | 10| 8750|
    7. +--------+------+
    1. rdd.collect()
    1. ['7369,SMITH,CLERK,7902,1980/12/17,800,,20',
    2. '7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30',
    3. '7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30',
    4. '7566,JONES,MANAGER,7839,1981/4/2,2975,,20',
    5. '7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30',
    6. '7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30',
    7. '7782,CLARK,MANAGER,7839,1981/6/9,2450,,10',
    8. '7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20',
    9. '7839,KING,PRESIDENT,,1981/11/17,5000,,10',
    10. '7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30',
    11. '7876,ADAMS,CLERK,7788,1987/5/23,1100,,20',
    12. '7900,JAMES,CLERK,7698,1981/12/3,950,,30',
    13. '7902,FORD,ANALYST,7566,1981/12/3,3000,,20',
    14. '7934,MILLER,CLERK,7782,1982/1/23,1300,,10']
    1. def func1(x):
    2. t = x.split(",")
    3. return t[-1],int(t[-3])
    4. rdd1 = rdd.map(func1)
    5. rdd1.collect()
    1. [('20', 800),
    2. ('30', 1600),
    3. ('30', 1250),
    4. ('20', 2975),
    5. ('30', 1250),
    6. ('30', 2850),
    7. ('10', 2450),
    8. ('20', 3000),
    9. ('10', 5000),
    10. ('30', 1500),
    11. ('20', 1100),
    12. ('30', 950),
    13. ('20', 3000),
    14. ('10', 1300)]
    1. df = rdd1.toDF(["dept_num","salary"])
    2. df.show()
    1. ---------------------------------------------------------------------------
    2. AttributeError Traceback (most recent call last)
    3. <ipython-input-12-d1520c28608d> in <module>
    4. ----> 1 df = rdd1.toDF(["dept_num","salary"])
    5. 2 df.show()
    6. AttributeError: 'PipelinedRDD' object has no attribute 'toDF'
    1. help(rdd1)
    1. Help on PipelinedRDD in module pyspark.rdd object:
    2. class PipelinedRDD(RDD)
    3. | Pipelined maps:
    4. |
    5. | >>> rdd = sc.parallelize([1, 2, 3, 4])
    6. | >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect()
    7. | [4, 8, 12, 16]
    8. | >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect()
    9. | [4, 8, 12, 16]
    10. |
    11. | Pipelined reduces:
    12. | >>> from operator import add
    13. | >>> rdd.map(lambda x: 2 * x).reduce(add)
    14. | 20
    15. | >>> rdd.flatMap(lambda x: [x, x]).reduce(add)
    16. | 20
    17. |
    18. | Method resolution order:
    19. | PipelinedRDD
    20. | RDD
    21. | builtins.object
    22. |
    23. | Methods defined here:
    24. |
    25. | __init__(self, prev, func, preservesPartitioning=False)
    26. | Initialize self. See help(type(self)) for accurate signature.
    27. |
    28. | getNumPartitions(self)
    29. | Returns the number of partitions in RDD
    30. |
    31. | >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
    32. | >>> rdd.getNumPartitions()
    33. | 2
    34. |
    35. | id(self)
    36. | A unique ID for this RDD (within its SparkContext).
    37. |
    38. | ----------------------------------------------------------------------
    39. | Methods inherited from RDD:
    40. |
    41. | __add__(self, other)
    42. | Return the union of this RDD and another one.
    43. |
    44. | >>> rdd = sc.parallelize([1, 1, 2, 3])
    45. | >>> (rdd + rdd).collect()
    46. | [1, 1, 2, 3, 1, 1, 2, 3]
    47. |
    48. | __getnewargs__(self)
    49. |
    50. | __repr__(self)
    51. | Return repr(self).
    52. |
    53. | aggregate(self, zeroValue, seqOp, combOp)
    54. | Aggregate the elements of each partition, and then the results for all
    55. | the partitions, using a given combine functions and a neutral "zero
    56. | value."
    57. |
    58. | The functions C{op(t1, t2)} is allowed to modify C{t1} and return it
    59. | as its result value to avoid object allocation; however, it should not
    60. | modify C{t2}.
    61. |
    62. | The first function (seqOp) can return a different result type, U, than
    63. | the type of this RDD. Thus, we need one operation for merging a T into
    64. | an U and one operation for merging two U
    65. |
    66. | >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
    67. | >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
    68. | >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
    69. | (10, 4)
    70. | >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)
    71. | (0, 0)
    72. |
    73. | aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None, partitionFunc=<function portable_hash at 0x7fc37857d730>)
    74. | Aggregate the values of each key, using given combine functions and a neutral
    75. | "zero value". This function can return a different result type, U, than the type
    76. | of the values in this RDD, V. Thus, we need one operation for merging a V into
    77. | a U and one operation for merging two U's, The former operation is used for merging
    78. | values within a partition, and the latter is used for merging values between
    79. | partitions. To avoid memory allocation, both of these functions are
    80. | allowed to modify and return their first argument instead of creating a new U.
    81. |
    82. | cache(self)
    83. | Persist this RDD with the default storage level (C{MEMORY_ONLY}).
    84. |
    85. | cartesian(self, other)
    86. | Return the Cartesian product of this RDD and another one, that is, the
    87. | RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and
    88. | C{b} is in C{other}.
    89. |
    90. | >>> rdd = sc.parallelize([1, 2])
    91. | >>> sorted(rdd.cartesian(rdd).collect())
    92. | [(1, 1), (1, 2), (2, 1), (2, 2)]
    93. |
    94. | checkpoint(self)
    95. | Mark this RDD for checkpointing. It will be saved to a file inside the
    96. | checkpoint directory set with L{SparkContext.setCheckpointDir()} and
    97. | all references to its parent RDDs will be removed. This function must
    98. | be called before any job has been executed on this RDD. It is strongly
    99. | recommended that this RDD is persisted in memory, otherwise saving it
    100. | on a file will require recomputation.
    101. |
    102. | coalesce(self, numPartitions, shuffle=False)
    103. | Return a new RDD that is reduced into `numPartitions` partitions.
    104. |
    105. | >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
    106. | [[1], [2, 3], [4, 5]]
    107. | >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
    108. | [[1, 2, 3, 4, 5]]
    109. |
    110. | cogroup(self, other, numPartitions=None)
    111. | For each key k in C{self} or C{other}, return a resulting RDD that
    112. | contains a tuple with the list of values for that key in C{self} as
    113. | well as C{other}.
    114. |
    115. | >>> x = sc.parallelize([("a", 1), ("b", 4)])
    116. | >>> y = sc.parallelize([("a", 2)])
    117. | >>> [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
    118. | [('a', ([1], [2])), ('b', ([4], []))]
    119. |
    120. | collect(self)
    121. | Return a list that contains all of the elements in this RDD.
    122. |
    123. | .. note:: This method should only be used if the resulting array is expected
    124. | to be small, as all the data is loaded into the driver's memory.
    125. |
    126. | collectAsMap(self)
    127. | Return the key-value pairs in this RDD to the master as a dictionary.
    128. |
    129. | .. note:: this method should only be used if the resulting data is expected
    130. | to be small, as all the data is loaded into the driver's memory.
    131. |
    132. | >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
    133. | >>> m[1]
    134. | 2
    135. | >>> m[3]
    136. | 4
    137. |
    138. | combineByKey(self, createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=<function portable_hash at 0x7fc37857d730>)
    139. | Generic function to combine the elements for each key using a custom
    140. | set of aggregation functions.
    141. |
    142. | Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined
    143. | type" C.
    144. |
    145. | Users provide three functions:
    146. |
    147. | - C{createCombiner}, which turns a V into a C (e.g., creates
    148. | a one-element list)
    149. | - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of
    150. | a list)
    151. | - C{mergeCombiners}, to combine two C's into a single one.
    152. |
    153. | In addition, users can control the partitioning of the output RDD.
    154. |
    155. | .. note:: V and C can be different -- for example, one might group an RDD of type
    156. | (Int, Int) into an RDD of type (Int, List[Int]).
    157. |
    158. | >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
    159. | >>> def add(a, b): return a + str(b)
    160. | >>> sorted(x.combineByKey(str, add, add).collect())
    161. | [('a', '11'), ('b', '1')]
    162. |
    163. | count(self)
    164. | Return the number of elements in this RDD.
    165. |
    166. | >>> sc.parallelize([2, 3, 4]).count()
    167. | 3
    168. |
    169. | countApprox(self, timeout, confidence=0.95)
    170. | .. note:: Experimental
    171. |
    172. | Approximate version of count() that returns a potentially incomplete
    173. | result within a timeout, even if not all tasks have finished.
    174. |
    175. | >>> rdd = sc.parallelize(range(1000), 10)
    176. | >>> rdd.countApprox(1000, 1.0)
    177. | 1000
    178. |
    179. | countApproxDistinct(self, relativeSD=0.05)
    180. | .. note:: Experimental
    181. |
    182. | Return approximate number of distinct elements in the RDD.
    183. |
    184. | The algorithm used is based on streamlib's implementation of
    185. | `"HyperLogLog in Practice: Algorithmic Engineering of a State
    186. | of The Art Cardinality Estimation Algorithm", available here
    187. | <http://dx.doi.org/10.1145/2452376.2452456>`_.
    188. |
    189. | :param relativeSD: Relative accuracy. Smaller values create
    190. | counters that require more space.
    191. | It must be greater than 0.000017.
    192. |
    193. | >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct()
    194. | >>> 900 < n < 1100
    195. | True
    196. | >>> n = sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct()
    197. | >>> 16 < n < 24
    198. | True
    199. |
    200. | countByKey(self)
    201. | Count the number of elements for each key, and return the result to the
    202. | master as a dictionary.
    203. |
    204. | >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
    205. | >>> sorted(rdd.countByKey().items())
    206. | [('a', 2), ('b', 1)]
    207. |
    208. | countByValue(self)
    209. | Return the count of each unique value in this RDD as a dictionary of
    210. | (value, count) pairs.
    211. |
    212. | >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
    213. | [(1, 2), (2, 3)]
    214. |
    215. | distinct(self, numPartitions=None)
    216. | Return a new RDD containing the distinct elements in this RDD.
    217. |
    218. | >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
    219. | [1, 2, 3]
    220. |
    221. | filter(self, f)
    222. | Return a new RDD containing only the elements that satisfy a predicate.
    223. |
    224. | >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
    225. | >>> rdd.filter(lambda x: x % 2 == 0).collect()
    226. | [2, 4]
    227. |
    228. | first(self)
    229. | Return the first element in this RDD.
    230. |
    231. | >>> sc.parallelize([2, 3, 4]).first()
    232. | 2
    233. | >>> sc.parallelize([]).first()
    234. | Traceback (most recent call last):
    235. | ...
    236. | ValueError: RDD is empty
    237. |
    238. | flatMap(self, f, preservesPartitioning=False)
    239. | Return a new RDD by first applying a function to all elements of this
    240. | RDD, and then flattening the results.
    241. |
    242. | >>> rdd = sc.parallelize([2, 3, 4])
    243. | >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
    244. | [1, 1, 1, 2, 2, 3]
    245. | >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
    246. | [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
    247. |
    248. | flatMapValues(self, f)
    249. | Pass each value in the key-value pair RDD through a flatMap function
    250. | without changing the keys; this also retains the original RDD's
    251. | partitioning.
    252. |
    253. | >>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
    254. | >>> def f(x): return x
    255. | >>> x.flatMapValues(f).collect()
    256. | [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
    257. |
    258. | fold(self, zeroValue, op)
    259. | Aggregate the elements of each partition, and then the results for all
    260. | the partitions, using a given associative function and a neutral "zero value."
    261. |
    262. | The function C{op(t1, t2)} is allowed to modify C{t1} and return it
    263. | as its result value to avoid object allocation; however, it should not
    264. | modify C{t2}.
    265. |
    266. | This behaves somewhat differently from fold operations implemented
    267. | for non-distributed collections in functional languages like Scala.
    268. | This fold operation may be applied to partitions individually, and then
    269. | fold those results into the final result, rather than apply the fold
    270. | to each element sequentially in some defined ordering. For functions
    271. | that are not commutative, the result may differ from that of a fold
    272. | applied to a non-distributed collection.
    273. |
    274. | >>> from operator import add
    275. | >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
    276. | 15
    277. |
    278. | foldByKey(self, zeroValue, func, numPartitions=None, partitionFunc=<function portable_hash at 0x7fc37857d730>)
    279. | Merge the values for each key using an associative function "func"
    280. | and a neutral "zeroValue" which may be added to the result an
    281. | arbitrary number of times, and must not change the result
    282. | (e.g., 0 for addition, or 1 for multiplication.).
    283. |
    284. | >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
    285. | >>> from operator import add
    286. | >>> sorted(rdd.foldByKey(0, add).collect())
    287. | [('a', 2), ('b', 1)]
    288. |
    289. | foreach(self, f)
    290. | Applies a function to all elements of this RDD.
    291. |
    292. | >>> def f(x): print(x)
    293. | >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
    294. |
    295. | foreachPartition(self, f)
    296. | Applies a function to each partition of this RDD.
    297. |
    298. | >>> def f(iterator):
    299. | ... for x in iterator:
    300. | ... print(x)
    301. | >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
    302. |
    303. | fullOuterJoin(self, other, numPartitions=None)
    304. | Perform a right outer join of C{self} and C{other}.
    305. |
    306. | For each element (k, v) in C{self}, the resulting RDD will either
    307. | contain all pairs (k, (v, w)) for w in C{other}, or the pair
    308. | (k, (v, None)) if no elements in C{other} have key k.
    309. |
    310. | Similarly, for each element (k, w) in C{other}, the resulting RDD will
    311. | either contain all pairs (k, (v, w)) for v in C{self}, or the pair
    312. | (k, (None, w)) if no elements in C{self} have key k.
    313. |
    314. | Hash-partitions the resulting RDD into the given number of partitions.
    315. |
    316. | >>> x = sc.parallelize([("a", 1), ("b", 4)])
    317. | >>> y = sc.parallelize([("a", 2), ("c", 8)])
    318. | >>> sorted(x.fullOuterJoin(y).collect())
    319. | [('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]
    320. |
    321. | getCheckpointFile(self)
    322. | Gets the name of the file to which this RDD was checkpointed
    323. |
    324. | Not defined if RDD is checkpointed locally.
    325. |
    326. | getStorageLevel(self)
    327. | Get the RDD's current storage level.
    328. |
    329. | >>> rdd1 = sc.parallelize([1,2])
    330. | >>> rdd1.getStorageLevel()
    331. | StorageLevel(False, False, False, False, 1)
    332. | >>> print(rdd1.getStorageLevel())
    333. | Serialized 1x Replicated
    334. |
    335. | glom(self)
    336. | Return an RDD created by coalescing all elements within each partition
    337. | into a list.
    338. |
    339. | >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
    340. | >>> sorted(rdd.glom().collect())
    341. | [[1, 2], [3, 4]]
    342. |
    343. | groupBy(self, f, numPartitions=None, partitionFunc=<function portable_hash at 0x7fc37857d730>)
    344. | Return an RDD of grouped items.
    345. |
    346. | >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
    347. | >>> result = rdd.groupBy(lambda x: x % 2).collect()
    348. | >>> sorted([(x, sorted(y)) for (x, y) in result])
    349. | [(0, [2, 8]), (1, [1, 1, 3, 5])]
    350. |
    351. | groupByKey(self, numPartitions=None, partitionFunc=<function portable_hash at 0x7fc37857d730>)
    352. | Group the values for each key in the RDD into a single sequence.
    353. | Hash-partitions the resulting RDD with numPartitions partitions.
    354. |
    355. | .. note:: If you are grouping in order to perform an aggregation (such as a
    356. | sum or average) over each key, using reduceByKey or aggregateByKey will
    357. | provide much better performance.
    358. |
    359. | >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
    360. | >>> sorted(rdd.groupByKey().mapValues(len).collect())
    361. | [('a', 2), ('b', 1)]
    362. | >>> sorted(rdd.groupByKey().mapValues(list).collect())
    363. | [('a', [1, 1]), ('b', [1])]
    364. |
    365. | groupWith(self, other, *others)
    366. | Alias for cogroup but with support for multiple RDDs.
    367. |
    368. | >>> w = sc.parallelize([("a", 5), ("b", 6)])
    369. | >>> x = sc.parallelize([("a", 1), ("b", 4)])
    370. | >>> y = sc.parallelize([("a", 2)])
    371. | >>> z = sc.parallelize([("b", 42)])
    372. | >>> [(x, tuple(map(list, y))) for x, y in sorted(list(w.groupWith(x, y, z).collect()))]
    373. | [('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))]
    374. |
    375. | histogram(self, buckets)
    376. | Compute a histogram using the provided buckets. The buckets
    377. | are all open to the right except for the last which is closed.
    378. | e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50],
    379. | which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1
    380. | and 50 we would have a histogram of 1,0,1.
    381. |
    382. | If your histogram is evenly spaced (e.g. [0, 10, 20, 30]),
    383. | this can be switched from an O(log n) inseration to O(1) per
    384. | element (where n is the number of buckets).
    385. |
    386. | Buckets must be sorted, not contain any duplicates, and have
    387. | at least two elements.
    388. |
    389. | If `buckets` is a number, it will generate buckets which are
    390. | evenly spaced between the minimum and maximum of the RDD. For
    391. | example, if the min value is 0 and the max is 100, given `buckets`
    392. | as 2, the resulting buckets will be [0,50) [50,100]. `buckets` must
    393. | be at least 1. An exception is raised if the RDD contains infinity.
    394. | If the elements in the RDD do not vary (max == min), a single bucket
    395. | will be used.
    396. |
    397. | The return value is a tuple of buckets and histogram.
    398. |
    399. | >>> rdd = sc.parallelize(range(51))
    400. | >>> rdd.histogram(2)
    401. | ([0, 25, 50], [25, 26])
    402. | >>> rdd.histogram([0, 5, 25, 50])
    403. | ([0, 5, 25, 50], [5, 20, 26])
    404. | >>> rdd.histogram([0, 15, 30, 45, 60]) # evenly spaced buckets
    405. | ([0, 15, 30, 45, 60], [15, 15, 15, 6])
    406. | >>> rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"])
    407. | >>> rdd.histogram(("a", "b", "c"))
    408. | (('a', 'b', 'c'), [2, 2])
    409. |
    410. | intersection(self, other)
    411. | Return the intersection of this RDD and another one. The output will
    412. | not contain any duplicate elements, even if the input RDDs did.
    413. |
    414. | .. note:: This method performs a shuffle internally.
    415. |
    416. | >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
    417. | >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
    418. | >>> rdd1.intersection(rdd2).collect()
    419. | [1, 2, 3]
    420. |
    421. | isCheckpointed(self)
    422. | Return whether this RDD is checkpointed and materialized, either reliably or locally.
    423. |
    424. | isEmpty(self)
    425. | Returns true if and only if the RDD contains no elements at all.
    426. |
    427. | .. note:: an RDD may be empty even when it has at least 1 partition.
    428. |
    429. | >>> sc.parallelize([]).isEmpty()
    430. | True
    431. | >>> sc.parallelize([1]).isEmpty()
    432. | False
    433. |
    434. | isLocallyCheckpointed(self)
    435. | Return whether this RDD is marked for local checkpointing.
    436. |
    437. | Exposed for testing.
    438. |
    439. | join(self, other, numPartitions=None)
    440. | Return an RDD containing all pairs of elements with matching keys in
    441. | C{self} and C{other}.
    442. |
    443. | Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
    444. | (k, v1) is in C{self} and (k, v2) is in C{other}.
    445. |
    446. | Performs a hash join across the cluster.
    447. |
    448. | >>> x = sc.parallelize([("a", 1), ("b", 4)])
    449. | >>> y = sc.parallelize([("a", 2), ("a", 3)])
    450. | >>> sorted(x.join(y).collect())
    451. | [('a', (1, 2)), ('a', (1, 3))]
    452. |
    453. | keyBy(self, f)
    454. | Creates tuples of the elements in this RDD by applying C{f}.
    455. |
    456. | >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
    457. | >>> y = sc.parallelize(zip(range(0,5), range(0,5)))
    458. | >>> [(x, list(map(list, y))) for x, y in sorted(x.cogroup(y).collect())]
    459. | [(0, [[0], [0]]), (1, [[1], [1]]), (2, [[], [2]]), (3, [[], [3]]), (4, [[2], [4]])]
    460. |
    461. | keys(self)
    462. | Return an RDD with the keys of each tuple.
    463. |
    464. | >>> m = sc.parallelize([(1, 2), (3, 4)]).keys()
    465. | >>> m.collect()
    466. | [1, 3]
    467. |
    468. | leftOuterJoin(self, other, numPartitions=None)
    469. | Perform a left outer join of C{self} and C{other}.
    470. |
    471. | For each element (k, v) in C{self}, the resulting RDD will either
    472. | contain all pairs (k, (v, w)) for w in C{other}, or the pair
    473. | (k, (v, None)) if no elements in C{other} have key k.
    474. |
    475. | Hash-partitions the resulting RDD into the given number of partitions.
    476. |
    477. | >>> x = sc.parallelize([("a", 1), ("b", 4)])
    478. | >>> y = sc.parallelize([("a", 2)])
    479. | >>> sorted(x.leftOuterJoin(y).collect())
    480. | [('a', (1, 2)), ('b', (4, None))]
    481. |
    482. | localCheckpoint(self)
    483. | Mark this RDD for local checkpointing using Spark's existing caching layer.
    484. |
    485. | This method is for users who wish to truncate RDD lineages while skipping the expensive
    486. | step of replicating the materialized data in a reliable distributed file system. This is
    487. | useful for RDDs with long lineages that need to be truncated periodically (e.g. GraphX).
    488. |
    489. | Local checkpointing sacrifices fault-tolerance for performance. In particular, checkpointed
    490. | data is written to ephemeral local storage in the executors instead of to a reliable,
    491. | fault-tolerant storage. The effect is that if an executor fails during the computation,
    492. | the checkpointed data may no longer be accessible, causing an irrecoverable job failure.
    493. |
    494. | This is NOT safe to use with dynamic allocation, which removes executors along
    495. | with their cached blocks. If you must use both features, you are advised to set
    496. | L{spark.dynamicAllocation.cachedExecutorIdleTimeout} to a high value.
    497. |
    498. | The checkpoint directory set through L{SparkContext.setCheckpointDir()} is not used.
    499. |
    500. | lookup(self, key)
    501. | Return the list of values in the RDD for key `key`. This operation
    502. | is done efficiently if the RDD has a known partitioner by only
    503. | searching the partition that the key maps to.
    504. |
    505. | >>> l = range(1000)
    506. | >>> rdd = sc.parallelize(zip(l, l), 10)
    507. | >>> rdd.lookup(42) # slow
    508. | [42]
    509. | >>> sorted = rdd.sortByKey()
    510. | >>> sorted.lookup(42) # fast
    511. | [42]
    512. | >>> sorted.lookup(1024)
    513. | []
    514. | >>> rdd2 = sc.parallelize([(('a', 'b'), 'c')]).groupByKey()
    515. | >>> list(rdd2.lookup(('a', 'b'))[0])
    516. | ['c']
    517. |
    518. | map(self, f, preservesPartitioning=False)
    519. | Return a new RDD by applying a function to each element of this RDD.
    520. |
    521. | >>> rdd = sc.parallelize(["b", "a", "c"])
    522. | >>> sorted(rdd.map(lambda x: (x, 1)).collect())
    523. | [('a', 1), ('b', 1), ('c', 1)]
    524. |
    525. | mapPartitions(self, f, preservesPartitioning=False)
    526. | Return a new RDD by applying a function to each partition of this RDD.
    527. |
    528. | >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
    529. | >>> def f(iterator): yield sum(iterator)
    530. | >>> rdd.mapPartitions(f).collect()
    531. | [3, 7]
    532. |
    533. | mapPartitionsWithIndex(self, f, preservesPartitioning=False)
    534. | Return a new RDD by applying a function to each partition of this RDD,
    535. | while tracking the index of the original partition.
    536. |
    537. | >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
    538. | >>> def f(splitIndex, iterator): yield splitIndex
    539. | >>> rdd.mapPartitionsWithIndex(f).sum()
    540. | 6
    541. |
    542. | mapPartitionsWithSplit(self, f, preservesPartitioning=False)
    543. | Deprecated: use mapPartitionsWithIndex instead.
    544. |
    545. | Return a new RDD by applying a function to each partition of this RDD,
    546. | while tracking the index of the original partition.
    547. |
    548. | >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
    549. | >>> def f(splitIndex, iterator): yield splitIndex
    550. | >>> rdd.mapPartitionsWithSplit(f).sum()
    551. | 6
    552. |
    553. | mapValues(self, f)
    554. | Pass each value in the key-value pair RDD through a map function
    555. | without changing the keys; this also retains the original RDD's
    556. | partitioning.
    557. |
    558. | >>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
    559. | >>> def f(x): return len(x)
    560. | >>> x.mapValues(f).collect()
    561. | [('a', 3), ('b', 1)]
    562. |
    563. | max(self, key=None)
    564. | Find the maximum item in this RDD.
    565. |
    566. | :param key: A function used to generate key for comparing
    567. |
    568. | >>> rdd = sc.parallelize([1.0, 5.0, 43.0, 10.0])
    569. | >>> rdd.max()
    570. | 43.0
    571. | >>> rdd.max(key=str)
    572. | 5.0
    573. |
    574. | mean(self)
    575. | Compute the mean of this RDD's elements.
    576. |
    577. | >>> sc.parallelize([1, 2, 3]).mean()
    578. | 2.0
    579. |
    580. | meanApprox(self, timeout, confidence=0.95)
    581. | .. note:: Experimental
    582. |
    583. | Approximate operation to return the mean within a timeout
    584. | or meet the confidence.
    585. |
    586. | >>> rdd = sc.parallelize(range(1000), 10)
    587. | >>> r = sum(range(1000)) / 1000.0
    588. | >>> abs(rdd.meanApprox(1000) - r) / r < 0.05
    589. | True
    590. |
    591. | min(self, key=None)
    592. | Find the minimum item in this RDD.
    593. |
    594. | :param key: A function used to generate key for comparing
    595. |
    596. | >>> rdd = sc.parallelize([2.0, 5.0, 43.0, 10.0])
    597. | >>> rdd.min()
    598. | 2.0
    599. | >>> rdd.min(key=str)
    600. | 10.0
    601. |
    602. | name(self)
    603. | Return the name of this RDD.
    604. |
    605. | partitionBy(self, numPartitions, partitionFunc=<function portable_hash at 0x7fc37857d730>)
    606. | Return a copy of the RDD partitioned using the specified partitioner.
    607. |
    608. | >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
    609. | >>> sets = pairs.partitionBy(2).glom().collect()
    610. | >>> len(set(sets[0]).intersection(set(sets[1])))
    611. | 0
    612. |
    613. | persist(self, storageLevel=StorageLevel(False, True, False, False, 1))
    614. | Set this RDD's storage level to persist its values across operations
    615. | after the first time it is computed. This can only be used to assign
    616. | a new storage level if the RDD does not have a storage level set yet.
    617. | If no storage level is specified defaults to (C{MEMORY_ONLY}).
    618. |
    619. | >>> rdd = sc.parallelize(["b", "a", "c"])
    620. | >>> rdd.persist().is_cached
    621. | True
    622. |
    623. | pipe(self, command, env=None, checkCode=False)
    624. | Return an RDD created by piping elements to a forked external process.
    625. |
    626. | >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
    627. | ['1', '2', '', '3']
    628. |
    629. | :param checkCode: whether or not to check the return value of the shell command.
    630. |
    631. | randomSplit(self, weights, seed=None)
    632. | Randomly splits this RDD with the provided weights.
    633. |
    634. | :param weights: weights for splits, will be normalized if they don't sum to 1
    635. | :param seed: random seed
    636. | :return: split RDDs in a list
    637. |
    638. | >>> rdd = sc.parallelize(range(500), 1)
    639. | >>> rdd1, rdd2 = rdd.randomSplit([2, 3], 17)
    640. | >>> len(rdd1.collect() + rdd2.collect())
    641. | 500
    642. | >>> 150 < rdd1.count() < 250
    643. | True
    644. | >>> 250 < rdd2.count() < 350
    645. | True
    646. |
    647. | reduce(self, f)
    648. | Reduces the elements of this RDD using the specified commutative and
    649. | associative binary operator. Currently reduces partitions locally.
    650. |
    651. | >>> from operator import add
    652. | >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
    653. | 15
    654. | >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
    655. | 10
    656. | >>> sc.parallelize([]).reduce(add)
    657. | Traceback (most recent call last):
    658. | ...
    659. | ValueError: Can not reduce() empty RDD
    660. |
    661. | reduceByKey(self, func, numPartitions=None, partitionFunc=<function portable_hash at 0x7fc37857d730>)
    662. | Merge the values for each key using an associative and commutative reduce function.
    663. |
    664. | This will also perform the merging locally on each mapper before
    665. | sending results to a reducer, similarly to a "combiner" in MapReduce.
    666. |
    667. | Output will be partitioned with C{numPartitions} partitions, or
    668. | the default parallelism level if C{numPartitions} is not specified.
    669. | Default partitioner is hash-partition.
    670. |
    671. | >>> from operator import add
    672. | >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
    673. | >>> sorted(rdd.reduceByKey(add).collect())
    674. | [('a', 2), ('b', 1)]
    675. |
    676. | reduceByKeyLocally(self, func)
    677. | Merge the values for each key using an associative and commutative reduce function, but
    678. | return the results immediately to the master as a dictionary.
    679. |
    680. | This will also perform the merging locally on each mapper before
    681. | sending results to a reducer, similarly to a "combiner" in MapReduce.
    682. |
    683. | >>> from operator import add
    684. | >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
    685. | >>> sorted(rdd.reduceByKeyLocally(add).items())
    686. | [('a', 2), ('b', 1)]
    687. |
    688. | repartition(self, numPartitions)
    689. | Return a new RDD that has exactly numPartitions partitions.
    690. |
    691. | Can increase or decrease the level of parallelism in this RDD.
    692. | Internally, this uses a shuffle to redistribute data.
    693. | If you are decreasing the number of partitions in this RDD, consider
    694. | using `coalesce`, which can avoid performing a shuffle.
    695. |
    696. | >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
    697. | >>> sorted(rdd.glom().collect())
    698. | [[1], [2, 3], [4, 5], [6, 7]]
    699. | >>> len(rdd.repartition(2).glom().collect())
    700. | 2
    701. | >>> len(rdd.repartition(10).glom().collect())
    702. | 10
    703. |
    704. | repartitionAndSortWithinPartitions(self, numPartitions=None, partitionFunc=<function portable_hash at 0x7fc37857d730>, ascending=True, keyfunc=<function RDD.<lambda> at 0x7fc251eafd08>)
    705. | Repartition the RDD according to the given partitioner and, within each resulting partition,
    706. | sort records by their keys.
    707. |
    708. | >>> rdd = sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)])
    709. | >>> rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: x % 2, 2)
    710. | >>> rdd2.glom().collect()
    711. | [[(0, 5), (0, 8), (2, 6)], [(1, 3), (3, 8), (3, 8)]]
    712. |
    713. | rightOuterJoin(self, other, numPartitions=None)
    714. | Perform a right outer join of C{self} and C{other}.
    715. |
    716. | For each element (k, w) in C{other}, the resulting RDD will either
    717. | contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w))
    718. | if no elements in C{self} have key k.
    719. |
    720. | Hash-partitions the resulting RDD into the given number of partitions.
    721. |
    722. | >>> x = sc.parallelize([("a", 1), ("b", 4)])
    723. | >>> y = sc.parallelize([("a", 2)])
    724. | >>> sorted(y.rightOuterJoin(x).collect())
    725. | [('a', (2, 1)), ('b', (None, 4))]
    726. |
    727. | sample(self, withReplacement, fraction, seed=None)
    728. | Return a sampled subset of this RDD.
    729. |
    730. | :param withReplacement: can elements be sampled multiple times (replaced when sampled out)
    731. | :param fraction: expected size of the sample as a fraction of this RDD's size
    732. | without replacement: probability that each element is chosen; fraction must be [0, 1]
    733. | with replacement: expected number of times each element is chosen; fraction must be >= 0
    734. | :param seed: seed for the random number generator
    735. |
    736. | .. note:: This is not guaranteed to provide exactly the fraction specified of the total
    737. | count of the given :class:`DataFrame`.
    738. |
    739. | >>> rdd = sc.parallelize(range(100), 4)
    740. | >>> 6 <= rdd.sample(False, 0.1, 81).count() <= 14
    741. | True
    742. |
    743. | sampleByKey(self, withReplacement, fractions, seed=None)
    744. | Return a subset of this RDD sampled by key (via stratified sampling).
    745. | Create a sample of this RDD using variable sampling rates for
    746. | different keys as specified by fractions, a key to sampling rate map.
    747. |
    748. | >>> fractions = {"a": 0.2, "b": 0.1}
    749. | >>> rdd = sc.parallelize(fractions.keys()).cartesian(sc.parallelize(range(0, 1000)))
    750. | >>> sample = dict(rdd.sampleByKey(False, fractions, 2).groupByKey().collect())
    751. | >>> 100 < len(sample["a"]) < 300 and 50 < len(sample["b"]) < 150
    752. | True
    753. | >>> max(sample["a"]) <= 999 and min(sample["a"]) >= 0
    754. | True
    755. | >>> max(sample["b"]) <= 999 and min(sample["b"]) >= 0
    756. | True
    757. |
    758. | sampleStdev(self)
    759. | Compute the sample standard deviation of this RDD's elements (which
    760. | corrects for bias in estimating the standard deviation by dividing by
    761. | N-1 instead of N).
    762. |
    763. | >>> sc.parallelize([1, 2, 3]).sampleStdev()
    764. | 1.0
    765. |
    766. | sampleVariance(self)
    767. | Compute the sample variance of this RDD's elements (which corrects
    768. | for bias in estimating the variance by dividing by N-1 instead of N).
    769. |
    770. | >>> sc.parallelize([1, 2, 3]).sampleVariance()
    771. | 1.0
    772. |
    773. | saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None)
    774. | Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
    775. | system, using the old Hadoop OutputFormat API (mapred package). Keys/values are
    776. | converted for output using either user specified converters or, by default,
    777. | L{org.apache.spark.api.python.JavaToWritableConverter}.
    778. |
    779. | :param conf: Hadoop job configuration, passed in as a dict
    780. | :param keyConverter: (None by default)
    781. | :param valueConverter: (None by default)
    782. |
    783. | saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None, compressionCodecClass=None)
    784. | Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
    785. | system, using the old Hadoop OutputFormat API (mapred package). Key and value types
    786. | will be inferred if not specified. Keys and values are converted for output using either
    787. | user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The
    788. | C{conf} is applied on top of the base Hadoop conf associated with the SparkContext
    789. | of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.
    790. |
    791. | :param path: path to Hadoop file
    792. | :param outputFormatClass: fully qualified classname of Hadoop OutputFormat
    793. | (e.g. "org.apache.hadoop.mapred.SequenceFileOutputFormat")
    794. | :param keyClass: fully qualified classname of key Writable class
    795. | (e.g. "org.apache.hadoop.io.IntWritable", None by default)
    796. | :param valueClass: fully qualified classname of value Writable class
    797. | (e.g. "org.apache.hadoop.io.Text", None by default)
    798. | :param keyConverter: (None by default)
    799. | :param valueConverter: (None by default)
    800. | :param conf: (None by default)
    801. | :param compressionCodecClass: (None by default)
    802. |
    803. | saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None)
    804. | Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
    805. | system, using the new Hadoop OutputFormat API (mapreduce package). Keys/values are
    806. | converted for output using either user specified converters or, by default,
    807. | L{org.apache.spark.api.python.JavaToWritableConverter}.
    808. |
    809. | :param conf: Hadoop job configuration, passed in as a dict
    810. | :param keyConverter: (None by default)
    811. | :param valueConverter: (None by default)
    812. |
    813. | saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None)
    814. | Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
    815. | system, using the new Hadoop OutputFormat API (mapreduce package). Key and value types
    816. | will be inferred if not specified. Keys and values are converted for output using either
    817. | user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The
    818. | C{conf} is applied on top of the base Hadoop conf associated with the SparkContext
    819. | of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.
    820. |
    821. | :param path: path to Hadoop file
    822. | :param outputFormatClass: fully qualified classname of Hadoop OutputFormat
    823. | (e.g. "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
    824. | :param keyClass: fully qualified classname of key Writable class
    825. | (e.g. "org.apache.hadoop.io.IntWritable", None by default)
    826. | :param valueClass: fully qualified classname of value Writable class
    827. | (e.g. "org.apache.hadoop.io.Text", None by default)
    828. | :param keyConverter: (None by default)
    829. | :param valueConverter: (None by default)
    830. | :param conf: Hadoop job configuration, passed in as a dict (None by default)
    831. |
    832. | saveAsPickleFile(self, path, batchSize=10)
    833. | Save this RDD as a SequenceFile of serialized objects. The serializer
    834. | used is L{pyspark.serializers.PickleSerializer}, default batch size
    835. | is 10.
    836. |
    837. | >>> tmpFile = NamedTemporaryFile(delete=True)
    838. | >>> tmpFile.close()
    839. | >>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3)
    840. | >>> sorted(sc.pickleFile(tmpFile.name, 5).map(str).collect())
    841. | ['1', '2', 'rdd', 'spark']
    842. |
    843. | saveAsSequenceFile(self, path, compressionCodecClass=None)
    844. | Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
    845. | system, using the L{org.apache.hadoop.io.Writable} types that we convert from the
    846. | RDD's key and value types. The mechanism is as follows:
    847. |
    848. | 1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects.
    849. | 2. Keys and values of this Java RDD are converted to Writables and written out.
    850. |
    851. | :param path: path to sequence file
    852. | :param compressionCodecClass: (None by default)
    853. |
    854. | saveAsTextFile(self, path, compressionCodecClass=None)
    855. | Save this RDD as a text file, using string representations of elements.
    856. |
    857. | @param path: path to text file
    858. | @param compressionCodecClass: (None by default) string i.e.
    859. | "org.apache.hadoop.io.compress.GzipCodec"
    860. |
    861. | >>> tempFile = NamedTemporaryFile(delete=True)
    862. | >>> tempFile.close()
    863. | >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name)
    864. | >>> from fileinput import input
    865. | >>> from glob import glob
    866. | >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
    867. | '0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n'
    868. |
    869. | Empty lines are tolerated when saving to text files.
    870. |
    871. | >>> tempFile2 = NamedTemporaryFile(delete=True)
    872. | >>> tempFile2.close()
    873. | >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name)
    874. | >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*"))))
    875. | '\n\n\nbar\nfoo\n'
    876. |
    877. | Using compressionCodecClass
    878. |
    879. | >>> tempFile3 = NamedTemporaryFile(delete=True)
    880. | >>> tempFile3.close()
    881. | >>> codec = "org.apache.hadoop.io.compress.GzipCodec"
    882. | >>> sc.parallelize(['foo', 'bar']).saveAsTextFile(tempFile3.name, codec)
    883. | >>> from fileinput import input, hook_compressed
    884. | >>> result = sorted(input(glob(tempFile3.name + "/part*.gz"), openhook=hook_compressed))
    885. | >>> b''.join(result).decode('utf-8')
    886. | 'bar\nfoo\n'
    887. |
    888. | setName(self, name)
    889. | Assign a name to this RDD.
    890. |
    891. | >>> rdd1 = sc.parallelize([1, 2])
    892. | >>> rdd1.setName('RDD1').name()
    893. | 'RDD1'
    894. |
    895. | sortBy(self, keyfunc, ascending=True, numPartitions=None)
    896. | Sorts this RDD by the given keyfunc
    897. |
    898. | >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
    899. | >>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
    900. | [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
    901. | >>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
    902. | [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
    903. |
    904. | sortByKey(self, ascending=True, numPartitions=None, keyfunc=<function RDD.<lambda> at 0x7fc251eafe18>)
    905. | Sorts this RDD, which is assumed to consist of (key, value) pairs.
    906. | # noqa
    907. |
    908. | >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
    909. | >>> sc.parallelize(tmp).sortByKey().first()
    910. | ('1', 3)
    911. | >>> sc.parallelize(tmp).sortByKey(True, 1).collect()
    912. | [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
    913. | >>> sc.parallelize(tmp).sortByKey(True, 2).collect()
    914. | [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
    915. | >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
    916. | >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
    917. | >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
    918. | [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]
    919. |
    920. | stats(self)
    921. | Return a L{StatCounter} object that captures the mean, variance
    922. | and count of the RDD's elements in one operation.
    923. |
    924. | stdev(self)
    925. | Compute the standard deviation of this RDD's elements.
    926. |
    927. | >>> sc.parallelize([1, 2, 3]).stdev()
    928. | 0.816...
    929. |
    930. | subtract(self, other, numPartitions=None)
    931. | Return each value in C{self} that is not contained in C{other}.
    932. |
    933. | >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
    934. | >>> y = sc.parallelize([("a", 3), ("c", None)])
    935. | >>> sorted(x.subtract(y).collect())
    936. | [('a', 1), ('b', 4), ('b', 5)]
    937. |
    938. | subtractByKey(self, other, numPartitions=None)
    939. | Return each (key, value) pair in C{self} that has no pair with matching
    940. | key in C{other}.
    941. |
    942. | >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
    943. | >>> y = sc.parallelize([("a", 3), ("c", None)])
    944. | >>> sorted(x.subtractByKey(y).collect())
    945. | [('b', 4), ('b', 5)]
    946. |
    947. | sum(self)
    948. | Add up the elements in this RDD.
    949. |
    950. | >>> sc.parallelize([1.0, 2.0, 3.0]).sum()
    951. | 6.0
    952. |
    953. | sumApprox(self, timeout, confidence=0.95)
    954. | .. note:: Experimental
    955. |
    956. | Approximate operation to return the sum within a timeout
    957. | or meet the confidence.
    958. |
    959. | >>> rdd = sc.parallelize(range(1000), 10)
    960. | >>> r = sum(range(1000))
    961. | >>> abs(rdd.sumApprox(1000) - r) / r < 0.05
    962. | True
    963. |
    964. | take(self, num)
    965. | Take the first num elements of the RDD.
    966. |
    967. | It works by first scanning one partition, and use the results from
    968. | that partition to estimate the number of additional partitions needed
    969. | to satisfy the limit.
    970. |
    971. | Translated from the Scala implementation in RDD#take().
    972. |
    973. | .. note:: this method should only be used if the resulting array is expected
    974. | to be small, as all the data is loaded into the driver's memory.
    975. |
    976. | >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
    977. | [2, 3]
    978. | >>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
    979. | [2, 3, 4, 5, 6]
    980. | >>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3)
    981. | [91, 92, 93]
    982. |
    983. | takeOrdered(self, num, key=None)
    984. | Get the N elements from an RDD ordered in ascending order or as
    985. | specified by the optional key function.
    986. |
    987. | .. note:: this method should only be used if the resulting array is expected
    988. | to be small, as all the data is loaded into the driver's memory.
    989. |
    990. | >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
    991. | [1, 2, 3, 4, 5, 6]
    992. | >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
    993. | [10, 9, 7, 6, 5, 4]
    994. |
    995. | takeSample(self, withReplacement, num, seed=None)
    996. | Return a fixed-size sampled subset of this RDD.
    997. |
    998. | .. note:: This method should only be used if the resulting array is expected
    999. | to be small, as all the data is loaded into the driver's memory.
    1000. |
    1001. | >>> rdd = sc.parallelize(range(0, 10))
    1002. | >>> len(rdd.takeSample(True, 20, 1))
    1003. | 20
    1004. | >>> len(rdd.takeSample(False, 5, 2))
    1005. | 5
    1006. | >>> len(rdd.takeSample(False, 15, 3))
    1007. | 10
    1008. |
    1009. | toDebugString(self)
    1010. | A description of this RDD and its recursive dependencies for debugging.
    1011. |
    1012. | toLocalIterator(self)
    1013. | Return an iterator that contains all of the elements in this RDD.
    1014. | The iterator will consume as much memory as the largest partition in this RDD.
    1015. |
    1016. | >>> rdd = sc.parallelize(range(10))
    1017. | >>> [x for x in rdd.toLocalIterator()]
    1018. | [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    1019. |
    1020. | top(self, num, key=None)
    1021. | Get the top N elements from an RDD.
    1022. |
    1023. | .. note:: This method should only be used if the resulting array is expected
    1024. | to be small, as all the data is loaded into the driver's memory.
    1025. |
    1026. | .. note:: It returns the list sorted in descending order.
    1027. |
    1028. | >>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
    1029. | [12]
    1030. | >>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2)
    1031. | [6, 5]
    1032. | >>> sc.parallelize([10, 4, 2, 12, 3]).top(3, key=str)
    1033. | [4, 3, 2]
    1034. |
    1035. | treeAggregate(self, zeroValue, seqOp, combOp, depth=2)
    1036. | Aggregates the elements of this RDD in a multi-level tree
    1037. | pattern.
    1038. |
    1039. | :param depth: suggested depth of the tree (default: 2)
    1040. |
    1041. | >>> add = lambda x, y: x + y
    1042. | >>> rdd = sc.parallelize([-5, -4, -3, -2, -1, 1, 2, 3, 4], 10)
    1043. | >>> rdd.treeAggregate(0, add, add)
    1044. | -5
    1045. | >>> rdd.treeAggregate(0, add, add, 1)
    1046. | -5
    1047. | >>> rdd.treeAggregate(0, add, add, 2)
    1048. | -5
    1049. | >>> rdd.treeAggregate(0, add, add, 5)
    1050. | -5
    1051. | >>> rdd.treeAggregate(0, add, add, 10)
    1052. | -5
    1053. |
    1054. | treeReduce(self, f, depth=2)
    1055. | Reduces the elements of this RDD in a multi-level tree pattern.
    1056. |
    1057. | :param depth: suggested depth of the tree (default: 2)
    1058. |
    1059. | >>> add = lambda x, y: x + y
    1060. | >>> rdd = sc.parallelize([-5, -4, -3, -2, -1, 1, 2, 3, 4], 10)
    1061. | >>> rdd.treeReduce(add)
    1062. | -5
    1063. | >>> rdd.treeReduce(add, 1)
    1064. | -5
    1065. | >>> rdd.treeReduce(add, 2)
    1066. | -5
    1067. | >>> rdd.treeReduce(add, 5)
    1068. | -5
    1069. | >>> rdd.treeReduce(add, 10)
    1070. | -5
    1071. |
    1072. | union(self, other)
    1073. | Return the union of this RDD and another one.
    1074. |
    1075. | >>> rdd = sc.parallelize([1, 1, 2, 3])
    1076. | >>> rdd.union(rdd).collect()
    1077. | [1, 1, 2, 3, 1, 1, 2, 3]
    1078. |
    1079. | unpersist(self)
    1080. | Mark the RDD as non-persistent, and remove all blocks for it from
    1081. | memory and disk.
    1082. |
    1083. | values(self)
    1084. | Return an RDD with the values of each tuple.
    1085. |
    1086. | >>> m = sc.parallelize([(1, 2), (3, 4)]).values()
    1087. | >>> m.collect()
    1088. | [2, 4]
    1089. |
    1090. | variance(self)
    1091. | Compute the variance of this RDD's elements.
    1092. |
    1093. | >>> sc.parallelize([1, 2, 3]).variance()
    1094. | 0.666...
    1095. |
    1096. | zip(self, other)
    1097. | Zips this RDD with another one, returning key-value pairs with the
    1098. | first element in each RDD second element in each RDD, etc. Assumes
    1099. | that the two RDDs have the same number of partitions and the same
    1100. | number of elements in each partition (e.g. one was made through
    1101. | a map on the other).
    1102. |
    1103. | >>> x = sc.parallelize(range(0,5))
    1104. | >>> y = sc.parallelize(range(1000, 1005))
    1105. | >>> x.zip(y).collect()
    1106. | [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
    1107. |
    1108. | zipWithIndex(self)
    1109. | Zips this RDD with its element indices.
    1110. |
    1111. | The ordering is first based on the partition index and then the
    1112. | ordering of items within each partition. So the first item in
    1113. | the first partition gets index 0, and the last item in the last
    1114. | partition receives the largest index.
    1115. |
    1116. | This method needs to trigger a spark job when this RDD contains
    1117. | more than one partitions.
    1118. |
    1119. | >>> sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect()
    1120. | [('a', 0), ('b', 1), ('c', 2), ('d', 3)]
    1121. |
    1122. | zipWithUniqueId(self)
    1123. | Zips this RDD with generated unique Long ids.
    1124. |
    1125. | Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
    1126. | n is the number of partitions. So there may exist gaps, but this
    1127. | method won't trigger a spark job, which is different from
    1128. | L{zipWithIndex}
    1129. |
    1130. | >>> sc.parallelize(["a", "b", "c", "d", "e"], 3).zipWithUniqueId().collect()
    1131. | [('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)]
    1132. |
    1133. | ----------------------------------------------------------------------
    1134. | Data descriptors inherited from RDD:
    1135. |
    1136. | __dict__
    1137. | dictionary for instance variables (if defined)
    1138. |
    1139. | __weakref__
    1140. | list of weak references to the object (if defined)
    1141. |
    1142. | context
    1143. | The L{SparkContext} that this RDD was created on.