方式一:SQL语句
- 连接phoenix: 将phoenix-client.jar拷贝到kettle_home/lib目录下;然后在建立数据库连接界面选择
generic database
,如下图所示:
填入连接URL: jdbc:phoenix:hdp03
驱动类:org.apache.phoenix.jdbc.PhoenixDriver
这样还不行。需要设置一些选项
设置选项:
phoenix.schema.isNamespaceMappingEnabled: true
autoCommit: true —否则不会自动提交
插入SQL语句示例:
选择Bind Parameters…下面的参数列表会替换?中的值。
点击Run运行,在phoenix可以看到数据成功插入类!
方式二 :Java 代码:
import java.sql .*;
import java.util.StringJoiner;
import java.util.stream.IntStream;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
if (first) {
first = false;
}
Object[] r = getRow();
if (r == null) {
setOutputDone();
return false;
}
// It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
// enough to handle any new fields you are creating in this step.
r = createOutputRow(r, data.outputRowMeta.size());
String ID = get(Fields.In, "ID").getString(r);
String NAME = get(Fields.In, "NAME").getString(r);
String address = get(Fields.In, "ADDRESS").getString(r);
String sql = "upsert into SPRING1 values ('" + ID + "','" + NAME + "','" + address + "')";
exec(sql);
// Send the row on to the next step.
//putRow(data.outputRowMeta, r);
return true;
}
public Connection getConnection() {
try {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
String url = "jdbc:phoenix:hdp03";
Connection conn = DriverManager.getConnection(url, "", "");
return conn;
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
public void close(Connection connection) {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
public int exec(String sql) {
Connection conn = getConnection();
try {
PreparedStatement statement = conn.prepareStatement(sql);
int i = statement.executeUpdate();
conn.commit();
return i;
} catch (SQLException e) {
e.printStackTrace();
} finally {
close(conn);
}
return -1;
}