vlambda博客
学习文章列表

BoneCP连接池重连机制分析

一、背景

朋友公司Mysql连接池用的BoneCP,应用程序访问Mysql以域名方式,配置如下:

jdbc:mysql://order.mysql.xx.cn:3306/order?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true

所有中间件访问都是通过内网的Dns服务器进行访问。


最近一个应用的Mysql所在机器已经老化严重,宕机几次,准备将数据库迁移到新机器上,为了减化操作,不想修改应用配置(线上机器几十台),问能不能只修改域名,而不用重启机器。


具体操作步骤如下:

1、将域名指向新机器;

2、将老实例关掉。


这里不讨论数据一致性和迁移数据的问题。



二、问题分析

这里有两点需要确认:

1、BoneCP失败了会重连吗?

即BoneCP能否捕捉连接失败/执行异常的错误,然后将连接池中打开的连接关闭。


2、DNS有缓存吗?

因为程序中配的是域名,实际上要通过4层的TCP协议连接Mysql,中间有个DNS解析,但DNS一般是有缓存的。


对于第一个问题,官方说是支持的,倒底是如何支持的,看下代码,关键代码为PreparedStatementHandle::execute方法:

/** * {@inheritDoc} *  * @see java.sql.PreparedStatement#execute() */ // @Override public boolean execute() throws SQLException { checkClosed(); try { if (this.logStatementsEnabled){ logger.debug(PoolUtil.fillLogParams(this.sql, this.logParams)); } long queryStartTime = queryTimerStart();
if (this.connectionHook != null){ this.connectionHook.onBeforeStatementExecute(this.connectionHandle, this, this.sql, this.logParams); }
boolean result = this.internalPreparedStatement.execute();
if (this.connectionHook != null){ this.connectionHook.onAfterStatementExecute(this.connectionHandle, this, this.sql, this.logParams); }

queryTimerEnd(this.sql, queryStartTime);
return result; } catch (SQLException e) { throw this.connectionHandle.markPossiblyBroken(e);
}
}


重点关注对异常的处理,即markPossiblyBroken的调用


/**  * Given an exception, flag the connection (or database) as being potentially broken. If the exception is a data-specific exception, * do nothing except throw it back to the application.  *  * @param e SQLException e * @return SQLException for further processing */ protected SQLException markPossiblyBroken(SQLException e) { String state = e.getSQLState(); boolean alreadyDestroyed = false;
ConnectionState connectionState = this.getConnectionHook() != null ? this.getConnectionHook().onMarkPossiblyBroken(this, state, e) : ConnectionState.NOP; if (state == null){ // safety; state = "08999"; }
if (((sqlStateDBFailureCodes.contains(state) || connectionState.equals(ConnectionState.TERMINATE_ALL_CONNECTIONS)) && this.pool != null) && this.pool.getDbIsDown().compareAndSet(false, true) ){ logger.error("Database access problem. Killing off this connection and all remaining connections in the connection pool. SQL State = " + state); this.pool.connectionStrategy.terminateAllConnections(); this.pool.destroyConnection(this); this.logicallyClosed.set(true); alreadyDestroyed = true;
for (int i=0; i < this.pool.partitionCount; i++) { // send a signal to try re-populating again. this.pool.partitions[i].getPoolWatchThreadSignalQueue().offer(new Object()); // item being pushed is not important. } }
   //如果是指定错误码,一般来说是连接超时之类的,会关闭连接池中的连接 if (state.equals("08003") || sqlStateDBFailureCodes.contains(state) || e.getCause() instanceof SocketException) { if (!alreadyDestroyed) { this.pool.destroyConnection(this); this.logicallyClosed.set(true); getOriginatingPartition().getPoolWatchThreadSignalQueue().offer(new Object()); // item being pushed is not important. } } char firstChar = state.charAt(0); if (connectionState.equals(ConnectionState.CONNECTION_POSSIBLY_BROKEN) || state.equals("40001") || state.startsWith("08") || (firstChar >= '5' && firstChar <='9') /*|| (firstChar >='I' && firstChar <= 'Z')*/){ this.possiblyBroken = true; }
// Notify anyone who's interested if (this.possiblyBroken && (this.getConnectionHook() != null)){ this.possiblyBroken = this.getConnectionHook().onConnectionException(this, state, e); }
return e; }


可以看到,代码中会检测是否为连接超时之类的错误,如是则关闭连接池的连接,这样下次就会重新建立新连接了。



第二个是关于DNS解析的问题,通过分析代码,BoneCP的连接复用的Jdbc的代码,连接的建立是由Jdbc包里的StandardSocketFactory类来完成的:


/** * @see com.mysql.jdbc.SocketFactory#createSocket(Properties) */ public Socket connect(String hostname, int portNumber, Properties props) throws SocketException, IOException {

if (this.host != null) { if (!(wantsLocalBind || wantsTimeout || needsConfigurationBeforeConnect)) {          //根据域名查找IP InetAddress[] possibleAddresses = InetAddress .getAllByName(this.host);
Throwable caughtWhileConnecting = null;
// Need to loop through all possible addresses, in case // someone has IPV6 configured (SuSE, for example...)
for (int i = 0; i < possibleAddresses.length; i++) { try { this.rawSocket = new Socket(possibleAddresses[i], port);
configureSocket(this.rawSocket, props);
break; } catch (Exception ex) { caughtWhileConnecting = ex; } }
if (rawSocket == null) { unwrapExceptionToProperClassAndThrowIt(caughtWhileConnecting); } } else { }
return this.rawSocket; } }
throw new SocketException("Unable to create socket"); }



getAllByName会调用getAddressesFromNameService来获取IP:

 private static InetAddress[] getAddressesFromNameService(String host, InetAddress reqAddr) throws UnknownHostException { InetAddress[] addresses = null; boolean success = false; UnknownHostException ex = null;
if ((addresses = checkLookupTable(host)) == null) { try { // This is the first thread which looks up the addresses // this host or the cache entry for this host has been // expired so this thread should do the lookup. for (NameService nameService : nameServices) { try { /* * Do not put the call to lookup() inside the * constructor. if you do you will still be * allocating space when the lookup fails. */
addresses = nameService.lookupAllHostAddr(host); success = true; break; } catch (UnknownHostException uhe) { if (host.equalsIgnoreCase("localhost")) { InetAddress[] local = new InetAddress[] { impl.loopbackAddress() }; addresses = local; success = true; break; } else { addresses = unknown_array; success = false; ex = uhe; } } }
// More to do? if (reqAddr != null && addresses.length > 1 && !addresses[0].equals(reqAddr)) { // Find it? int i = 1; for (; i < addresses.length; i++) { if (addresses[i].equals(reqAddr)) { break; } } // Rotate if (i < addresses.length) { InetAddress tmp, tmp2 = reqAddr; for (int j = 0; j < i; j++) { tmp = addresses[j]; addresses[j] = tmp2; tmp2 = tmp; } addresses[i] = tmp2; } }                //关键代码,缓存查询结果 cacheAddresses(host, addresses, success);
if (!success && ex != null) throw ex;
} finally { // Delete host from the lookupTable and notify // all threads waiting on the lookupTable monitor. updateLookupTable(host); } }
return addresses; }


代码就不再贴了,是通过调用InetAddressCachePolicy来获取缓存时间,

 static { Integer var0 = (Integer)AccessController.doPrivileged(new PrivilegedAction<Integer>() { public Integer run() { String var1; try { var1 = Security.getProperty("networkaddress.cache.ttl"); if(var1 != null) { return Integer.valueOf(var1); } } catch (NumberFormatException var3) { ; }
try { var1 = System.getProperty("sun.net.inetaddr.ttl"); if(var1 != null) { return Integer.decode(var1); } } catch (NumberFormatException var2) { ; }
return null; } }); if(var0 != null) { cachePolicy = var0.intValue(); if(cachePolicy < 0) { cachePolicy = -1; }
propertySet = true; } else if(System.getSecurityManager() == null) { cachePolicy = 30; }


可以看到InetAddressCachePolicy会从几个配置中读,如果读不到,也没有配置SecurityManager,则默认是30秒,也就是本例中的情况。



四、实际验证

上面是我们的分析过程,如何验证呢?


1、将程序跑起来;

2、将域名order.mysql.xx.cn指向新机器;

3、在老的mysql机器上用show processlist显示连接,然后用kill杀掉这些连接;

4、观察新的mysql机器上有没连接过来,程序有没报错


不出意外的话,程序会有一段小报错,然后恢复正常了,所有mysql连接都指向新机器了。