方式一:SQL语句

  1. 连接phoenix: 将phoenix-client.jar拷贝到kettle_home/lib目录下;然后在建立数据库连接界面选择 generic database ,如下图所示:

image.png

填入连接URL: jdbc:phoenix:hdp03
驱动类:org.apache.phoenix.jdbc.PhoenixDriver

这样还不行。需要设置一些选项

  1. 设置选项:

    image.png

phoenix.schema.isNamespaceMappingEnabled: true
autoCommit: true —否则不会自动提交

  1. 插入SQL语句示例:

    image.png

选择Bind Parameters…下面的参数列表会替换?中的值。

点击Run运行,在phoenix可以看到数据成功插入类!


方式二 :Java 代码:

  1. import java.sql .*;
  2. import java.util.StringJoiner;
  3. import java.util.stream.IntStream;
  4. public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
  5. if (first) {
  6. first = false;
  7. }
  8. Object[] r = getRow();
  9. if (r == null) {
  10. setOutputDone();
  11. return false;
  12. }
  13. // It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
  14. // enough to handle any new fields you are creating in this step.
  15. r = createOutputRow(r, data.outputRowMeta.size());
  16. String ID = get(Fields.In, "ID").getString(r);
  17. String NAME = get(Fields.In, "NAME").getString(r);
  18. String address = get(Fields.In, "ADDRESS").getString(r);
  19. String sql = "upsert into SPRING1 values ('" + ID + "','" + NAME + "','" + address + "')";
  20. exec(sql);
  21. // Send the row on to the next step.
  22. //putRow(data.outputRowMeta, r);
  23. return true;
  24. }
  25. public Connection getConnection() {
  26. try {
  27. Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
  28. String url = "jdbc:phoenix:hdp03";
  29. Connection conn = DriverManager.getConnection(url, "", "");
  30. return conn;
  31. } catch (ClassNotFoundException e) {
  32. e.printStackTrace();
  33. } catch (SQLException e) {
  34. e.printStackTrace();
  35. }
  36. return null;
  37. }
  38. public void close(Connection connection) {
  39. if (connection != null) {
  40. try {
  41. connection.close();
  42. } catch (SQLException e) {
  43. e.printStackTrace();
  44. }
  45. }
  46. }
  47. public int exec(String sql) {
  48. Connection conn = getConnection();
  49. try {
  50. PreparedStatement statement = conn.prepareStatement(sql);
  51. int i = statement.executeUpdate();
  52. conn.commit();
  53. return i;
  54. } catch (SQLException e) {
  55. e.printStackTrace();
  56. } finally {
  57. close(conn);
  58. }
  59. return -1;
  60. }