#!/usr/bin/env python# coding: utf-8# In[1]:# rdd算子# In[2]:from pyspark import SparkConf,SparkContextconf = SparkConf().setAppName("test").setMaster("local[*]")# In[3]:sc = SparkContext(conf = conf)# In[4]:data = [1,2,3,4,5,6,7,8,9]rdd =sc.parallelize(data, numSlices=3)print(rdd.collect())# In[6]:print(rdd.getNumPartitions()) # 获取分区数# In[7]:# rdd 读取文件#小文件较多用wholeTextFilerdd = sc.textFile("hdfs://bipcluster04/bip/external_data/vipreco/tess_official/entity/20220408/ads_rec_mer_itemRT_v4_v3")# rdd算子transformation算子 :返回类型还是rdd&action算子:返回类型不是rdd# In[ ]:## wordcount 计算实例# In[13]:localFile = sc.textFile("file:///home/mlp/notebooks/202204/helloWorld.txt")# In[14]:wordRdd = localFile.flatMap(lambda x: x.split(' '))# In[15]:wordsWithOneRdd = wordRdd.map(lambda x: (x,1))# In[16]:reduceRdd = wordsWithOneRdd.reduceByKey(lambda a,b: a + b).collect()# In[17]:print(reduceRdd)# In[ ]:# In[18]:#groupBy 算子:将rdd分组;# In[23]:rdd2= sc.parallelize([1,2,3,2,4,5,6,7,8])# In[28]:rdd3 =rdd2.groupBy(lambda num: 'even' if num % 2== 0 else 'odd')print(rdd3.map(lambda x: (x[0],list(x[1]))).collect())# In[30]:# filter过滤rdd2.filter(lambda x: x % 2 ==1).collect()#rdd3.filter(lambda x: x % 2 ==1).collect() 被使用# In[35]:#distinct 算子rdd2.distinct().collect()# In[36]:# union 将2个rdd合并成一个# 只合并不去重,不同类型也可以合并rdd4 = rdd2.distinct()rdd4.union(rdd2).collect()# In[41]:# join 对两个rdd进行join连接,类sql;# 只能用于二元数组;join leftOuterJoin RightOuterJoinx = sc.parallelize([(20220410,100),(20220411,120),(20220412,140)])y = sc.parallelize([(20220410,69123),(20220411,69124)])# In[42]:x.join(y).collect()# In[43]:x = sc.parallelize([(20220410,100,1),(20220411,120,1),(20220412,140,2)])y = sc.parallelize([(20220410,69123,2),(20220411,69124,1)])x.join(y).collect()# In[45]:# 两个集合的交集intersection# 返回两个rdd的交集,一个新的rdd#rdd.inserection(rdd2)rdd1 = sc.parallelize([('a',1),('b',1)])rdd2 = sc.parallelize([('a',1),('c',1)])inser_rdd = rdd1.intersection(rdd2)inser_rdd.collect()# In[46]:# glom() 将rdd加上嵌套,这个嵌套按照分区来进行# glom的作用是将同一个分区里的元素合并到一个array里data = [1,2,3,4,5,6,7,8,9]rdd =sc.parallelize(data, numSlices=2)rdd.glom().collect()# In[49]:# groupByKey() 针对kv型rdd,按照key进行分组rdd = sc.parallelize([1,2,2,4,3])rdd1 = rdd.map(lambda x: (x,1))rdd2 = rdd1.groupByKey()rdd2.map(lambda x: (x[0],list(x[1]))).collect()# In[50]:# sortBy # 对rdd进行排序rdd1.collect()# In[51]:rdd1.sortBy(lambda x:x[0]).collect()# In[52]:rdd1.sortBy(lambda x:x[1]).collect()# In[53]:# sortByKey() # 针对kv性rdd, 按照key进行排序#。RDD.sortByKey(ascending=True, numPartitions=None, keyfunc=<function RDD.<lambda>>) # In[67]:rdd = sc.parallelize([('A',1),('b',1),('a',1),('D',1),('F',1),('b',1),('a',2),('b',3)],4)# In[68]:rdd.sortByKey().collect()# In[69]:rdd.sortByKey(ascending=False, numPartitions=3).collect()# In[70]:rdd.sortByKey(ascending=True, numPartitions=1, keyfunc= lambda key:str(key).lower()).collect()# In[ ]: