数据读取
# 第一步先连接sparkfrom pyspark.sql import SparkSessionspark=SparkSession \ .builder \ .appName('my_first_app_name') \ .getOrCreate()
创建数据
从变量创建
############################################## 指定类型############################################## 生成以逗号分隔的数据stringCSVRDD = spark.sparkContext.parallelize([ (123, "Katie", 19, "brown"), (234, "Michael", 22, "green"), (345, "Simone", 23, "blue")])# 指定模式, StructField(name,dataType,nullable)# 其中:# name: 该字段的名字,# dataType:该字段的数据类型,# nullable: 指示该字段的值是否为空from pyspark.sql.types import StructType, StructField, LongType, StringType # 导入类型schema = StructType([ StructField("id", LongType(), True), StructField("name", StringType(), True), StructField("age", LongType(), True), StructField("eyeColor", StringType(), True)])# 对RDD应用该模式并且创建DataFrameswimmers = spark.createDataFrame(stringCSVRDD,schema)# 利用DataFrame创建一个临时视图swimmers.registerTempTable("swimmers")# 查看DataFrame的行数swimmers.count()############################################## 推断类型############################################## 使用自动类型推断的方式创建dataframedata = [(123, "Katie", 19, "brown"), (234, "Michael", 22, "green"), (345, "Simone", 23, "blue")]df = spark.createDataFrame(data, schema=['id', 'name', 'age', 'eyccolor'])df.show()df.count()
从JSON创建
# 读取spark下面的示例数据file = r"D:\hadoop_spark\spark-2.1.0-bin-hadoop2.7\examples\src\main\resources\people.json"df = spark.read.json(file)df.show()
从CSV创建
# 先创建csv文件import pandas as pdimport numpy as npdf=pd.DataFrame(np.random.rand(5,5),columns=['a','b','c','d','e']).\ applymap(lambda x: int(x*10))file=r"D:\hadoop_spark\spark-2.1.0-bin-hadoop2.7\examples\src\main\resources\random.csv"df.to_csv(file,index=False)# 再读取csv文件monthlySales = spark.read.csv(file, header=True, inferSchema=True)monthlySales.show()
从MySql创建
# 此时需要将mysql-jar驱动放到spark-2.2.0-bin-hadoop2.7\jars下面# 单机环境可行,集群环境不行# 重新执行df = spark.read.format('jdbc').options( url='jdbc:mysql://127.0.0.1', dbtable='mysql.db', user='root', password='123456' ).load()df.show()# 也可以传入SQL语句sql="(select * from mysql.db where db='wp230') t"df = spark.read.format('jdbc').options( url='jdbc:mysql://127.0.0.1', dbtable=sql, user='root', password='123456' ).load()df.show()
从pandas.dataframe创建
# 如果不指定schema则用pandas的列名df = pd.DataFrame(np.random.random((4,4)))spark_df = spark.createDataFrame (df,schema=['a','b','c','d'])
从parquet创建
# 读取example下面的parquet文件file=r"D:\apps\spark-2.2.0-bin-hadoop2.7\examples\src\main\resources\users.parquet"df=spark.read.parquet(file)df.show()
从hive创建
# 如果已经配置spark连接hive的参数,可以直接读取hive数据spark = SparkSession \ .builder \ .enableHiveSupport() \ .master("172.31.100.170:7077") \ .appName("my_first_app_name") \ .getOrCreate()df=spark.sql("select * from hive_tb_name")df.show()
从hdfs创建
# 直接读取,不需要指定ip和portdata= spark.read.csv('hdfs:///tmp/_da_exdata_path/data.csv', header=True)data.show()# 有些情况下是需要指定ip和端口的data= spark.read.csv('hdfs://localhost:9000/tmp/_da_exdata_path/data.csv', header=True)data.show()
保存数据
写到csv
# 创建dataframeimport numpy as npdf = pd.DataFrame(np.random.random((4, 4)),columns=['a', 'b', 'c', 'd'])spark_df = spark.createDataFrame(df)# 写到csvfile=r"D:\apps\spark-2.2.0-bin-hadoop2.7\examples\src\main\resources\test.csv"spark_df.write.csv(path=file, header=True, sep=",", mode='overwrite')
写到parquet
# 创建dataframeimport numpy as npdf = pd.DataFrame(np.random.random((4, 4)),columns=['a', 'b', 'c', 'd'])spark_df = spark.createDataFrame(df)# 写到parquetfile=r"D:\apps\spark-2.2.0-bin-hadoop2.7\examples\src\main\resources\test.parquet"spark_df.write.parquet(path=file,mode='overwrite')
写到hive
df.registerTempTable('test_hive')sqlContext.sql("create table default.write_test select * from test_hive")# 或者# "overwrite"是重写表的模式,如果表存在,就覆盖掉原始数据,如果不存在就重新生成一张表# mode("append")是在原有表的基础上进行添加数据df.write.format("hive").mode("overwrite").saveAsTable('default.write_test')
# 打开动态分区spark.sql("set hive.exec.dynamic.partition.mode = nonstrict")spark.sql("set hive.exec.dynamic.partition=true")# 使用普通的hive-sql写入分区表spark.sql(""" insert overwrite table ai.da_aipurchase_dailysale_hive partition (saledate) select productid, propertyid, processcenterid, saleplatform, sku, poa, salecount, saledate from szy_aipurchase_tmp_szy_dailysale distribute by saledate """)# 或者使用每次重建分区表的方式jdbcDF.write.mode("overwrite").partitionBy("saledate").insertInto("ai.da_aipurchase_dailysale_hive")jdbcDF.write.saveAsTable("ai.da_aipurchase_dailysale_hive", None, "append", partitionBy='saledate')# 不写分区表,只是简单的导入到hive表jdbcDF.write.saveAsTable("ai.da_aipurchase_dailysale_for_ema_predict", None, "overwrite", None)
写到hdfs
# 数据写到hdfs,而且以csv格式保存jdbcDF.write.mode("overwrite").options(header="true").csv("/home/ai/da/da_aipurchase_dailysale_for_ema_predict.csv")
写到MySql
# 会自动对齐字段,也就是说,spark_df 的列不一定要全部包含MySQL的表的全部列才行# overwrite 清空表再导入spark_df.write.mode("overwrite").format("jdbc").options( url='jdbc:mysql://127.0.0.1', user='root', password='123456', dbtable="test.test", batchsize="1000",).save()# append 追加方式spark_df.write.mode("append").format("jdbc").options( url='jdbc:mysql://127.0.0.1', user='root', password='123456', dbtable="test.test", batchsize="1000",).save()
Source
https://zhuanlan.zhihu.com/p/34901558