Spark Sql

一、Spark SQL

1. sqlContext

  • SparkContext 创建上下文
  • 基于 SparkContext 创建 sqlContext 上下文
  • sqlContext 操作源数据

1.1 测试数据源

  1. vim /data/log/customers.txt
  2. 100, John Smith, Austin, TX, 78727
  3. 200, Joe Johnson, Dallas, TX, 75201
  4. 300, Bob Jones, Houston, TX, 77028
  5. 400, Andy Davis, San Antonio, TX, 78227
  6. 500, James Williams, Austin, TX, 78727

1.2 sqlContext 编程 - 指定模式 操作 sql

  1. import org.apache.spark.{SparkConf, SparkContext};
  2. val sparkConf = new SparkConf()
  3. sparkConf.setAppName("SparkSQLHiveOnYarn")
  4. sparkConf.setMaster("local[2]")
  5. val sc = new SparkContext(sparkConf)
  6. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  7. // 创建RDD对象
  8. val rddCustomers = sc.textFile("/data/log/customers.txt")
  9. // 导入 Spark SQL Row
  10. import org.apache.spark.sql.Row;
  11. // 导入 Spark SQL 数据类型
  12. import org.apache.spark.sql.types.{StructType, StructField, StringType};
  13. // 用字符串编码模式
  14. val schemaString = "customer_id name city state zip_code"
  15. // 用模式字符串生成模式对象
  16. val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
  17. // 将RDD(rddCustomers)记录转化成Row。
  18. val rowRDD = rddCustomers.map(_.split(",")).map(p => Row(p(0).trim,p(1),p(2),p(3),p(4)))
  19. // 将模式应用于RDD对象。
  20. val dfCustomers = sqlContext.createDataFrame(rowRDD, schema)
  21. // 将DataFrame注册为表
  22. dfCustomers.registerTempTable("customers")
  23. // 用sqlContext对象提供的sql方法执行SQL语句。
  24. val custNames = sqlContext.sql("SELECT name FROM customers")
  25. // SQL查询的返回结果为DataFrame对象,支持所有通用的RDD操作。
  26. // 可以按照顺序访问结果行的各个列。
  27. custNames.map(t => "Name: " + t(0)).collect().foreach(println)
  28. // 用sqlContext对象提供的sql方法执行SQL语句。
  29. val customersByCity = sqlContext.sql("SELECT name,zip_code FROM customers ORDER BY zip_code")
  30. // SQL查询的返回结果为DataFrame对象,支持所有通用的RDD操作。
  31. // 可以按照顺序访问结果行的各个列。
  32. customersByCity.map(t => t(0) + "," + t(1)).collect().foreach(println)

2. SparkContext -> HiveContext 操作 hive table

  1. import org.apache.spark.{SparkConf, SparkContext};
  2. import org.apache.spark.sql._;
  3. import org.apache.spark.sql.hive.HiveContext;
  4. // spark 设置配置与上线文
  5. val sparkConf = new SparkConf()
  6. sparkConf.setAppName("SparkSQLHiveOnYarn")
  7. sparkConf.setMaster("local[2]")
  8. val sc = new SparkContext(sparkConf)
  9. // 基于 SparkContext 上下文配置 HiveContext
  10. val hiveContext = new HiveContext(sc)
  11. // 配置 hive 元数据服务
  12. hiveContext.setConf("hive.metastore.uris", "thrift://NameNode:9083")
  13. // 通过 hiveContext 操作 hive table 数据, 写法一
  14. hiveContext.sql("SHOW DATABASES").collect().foreach { x => println(x) }
  15. hiveContext.sql("SHOW TABLES").collect().foreach { x => println(x) }
  16. hiveContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
  17. hiveContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
  18. // Queries are expressed in HiveQL
  19. hiveContext.sql("FROM src SELECT key, value").collect().foreach(println)
  20. // 通过 hiveContext 操作 hive 的数据, 写法二
  21. // 导入语句,可以隐式地将 RDD 转化成 DataFrame
  22. import hiveContext.implicits._ ;
  23. import hiveContext.sql;
  24. sql("SHOW DATABASES").collect().foreach { x => println(x) }
  25. sql("SHOW TABLES").collect().foreach { x => println(x) }

