1. # 统计各部门薪资总和,按总和的降序排序
    2. # (dept_num,salary) --> 分组求和reduceByKey --> 排序 sortBy
    1. from pyspark import SparkContext
    2. sc = SparkContext()
    1. rdd = sc.textFile("/home/ubuntu/emp.csv")
    2. rdd.take(3)
    1. ['emp_num,emp_name,role,leader_num,birth_date,salary,bonus,dept_num',
    2. '7369,SMITH,CLERK,7902,1980/12/17,800,,20',
    3. '7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30']
    1. rdd = rdd.filter(lambda x:x[-8:] != "dept_num" )
    2. rdd.take(3)
    1. ['7369,SMITH,CLERK,7902,1980/12/17,800,,20',
    2. '7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30',
    3. '7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30']
    1. def func1(x):
    2. t = x.split(",")
    3. return (t[-1],int(t[-3]))
    4. rdd1 = rdd.map(func1)
    5. rdd1.collect()
    1. [('20', 800),
    2. ('30', 1600),
    3. ('30', 1250),
    4. ('20', 2975),
    5. ('30', 1250),
    6. ('30', 2850),
    7. ('10', 2450),
    8. ('20', 3000),
    9. ('10', 5000),
    10. ('30', 1500),
    11. ('20', 1100),
    12. ('30', 950),
    13. ('20', 3000),
    14. ('10', 1300)]
    1. rdd2 = rdd1.reduceByKey(lambda x,y:x+y)
    2. rdd2.collect()
    1. [('20', 10875), ('10', 8750), ('30', 9400)]
    1. rdd2.sortBy(lambda x:x[1],ascending=False).collect()
    1. [('20', 10875), ('30', 9400), ('10', 8750)]
    1. rdd.take(3)
    1. ['7369,SMITH,CLERK,7902,1980/12/17,800,,20',
    2. '7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30',
    3. '7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30']
    1. rdd.flatMap(lambda x:x.split(",")).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).sortBy(lambda x:x[1],ascending=False).collect()
    1. [('', 11),
    2. ('7698', 6),
    3. ('30', 6),
    4. ('20', 5),
    5. ('SALESMAN', 4),
    6. ('7839', 4),
    7. ('CLERK', 4),
    8. ('10', 3),
    9. ('7566', 3),
    10. ('MANAGER', 3),
    11. ('1250', 2),
    12. ('7782', 2),
    13. ('7788', 2),
    14. ('7902', 2),
    15. ('ANALYST', 2),
    16. ('3000', 2),
    17. ('1981/12/3', 2),
    18. ('7369', 1),
    19. ('SMITH', 1),
    20. ('1980/12/17', 1),
    21. ('7499', 1),
    22. ('ALLEN', 1),
    23. ('1981/2/20', 1),
    24. ('1600', 1),
    25. ('7521', 1),
    26. ('1981/2/22', 1),
    27. ('JONES', 1),
    28. ('1981/4/2', 1),
    29. ('2975', 1),
    30. ('MARTIN', 1),
    31. ('1981/9/28', 1),
    32. ('1400', 1),
    33. ('CLARK', 1),
    34. ('1981/11/17', 1),
    35. ('0', 1),
    36. ('1987/5/23', 1),
    37. ('1100', 1),
    38. ('7900', 1),
    39. ('JAMES', 1),
    40. ('950', 1),
    41. ('1982/1/23', 1),
    42. ('800', 1),
    43. ('300', 1),
    44. ('WARD', 1),
    45. ('500', 1),
    46. ('7654', 1),
    47. ('BLAKE', 1),
    48. ('1981/5/1', 1),
    49. ('2850', 1),
    50. ('1981/6/9', 1),
    51. ('2450', 1),
    52. ('SCOTT', 1),
    53. ('1987/4/19', 1),
    54. ('KING', 1),
    55. ('PRESIDENT', 1),
    56. ('5000', 1),
    57. ('7844', 1),
    58. ('TURNER', 1),
    59. ('1981/9/8', 1),
    60. ('1500', 1),
    61. ('7876', 1),
    62. ('ADAMS', 1),
    63. ('FORD', 1),
    64. ('7934', 1),
    65. ('MILLER', 1),
    66. ('1300', 1)]
    1. from pyspark.sql import SparkSession
    2. spark = SparkSession.builder.getOrCreate()
    1. df = spark.read.csv("/home/ubuntu/emp.csv",header=True)
    2. df.show()
    1. +-------+--------+---------+----------+----------+------+-----+--------+
    2. |emp_num|emp_name| role|leader_num|birth_date|salary|bonus|dept_num|
    3. +-------+--------+---------+----------+----------+------+-----+--------+
    4. | 7369| SMITH| CLERK| 7902|1980/12/17| 800| null| 20|
    5. | 7499| ALLEN| SALESMAN| 7698| 1981/2/20| 1600| 300| 30|
    6. | 7521| WARD| SALESMAN| 7698| 1981/2/22| 1250| 500| 30|
    7. | 7566| JONES| MANAGER| 7839| 1981/4/2| 2975| null| 20|
    8. | 7654| MARTIN| SALESMAN| 7698| 1981/9/28| 1250| 1400| 30|
    9. | 7698| BLAKE| MANAGER| 7839| 1981/5/1| 2850| null| 30|
    10. | 7782| CLARK| MANAGER| 7839| 1981/6/9| 2450| null| 10|
    11. | 7788| SCOTT| ANALYST| 7566| 1987/4/19| 3000| null| 20|
    12. | 7839| KING|PRESIDENT| null|1981/11/17| 5000| null| 10|
    13. | 7844| TURNER| SALESMAN| 7698| 1981/9/8| 1500| 0| 30|
    14. | 7876| ADAMS| CLERK| 7788| 1987/5/23| 1100| null| 20|
    15. | 7900| JAMES| CLERK| 7698| 1981/12/3| 950| null| 30|
    16. | 7902| FORD| ANALYST| 7566| 1981/12/3| 3000| null| 20|
    17. | 7934| MILLER| CLERK| 7782| 1982/1/23| 1300| null| 10|
    18. +-------+--------+---------+----------+----------+------+-----+--------+
    1. df1 = df.selectExpr("dept_num","cast(salary as int) salary")
    1. import pyspark.sql.functions as F
    1. # SQL DSL sum count mean max min
    2. df1.groupBy("dept_num").sum().orderBy(F.col("sum(salary)").desc()).show()
    1. +--------+-----------+
    2. |dept_num|sum(salary)|
    3. +--------+-----------+
    4. | 20| 10875|
    5. | 30| 9400|
    6. | 10| 8750|
    7. +--------+-----------+
    1. df1.registerTempTable("emp")
    1. spark.sql("select dept_num,sum(salary) as salary from emp group by dept_num order by salary desc").show()
    1. +--------+------+
    2. |dept_num|salary|
    3. +--------+------+
    4. | 20| 10875|
    5. | 30| 9400|
    6. | 10| 8750|
    7. +--------+------+
    1. rdd.collect()
    1. ['7369,SMITH,CLERK,7902,1980/12/17,800,,20',
    2. '7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30',
    3. '7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30',
    4. '7566,JONES,MANAGER,7839,1981/4/2,2975,,20',
    5. '7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30',
    6. '7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30',
    7. '7782,CLARK,MANAGER,7839,1981/6/9,2450,,10',
    8. '7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20',
    9. '7839,KING,PRESIDENT,,1981/11/17,5000,,10',
    10. '7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30',
    11. '7876,ADAMS,CLERK,7788,1987/5/23,1100,,20',
    12. '7900,JAMES,CLERK,7698,1981/12/3,950,,30',
    13. '7902,FORD,ANALYST,7566,1981/12/3,3000,,20',
    14. '7934,MILLER,CLERK,7782,1982/1/23,1300,,10']
    1. def func1(x):
    2. t = x.split(",")
    3. return t[-1],int(t[-3])
    4. rdd1 = rdd.map(func1)
    5. rdd1.collect()
    1. [('20', 800),
    2. ('30', 1600),
    3. ('30', 1250),
    4. ('20', 2975),
    5. ('30', 1250),
    6. ('30', 2850),
    7. ('10', 2450),
    8. ('20', 3000),
    9. ('10', 5000),
    10. ('30', 1500),
    11. ('20', 1100),
    12. ('30', 950),
    13. ('20', 3000),
    14. ('10', 1300)]
    1. df = rdd1.toDF(["dept_num","salary"])
    2. df.show()
    1. ---------------------------------------------------------------------------
    2. AttributeError Traceback (most recent call last)
    3. <ipython-input-12-d1520c28608d> in <module>
    4. ----> 1 df = rdd1.toDF(["dept_num","salary"])
    5. 2 df.show()
    6. AttributeError: 'PipelinedRDD' object has no attribute 'toDF'
