方式一:SQL语句
- 连接phoenix: 将phoenix-client.jar拷贝到kettle_home/lib目录下;然后在建立数据库连接界面选择
generic database,如下图所示:

填入连接URL: jdbc:phoenix:hdp03
驱动类:org.apache.phoenix.jdbc.PhoenixDriver
这样还不行。需要设置一些选项
设置选项:

phoenix.schema.isNamespaceMappingEnabled: true
autoCommit: true —否则不会自动提交
插入SQL语句示例:

选择Bind Parameters…下面的参数列表会替换?中的值。
点击Run运行,在phoenix可以看到数据成功插入类!
方式二 :Java 代码:
import java.sql .*;import java.util.StringJoiner;import java.util.stream.IntStream;public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {if (first) {first = false;}Object[] r = getRow();if (r == null) {setOutputDone();return false;}// It is always safest to call createOutputRow() to ensure that your output row's Object[] is large// enough to handle any new fields you are creating in this step.r = createOutputRow(r, data.outputRowMeta.size());String ID = get(Fields.In, "ID").getString(r);String NAME = get(Fields.In, "NAME").getString(r);String address = get(Fields.In, "ADDRESS").getString(r);String sql = "upsert into SPRING1 values ('" + ID + "','" + NAME + "','" + address + "')";exec(sql);// Send the row on to the next step.//putRow(data.outputRowMeta, r);return true;}public Connection getConnection() {try {Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");String url = "jdbc:phoenix:hdp03";Connection conn = DriverManager.getConnection(url, "", "");return conn;} catch (ClassNotFoundException e) {e.printStackTrace();} catch (SQLException e) {e.printStackTrace();}return null;}public void close(Connection connection) {if (connection != null) {try {connection.close();} catch (SQLException e) {e.printStackTrace();}}}public int exec(String sql) {Connection conn = getConnection();try {PreparedStatement statement = conn.prepareStatement(sql);int i = statement.executeUpdate();conn.commit();return i;} catch (SQLException e) {e.printStackTrace();} finally {close(conn);}return -1;}
