banner.webp

1.Spark SQL

  1. Spark SQL的前身是Shark,目的是为了给熟悉RDBMS但不理解MapReduce的技术人员提供快速上手的工具。Spark SQL类似于Hive,但Spark SQL可以不依赖与Hadoop,而且运算速度比Hive快。

2.Spark SQL命令

  1. 开启Spark SQL,既可以使用和类似Hive的方法进行操作。不同的是,Hive的数据操作被翻译成了MapReduce任务,而Spark SQL的数据操作在Spark中执行。
  2. Hive类似,我们需要将MySQL连接器的jar包放到/usr/local/spark/jars目录下,经过我测试这里其实操作的就是Hive。<br />![1.png](https://cdn.nlark.com/yuque/0/2022/png/1751730/1655714979450-5dfa5ad1-10d6-46f3-8305-dea9de578fdd.png#clientId=ufa544f79-23b5-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=ue376e06a&margin=%5Bobject%20Object%5D&name=1.png&originHeight=216&originWidth=905&originalType=binary&ratio=1&rotation=0&showTitle=false&size=252205&status=done&style=none&taskId=u447b15cd-c1ef-4bde-a8b7-5b65139e9d4&title=)<br /> 可以发现库与表还有内容与我们当初搭建Hive的测试数据是一样的,这里操作什么的都类似,不再多列举了。

3.Java操作Spark SQL

接下来我们通过官方实例演示一下通过Spark操作Hive,提前将hive-site.xml、core-site.xml、hdfs-xml文件导入到spark目录的conf中,当然也可以操作JSON、CSV、Text、JDBC等,这里就不一一列举了

参考文档:https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html

导入下面的依赖:

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-core_2.12</artifactId>
  4. <version>3.2.1</version>
  5. <exclusions>
  6. <exclusion>
  7. <groupId>org.slf4j</groupId>
  8. <artifactId>slf4j-log4j12</artifactId>
  9. </exclusion>
  10. </exclusions>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.spark</groupId>
  14. <artifactId>spark-sql_2.12</artifactId>
  15. <version>3.2.1</version>
  16. </dependency>

编写代码:

  1. import org.apache.spark.SparkConf;
  2. import org.apache.spark.api.java.function.MapFunction;
  3. import org.apache.spark.sql.Dataset;
  4. import org.apache.spark.sql.Encoders;
  5. import org.apache.spark.sql.Row;
  6. import org.apache.spark.sql.SparkSession;
  7. public class SparkSql {
  8. public static void main(String[] args) {
  9. SparkConf sparkConf = new SparkConf()
  10. .setMaster("local[2]")
  11. .setAppName("SparkSQL");
  12. SparkSession spark = SparkSession
  13. .builder()
  14. .enableHiveSupport()
  15. .appName("sql")
  16. .config(sparkConf)
  17. .getOrCreate();
  18. spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
  19. spark.sql("LOAD DATA LOCAL INPATH '/usr/local/spark/examples/src/main/resources/kv1.txt' INTO TABLE src");
  20. spark.sql("SELECT * FROM src").show();
  21. Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");
  22. // The items in DataFrames are of type Row, which lets you to access each column by ordinal.
  23. Dataset<String> stringsDS = sqlDF.map(
  24. (MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
  25. Encoders.STRING());
  26. stringsDS.show();
  27. //切换到我们之前学习Hive时候建立的数据库与数据表
  28. spark.sql("use test");
  29. spark.sql("select * from emp").show();
  30. spark.stop();
  31. }
  32. }

然后打包,上传服务器,执行

  1. spark-submit --class SparkSql --master local[2] /usr/local/spark_sql-1.0.jar

结果:
2.png 3.png (%LF88AAJOU2G9)_@R$WK(2.png