1. #!/usr/bin/env python
    2. # coding: utf-8
    3. # In[1]:
    4. # rdd算子
    5. # In[2]:
    6. from pyspark import SparkConf,SparkContext
    7. conf = SparkConf().setAppName("test").setMaster("local[*]")
    8. # In[3]:
    9. sc = SparkContext(conf = conf)
    10. # In[4]:
    11. data = [1,2,3,4,5,6,7,8,9]
    12. rdd =sc.parallelize(data, numSlices=3)
    13. print(rdd.collect())
    14. # In[6]:
    15. print(rdd.getNumPartitions()) # 获取分区数
    16. # In[7]:
    17. # rdd 读取文件
    18. #小文件较多用wholeTextFile
    19. rdd = sc.textFile("hdfs://bipcluster04/bip/external_data/vipreco/tess_official/entity/20220408/ads_rec_mer_itemRT_v4_v3")
    20. # rdd算子
    21. transformation算子 :返回类型还是rdd
    22. &
    23. action算子:返回类型不是rdd
    24. # In[ ]:
    25. ## wordcount 计算实例
    26. # In[13]:
    27. localFile = sc.textFile("file:///home/mlp/notebooks/202204/helloWorld.txt")
    28. # In[14]:
    29. wordRdd = localFile.flatMap(lambda x: x.split(' '))
    30. # In[15]:
    31. wordsWithOneRdd = wordRdd.map(lambda x: (x,1))
    32. # In[16]:
    33. reduceRdd = wordsWithOneRdd.reduceByKey(lambda a,b: a + b).collect()
    34. # In[17]:
    35. print(reduceRdd)
    36. # In[ ]:
    37. # In[18]:
    38. #groupBy 算子:将rdd分组;
    39. # In[23]:
    40. rdd2= sc.parallelize([1,2,3,2,4,5,6,7,8])
    41. # In[28]:
    42. rdd3 =rdd2.groupBy(lambda num: 'even' if num % 2== 0 else 'odd')
    43. print(rdd3.map(lambda x: (x[0],list(x[1]))).collect())
    44. # In[30]:
    45. # filter过滤
    46. rdd2.filter(lambda x: x % 2 ==1).collect()
    47. #rdd3.filter(lambda x: x % 2 ==1).collect() 被使用
    48. # In[35]:
    49. #distinct 算子
    50. rdd2.distinct().collect()
    51. # In[36]:
    52. # union 将2个rdd合并成一个
    53. # 只合并不去重,不同类型也可以合并
    54. rdd4 = rdd2.distinct()
    55. rdd4.union(rdd2).collect()
    56. # In[41]:
    57. # join 对两个rdd进行join连接,类sql;
    58. # 只能用于二元数组;join leftOuterJoin RightOuterJoin
    59. x = sc.parallelize([(20220410,100),(20220411,120),(20220412,140)])
    60. y = sc.parallelize([(20220410,69123),(20220411,69124)])
    61. # In[42]:
    62. x.join(y).collect()
    63. # In[43]:
    64. x = sc.parallelize([(20220410,100,1),(20220411,120,1),(20220412,140,2)])
    65. y = sc.parallelize([(20220410,69123,2),(20220411,69124,1)])
    66. x.join(y).collect()
    67. # In[45]:
    68. # 两个集合的交集intersection
    69. # 返回两个rdd的交集,一个新的rdd
    70. #rdd.inserection(rdd2)
    71. rdd1 = sc.parallelize([('a',1),('b',1)])
    72. rdd2 = sc.parallelize([('a',1),('c',1)])
    73. inser_rdd = rdd1.intersection(rdd2)
    74. inser_rdd.collect()
    75. # In[46]:
    76. # glom() 将rdd加上嵌套,这个嵌套按照分区来进行
    77. # glom的作用是将同一个分区里的元素合并到一个array里
    78. data = [1,2,3,4,5,6,7,8,9]
    79. rdd =sc.parallelize(data, numSlices=2)
    80. rdd.glom().collect()
    81. # In[49]:
    82. # groupByKey() 针对kv型rdd,按照key进行分组
    83. rdd = sc.parallelize([1,2,2,4,3])
    84. rdd1 = rdd.map(lambda x: (x,1))
    85. rdd2 = rdd1.groupByKey()
    86. rdd2.map(lambda x: (x[0],list(x[1]))).collect()
    87. # In[50]:
    88. # sortBy
    89. # 对rdd进行排序
    90. rdd1.collect()
    91. # In[51]:
    92. rdd1.sortBy(lambda x:x[0]).collect()
    93. # In[52]:
    94. rdd1.sortBy(lambda x:x[1]).collect()
    95. # In[53]:
    96. # sortByKey()
    97. # 针对kv性rdd, 按照key进行排序
    98. #。RDD.sortByKey(ascending=True, numPartitions=None, keyfunc=<function RDD.<lambda>>)
    99. # In[67]:
    100. rdd = sc.parallelize([('A',1),('b',1),('a',1),('D',1),('F',1),('b',1),('a',2),('b',3)],4)
    101. # In[68]:
    102. rdd.sortByKey().collect()
    103. # In[69]:
    104. rdd.sortByKey(ascending=False, numPartitions=3).collect()
    105. # In[70]:
    106. rdd.sortByKey(ascending=True, numPartitions=1, keyfunc= lambda key:str(key).lower()).collect()
    107. # In[ ]: