问题描述
报错信息
在数据收集阶段,flume自定义source会通过查询本地mysql库解析用户ip地址;运行一段时间后,会出现如下报错,flume重启(重新建立数据库连接池)后又能正常一段时间
druid-1.1.0版本
mysql-connector-5.1.35
catch getAddressOfIp error JSON:
{"@type":"java.sql.SQLException",
"cause":{"@type":"com.mysql.jdbc.exceptions.jdbc4.CommunicationsException",
"cause":{"@type":"java.io.EOFException","localizedMessage":"Can not read response from server. Expected to read 4 bytes, read 0 bytes before connection was unexpectedly lost.","message":"Can not read response from server. Expected to read 4 bytes, read 0 bytes before connection was unexpectedly lost.",
"stackTrace":
{"className":"com.mysql.jdbc.MysqlIO","lineNumber":2914,"methodName":"readFully"},
{"className":"com.mysql.jdbc.MysqlIO","lineNumber":3332,"methodName":"reuseAndReadPacket"},
{"className":"com.mysql.jdbc.MysqlIO","lineNumber":3322,"methodName":"reuseAndReadPacket"},
{"className":"com.mysql.jdbc.MysqlIO","lineNumber":3762,"methodName":"checkErrorPacket"},
{"className":"com.mysql.jdbc.MysqlIO","lineNumber":2435,"methodName":"sendCommand"},
{"className":"com.mysql.jdbc.MysqlIO","lineNumber":2582,"methodName":"sqlQueryDirect"},
{"className":"com.mysql.jdbc.ConnectionImpl","lineNumber":2535,"methodName":"execSQL"},
{"className":"com.mysql.jdbc.PreparedStatement","lineNumber":1911,"methodName":"executeInternal"},
{"className":"com.mysql.jdbc.PreparedStatement","lineNumber":2034,"methodName":"executeQuery"},
{"className":"com.alibaba.druid.pool.DruidPooledPreparedStatement","lineNumber":228,"methodName":"executeQuery"},
{"className":"net.xinhuamm.sinks.MySinkKafka","lineNumber":245,"methodName":"getAddressOfIp"},
{"className":"net.xinhuamm.sinks.MySinkKafka","lineNumber":119,"methodName":"process"}
我的代码
public class DruidManager {
private DruidManager() {
}
private static DruidManager single = null;
private DruidDataSource dataSource;
public synchronized static DruidManager getInstance() {
if (single == null) {
single = new DruidManager();
single.initPool();
}
return single;
}
private void initPool() {
dataSource = new DruidDataSource();
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
dataSource.setUrl("jdbc:mysql://hadoop-master:3306/noah_xinhua?characterEncoding=UTF-8");
dataSource.setUsername("noah_dba");
dataSource.setPassword("xinhua098");
dataSource.setInitialSize(50);
dataSource.setMinIdle(20);
//maxIdle值与maxActive值应配置的接近.当连接数超过maxIdle值后,刚刚使用完的连接(刚刚空闲下来)会立即被销毁。而不是想要的空闲M秒后再销毁起一个缓冲作用。若maxIdle与maxActive相差较大,在高负载的系统中会导致频繁的创建、销毁连接,连接数在maxIdle与maxActive间快速频繁波动,这不是想要的。高负载系统的maxIdle值可以设置为与maxActive相同或设置为-1(-1表示不限制),让连接数量在minIdle与maxIdle间缓冲慢速波动。
//最大空闲连接,连接池中最大的空闲的连接数,超过的空闲连接将被释放,如果设置为负数表示不限制(默认为8个,maxIdle不能设置太小,因为假如在高负载的情况下,连接的打开时间比关闭的时间快,会引起连接池中idle的个数上升超过maxIdle,而造成频繁的连接销毁和创建,类似于jvm参数中的Xmx设置)
// dataSource.setMaxIdle(800);
dataSource.setMaxActive(800); //连接池的最大数据库连接数。设为0表示无限制。 一般把maxActive设置成可能的并发量就行了
dataSource.setMaxWait(5000);//从池中取连接的最大等待时间,单位ms.当没有可用连接时,连接池等待连接释放的最大时间,超过该时间限制会抛出异常,如果设置-1表示无限等待(默认为无限)
dataSource.setPoolPreparedStatements(true);
dataSource.setMaxPoolPreparedStatementPerConnectionSize(10);
//每30秒运行一次空闲连接回收器,配置timeBetweenEvictionRunsMillis = "30000"后,每30秒运行一次空闲连接回收器(独立线程)。并每次检查3个连接,如果连接空闲时间超过30分钟就销毁。销毁连接后,连接数量就少了,如果小于minIdle数量,就新建连接,维护数量不少于minIdle,过行了新老更替。
dataSource.setTimeBetweenEvictionRunsMillis(60000); //DBCP会启用独立的工作线程300秒定时检查
dataSource.setMinEvictableIdleTimeMillis(300000); //池中的连接空闲5分钟后被回收
dataSource.setTestWhileIdle(true);
dataSource.setTestOnBorrow(true); // <!-- 每次获取一个连接的时候,验证一下连接是否可用,语句在validationQuery里面 -->
dataSource.setTestOnReturn(false);
dataSource.setValidationQuery("SELECT 1"); //<!-- 数据库连接可用性测试语句 -->
// dataSource.setRemoveAbandoned(true); //<!-- 自动处理连接未关闭的问题,Setting this to true can recover db connections from poorly written applications which fail to close a connection. -->
// dataSource.setRemoveAbandonedTimeout(180); //<!-- 连接使用后1分钟未关闭,则抛弃 -->
}
//要考虑多线程的情况
public Connection getConnection() {
Connection connection = null;
try {
synchronized (dataSource) {
connection = dataSource.getConnection();
}
} catch (SQLException e) {
e.printStackTrace();
}
return connection;
}
}
public class MySinkKafka extends AbstractSink implements Configurable {
private static final Logger LOG = LoggerFactory.getLogger(MySinkKafka.class);
KafkaProducer producer;
Connection conn;
private JedisUtil jedisUtil;
private Boolean enableLog;
@Override
public void start() {
producer = getProducer();
DruidManager druidManager = DruidManager.getInstance();
conn = druidManager.getConnection();
jedisUtil = JedisUtil.getInstance();
}
@Override
public void stop () {
// Disconnect from the external respository and do any
// additional cleanup (e.g. releasing resources or nulling-out
// field values) ..
System.out.println("MySinkKafka stop()------->");
LOG.info("MySinkKafka stop()------->");
if (producer != null) {
producer.close();
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
@Override
public Status process() {
...
}
private static void assembleMap(JSONObject container, JSONObject jsonObject) {
...
}
/**
* 获取生产者
*
* @author liujiapeng
* @date 2018/10/9 9:42
* @returnType org.apache.kafka.clients.producer.KafkaProducer
*/
private KafkaProducer getProducer() {
...
}
@Override
public void configure(Context context) {
...
}
/**
* 使用淘宝的开放接口解析IP
*
* @param ip
* @return
*/
private AddressInfo parsingIP(String ip) {
...
}
/**
* mysql查询
* @param ip
* @return
*/
private AddressInfo getAddressOfIp(String ip) throws SQLException {
String key = "mysql_" + ip;
String address = jedisUtil.get(key);
LOG.info("getAddressOfIp: key-" + key + ",address-" + address);
if (StringUtils.isEmpty(address) || StringUtils.equals("{}", address)) {
AddressInfo addressInfo = new AddressInfo();
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "select country, province, city from ip where INET_ATON('" + ip + "') BETWEEN ip_start_num AND ip_end_num LIMIT 1";
LOG.info("getAddressOfIp: sql:" + sql);
ps = conn.prepareStatement(sql);
rs = ps.executeQuery();
while (rs.next()) {
addressInfo.setCountry(rs.getString(1));
addressInfo.setProvince(rs.getString(2));
addressInfo.setCity(rs.getString(3));
}
} catch (Exception e) {
LOG.error("catch getAddressOfIp error:" + e.getMessage());
LOG.error("catch getAddressOfIp error JSON:" + JSON.toJSONString(e));
e.printStackTrace();
} finally {
if (ps != null) {
ps.close();
}
if (rs != null) {
rs.close();
}
}
if (StringUtils.isNotEmpty(addressInfo.getProvince())) {
LOG.info("result:" + JSON.toJSONString(addressInfo));
}
return addressInfo;
}
return JSON.parseObject(address, AddressInfo.class);
}
}
懵逼的摸索
first of all,打开度娘搜一搜~,各种修改连接池配置,以为找到银弹了,很兴奋的一个个试了过来,发现还是不行(吐槽下这个bug真难复现,本地还想不到调试的办法);
只能硬着头皮装模做样瞅瞅源码咯~,简化了下代码,报错出在rs = ps.executeQuery()
,跟踪里面的代码没看出啥问题(主要还是自己菜,看懂不多),最下面的异常在MysqlIO
类检查异常方法checkErrorPacket
,发送连接拿到的inputStream
长度为0导致的,报错是catch getAddressOfIp error:connection holder is null (最后的异常层层外抛也没找到这个异常)。看不下去了,我还是想直接点,找holder!虽然异常是代码爆出来的,但是,这个PreparedStatement
是上面初始化的,感觉还是这个鬼有问题;往里一翻,又懵逼了,草草翻了下,大概了解了为啥度娘搜到的解决方案是改配置了,直接看拿到的连接DruidDataSource.getConnection()
,看到了一堆数据库连接配置,根据代码的逻辑进行配置,试了下还是不行(其实根据代码逻辑配置和度娘上面差不多得,笑~);咋办,脑壳疼。
最最最后,抱着一丝希望,希望他是个bbbbbug,去github上搜了下issue,发现遇到这个问题的人不少,看了下也没咋看懂,但的确有相关的一个bug不知道是不是我踩的这个。修复版本是druid-1.1.15,我就换成了次新版本druid-1.1.22,后续看试验情况等bug复现吧(希望别出来啊~)
观察了两天,emmmm… 还是没有解决(无能怒吼),但是好消息是报错换了
java.sql.SQLException: connection disabled
at com.alibaba.druid.pool.DruidPooledConnection.checkStateInternal(DruidPooledConnection.java:1169)
at com.alibaba.druid.pool.DruidPooledConnection.checkState(DruidPooledConnection.java:1154)
at com.alibaba.druid.pool.DruidPooledConnection.prepareStatement(DruidPooledConnection.java:337)
at net.xinhuamm.sinks.MySinkKafka.getAddressOfIp(MySinkKafka.java:246)
at net.xinhuamm.sinks.MySinkKafka.process(MySinkKafka.java:121)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet successfully received from the server was 31,749,415 milliseconds ago. The last packet sent successfully to the server was 31,749,971 milliseconds ago. is longer than the server configured value of 'wait_timeout'. You should consider either expiring and/or testing connection validity before use in your application, increasing the server configured values for client timeouts, or using the Connector/J connection property 'autoReconnect=true' to avoid this problem.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:389)
at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1038)
at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3609)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2417)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2582)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2535)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1911)
at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2034)
at com.alibaba.druid.pool.DruidPooledPreparedStatement.executeQuery(DruidPooledPreparedStatement.java:227)
at net.xinhuamm.sinks.MySinkKafka.getAddressOfIp(MySinkKafka.java:247)
... 4 more
Caused by: java.net.SocketException: Broken pipe (Write failed)
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3591)
... 11 more
继续查呗,换个思路,我去server端也就是mysql日志里找找。
此处推荐下这篇文章,简单清晰:MySQL令人头疼的Aborted告警案例分析cat /var/log/mysql/error.log
2020-09-24T03:30:38.097616Z 15839 [Note] Aborted connection 15839 to db: 'azkaban' user: 'root' host: 'hadoop-slave1' (Got timeout reading communication packets)
2020-09-24T03:30:38.127735Z 15840 [Note] Aborted connection 15840 to db: 'azkaban' user: 'root' host: 'hadoop-slave1' (Got timeout reading communication packets)
2020-09-24T03:37:41.710451Z 15841 [Note] Aborted connection 15841 to db: 'azkaban' user: 'root' host: 'hadoop-slave1' (Got timeout reading communication packets)
2020-09-24T09:27:48.004925Z 16285 [Note] Aborted connection 16285 to db: 'noah_xinhua' user: 'root' host: '115.236.34.2' (Got an error reading communication packets)
2020-09-24T09:54:27.068765Z 16286 [Note] Aborted connection 16286 to db: 'azkaban' user: 'root' host: '115.236.34.2' (Got an error reading communication packets)
2020-09-24T09:54:27.068782Z 16287 [Note] Aborted connection 16287 to db: 'azkaban' user: 'root' host: '115.236.34.2' (Got an error reading communication packets)
这错误信息对不上啊…emmm郁闷了,115.236.34.2(我的网络ip),我预期是能看到hadoop-slave2的host连接报错信息。gg 这日志没价值
看了下mysql配置,max_allowed_packet、interactive_timeout和wait_timeout 正常;
回头想了下,wait_timeout 是28800(即8小时),生产环境理论上不会出现8小时没用到连接,但是测试环境的确更容易复现出这个bug,所需要的时间更短,所以我先将等待时长设置成2628000(即1个月),坐等再观察观察…
如果bug还存在,考虑替换掉连接池,确保是否是db.close()调用问题;
还是gg了,调整mysql连接配置失败,我对conn加了行conn.close()
先试试,如果不行,换调druid为dbcp。