BoneCP连接池重连机制分析
一、背景
朋友公司Mysql连接池用的BoneCP,应用程序访问Mysql以域名方式,配置如下:
jdbc:mysql://order.mysql.xx.cn:3306/order?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true
所有中间件访问都是通过内网的Dns服务器进行访问。
最近一个应用的Mysql所在机器已经老化严重,宕机几次,准备将数据库迁移到新机器上,为了减化操作,不想修改应用配置(线上机器几十台),问能不能只修改域名,而不用重启机器。
具体操作步骤如下:
1、将域名指向新机器;
2、将老实例关掉。
这里不讨论数据一致性和迁移数据的问题。
二、问题分析
这里有两点需要确认:
1、BoneCP失败了会重连吗?
即BoneCP能否捕捉连接失败/执行异常的错误,然后将连接池中打开的连接关闭。
2、DNS有缓存吗?
因为程序中配的是域名,实际上要通过4层的TCP协议连接Mysql,中间有个DNS解析,但DNS一般是有缓存的。
对于第一个问题,官方说是支持的,倒底是如何支持的,看下代码,关键代码为PreparedStatementHandle::execute方法:
/**
* {@inheritDoc}
*
* @see java.sql.PreparedStatement#execute()
*/
// @Override
public boolean execute() throws SQLException {
checkClosed();
try {
if (this.logStatementsEnabled){
logger.debug(PoolUtil.fillLogParams(this.sql, this.logParams));
}
long queryStartTime = queryTimerStart();
if (this.connectionHook != null){
this.connectionHook.onBeforeStatementExecute(this.connectionHandle, this, this.sql, this.logParams);
}
boolean result = this.internalPreparedStatement.execute();
if (this.connectionHook != null){
this.connectionHook.onAfterStatementExecute(this.connectionHandle, this, this.sql, this.logParams);
}
queryTimerEnd(this.sql, queryStartTime);
return result;
} catch (SQLException e) {
throw this.connectionHandle.markPossiblyBroken(e);
}
}
重点关注对异常的处理,即markPossiblyBroken的调用
/**
* Given an exception, flag the connection (or database) as being potentially broken. If the exception is a data-specific exception,
* do nothing except throw it back to the application.
*
* @param e SQLException e
* @return SQLException for further processing
*/
protected SQLException markPossiblyBroken(SQLException e) {
String state = e.getSQLState();
boolean alreadyDestroyed = false;
ConnectionState connectionState = this.getConnectionHook() != null ? this.getConnectionHook().onMarkPossiblyBroken(this, state, e) : ConnectionState.NOP;
if (state == null){ // safety;
state = "08999";
}
if (((sqlStateDBFailureCodes.contains(state) || connectionState.equals(ConnectionState.TERMINATE_ALL_CONNECTIONS)) && this.pool != null) && this.pool.getDbIsDown().compareAndSet(false, true) ){
logger.error("Database access problem. Killing off this connection and all remaining connections in the connection pool. SQL State = " + state);
this.pool.connectionStrategy.terminateAllConnections();
this.pool.destroyConnection(this);
this.logicallyClosed.set(true);
alreadyDestroyed = true;
for (int i=0; i < this.pool.partitionCount; i++) {
// send a signal to try re-populating again.
this.pool.partitions[i].getPoolWatchThreadSignalQueue().offer(new Object()); // item being pushed is not important.
}
}
//如果是指定错误码,一般来说是连接超时之类的,会关闭连接池中的连接
if (state.equals("08003") || sqlStateDBFailureCodes.contains(state) || e.getCause() instanceof SocketException) {
if (!alreadyDestroyed) {
this.pool.destroyConnection(this);
this.logicallyClosed.set(true);
getOriginatingPartition().getPoolWatchThreadSignalQueue().offer(new Object()); // item being pushed is not important.
}
}
char firstChar = state.charAt(0);
if (connectionState.equals(ConnectionState.CONNECTION_POSSIBLY_BROKEN) || state.equals("40001") ||
state.startsWith("08") || (firstChar >= '5' && firstChar <='9') /*|| (firstChar >='I' && firstChar <= 'Z')*/){
this.possiblyBroken = true;
}
// Notify anyone who's interested
if (this.possiblyBroken && (this.getConnectionHook() != null)){
this.possiblyBroken = this.getConnectionHook().onConnectionException(this, state, e);
}
return e;
}
可以看到,代码中会检测是否为连接超时之类的错误,如是则关闭连接池的连接,这样下次就会重新建立新连接了。
第二个是关于DNS解析的问题,通过分析代码,BoneCP的连接复用的Jdbc的代码,连接的建立是由Jdbc包里的StandardSocketFactory类来完成的:
/**
* @see com.mysql.jdbc.SocketFactory#createSocket(Properties)
*/
public Socket connect(String hostname, int portNumber, Properties props)
throws SocketException, IOException {
if (this.host != null) {
if (!(wantsLocalBind || wantsTimeout || needsConfigurationBeforeConnect)) {
//根据域名查找IP
InetAddress[] possibleAddresses = InetAddress
.getAllByName(this.host);
Throwable caughtWhileConnecting = null;
// Need to loop through all possible addresses, in case
// someone has IPV6 configured (SuSE, for example...)
for (int i = 0; i < possibleAddresses.length; i++) {
try {
this.rawSocket = new Socket(possibleAddresses[i],
port);
configureSocket(this.rawSocket, props);
break;
} catch (Exception ex) {
caughtWhileConnecting = ex;
}
}
if (rawSocket == null) {
unwrapExceptionToProperClassAndThrowIt(caughtWhileConnecting);
}
} else {
}
return this.rawSocket;
}
}
throw new SocketException("Unable to create socket");
}
getAllByName会调用getAddressesFromNameService来获取IP:
private static InetAddress[] getAddressesFromNameService(String host, InetAddress reqAddr)
throws UnknownHostException
{
InetAddress[] addresses = null;
boolean success = false;
UnknownHostException ex = null;
if ((addresses = checkLookupTable(host)) == null) {
try {
// This is the first thread which looks up the addresses
// this host or the cache entry for this host has been
// expired so this thread should do the lookup.
for (NameService nameService : nameServices) {
try {
/*
* Do not put the call to lookup() inside the
* constructor. if you do you will still be
* allocating space when the lookup fails.
*/
addresses = nameService.lookupAllHostAddr(host);
success = true;
break;
} catch (UnknownHostException uhe) {
if (host.equalsIgnoreCase("localhost")) {
InetAddress[] local = new InetAddress[] { impl.loopbackAddress() };
addresses = local;
success = true;
break;
}
else {
addresses = unknown_array;
success = false;
ex = uhe;
}
}
}
// More to do?
if (reqAddr != null && addresses.length > 1 && !addresses[0].equals(reqAddr)) {
// Find it?
int i = 1;
for (; i < addresses.length; i++) {
if (addresses[i].equals(reqAddr)) {
break;
}
}
// Rotate
if (i < addresses.length) {
InetAddress tmp, tmp2 = reqAddr;
for (int j = 0; j < i; j++) {
tmp = addresses[j];
addresses[j] = tmp2;
tmp2 = tmp;
}
addresses[i] = tmp2;
}
}
//关键代码,缓存查询结果
cacheAddresses(host, addresses, success);
if (!success && ex != null)
throw ex;
} finally {
// Delete host from the lookupTable and notify
// all threads waiting on the lookupTable monitor.
updateLookupTable(host);
}
}
return addresses;
}
代码就不再贴了,是通过调用InetAddressCachePolicy来获取缓存时间,
static {
Integer var0 = (Integer)AccessController.doPrivileged(new PrivilegedAction<Integer>() {
public Integer run() {
String var1;
try {
var1 = Security.getProperty("networkaddress.cache.ttl");
if(var1 != null) {
return Integer.valueOf(var1);
}
} catch (NumberFormatException var3) {
;
}
try {
var1 = System.getProperty("sun.net.inetaddr.ttl");
if(var1 != null) {
return Integer.decode(var1);
}
} catch (NumberFormatException var2) {
;
}
return null;
}
});
if(var0 != null) {
cachePolicy = var0.intValue();
if(cachePolicy < 0) {
cachePolicy = -1;
}
propertySet = true;
} else if(System.getSecurityManager() == null) {
cachePolicy = 30;
}
可以看到InetAddressCachePolicy会从几个配置中读,如果读不到,也没有配置SecurityManager,则默认是30秒,也就是本例中的情况。
四、实际验证
上面是我们的分析过程,如何验证呢?
1、将程序跑起来;
2、将域名order.mysql.xx.cn指向新机器;
3、在老的mysql机器上用show processlist显示连接,然后用kill杀掉这些连接;
4、观察新的mysql机器上有没连接过来,程序有没报错
不出意外的话,程序会有一段小报错,然后恢复正常了,所有mysql连接都指向新机器了。