数据读取

  1. # 第一步先连接spark
  2. from pyspark.sql import SparkSession
  3. spark=SparkSession \
  4. .builder \
  5. .appName('my_first_app_name') \
  6. .getOrCreate()

创建数据

从变量创建

  1. #############################################
  2. # 指定类型
  3. #############################################
  4. # 生成以逗号分隔的数据
  5. stringCSVRDD = spark.sparkContext.parallelize([
  6. (123, "Katie", 19, "brown"),
  7. (234, "Michael", 22, "green"),
  8. (345, "Simone", 23, "blue")
  9. ])
  10. # 指定模式, StructField(name,dataType,nullable)
  11. # 其中:
  12. # name: 该字段的名字,
  13. # dataType:该字段的数据类型,
  14. # nullable: 指示该字段的值是否为空
  15. from pyspark.sql.types import StructType, StructField, LongType, StringType # 导入类型
  16. schema = StructType([
  17. StructField("id", LongType(), True),
  18. StructField("name", StringType(), True),
  19. StructField("age", LongType(), True),
  20. StructField("eyeColor", StringType(), True)
  21. ])
  22. # 对RDD应用该模式并且创建DataFrame
  23. swimmers = spark.createDataFrame(stringCSVRDD,schema)
  24. # 利用DataFrame创建一个临时视图
  25. swimmers.registerTempTable("swimmers")
  26. # 查看DataFrame的行数
  27. swimmers.count()
  28. #############################################
  29. # 推断类型
  30. #############################################
  31. # 使用自动类型推断的方式创建dataframe
  32. data = [(123, "Katie", 19, "brown"),
  33. (234, "Michael", 22, "green"),
  34. (345, "Simone", 23, "blue")]
  35. df = spark.createDataFrame(data, schema=['id', 'name', 'age', 'eyccolor'])
  36. df.show()
  37. df.count()

从JSON创建

  1. # 读取spark下面的示例数据
  2. file = r"D:\hadoop_spark\spark-2.1.0-bin-hadoop2.7\examples\src\main\resources\people.json"
  3. df = spark.read.json(file)
  4. df.show()

从CSV创建

  1. # 先创建csv文件
  2. import pandas as pd
  3. import numpy as np
  4. df=pd.DataFrame(np.random.rand(5,5),columns=['a','b','c','d','e']).\
  5. applymap(lambda x: int(x*10))
  6. file=r"D:\hadoop_spark\spark-2.1.0-bin-hadoop2.7\examples\src\main\resources\random.csv"
  7. df.to_csv(file,index=False)
  8. # 再读取csv文件
  9. monthlySales = spark.read.csv(file, header=True, inferSchema=True)
  10. monthlySales.show()

从MySql创建

  1. # 此时需要将mysql-jar驱动放到spark-2.2.0-bin-hadoop2.7\jars下面
  2. # 单机环境可行,集群环境不行
  3. # 重新执行
  4. df = spark.read.format('jdbc').options(
  5. url='jdbc:mysql://127.0.0.1',
  6. dbtable='mysql.db',
  7. user='root',
  8. password='123456'
  9. ).load()
  10. df.show()
  11. # 也可以传入SQL语句
  12. sql="(select * from mysql.db where db='wp230') t"
  13. df = spark.read.format('jdbc').options(
  14. url='jdbc:mysql://127.0.0.1',
  15. dbtable=sql,
  16. user='root',
  17. password='123456'
  18. ).load()
  19. df.show()

从pandas.dataframe创建

  1. # 如果不指定schema则用pandas的列名
  2. df = pd.DataFrame(np.random.random((4,4)))
  3. spark_df = spark.createDataFrame (df,schema=['a','b','c','d'])

从parquet创建

  1. # 读取example下面的parquet文件
  2. file=r"D:\apps\spark-2.2.0-bin-hadoop2.7\examples\src\main\resources\users.parquet"
  3. df=spark.read.parquet(file)
  4. df.show()

