#!/usr/bin/env python
# coding: utf-8
# In[1]:
# rdd算子
# In[2]:
from pyspark import SparkConf,SparkContext
conf = SparkConf().setAppName("test").setMaster("local[*]")
# In[3]:
sc = SparkContext(conf = conf)
# In[4]:
data = [1,2,3,4,5,6,7,8,9]
rdd =sc.parallelize(data, numSlices=3)
print(rdd.collect())
# In[6]:
print(rdd.getNumPartitions()) # 获取分区数
# In[7]:
# rdd 读取文件
#小文件较多用wholeTextFile
rdd = sc.textFile("hdfs://bipcluster04/bip/external_data/vipreco/tess_official/entity/20220408/ads_rec_mer_itemRT_v4_v3")
# rdd算子
transformation算子 :返回类型还是rdd
&
action算子:返回类型不是rdd
# In[ ]:
## wordcount 计算实例
# In[13]:
localFile = sc.textFile("file:///home/mlp/notebooks/202204/helloWorld.txt")
# In[14]:
wordRdd = localFile.flatMap(lambda x: x.split(' '))
# In[15]:
wordsWithOneRdd = wordRdd.map(lambda x: (x,1))
# In[16]:
reduceRdd = wordsWithOneRdd.reduceByKey(lambda a,b: a + b).collect()
# In[17]:
print(reduceRdd)
# In[ ]:
# In[18]:
#groupBy 算子:将rdd分组;
# In[23]:
rdd2= sc.parallelize([1,2,3,2,4,5,6,7,8])
# In[28]:
rdd3 =rdd2.groupBy(lambda num: 'even' if num % 2== 0 else 'odd')
print(rdd3.map(lambda x: (x[0],list(x[1]))).collect())
# In[30]:
# filter过滤
rdd2.filter(lambda x: x % 2 ==1).collect()
#rdd3.filter(lambda x: x % 2 ==1).collect() 被使用
# In[35]:
#distinct 算子
rdd2.distinct().collect()
# In[36]:
# union 将2个rdd合并成一个
# 只合并不去重,不同类型也可以合并
rdd4 = rdd2.distinct()
rdd4.union(rdd2).collect()
# In[41]:
# join 对两个rdd进行join连接,类sql;
# 只能用于二元数组;join leftOuterJoin RightOuterJoin
x = sc.parallelize([(20220410,100),(20220411,120),(20220412,140)])
y = sc.parallelize([(20220410,69123),(20220411,69124)])
# In[42]:
x.join(y).collect()
# In[43]:
x = sc.parallelize([(20220410,100,1),(20220411,120,1),(20220412,140,2)])
y = sc.parallelize([(20220410,69123,2),(20220411,69124,1)])
x.join(y).collect()
# In[45]:
# 两个集合的交集intersection
# 返回两个rdd的交集,一个新的rdd
#rdd.inserection(rdd2)
rdd1 = sc.parallelize([('a',1),('b',1)])
rdd2 = sc.parallelize([('a',1),('c',1)])
inser_rdd = rdd1.intersection(rdd2)
inser_rdd.collect()
# In[46]:
# glom() 将rdd加上嵌套,这个嵌套按照分区来进行
# glom的作用是将同一个分区里的元素合并到一个array里
data = [1,2,3,4,5,6,7,8,9]
rdd =sc.parallelize(data, numSlices=2)
rdd.glom().collect()
# In[49]:
# groupByKey() 针对kv型rdd,按照key进行分组
rdd = sc.parallelize([1,2,2,4,3])
rdd1 = rdd.map(lambda x: (x,1))
rdd2 = rdd1.groupByKey()
rdd2.map(lambda x: (x[0],list(x[1]))).collect()
# In[50]:
# sortBy
# 对rdd进行排序
rdd1.collect()
# In[51]:
rdd1.sortBy(lambda x:x[0]).collect()
# In[52]:
rdd1.sortBy(lambda x:x[1]).collect()
# In[53]:
# sortByKey()
# 针对kv性rdd, 按照key进行排序
#。RDD.sortByKey(ascending=True, numPartitions=None, keyfunc=<function RDD.<lambda>>)
# In[67]:
rdd = sc.parallelize([('A',1),('b',1),('a',1),('D',1),('F',1),('b',1),('a',2),('b',3)],4)
# In[68]:
rdd.sortByKey().collect()
# In[69]:
rdd.sortByKey(ascending=False, numPartitions=3).collect()
# In[70]:
rdd.sortByKey(ascending=True, numPartitions=1, keyfunc= lambda key:str(key).lower()).collect()
# In[ ]: