BoneCP连接池重连机制分析
一、背景
朋友公司Mysql连接池用的BoneCP,应用程序访问Mysql以域名方式,配置如下:
jdbc:mysql://order.mysql.xx.cn:3306/order?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true
所有中间件访问都是通过内网的Dns服务器进行访问。
最近一个应用的Mysql所在机器已经老化严重,宕机几次,准备将数据库迁移到新机器上,为了减化操作,不想修改应用配置(线上机器几十台),问能不能只修改域名,而不用重启机器。
具体操作步骤如下:
1、将域名指向新机器;
2、将老实例关掉。
这里不讨论数据一致性和迁移数据的问题。
二、问题分析
这里有两点需要确认:
1、BoneCP失败了会重连吗?
即BoneCP能否捕捉连接失败/执行异常的错误,然后将连接池中打开的连接关闭。
2、DNS有缓存吗?
因为程序中配的是域名,实际上要通过4层的TCP协议连接Mysql,中间有个DNS解析,但DNS一般是有缓存的。
对于第一个问题,官方说是支持的,倒底是如何支持的,看下代码,关键代码为PreparedStatementHandle::execute方法:
/*** {@inheritDoc}** @see java.sql.PreparedStatement#execute()*/// @Overridepublic boolean execute() throws SQLException {checkClosed();try {if (this.logStatementsEnabled){logger.debug(PoolUtil.fillLogParams(this.sql, this.logParams));}long queryStartTime = queryTimerStart();if (this.connectionHook != null){this.connectionHook.onBeforeStatementExecute(this.connectionHandle, this, this.sql, this.logParams);}boolean result = this.internalPreparedStatement.execute();if (this.connectionHook != null){this.connectionHook.onAfterStatementExecute(this.connectionHandle, this, this.sql, this.logParams);}queryTimerEnd(this.sql, queryStartTime);return result;} catch (SQLException e) {throw this.connectionHandle.markPossiblyBroken(e);}}
重点关注对异常的处理,即markPossiblyBroken的调用
/*** Given an exception, flag the connection (or database) as being potentially broken. If the exception is a data-specific exception,* do nothing except throw it back to the application.** @param e SQLException e* @return SQLException for further processing*/protected SQLException markPossiblyBroken(SQLException e) {String state = e.getSQLState();boolean alreadyDestroyed = false;ConnectionState connectionState = this.getConnectionHook() != null ? this.getConnectionHook().onMarkPossiblyBroken(this, state, e) : ConnectionState.NOP;if (state == null){ // safety;state = "08999";}if (((sqlStateDBFailureCodes.contains(state) || connectionState.equals(ConnectionState.TERMINATE_ALL_CONNECTIONS)) && this.pool != null) && this.pool.getDbIsDown().compareAndSet(false, true) ){logger.error("Database access problem. Killing off this connection and all remaining connections in the connection pool. SQL State = " + state);this.pool.connectionStrategy.terminateAllConnections();this.pool.destroyConnection(this);this.logicallyClosed.set(true);alreadyDestroyed = true;for (int i=0; i < this.pool.partitionCount; i++) {// send a signal to try re-populating again.this.pool.partitions[i].getPoolWatchThreadSignalQueue().offer(new Object()); // item being pushed is not important.}}//如果是指定错误码,一般来说是连接超时之类的,会关闭连接池中的连接if (state.equals("08003") || sqlStateDBFailureCodes.contains(state) || e.getCause() instanceof SocketException) {if (!alreadyDestroyed) {this.pool.destroyConnection(this);this.logicallyClosed.set(true);getOriginatingPartition().getPoolWatchThreadSignalQueue().offer(new Object()); // item being pushed is not important.}}char firstChar = state.charAt(0);if (connectionState.equals(ConnectionState.CONNECTION_POSSIBLY_BROKEN) || state.equals("40001") ||state.startsWith("08") || (firstChar >= '5' && firstChar <='9') /*|| (firstChar >='I' && firstChar <= 'Z')*/){this.possiblyBroken = true;}// Notify anyone who's interestedif (this.possiblyBroken && (this.getConnectionHook() != null)){this.possiblyBroken = this.getConnectionHook().onConnectionException(this, state, e);}return e;}
可以看到,代码中会检测是否为连接超时之类的错误,如是则关闭连接池的连接,这样下次就会重新建立新连接了。
第二个是关于DNS解析的问题,通过分析代码,BoneCP的连接复用的Jdbc的代码,连接的建立是由Jdbc包里的StandardSocketFactory类来完成的:
/*** @see com.mysql.jdbc.SocketFactory#createSocket(Properties)*/public Socket connect(String hostname, int portNumber, Properties props)throws SocketException, IOException {if (this.host != null) {if (!(wantsLocalBind || wantsTimeout || needsConfigurationBeforeConnect)) {//根据域名查找IPInetAddress[] possibleAddresses = InetAddress.getAllByName(this.host);Throwable caughtWhileConnecting = null;// Need to loop through all possible addresses, in case// someone has IPV6 configured (SuSE, for example...)for (int i = 0; i < possibleAddresses.length; i++) {try {this.rawSocket = new Socket(possibleAddresses[i],port);configureSocket(this.rawSocket, props);break;} catch (Exception ex) {caughtWhileConnecting = ex;}}if (rawSocket == null) {unwrapExceptionToProperClassAndThrowIt(caughtWhileConnecting);}} else {}return this.rawSocket;}}throw new SocketException("Unable to create socket");}
getAllByName会调用getAddressesFromNameService来获取IP:
private static InetAddress[] getAddressesFromNameService(String host, InetAddress reqAddr)throws UnknownHostException{InetAddress[] addresses = null;boolean success = false;UnknownHostException ex = null;if ((addresses = checkLookupTable(host)) == null) {try {// This is the first thread which looks up the addresses// this host or the cache entry for this host has been// expired so this thread should do the lookup.for (NameService nameService : nameServices) {try {/** Do not put the call to lookup() inside the* constructor. if you do you will still be* allocating space when the lookup fails.*/addresses = nameService.lookupAllHostAddr(host);success = true;break;} catch (UnknownHostException uhe) {if (host.equalsIgnoreCase("localhost")) {InetAddress[] local = new InetAddress[] { impl.loopbackAddress() };addresses = local;success = true;break;}else {addresses = unknown_array;success = false;ex = uhe;}}}// More to do?if (reqAddr != null && addresses.length > 1 && !addresses[0].equals(reqAddr)) {// Find it?int i = 1;for (; i < addresses.length; i++) {if (addresses[i].equals(reqAddr)) {break;}}// Rotateif (i < addresses.length) {InetAddress tmp, tmp2 = reqAddr;for (int j = 0; j < i; j++) {tmp = addresses[j];addresses[j] = tmp2;tmp2 = tmp;}addresses[i] = tmp2;}}//关键代码,缓存查询结果cacheAddresses(host, addresses, success);if (!success && ex != null)throw ex;} finally {// Delete host from the lookupTable and notify// all threads waiting on the lookupTable monitor.updateLookupTable(host);}}return addresses;}
代码就不再贴了,是通过调用InetAddressCachePolicy来获取缓存时间,
static {Integer var0 = (Integer)AccessController.doPrivileged(new PrivilegedAction<Integer>() {public Integer run() {String var1;try {var1 = Security.getProperty("networkaddress.cache.ttl");if(var1 != null) {return Integer.valueOf(var1);}} catch (NumberFormatException var3) {;}try {var1 = System.getProperty("sun.net.inetaddr.ttl");if(var1 != null) {return Integer.decode(var1);}} catch (NumberFormatException var2) {;}return null;}});if(var0 != null) {cachePolicy = var0.intValue();if(cachePolicy < 0) {cachePolicy = -1;}propertySet = true;} else if(System.getSecurityManager() == null) {cachePolicy = 30;}
可以看到InetAddressCachePolicy会从几个配置中读,如果读不到,也没有配置SecurityManager,则默认是30秒,也就是本例中的情况。
四、实际验证
上面是我们的分析过程,如何验证呢?
1、将程序跑起来;
2、将域名order.mysql.xx.cn指向新机器;
3、在老的mysql机器上用show processlist显示连接,然后用kill杀掉这些连接;
4、观察新的mysql机器上有没连接过来,程序有没报错
不出意外的话,程序会有一段小报错,然后恢复正常了,所有mysql连接都指向新机器了。
