JDBC读取数据的缺点
jdbc默认的读取数据的时候,会将要查询的数据一次性读到内存中,再通过resultSet循环读取出来,这样子百万条数据很容易就撑爆内存,再执行ResultSet的next方法,在数据全部返回之前,next方法是阻塞的,设置了之后(使用stream),ResultSet的next方法就会立即返回server端返回的数据,这样客户端就不用使用大量的内存存储返回的数据了
然后调研了下发现,其实可以通过jdbc流的方式读取数据,这种方式可以将读取出来的每一条数据查询到客户端,再执行insert操作,这样机器负载就会轻很多,实际操作了下,确实是这样的。
代码
import com.alibaba.fastjson.JSONObject;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import java.sql.*;import java.util.Properties;public class MysqlSource extends RichSourceFunction<JSONObject>{private Properties properties;private Connection connection;PreparedStatement preparedStatement;public MysqlSource() {super();}public MysqlSource(Properties properties) {this.properties = properties;}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);String jdbcUri = this.properties.getProperty("jdbcUri");this.connection = DriverManager.getConnection(jdbcUri, "db_name", "passwrod");}@Overridepublic void run(SourceContext<JSONObject> ctx) throws Exception {String sql = "select * from table_name";// 表示流式读取数据preparedStatement = this.connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);// 一定要设置成Integer.MIN_VALUEpreparedStatement.setFetchSize(Integer.MIN_VALUE);ResultSet resultSet = preparedStatement.executeQuery();while (resultSet.next()) {JSONObject jsonObject = new JSONObject();ctx.collect(jsonObject);}}@Overridepublic void cancel() {try {this.preparedStatement.close();this.connection.close();} catch (SQLException e) {e.printStackTrace();}}}
具体使用stream来读取数据,只是需要在生成prepareStatement时加上ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY这两个参数,再设置一下FetchSize为Integer.MIN_VALUE就可以了,mysql判断是否开启流式读取结果就是看这三个条件forward-only,read-only,fatch size=Integer.MIN_VALUE有没有被设置
[
](https://blog.csdn.net/qq_22912803/article/details/88998415)