从hive创建

  1. # 如果已经配置spark连接hive的参数,可以直接读取hive数据
  2. spark = SparkSession \
  3. .builder \
  4. .enableHiveSupport() \
  5. .master("172.31.100.170:7077") \
  6. .appName("my_first_app_name") \
  7. .getOrCreate()
  8. df=spark.sql("select * from hive_tb_name")
  9. df.show()

从hdfs创建

  1. # 直接读取,不需要指定ip和port
  2. data= spark.read.csv('hdfs:///tmp/_da_exdata_path/data.csv', header=True)
  3. data.show()
  4. # 有些情况下是需要指定ip和端口的
  5. data= spark.read.csv('hdfs://localhost:9000/tmp/_da_exdata_path/data.csv', header=True)
  6. data.show()

保存数据

写到csv

  1. # 创建dataframe
  2. import numpy as np
  3. df = pd.DataFrame(np.random.random((4, 4)),columns=['a', 'b', 'c', 'd'])
  4. spark_df = spark.createDataFrame(df)
  5. # 写到csv
  6. file=r"D:\apps\spark-2.2.0-bin-hadoop2.7\examples\src\main\resources\test.csv"
  7. spark_df.write.csv(path=file, header=True, sep=",", mode='overwrite')

写到parquet

  1. # 创建dataframe
  2. import numpy as np
  3. df = pd.DataFrame(np.random.random((4, 4)),columns=['a', 'b', 'c', 'd'])
  4. spark_df = spark.createDataFrame(df)
  5. # 写到parquet
  6. file=r"D:\apps\spark-2.2.0-bin-hadoop2.7\examples\src\main\resources\test.parquet"
  7. spark_df.write.parquet(path=file,mode='overwrite')

写到hive

  1. df.registerTempTable('test_hive')
  2. sqlContext.sql("create table default.write_test select * from test_hive")
  3. # 或者
  4. # "overwrite"是重写表的模式,如果表存在,就覆盖掉原始数据,如果不存在就重新生成一张表
  5. # mode("append")是在原有表的基础上进行添加数据
  6. df.write.format("hive").mode("overwrite").saveAsTable('default.write_test')
  1. # 打开动态分区
  2. spark.sql("set hive.exec.dynamic.partition.mode = nonstrict")
  3. spark.sql("set hive.exec.dynamic.partition=true")
  4. # 使用普通的hive-sql写入分区表
  5. spark.sql("""
  6. insert overwrite table ai.da_aipurchase_dailysale_hive
  7. partition (saledate)
  8. select productid, propertyid, processcenterid, saleplatform, sku, poa, salecount, saledate
  9. from szy_aipurchase_tmp_szy_dailysale distribute by saledate
  10. """)
  11. # 或者使用每次重建分区表的方式
  12. jdbcDF.write.mode("overwrite").partitionBy("saledate").insertInto("ai.da_aipurchase_dailysale_hive")
  13. jdbcDF.write.saveAsTable("ai.da_aipurchase_dailysale_hive", None, "append", partitionBy='saledate')
  14. # 不写分区表,只是简单的导入到hive表
  15. jdbcDF.write.saveAsTable("ai.da_aipurchase_dailysale_for_ema_predict", None, "overwrite", None)

写到hdfs

  1. # 数据写到hdfs,而且以csv格式保存
  2. jdbcDF.write.mode("overwrite").options(header="true").csv("/home/ai/da/da_aipurchase_dailysale_for_ema_predict.csv")

写到MySql

  1. # 会自动对齐字段,也就是说,spark_df 的列不一定要全部包含MySQL的表的全部列才行
  2. # overwrite 清空表再导入
  3. spark_df.write.mode("overwrite").format("jdbc").options(
  4. url='jdbc:mysql://127.0.0.1',
  5. user='root',
  6. password='123456',
  7. dbtable="test.test",
  8. batchsize="1000",
  9. ).save()
  10. # append 追加方式
  11. spark_df.write.mode("append").format("jdbc").options(
  12. url='jdbc:mysql://127.0.0.1',
  13. user='root',
  14. password='123456',
  15. dbtable="test.test",
  16. batchsize="1000",
  17. ).save()

Source

https://zhuanlan.zhihu.com/p/34901558