结合项目中遇到的问题,并参考https://stackoverflow.com/的问题解答,将PySpark开发过程中的问题进行梳理如下欢迎批评指正!!! 后续若有问题会继续更新。。。

01-利用Jupyter-Notebook启动PySpark

推荐先安装好Anaconda
习惯了在Jupyter Notebook上面测试代码,怎么在使用PySpark时也能够利用上,只需要一下几步骤!!!

修改 ~/.bashrc 文件

  1. nano ~/.bashrc

添加以下内容

  1. export PATH="/opt/anaconda3/bin:$PATH"
  2. export PYSPARK_PYTHON="python3"
  3. export PYSPARK_DRIVER_PYTHON="ipython3"
  4. export PYSPARK_DRIVER_PYTHON_OPTS="notebook --NotebookApp.open_browser=False --allow-root --NotebookApp.ip='192.168.111.106' --NotebookApp.port=8889"

source一下

  1. source ~/.bashrc

就可以让Jupyter Notebook启动 pyspark

  1. [root@server106 pyspark_app]# pyspark
  2. [TerminalIPythonApp] WARNING | Subcommand `ipython notebook` is deprecated and will be removed in future versions.
  3. [TerminalIPythonApp] WARNING | You likely want to use `jupyter notebook`... continue in 5 sec. Press Ctrl-C to quit now.
  4. [I 16:27:26.624 NotebookApp] Serving notebooks from local directory: /home/ydzhao/pyspark_app
  5. [I 16:27:26.624 NotebookApp] 0 active kernels
  6. [I 16:27:26.624 NotebookApp] The Jupyter Notebook is running at: http://192.168.111.106:8889/?token=b7a226911d027509bf277fe40641b3572608ae47b26983f7
  7. [I 16:27:26.624 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
  8. [C 16:27:26.625 NotebookApp]
  9. Copy/paste this URL into your browser when you connect for the first time,
  10. to login with a token:
  11. http://192.168.111.106:8889/?token=b7a226911d027509bf277fe40641b3572608ae47b26983f7

02-PySpark-INFO-LOG问题

Spark 1.6.2

  1. log4j = sc._jvm.org.apache.log4j
  2. log4j.LogManager.getRootLogger().setLevel(log4j.Level.ERROR)
  3. # 设置Spark控制台不要显示太多信息
  4. def SetLogger(sc):
  5. logger = sc._jvm.org.apache.log4j
  6. logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
  7. logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
  8. logger.LogManager.getLogger("INFO"). setLevel( logger.Level.ERROR )
  9. logger.LogManager.getLogger("WARN"). setLevel( logger.Level.ERROR )
  10. logger.LogManager.getRootLogger().setLevel(logger.Level.ERROR)

Spark 2.0+

  1. spark.sparkContext.setLogLevel('WARN')

旧的方法

conf/log4j.properties.template 复制为 conf/log4j.properties
编辑log4j.properties

  1. log4j.rootCategory=INFO, console 修改为 log4j.rootCategory=WARN, console

不同的LOG设置:

  1. OFF (most specific, no logging)
  2. FATAL (most specific, little data)
  3. ERROR - Log only in case of Errors
  4. WARN - Log only in case of Warnings or Errors
  5. INFO (Default)
  6. DEBUG - Log details steps (and all logs stated above)
  7. TRACE (least specific, a lot of data)
  8. ALL (least specific, all data)
  1. pyspark.SparkContext.setLogLevel
  2. from pyspark.sql import SparkSession
  3. spark = SparkSession.builder.\
  4. master('local').\
  5. appName('foo').\
  6. getOrCreate()
  7. spark.sparkContext.setLogLevel('WARN')

03-PySpark调用关系型数据库表

启动pyspark shell时,如果想调用数据库表,可以这样操作

需要提前下载好ojdbc6-11.2.0.3.jar

  1. pyspark --jars "/data/spark/ojdbc6-11.2.0.3.jar"

提交任务时加上

  1. spark-submit --master yarn \
  2. --deploy-mode cluster \
  3. --num-executors 25 \
  4. --executor-cores 2 \
  5. --driver-memory 4g \
  6. --executor-memory 4g \
  7. --conf spark.broadcast.compress=true \
  8. --jars "/data/spark/ojdbc6-11.2.0.3.jar" /home/project/test.py

04-PySpark过滤问题

使用boolean expressions

  1. '&' for 'and'
  2. '|' for 'or'
  3. '~' for 'not'

‘~’ for ‘not’

  1. rdd = sc.parallelize([(0,1), (0,1), (0,2), (1,2), (1,10), (1,20), (3,18), (3,18), (3,18)])
  2. df = sqlContext.createDataFrame(rdd, ["id", "score"])
  3. l = [10,18,20]
  4. df.filter(~df.score.isin(l))
  5. df.where(df.score.isin(l))
  6. 输出:
  7. (0,1), (0,1), (0,2), (1,2)
  8. (1,10), (1,20), (3,18), (3,18), (3,18)

‘&’ for ‘and’

‘|’ for ‘or’

  1. df1.filter((df1.LON == 0)|(df1.LAT == 0)).count()
  2. 24105
  3. df1.filter((df1.LON == 0)&(df1.LAT == 0)).count()
  4. 14252
  5. df1.filter(~df1.LON == 0).count()
  6. AnalysisException: "cannot resolve 'NOT LON' due to data type mismatch:
  7. argument 1 requires boolean type, however, 'LON' is of double type.;"
  8. df1.filter(~df1.LON.isNotNull()).count()
  9. 0

05-PySpark列表循环

  1. df.select([c for c in df.columns if c != 'id'])
  2. #(缺失值个数,行号)
  3. df_miss.rdd.map(lambda row:(sum([c == None for c in row]),row['id'])).sortByKey(ascending=False).collect()
  4. df_miss.agg(*[(1.00 - (fn.count(c) / fn.count('*'))).alias(c + '_missing') for c in df_miss.columns]).show()
  5. df_miss.select([c for c in df_miss.columns if c != 'income'])
  6. df_miss_no_income.agg(*[fn.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender']).\
  7. toPandas().to_dict('records')[0]

06-由RDD生成DataFrame

  1. # RDD到DataFrame
  2. from pyspark.sql import types
  3. fraud = sc.textFile('file:///root/ydzhao/PySpark/Chapter04/ccFraud.csv.gz')
  4. header = fraud.first()
  1. header
  2. '"custID","gender","state","cardholder","balance","numTrans","numIntlTrans","creditLine","fraudRisk"'
  1. fraud.take(3)
  2. ['"custID","gender","state","cardholder","balance","numTrans","numIntlTrans","creditLine","fraudRisk"',
  3. '1,1,35,1,3000,4,14,2,0',
  4. '2,2,2,1,0,9,0,18,0']
  1. fraud.count()
  2. 10000001

(1) 去除标题行数据,每个元素转换成整型Integer,还是RDD

  1. fraud = fraud.filter(lambda row: row != header).map(lambda row: [int(x) for x in row.split(',')])
  2. fraud.take(3)
  3. [[1, 1, 35, 1, 3000, 4, 14, 2, 0],
  4. [2, 2, 2, 1, 0, 9, 0, 18, 0],
  5. [3, 2, 2, 1, 0, 27, 9, 16, 0]]

(2) 创建DataFrame模式

h[1:-1]代表第一行到最后一行

  1. schema = [
  2. *[
  3. types.StructField(h[1:-1], types.IntegerType(), True) for h in header.split(',')
  4. ]
  5. ]
  6. schema = types.StructType(schema)

(3) 创建DataFrame

  1. # spark2.0+
  2. # fraud_df = spark.createDataFrame(fraud, schema)
  3. # spark1.6.2
  4. fraud_df = sqlContext.createDataFrame(fraud, schema)
  5. fraud_df.printSchema()
  6. root
  7. |-- custID: integer (nullable = true)
  8. |-- gender: integer (nullable = true)
  9. |-- state: integer (nullable = true)
  10. |-- cardholder: integer (nullable = true)
  11. |-- balance: integer (nullable = true)
  12. |-- numTrans: integer (nullable = true)
  13. |-- numIntlTrans: integer (nullable = true)
  14. |-- creditLine: integer (nullable = true)
  15. |-- fraudRisk: integer (nullable = true)

08-PySpark数据预处理

RDD读取数据、RDD动作操作、RDD创建DF、读Oracle数据库、新增一列,转换数据类型,过滤,join、做聚合、去重、排序(基于Spark1.6.2)

  1. rdd = sc.textFile('/sh/signaling/2016/10/22/TRAFF_20161022232800.txt.lzo')
  2. rdd1 = rdd.map(lambda x:x.split(",")).map(lambda line:(line[2]+line[3],1)).reduceByKey(lambda x,y:x+y)
  3. df1 = sqlContext.createDataFrame(rdd1,['LACCELL','NUM'])
  4. df = sqlContext.read.format("jdbc").options(url="jdbc:oracle:thin:@192.168.111.107:1521:orcl"
  5. , driver = "oracle.jdbc.driver.OracleDriver"
  6. , dbtable = "TBL_LAC_CELL_TAZ"
  7. , user="shanghai_base"
  8. , password="shanghai_base").load()
  9. from pyspark.sql import types
  10. df2 = df.\
  11. withColumn('LACCELL',df.LAC_CELL.cast(types.LongType())).\
  12. select('LACCELL','TAZID').filter(df.TAZID>100000)
  13. df3 = df1.\
  14. join(df2,df1.LACCELL == df2.LACCELL,'right').\
  15. select(df2.TAZID.alias('GRID_1KM'),df1.LACCELL,df1.NUM).\
  16. dropna(how='any').\
  17. orderBy(["GRID_1KM", "LACCELL"], ascending=[1, 0])
  18. df3.show()

09-RDD的生成与转换

  1. # 1 txt读取生成RDD和转换保存
  2. rdd = sc.textFile('/nj/signaling/2016/07/11/TRAFF_20160711235500.txt')
  3. rdd_save = sc.textFile('/nj/signaling/2016/07/11').\
  4. filter(lambda line: int(line.split(",")[0], 16) % 2 ==0).\
  5. saveAsTextFile("/nj/signaling2/2016/07/11")
  1. # 2 自定义函数读取RDD
  2. file=sc.textFile('/GPS/FeiTian/2017/07/*')
  3. def line2record(line) :
  4. segs = line.split(" ")
  5. return (segs[0],segs[1],segs[2],segs[5])
  6. file.map(line2record).take(10)
  7. [('2017-07-27', '23:59:57', '21582', '0'),
  8. ('2017-07-27', '23:59:58', '133386', '0'),
  9. ('2017-07-27', '22:59:00', '130387', '0'),
  10. ('2017-07-27', '23:59:57', '125899', '0'),
  11. ('2017-07-27', '22:59:58', '142358', '0'),
  12. ('2017-07-27', '23:59:58', '110065', '0'),
  13. ('2017-07-27', '22:59:58', '136810', '0'),
  14. ('2017-07-27', '23:59:58', '139889', '1'),
  15. ('2017-07-27', '23:59:57', '19877', '2'),
  16. ('2017-07-27', '23:56:54', '32764', '0')]
  1. # 3 自定义字段,方便字段组合生成新字段
  2. rdd = sc.textFile('/GPS/FeiTian/2017/07/201707302338.dt')
  3. rdd_split = rdd.map(lambda x:(x.split(" ")[2],
  4. (x.split(" ")[0]+" "+x.split(" ")[1],
  5. x.split(" ")[3],
  6. x.split(" ")[4],
  7. x.split(" ")[5],
  8. x.split(" ")[6],
  9. x.split(" ")[7],
  10. x.split(" ")[8],
  11. )))
  12. rdd_split.take(3)
  13. [('33712',
  14. ('2017-07-30 23:37:55', '121.74655', '31.051933', '0', '0', '0.0', '05')),
  15. ('135249',
  16. ('2017-07-30 23:37:56', '121.4169', '31.2813', '0', '6', '11.0', '02')),
  17. ('11292',
  18. ('2017-07-30 23:37:20', '121.5148', '31.1553', '0', '2', '0.0', '02'))
  19. ]
  20. rdd_groupbykey = rdd_split.groupByKey().map(lambda x : (x[0], list(x[1])))
  21. rdd_groupbykey.take(5)
  22. [('21664',
  23. [('2017-08-30 23:38:29', '121.77598', '30.958338', '0', '4', '0.0', '04')]),
  24. ('1114',
  25. [('2017-07-30 23:38:33', '121.58', '31.3436', '1', '7', '0.0', '02')]),
  26. ('172072',
  27. [('2017-07-30 23:38:39', '121.4904', '31.351997', '1', '2', '0.0', '05')]),
  28. ('160125',
  29. [('2017-07-30 23:38:00', '121.344795', '31.19619', '0', '6', '0.0', '05'),
  30. ('2017-07-30 23:38:40', '121.344795', '31.19619', '0', '6', '0.0', '05')]),
  31. ('135655',
  32. [('2017-07-30 23:38:28', '121.4507', '31.3406', '1', '0', '0.0', '02')])
  33. ]
  34. rdd_groupbykey.map(lambda x:x[1]).take(5)
  35. [[('2017-08-30 23:38:29', '121.77598', '30.958338', '0', '4', '0.0', '04')],
  36. [('2017-07-30 23:38:33', '121.58', '31.3436', '1', '7', '0.0', '02')],
  37. [('2017-07-30 23:38:39', '121.4904', '31.351997', '1', '2', '0.0', '05')],
  38. [('2017-07-30 23:38:00', '121.344795', '31.19619', '0', '6', '0.0', '05'),
  39. ('2017-07-30 23:38:40', '121.344795', '31.19619', '0', '6', '0.0', '05')],
  40. [('2017-07-30 23:38:28', '121.4507', '31.3406', '1', '0', '0.0', '02')]
  41. ]
  1. # 4 自定义字段,方便字段组合生成新字段
  2. rdd = sc.textFile('file:///root/ydzhao/Two_passengers_and_one_danger/20171219_20171219135232.txt',4)
  3. from datetime import datetime
  4. rdd_split= rdd.map(lambda x:(datetime.strptime(x.split("@@")[0], "%Y-%m-%d %H:%M:%S"),
  5. x.split("@@")[1],
  6. x.split("@@")[2],
  7. x.split("@@")[3],
  8. int(x.split("@@")[4])*0.000001,
  9. int(x.split("@@")[5])*0.000001,
  10. int(x.split("@@")[6]),
  11. int(x.split("@@")[7]),
  12. int(x.split("@@")[8]),
  13. x.split("@@")[9],
  14. int(x.split("@@")[10]),
  15. x.split("@@")[11],
  16. x.split("@@")[12]))
  17. from pyspark.sql.types import *
  18. schema = StructType([
  19. StructField("TimeStamp",TimestampType(),True),
  20. StructField("VehicleID",StringType(),True),
  21. StructField("VehiclePlateColor",StringType(),True),
  22. StructField("MessageSeq",StringType(),True),
  23. StructField("Lng",DoubleType(),True),
  24. StructField("Lat",DoubleType(),True),
  25. StructField("TerminalSpeed",IntegerType(),True),
  26. StructField("DrivingSpeed",IntegerType(),True),
  27. StructField("TotalMile",IntegerType(),True),
  28. StructField("Direction",StringType(),True),
  29. StructField("Altitude",IntegerType(),True),
  30. StructField("StatusBit",StringType(),True),
  31. StructField("AlarmStatus",StringType(),True)
  32. ])
  33. df2 = sqlContext.createDataFrame(rdd_split,schema)
  34. df2.registerTempTable("df2")

10-JSON格式嵌套问题

解决JSON格式嵌套问题

  1. df = sqlContext.read.json('file:///home/UnicomGSM/data/20170614103000.json')
  2. df.printSchema()
  3. root
  4. |-- RoadSegState: struct (nullable = true)
  5. | |-- DateTimeDelay: long (nullable = true)
  6. | |-- Datetime: string (nullable = true)
  7. | |-- Description: string (nullable = true)
  8. | |-- IntersectionDelay: long (nullable = true)
  9. | |-- IsRoadIntersection: string (nullable = true)
  10. | |-- MobileNumber: long (nullable = true)
  11. | |-- Number: long (nullable = true)
  12. | |-- RoadSegID: string (nullable = true)
  13. | |-- SigNumber: long (nullable = true)
  14. | |-- Speed: double (nullable = true)
  15. | |-- SpeedDiff: double (nullable = true)
  16. | |-- State: string (nullable = true)
  17. | |-- Time: double (nullable = true)
  1. df1 = df.select([df.RoadSegState.DateTimeDelay.alias("DateTimeDelay"),
  2. df.RoadSegState.DateTime.alias("DateTime"),
  3. df.RoadSegState.Description.alias("Description"),
  4. df.RoadSegState.IntersectionDelay.alias("IntersectionDelay"),
  5. df.RoadSegState.IsRoadIntersection.alias("IsRoadIntersection"),
  6. df.RoadSegState.MobileNumber.alias("MobileNumber"),
  7. df.RoadSegState.Number.alias("Number"),
  8. df.RoadSegState.RoadSegID.alias("RoadSegID"),
  9. df.RoadSegState.SigNumber.alias("SigNumber"),
  10. df.RoadSegState.Speed.alias("Speed"),
  11. df.RoadSegState.SpeedDiff.alias("SpeedDiff"),
  12. df.RoadSegState.State.alias("State"),
  13. df.RoadSegState.Time.alias("Time")])
  14. df1.printSchema()
  15. root
  16. |-- DateTimeDelay: long (nullable = true)
  17. |-- DateTime: string (nullable = true)
  18. |-- Description: string (nullable = true)
  19. |-- IntersectionDelay: long (nullable = true)
  20. |-- IsRoadIntersection: string (nullable = true)
  21. |-- MobileNumber: long (nullable = true)
  22. |-- Number: long (nullable = true)
  23. |-- RoadSegID: string (nullable = true)
  24. |-- SigNumber: long (nullable = true)
  25. |-- Speed: double (nullable = true)
  26. |-- SpeedDiff: double (nullable = true)
  27. |-- State: string (nullable = true)
  28. |-- Time: double (nullable = true)

11-str转TimestampType()

  1. import pyspark.sql.functions as func
  2. from pyspark.sql.types import TimestampType
  3. from datetime import datetime
  4. df_y = sqlContext.read.json("/user/test.json")
  5. udf_dt = func.udf(lambda x: datetime.strptime(x, '%Y%m%d%H%M%S'), TimestampType())
  6. df = df_y.withColumn('datetime', udf_dt(df_y.date))
  7. df_g = df_y.groupby(func.hour(df_y.date))
  8. df_y.groupby(df_y.name).agg(func.countDistinct('address')).show()

12-字段去重计数:countDistinct和approxCountDistinct

精确计数countDistinct

  1. from pyspark.sql.functions import col, countDistinct
  2. df.agg(*(countDistinct(col(c)).alias(c) for c in df.columns))
  3. from pyspark.sql.functions import col, countDistinct
  4. df.agg(countDistinct(col("colName")).alias("count")).show()
  1. from pyspark.sql.functions import countDistinct
  2. df1.agg(countDistinct(df1.msid).alias('count')).collect()

大概计数approxCountDistinct

如果你想加快速度,可能会损失精度,可以使用approxCountDistinct()

  1. from pyspark.sql.functions import approxCountDistinct
  2. df1.agg(approxCountDistinct(df1.msid).alias('count')).collect()

显然countDistinct不如approxCountDistinct性能好

13-PySpark新增列问题

  1. from pyspark.sql.functions import col, expr, when

方法一

  1. new_column_1 = expr(
  2. """IF(fruit1 IS NULL OR fruit2 IS NULL, 3, IF(fruit1 = fruit2, 1, 0))"""
  3. )

方法二:思路清晰,推荐使用

  1. new_column_2 = when(
  2. col("fruit1").isNull() | col("fruit2").isNull(), 3
  3. ).when(col("fruit1") == col("fruit2"), 1).otherwise(0)

方法三:用的少

  1. from pyspark.sql.functions import coalesce, lit
  2. new_column_3 = coalesce((col("fruit1") == col("fruit2")).cast("int"), lit(3))
  1. df = sc.parallelize([
  2. ("orange", "apple"), ("kiwi", None), (None, "banana"),
  3. ("mango", "mango"), (None, None)
  4. ]).toDF(["fruit1", "fruit2"])
  1. df.withColumn("new_column_1", new_column_1).\
  2. withColumn("new_column_2", new_column_2).\
  3. withColumn("new_column_3", new_column_3)
  4. +------+------+------------+------------+------------+
  5. |fruit1|fruit2|new_column_1|new_column_2|new_column_3|
  6. +------+------+------------+------------+------------+
  7. |orange| apple| 0| 0| 0|
  8. | kiwi| null| 3| 3| 3|
  9. | null|banana| 3| 3| 3|
  10. | mango| mango| 1| 1| 1|
  11. | null| null| 3| 3| 3|
  12. +------+------+------------+------------+------------+

方法四 : 自定义函数

  1. from pyspark.sql.types import IntegerType
  2. from pyspark.sql.functions import udf
  3. def func(fruit1, fruit2):
  4. if fruit1 == None or fruit2 == None:
  5. return 3
  6. if fruit1 == fruit2:
  7. return 1
  8. return 0
  9. func_udf = udf(func, IntegerType())
  10. df = df.withColumn('new_column',func_udf(df['fruit1'], df['fruit2']))

14-PySpark数据填充、过滤

  1. id Value
  2. 1 103
  3. 2 1504
  4. 3 1
  5. from pyspark.sql.functions import lpad
  6. >>> df.select('id',lpad(df['value'],4,'0').alias('value')).show()
  7. +---+-----+
  8. | id|value|
  9. +---+-----+
  10. | 1| 0103|
  11. | 2| 1504|
  12. | 3| 0001|
  13. +---+-----+
  1. # filter/where
  2. from pyspark.sql.functions import col
  3. df.where(col("dt_mvmt").isNull())
  4. df.where(col("dt_mvmt").isNotNull())
  5. df.na.drop(subset=["dt_mvmt"])
  6. df.filter(df.dt_mvmt.isNotNull()).count()
  7. col_list = df.schema.names
  8. df_fltered = df.where(col(c) >= 10 for c in col_list)

15-Renaming-Columns-for-PySpark-DataFrames-Aggregates

  1. # 方法一:需要提前知道agg后的列名SUM(money),这个不好
  2. df.groupBy("group")\
  3. .agg({"money":"sum"})\
  4. .withColumnRenamed("SUM(money)", "money")
  5. .show(100)
  6. # 方法二:推荐使用
  7. import pyspark.sql.functions as func
  8. df.groupBy("group")\
  9. .agg(func.sum('money').alias('money'))\
  10. .show(100)
  1. #####
  2. cols = [i.name for i in df.schema.fields if "StructType" in str(i.dataType)]
  3. df.select(cols)
  4. #####
  5. df = sc.\
  6. parallelize([(1,'female',233), (None,'female',314),(0,'female',81),(1, None, 342), (1,'male',109)]).\
  7. toDF().\
  8. withColumnRenamed("_1","survived").\
  9. withColumnRenamed("_2","sex").\
  10. withColumnRenamed("_3","count")
  11. total = df.select("count").agg(sum('count').alias('sum_count')).collect().pop()['sum_count']
  12. result = df.withColumn('percent', (df['count']/total) * 100)
  13. result.show()
  14. +--------+------+-----+------------------+
  15. |survived| sex|count| percent|
  16. +--------+------+-----+------------------+
  17. | 1|female| 233| 21.59406858202039|
  18. | null|female| 314|29.101019462465246|
  19. | 0|female| 81| 7.506950880444857|
  20. | 1| null| 342| 31.69601482854495|
  21. | 1| male| 109|10.101946246524559|
  22. +--------+------+-----+------------------+
  1. df = sc.parallelize([
  2. ("XYZ12", "B1, B44, B66", "A, C", 59),
  3. ("ZYY3 ", "B8, B3, B7", "J, Z", 66)
  4. ]).toDF(["dbn", "bus", "subway", "score"])
  5. from pyspark.sql.functions import col, explode, split, trim
  6. with_bus_exploded = df.withColumn("bus", explode(split("bus", ",")))
  7. with_bus_trimmed = with_bus_exploded.withColumn("bus", trim(col("bus")))
  8. +-----+---+------+-----+
  9. | dbn|bus|subway|score|
  10. +-----+---+------+-----+
  11. |XYZ12| B1| A, C| 59|
  12. |XYZ12|B44| A, C| 59|
  13. |XYZ12|B66| A, C| 59|
  14. |ZYY3 | B8| J, Z| 66|
  15. |ZYY3 | B3| J, Z| 66|
  16. |ZYY3 | B7| J, Z| 66|
  17. +-----+---+------+-----+

16-保存Pyspark-dataframe到Hbase

  1. df = sc.parallelize([('a', '1.0'), ('b', '2.0')]).toDF(schema=['col0', 'col1'])
  2. catalog = ''.join("""{
  3. "table":{"namespace":"default", "name":"testtable"},
  4. "rowkey":"key",
  5. "columns":{
  6. "col0":{"cf":"rowkey", "col":"key", "type":"string"},
  7. "col1":{"cf":"cf", "col":"col1", "type":"string"}
  8. }
  9. }""".split())
  10. # write to hbase
  11. df.write \
  12. .options(catalog=catalog) \
  13. .format('org.apache.spark.sql.execution.datasources.hbase') \
  14. .mode("overwrite") \
  15. .option("zkUrl","host1,host2,host3:2181") \
  16. .save()
  17. # reading
  18. df_read = spark.read.options(catalog=catalog).format('org.apache.spark.sql.execution.datasources.hbase').load()
  19. df_read .show()
  20. "PYSPARK_SUBMIT_ARGS": "--master yarn \
  21. --jars hbase_spark_jar/hbase-0.90.2.jar,\
  22. /hbase_spark_jar/hbase-client-1.3.1.jar,\
  23. hbase_spark_jar/spark-avro_2.11-3.0.1.jar,\
  24. /hbase_spark_jar/hbase-spark-1.2.0-cdh5.7.3.jar,\
  25. /hbase_spark_jar/shc-1.0.0-2.0-s_2.11.jar \
  26. --files /etc/hbase/2.5.0.0-1245/0/hbase-site.xml \
  27. --executor-memory 8g \
  28. --executor-cores 4 \
  29. --num-executors 4 \
  30. pyspark-shell"

17-新增列时when-otherwise的使用

when otherwise类似于SQL中case when end的作用

  1. df = [
  2. **id** **col1** **col2** **col3** **col4**
  3. 101 1 0 1 1
  4. 102 0 1 1 0
  5. 103 1 1 0 1
  6. 104 0 0 1 1
  7. ]
  8. from pyspark.sql.functions import when, lit
  9. def update_col_check(df, col_name):
  10. return df.withColumn('col_check', when(df[col_name] == 1, lit(col_name)).otherwise(df['col_check']))
  11. update_col_check(df, 'col1').show()
  12. +---+----+----+----+----+---------+
  13. | id|col1|col2|col3|col4|col_check|
  14. +---+----+----+----+----+---------+
  15. |101| 1| 0| 1| 1| col1|
  16. |102| 0| 1| 1| 0| -1|
  17. |103| 1| 1| 0| 1| col1|
  18. |104| 0| 0| 1| 1| -1|
  19. +---+----+----+----+----+---------+
  20. update_col_check(df, 'col2').show()
  21. +---+----+----+----+----+---------+
  22. | id|col1|col2|col3|col4|col_check|
  23. +---+----+----+----+----+---------+
  24. |101| 1| 0| 1| 1| -1|
  25. |102| 0| 1| 1| 0| col2|
  26. |103| 1| 1| 0| 1| col2|
  27. |104| 0| 0| 1| 1| -1|
  28. +---+----+----+----+----+---------+
  1. from pyspark.sql.functions import when
  2. df.select(df.name, when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show()
  3. +-----+------------------------------------------------------------+
  4. | name|CASE WHEN (age > 4) THEN 1 WHEN (age < 3) THEN -1 ELSE 0 END|
  5. +-----+------------------------------------------------------------+
  6. |Alice| -1|
  7. | Bob| 1|
  8. +-----+------------------------------------------------------------+

18-PySpark-DataFrame元组列表的解压缩

  1. from pyspark.sql.functions import udf, col
  2. # create the dataframe
  3. df = sqlContext.createDataFrame(
  4. [
  5. (1, [('blue', 0.5),('red', 0.1),('green', 0.7)]),
  6. (2, [('red', 0.9),('cyan', 0.5),('white', 0.4)])
  7. ],
  8. ('Topic', 'Tokens')
  9. )
  10. df.show()
  1. +-----+-------------------------------------------+
  2. |Topic| Tokens |
  3. +-----+-------------------------------------------+
  4. | 1| ('blue', 0.5),('red', 0.1),('green', 0.7)|
  5. | 2| ('red', 0.9),('cyan', 0.5),('white', 0.4)|
  6. +-----+-------------------------------------------+
  1. def get_colors(l):
  2. return [x[0] for x in l]
  3. def get_weights(l):
  4. return [x[1] for x in l]
  5. # make udfs from the above functions - Note the return types
  6. get_colors_udf = udf(get_colors, ArrayType(StringType()))
  7. get_weights_udf = udf(get_weights, ArrayType(FloatType()))
  8. # use withColumn and apply the udfs
  9. df.withColumn('Weights', get_weights_udf(col('Tokens')))\
  10. .withColumn('Tokens', get_colors_udf(col('Tokens')))\
  11. .select(['Topic', 'Tokens', 'Weights'])\
  12. .show()
  13. +-----+------------------+---------------+
  14. |Topic| Tokens| Weights|
  15. +-----+------------------+---------------+
  16. | 1|[blue, red, green]|[0.5, 0.1, 0.7]|
  17. | 2|[red, cyan, white]|[0.9, 0.5, 0.4]|
  18. +-----+------------------+---------------+
  1. root
  2. |-- Topic: long (nullable = true)
  3. |-- Tokens: array (nullable = true)
  4. | |-- element: struct (containsNull = true)
  5. | | |-- _1: string (nullable = true)
  6. | | |-- _2: double (nullable = true)
  1. from pyspark.sql.functions import col
  2. df.select(
  3. col("Topic"),
  4. col("Tokens._1").alias("Tokens"), col("Tokens._2").alias("weights")
  5. ).show()
  1. +-----+------------------+---------------+
  2. |Topic| Tokens| weights|
  3. +-----+------------------+---------------+
  4. | 1|[blue, red, green]|[0.5, 0.1, 0.7]|
  5. | 2|[red, cyan, white]|[0.9, 0.5, 0.4]|
  6. +-----+------------------+---------------+
  1. cols = [
  2. col("Tokens.{}".format(n)) for n in
  3. df.schema["Tokens"].dataType.elementType.names]
  4. df.select("Topic", *cols)

19-Spark—shuffle-write

什么时候需要 shuffle writer

PySpark常见问题总结梳理 - 图1

我们抽象出来其中的RDD和依赖关系
PySpark常见问题总结梳理 - 图2

对应的划分后的RDD结构为:
PySpark常见问题总结梳理 - 图3

最终我们得到了整个执行过程:
PySpark常见问题总结梳理 - 图4

中间就涉及到shuffle 过程,前一个stageShuffleMapTask 进行 shuffle write, 把数据存储在 blockManager上面, 并且把数据位置元信息上报到drivermapOutTrack 组件中, 下一个stage 根据数据位置元信息, 进行shuffle read, 拉取上个stage的输出数据。

UnsafeShuffleWriter

UnsafeShuffleWriter里面维护着一个 ShuffleExternalSorter, 用来做外部排序,外部排序就是要先部分排序数据并把数据输出到磁盘,然后最后再进行merge 全局排序, 既然这里也是外部排序,跟SortShuffleWriter有什么区别呢, 这里只根据 recordartition id 先在内存 ShuffleInMemorySorter中进行排序, 排好序的数据经过序列化压缩输出到换一个临时文件的一段,并且记录每个分区段的seek位置,方便后续可以单独读取每个分区的数据,读取流经过解压反序列化,就可以正常读取了。

整个过程就是不断地在ShuffleInMemorySorter插入数据,如果没有内存就申请内存,如果申请不到内存就spill 到文件中,最终合并成一个 依据partition id全局有序 的大文件。

PySpark常见问题总结梳理 - 图5