3. 通过 Thriftserver 连接 Hive

  • 启动 spark 的 Thriftserver
  • 通过 hive 客户端连接 Thriftserver 服务器
  • 操作 hive 数据
  1. import java.util.Map;
  2. import java.util.HashMap;
  3. import java.util.List;
  4. import java.util.ArrayList;
  5. import java.util.Arrays;
  6. import java.sql.SQLException;
  7. import java.sql.Connection;
  8. import java.sql.ResultSet;
  9. import java.sql.Statement;
  10. import java.sql.DriverManager;
  11. import org.apache.hive.jdbc.HiveDriver;
  12. public class HiveClient {
  13. private static String driverName = "org.apache.hive.jdbc.HiveDriver";
  14. /**
  15. * 获取连接
  16. */
  17. private Connection connection;
  18. public Connection getConnection() {
  19. return connection;
  20. }
  21. public void setConnection(Connection conn) {
  22. this.connection = conn;
  23. }
  24. /**
  25. * 获取连接句柄
  26. */
  27. private Statement stmt;
  28. public Statement getStmt() {
  29. return stmt;
  30. }
  31. public void setStmt(Statement stmt) {
  32. this.stmt = stmt;
  33. }
  34. public HiveClient(String url, String user, String password) throws SQLException {
  35. // 导入类
  36. try {
  37. Class.forName(driverName);
  38. } catch (ClassNotFoundException e) {
  39. e.printStackTrace();
  40. }
  41. // 创建连接
  42. Connection con = DriverManager.getConnection(url, user, password);
  43. this.setConnection(con);
  44. // 创建连接句柄语句
  45. Statement stmt = con.createStatement();
  46. this.setStmt(stmt);
  47. }
  48. /**
  49. * 执行指定 Sql
  50. * @param sql
  51. * @return 布尔值
  52. */
  53. public Boolean execute (String sql) {
  54. Boolean rs = false;
  55. try {
  56. rs = this.getStmt().execute(sql);
  57. } catch (SQLException e) {
  58. e.printStackTrace();
  59. }
  60. return rs;
  61. }
  62. /**
  63. * 查询数据
  64. * @param sql
  65. * @param fields 字段
  66. * @return List<Map<String, String>>
  67. * @throws SQLException
  68. */
  69. public List<Map<String, String>> select(String sql, String fields) throws SQLException {
  70. // 保存结果数据
  71. List<Map<String, String>> listResult = new ArrayList<Map<String, String>>();
  72. ResultSet res = null;
  73. try {
  74. res = this.getStmt().executeQuery(sql);
  75. } catch (SQLException e) {
  76. e.printStackTrace();
  77. }
  78. // 转换为数组
  79. String[] arrFields = fields.split(",");
  80. // 遍历每一行
  81. while (res.next()) {
  82. // 保存一行数据
  83. Map<String, String> mapRowData = new HashMap<String, String>();
  84. // 拼接字段值
  85. for (String field : arrFields) {
  86. mapRowData.put(field, res.getString("broker_id"));
  87. }
  88. // 追加到 list 中
  89. listResult.add(mapRowData);
  90. }
  91. /**
  92. for (Map<String, String> rs : listResult) {
  93. System.out.println(rs.get("broker_id"));
  94. }
  95. */
  96. return listResult;
  97. }
  98. /**
  99. * 关闭连接
  100. */
  101. public void closeConnection() {
  102. try {
  103. this.getConnection().close();
  104. } catch (SQLException e) {
  105. e.printStackTrace();
  106. }
  107. }
  108. }