JDBC读取数据的缺点

jdbc默认的读取数据的时候,会将要查询的数据一次性读到内存中,再通过resultSet循环读取出来,这样子百万条数据很容易就撑爆内存,再执行ResultSet的next方法,在数据全部返回之前,next方法是阻塞的,设置了之后(使用stream),ResultSet的next方法就会立即返回server端返回的数据,这样客户端就不用使用大量的内存存储返回的数据了
然后调研了下发现,其实可以通过jdbc流的方式读取数据,这种方式可以将读取出来的每一条数据查询到客户端,再执行insert操作,这样机器负载就会轻很多,实际操作了下,确实是这样的。

代码

  1. import com.alibaba.fastjson.JSONObject;
  2. import org.apache.flink.configuration.Configuration;
  3. import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
  4. import java.sql.*;
  5. import java.util.Properties;
  6. public class MysqlSource extends RichSourceFunction<JSONObject>{
  7. private Properties properties;
  8. private Connection connection;
  9. PreparedStatement preparedStatement;
  10. public MysqlSource() {
  11. super();
  12. }
  13. public MysqlSource(Properties properties) {
  14. this.properties = properties;
  15. }
  16. @Override
  17. public void open(Configuration parameters) throws Exception {
  18. super.open(parameters);
  19. String jdbcUri = this.properties.getProperty("jdbcUri");
  20. this.connection = DriverManager.getConnection(jdbcUri, "db_name", "passwrod");
  21. }
  22. @Override
  23. public void run(SourceContext<JSONObject> ctx) throws Exception {
  24. String sql = "select * from table_name";
  25. // 表示流式读取数据
  26. preparedStatement = this.connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
  27. // 一定要设置成Integer.MIN_VALUE
  28. preparedStatement.setFetchSize(Integer.MIN_VALUE);
  29. ResultSet resultSet = preparedStatement.executeQuery();
  30. while (resultSet.next()) {
  31. JSONObject jsonObject = new JSONObject();
  32. ctx.collect(jsonObject);
  33. }
  34. }
  35. @Override
  36. public void cancel() {
  37. try {
  38. this.preparedStatement.close();
  39. this.connection.close();
  40. } catch (SQLException e) {
  41. e.printStackTrace();
  42. }
  43. }
  44. }

具体使用stream来读取数据,只是需要在生成prepareStatement时加上ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY这两个参数,再设置一下FetchSize为Integer.MIN_VALUE就可以了,mysql判断是否开启流式读取结果就是看这三个条件forward-only,read-only,fatch size=Integer.MIN_VALUE有没有被设置
[

](https://blog.csdn.net/qq_22912803/article/details/88998415)