java版

    1. /**
    2. * 以编程方式动态指定元数据,将RDD转换为DataFrame
    3. * @author Administrator
    4. *
    5. */
    6. public class RDD2DataFrameProgrammatically {
    7. public static void main(String[] args) {
    8. // 创建SparkConf、JavaSparkContext、SQLContext
    9. SparkConf conf = new SparkConf()
    10. .setMaster("local")
    11. .setAppName("RDD2DataFrameProgrammatically");
    12. JavaSparkContext sc = new JavaSparkContext(conf);
    13. SQLContext sqlContext = new SQLContext(sc);
    14. // 第一步,创建一个普通的RDD,但是,必须将其转换为RDD<Row>的这种格式
    15. JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//students.txt");
    16. // 分析一下
    17. // 它报了一个,不能直接从String转换为Integer的一个类型转换的错误
    18. // 就说明什么,说明有个数据,给定义成了String类型,结果使用的时候,要用Integer类型来使用
    19. // 而且,错误报在sql相关的代码中
    20. // 所以,基本可以断定,就是说,在sql中,用到age<=18的语法,所以就强行就将age转换为Integer来使用
    21. // 但是,肯定是之前有些步骤,将age定义为了String
    22. // 所以就往前找,就找到了这里
    23. // 往Row中塞数据的时候,要注意,什么格式的数据,就用什么格式转换一下,再塞进去
    24. JavaRDD<Row> studentRDD = lines.map(new Function<String, Row>() {
    25. private static final long serialVersionUID = 1L;
    26. @Override
    27. public Row call(String line) throws Exception {
    28. String[] lineSplited = line.split(",");
    29. return RowFactory.create(
    30. Integer.valueOf(lineSplited[0]),
    31. lineSplited[1],
    32. Integer.valueOf(lineSplited[2]));
    33. }
    34. });
    35. // 第二步,动态构造元数据
    36. // 比如说,id、name等,field的名称和类型,可能都是在程序运行过程中,动态从mysql db里
    37. // 或者是配置文件中,加载出来的,是不固定的
    38. // 所以特别适合用这种编程的方式,来构造元数据
    39. List<StructField> structFields = new ArrayList<StructField>();
    40. structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
    41. structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
    42. structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
    43. StructType structType = DataTypes.createStructType(structFields);
    44. // 第三步,使用动态构造的元数据,将RDD转换为DataFrame
    45. DataFrame studentDF = sqlContext.createDataFrame(studentRDD, structType);
    46. // 后面,就可以使用DataFrame了
    47. studentDF.registerTempTable("students");
    48. DataFrame teenagerDF = sqlContext.sql("select * from students where age<=18");
    49. List<Row> rows = teenagerDF.javaRDD().collect();
    50. for(Row row : rows) {
    51. System.out.println(row);
    52. }
    53. }
    54. }

    scala 版

    1. /**
    2. * @author Administrator
    3. */
    4. object RDD2DataFrameProgrammatically extends App {
    5. val conf = new SparkConf()
    6. .setMaster("local")
    7. .setAppName("RDD2DataFrameProgrammatically")
    8. val sc = new SparkContext(conf)
    9. val sqlContext = new SQLContext(sc)
    10. // 第一步,构造出元素为Row的普通RDD
    11. val studentRDD = sc.textFile("C://Users//Administrator//Desktop//students.txt", 1)
    12. .map { line => Row(line.split(",")(0).toInt, line.split(",")(1), line.split(",")(2).toInt) }
    13. // 第二步,编程方式动态构造元数据
    14. val structType = StructType(Array(
    15. StructField("id", IntegerType, true),
    16. StructField("name", StringType, true),
    17. StructField("age", IntegerType, true)))
    18. // 第三步,进行RDD到DataFrame的转换
    19. val studentDF = sqlContext.createDataFrame(studentRDD, structType)
    20. // 继续正常使用
    21. studentDF.registerTempTable("students")
    22. val teenagerDF = sqlContext.sql("select * from students where age<=18")
    23. val teenagerRDD = teenagerDF.rdd.collect().foreach { row => println(row) }
    24. }