对象池化机制(二)数据库连接池
前面已经讨论过通用池设计、线程池的设计,本文探讨池化机制在数据库连接复用上的应用:数据库连接池。
JDBC API是一个高度抽象的API,每个厂商自己有着自己的实现。大多数针对JDBC层的框架都是对JDBC API的包装,包装成一些高级功能,比如可以对DataSource和Connection进行包装支持连接池功能,可以对Statement进行包装,支持多数据源的分库分表功能。关于JDBC可以参考我之前的文章 JDBC和DBUtils的架构设计。
开发者接触的数据库连接池配置大多数都是在配置数据源,因为数据库连接池通常是在DataSource抽象基础上实现的。
DataSource
我们先来看看数据源javax.sql.DataSource接口的定义:
public interface DataSource extends CommonDataSource, Wrapper {
Connection getConnection() throws SQLException;
Connection getConnection(String username, String password)
throws SQLException;
}
DataSource是官方推荐建立连接的方式,提供了getConnection方法获取连接,很显然,我们可以在连接获取时加入池机制。
commons-dbcp2:基于commons-pool2实现
架构设计

commons-dbcp2是Apache提供的数据库连接池,底层池是基于commons-pool2实现的,上一篇文章《对象池化机制(一)池设计和commons-pool原理》已经详细介绍过commons-pool的原理,从commons-dpcp2的架构设计图可以看出,它的核心功能就是扩展commons-pool提供的PooledObjectFactory接口,定义Connection的创建、初始化、校验等操作。
PoolableConnectionFactory是PooledObjectFactory的具体实现,将创建Connection这部分工作交给了另一个工厂ConnectionFactory去处理。
PoolingDataSource是对数据源接口最基础的实现,对连接池进行了包装。
BasicDataSource是一个更高层的封装,将连接池、数据源等配置都集中在这个类中暴露。
示例:PoolingDataSource
我们先来看一个创建数据源的示例:
@Test
public void testDBCPPool() throws SQLException {
// 创建PooledObjectFactory
ConnectionFactory connectionFactory = new DriverManagerConnectionFactory(url, user,
password);
PoolableConnectionFactory poolableConnectionFactory = new PoolableConnectionFactory(
connectionFactory, null);
// 创建池子
GenericObjectPool<PoolableConnection> connectionPool = new GenericObjectPool<>(
poolableConnectionFactory);
poolableConnectionFactory.setPool(connectionPool);
// 包装池子
PoolingDataSource<PoolableConnection> dataSource = new PoolingDataSource<>(connectionPool);
// process with dataSource
}
BasicDataSource的示例会更简单,这里我们使用更基础的实现PoolingDataSource可以更清楚的看出底层的逻辑与commons-pool2是一致的。
核心组件实现:数据源PoolingDataSource
PoolingDataSource实现了DataSource接口,构造函数要求指定连接池ObjectPool的实现,提供了从连接池获取连接的功能。
public class PoolingDataSource<C extends Connection> implements DataSource, AutoCloseable {
private final ObjectPool<C> pool;
@Override
public Connection getConnection() throws SQLException {
try {
final C conn = pool.borrowObject();
if (conn == null) {
return null;
}
return new PoolGuardConnectionWrapper<>(conn);
} catch (final SQLException e) {
throw e;
} catch (final NoSuchElementException e) {
throw new SQLException("Cannot get a connection, pool error " + e.getMessage(), e);
} catch (final RuntimeException e) {
throw e;
} catch (final InterruptedException e) {
// Reset the interrupt status so it is visible to callers
Thread.currentThread().interrupt();
throw new SQLException("Cannot get a connection, general error", e);
} catch (final Exception e) {
throw new SQLException("Cannot get a connection, general error", e);
}
}
// 略
」
getConnection的实现就是从连接池中借取连接:pool.borrowObject()。
那么,什么时候会将连接归还给连接池呢?答案是在Connection关闭的时候,因为连接池的存在,关闭的时候并不是真的去关闭,而是放回连接池,Connection的封装实现是PoolableConnection。
核心组件实现:连接对象PoolableConnection
PoolableConnection实现了java.sql.Connection接口,它继承DelegatingConnection<Connection>类,真正委托的对象是内部一个具体的Connection实现。
PoolableConnection的close方法实现就是将Connection归还到pool中。
public synchronized void close() throws SQLException {
if (isClosedInternal()) {
// already closed
return;
}
boolean isUnderlyingConnectionClosed;
try {
isUnderlyingConnectionClosed = getDelegateInternal().isClosed();
} catch (final SQLException e) {
try {
pool.invalidateObject(this);
} catch (final IllegalStateException ise) {
// pool is closed, so close the connection
passivate();
getInnermostDelegate().close();
} catch (final Exception ie) {
// DO NOTHING the original exception will be rethrown
}
throw new SQLException("Cannot close connection (isClosed check failed)", e);
}
/*
* Can't set close before this code block since the connection needs to be open when validation runs. Can't set
* close after this code block since by then the connection will have been returned to the pool and may have
* been borrowed by another thread. Therefore, the close flag is set in passivate().
*/
if (isUnderlyingConnectionClosed) {
// Abnormal close: underlying connection closed unexpectedly, so we
// must destroy this proxy
try {
pool.invalidateObject(this);
} catch (final IllegalStateException e) {
// pool is closed, so close the connection
passivate();
getInnermostDelegate().close();
} catch (final Exception e) {
throw new SQLException("Cannot close connection (invalidating pooled object failed)", e);
}
} else {
// Normal close: underlying connection is still open, so we
// simply need to return this proxy to the pool
try {
pool.returnObject(this);
} catch (final IllegalStateException e) {
// pool is closed, so close the connection
passivate();
getInnermostDelegate().close();
} catch (final SQLException e) {
throw e;
} catch (final RuntimeException e) {
throw e;
} catch (final Exception e) {
throw new SQLException("Cannot close connection (return to pool failed)", e);
}
}
}
PoolableConnection不止重写了close方法,对Connection接口其它的方法比如createStatement使用委托模式都做了一层封装,这里就不展开了。
核心组件实现:对象工厂PoolableConnectionFactory
最核心的代码是PooledObjectFactory的实现,即PoolableConnectionFactory,我们一点点来看里面每个方法的实现。
makeObject
在创建连接对象时,也引来了一些新的配置参数:connectionInitSqls、poolPreparedStatements和maxOpenPreparedStatements。
| 配置 | 作用 |
|---|---|
| connectionInitSqls | List |
| poolPreparedStatements | boolean:是否开启Statement池复用,mysql下建议关闭 |
| maxOpenPreparedStatements | int:Statement池上限 |
@Override
public PooledObject<PoolableConnection> makeObject() throws Exception {
// 创建连接
Connection conn = connectionFactory.createConnection();
if (conn == null) {
throw new IllegalStateException("Connection factory returned null from createConnection");
}
try {
// 初始化连接
initializeConnection(conn);
} catch (final SQLException sqle) {
// Make sure the connection is closed
try {
conn.close();
} catch (final SQLException ignore) {
// ignore
}
// Rethrow original exception so it is visible to caller
throw sqle;
}
final long connIndex = connectionIndex.getAndIncrement();
// 处理poolStatements
if (poolStatements) {
conn = new PoolingConnection(conn);
final GenericKeyedObjectPoolConfig<DelegatingPreparedStatement> config = new GenericKeyedObjectPoolConfig<>();
config.setMaxTotalPerKey(-1);
config.setBlockWhenExhausted(false);
config.setMaxWaitMillis(0);
config.setMaxIdlePerKey(1);
config.setMaxTotal(maxOpenPreparedStatements);
if (dataSourceJmxObjectName != null) {
final StringBuilder base = new StringBuilder(dataSourceJmxObjectName.toString());
base.append(Constants.JMX_CONNECTION_BASE_EXT);
base.append(Long.toString(connIndex));
config.setJmxNameBase(base.toString());
config.setJmxNamePrefix(Constants.JMX_STATEMENT_POOL_PREFIX);
} else {
config.setJmxEnabled(false);
}
final PoolingConnection poolingConn = (PoolingConnection) conn;
final KeyedObjectPool<PStmtKey, DelegatingPreparedStatement> stmtPool = new GenericKeyedObjectPool<>(
poolingConn, config);
poolingConn.setStatementPool(stmtPool);
poolingConn.setCacheState(cacheState);
}
// Register this connection with JMX
ObjectName connJmxName;
if (dataSourceJmxObjectName == null) {
connJmxName = null;
} else {
connJmxName = new ObjectName(
dataSourceJmxObjectName.toString() + Constants.JMX_CONNECTION_BASE_EXT + connIndex);
}
// 生成包装连接对象PoolableConnection
final PoolableConnection pc = new PoolableConnection(conn, pool, connJmxName, disconnectionSqlCodes,
fastFailValidation);
pc.setCacheState(cacheState);
// 创建PooledObject
return new DefaultPooledObject<>(pc);
}
简要说明下创建对象的代码:
- 创建连接的工作交由
ConnectionFactory实现,可选实现有DriverConnectionFactory、DriverManagerConnectionFactory和DataSourceConnectionFactory。 - 由于Statement也可以被pool,代码里面对poolStatements做了处理。
- PoolableConnection构造时传入了具体的Connection实现。
- initializeConnection方法正是执行参数connectionInitSqls语句。
validateObject
数据库连接池的校验工作原理是通过一条sql去校验是否能有返回值,所以这里有两个很重要的参数:validationQuery和validationQueryTimeout。
| 配置 | 作用 |
|---|---|
| validationQuery | string:select语句,必须至少有一行返回值 |
| validationQueryTimeout | int:校验查询超时时间,单位秒 |
几乎所有的数据库连接池框架都会有这两个参数来实现校验功能,我们进入源码,看看这两个重要的参数是如何起作用的:
public void validate(final String sql, int timeoutSeconds) throws SQLException {
if (fastFailValidation && fatalSqlExceptionThrown) {
throw new SQLException(Utils.getMessage("poolableConnection.validate.fastFail"));
}
// 没有指定sql
if (sql == null || sql.length() == 0) {
if (timeoutSeconds < 0) {
timeoutSeconds = 0;
}
if (!isValid(timeoutSeconds)) {
throw new SQLException("isValid() returned false");
}
return;
}
if (!sql.equals(lastValidationSql)) {
lastValidationSql = sql;
// Has to be the innermost delegate else the prepared statement will
// be closed when the pooled connection is passivated.
validationPreparedStatement = getInnermostDelegateInternal().prepareStatement(sql);
}
if (timeoutSeconds > 0) {
validationPreparedStatement.setQueryTimeout(timeoutSeconds);
}
// 执行校验
try (ResultSet rs = validationPreparedStatement.executeQuery()) {
if (!rs.next()) {
throw new SQLException("validationQuery didn't return a row");
}
} catch (final SQLException sqle) {
throw sqle;
}
}
我们可以读出一些关于参数的重要实现:
- 如果没有指定sql,则会调用isValid方法区校验,最终会调用
java.sql.Connection.isValid(int)这个方法。 - 执行校验的时候,这条sql必须有返回值,它的判断逻辑是
(!rs.next())。
activateObject
@Override
public void activateObject(final PooledObject<PoolableConnection> p) throws Exception {
validateLifetime(p);
final PoolableConnection conn = p.getObject();
conn.activate();
if (defaultAutoCommit != null && conn.getAutoCommit() != defaultAutoCommit.booleanValue()) {
conn.setAutoCommit(defaultAutoCommit.booleanValue());
}
if (defaultTransactionIsolation != UNKNOWN_TRANSACTION_ISOLATION
&& conn.getTransactionIsolation() != defaultTransactionIsolation) {
conn.setTransactionIsolation(defaultTransactionIsolation);
}
if (defaultReadOnly != null && conn.isReadOnly() != defaultReadOnly.booleanValue()) {
conn.setReadOnly(defaultReadOnly.booleanValue());
}
if (defaultCatalog != null && !defaultCatalog.equals(conn.getCatalog())) {
conn.setCatalog(defaultCatalog);
}
if (defaultSchema != null && !defaultSchema.equals(Jdbc41Bridge.getSchema(conn))) {
Jdbc41Bridge.setSchema(conn, defaultSchema);
}
conn.setDefaultQueryTimeout(defaultQueryTimeoutSeconds);
}
这段代码的功能是重新初始化复用的连接,与之对应的还有passivateObject重置连接的方法。
小结
commons-dbcp2针对具体复用对象Connection的特性在commons—pools上实现了数据库连接池的功能,很好的结合了JDBC API和Commons-Pool API,并且提供了一些贴合的参数配置,如validateQuery。
tomcat-jdbc
tomcat-jdbc是tomcat用来代替DBCP的连接池,具体优势可以参见官方文档,接下来我们主要分析下面三个优势:
- 极简代码,没有依赖commons-pool去实现连接池的功能,自己设计了池ConnectionPool
- 支持配置拦截器JdbcInterceptor,从而实现更多的扩展功能
- 基于JDBC API提供的
javax.sql.ConnectionPoolDataSource和javax.sql.PooledConnection抽象连接
架构设计

图中可以看出,真实的连接对象被封装成PooledConnection类,并且通过代理的方式支持拦截器的功能,连接池的功能是由ConnectionPool实现,org.apache.tomcat.jdbc.pool.DataSource是数据源的实现,因为重新设计的池子不需要有扩展性,所以减少了很多类。
示例
@Test
public void testBasic() throws SQLException {
PoolProperties p = new PoolProperties();
p.setUrl(url);
p.setDriverClassName(driverName);
p.setUsername(user);
p.setPassword(password);
p.setTestWhileIdle(false);
p.setTestOnBorrow(true);
p.setValidationQuery("SELECT 1");
p.setTestOnReturn(false);
p.setValidationInterval(30000);
p.setTimeBetweenEvictionRunsMillis(30000);
p.setMaxActive(100);
p.setInitialSize(10);
p.setMaxWait(10000);
p.setRemoveAbandonedTimeout(60);
p.setMinEvictableIdleTimeMillis(30000);
p.setMinIdle(10);
p.setLogAbandoned(true);
p.setRemoveAbandoned(true);
p.setJdbcInterceptors(
"org.apache.tomcat.jdbc.pool.interceptor.ConnectionState;"+
"org.apache.tomcat.jdbc.pool.interceptor.StatementFinalizer");
DataSource datasource = new org.apache.tomcat.jdbc.pool.DataSource();
datasource.setPoolProperties(p);
// process with datasource
}
从示例代码可以看到,除了使用方式不一样,参数的含义与commons-dbcp2基本是一致的,接下来我们深入几个核心组件,看一看它们的实现原理。
核心组件实现:数据源org.apache.tomcat.jdbc.pool.DataSource
org.apache.tomcat.jdbc.pool.DataSource实现了javax.sql.ConnectionPoolDataSource和javax.sql.DataSource接口,继承自DataSourceProxy。
public Connection getConnection() throws SQLException {
if (pool == null)
return createPool().getConnection();
return pool.getConnection();
}
public javax.sql.PooledConnection getPooledConnection() throws SQLException {
return (javax.sql.PooledConnection) getConnection();
}
public ConnectionPool createPool() throws SQLException {
if (pool != null) {
return pool;
} else {
return pCreatePool();
}
}
private synchronized ConnectionPool pCreatePool() throws SQLException {
if (pool != null) {
return pool;
} else {
pool = new ConnectionPool(poolProperties);
return pool;
}
}
核心方法实现是通过ConnectionPool池子获取连接。需要注意的是javax.sql.PooledConnection和java.sql.Connection并没有继承关系,但是在DataSourceProxy实现中getPooledConnection和getConnection却是返回了同一个对象,可想而知,这个对象的类型同时实现了这两个接口,tomcat-jdbc是通过动态代理实现的,正如架构图中的Connection$Proxy。
核心组件实现:连接对象PooledConnection和代理拦截器ProxyConnection
在讨论具体池子实现前,我们先来看看中转的实体对象PooledConnection,它是对具体实现的java.sql.Connection对象的封装,是连接池中复用的对象,它 内部包含了创建实际连接的功能,这个方法将会在ConnectionPool具体创建连接时调用,我们看看创建连接(可以理解为makeObject)代码PooledConnection.connect():
public void connect() throws SQLException {
if (released.get()) throw new SQLException("A connection once released, can't be reestablished.");
if (connection != null) {
try {
this.disconnect(false);
} catch (Exception x) {
log.debug("Unable to disconnect previous connection.", x);
} //catch
} //end if
//if (poolProperties.getDataSource()==null && poolProperties.getDataSourceJNDI()!=null) {
//TODO lookup JNDI name
//}
if (poolProperties.getDataSource()!=null) {
connectUsingDataSource();
} else {
connectUsingDriver();
}
//set up the default state, unless we expect the interceptor to do it
if (poolProperties.getJdbcInterceptors()==null || poolProperties.getJdbcInterceptors().indexOf(ConnectionState.class.getName())<0 ||
poolProperties.getJdbcInterceptors().indexOf(ConnectionState.class.getSimpleName())<0) {
if (poolProperties.getDefaultTransactionIsolation()!=DataSourceFactory.UNKNOWN_TRANSACTIONISOLATION) connection.setTransactionIsolation(poolProperties.getDefaultTransactionIsolation());
if (poolProperties.getDefaultReadOnly()!=null) connection.setReadOnly(poolProperties.getDefaultReadOnly().booleanValue());
if (poolProperties.getDefaultAutoCommit()!=null) connection.setAutoCommit(poolProperties.getDefaultAutoCommit().booleanValue());
if (poolProperties.getDefaultCatalog()!=null) connection.setCatalog(poolProperties.getDefaultCatalog());
}
this.discarded = false;
this.lastConnected = System.currentTimeMillis();
}
其中关键的两行代码是connectUsingDataSource和connectUsingDriver,里面无非就是通过JDBC API获取连接。
当我们从池子中借取PooledConnection连接后,通过动态代理的方式实现拦截器功能,默认第一个拦截器就是ProxyConnection,这个拦截器委托实体对象PooledConnection执行相关的方法,构造方法传入了实体对象:
protected ProxyConnection(ConnectionPool parent, PooledConnection con,
boolean useEquals) {
pool = parent;
connection = con;
setUseEquals(useEquals);
}
当我们调用Connection的方法时,都会进入ProxyConnection的方法,比如当Connection.close()时,就会在这个拦截器中实现归还回池子的功能:
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (compare(ISCLOSED_VAL,method)) {
return Boolean.valueOf(isClosed());
}
// 归还回池子
if (compare(CLOSE_VAL,method)) {
if (connection==null) return null; //noop for already closed.
PooledConnection poolc = this.connection;
this.connection = null;
pool.returnConnection(poolc);
return null;
} else if (compare(TOSTRING_VAL,method)) {
return this.toString();
} else if (compare(GETCONNECTION_VAL,method) && connection!=null) {
// 如果是javax.sql.PooledConnection的getConnection方法
return connection.getConnection();
} else if (method.getDeclaringClass().isAssignableFrom(XAConnection.class)) {
try {
// XAConnection
return method.invoke(connection.getXAConnection(),args);
}catch (Throwable t) {
if (t instanceof InvocationTargetException) {
throw t.getCause() != null ? t.getCause() : t;
} else {
throw t;
}
}
}
if (isClosed()) throw new SQLException("Connection has already been closed.");
if (compare(UNWRAP_VAL,method)) {
return unwrap((Class<?>)args[0]);
} else if (compare(ISWRAPPERFOR_VAL,method)) {
return Boolean.valueOf(this.isWrapperFor((Class<?>)args[0]));
}
try {
// 委托实际连接处理
PooledConnection poolc = connection;
if (poolc!=null) {
return method.invoke(poolc.getConnection(),args);
} else {
throw new SQLException("Connection has already been closed.");
}
}catch (Throwable t) {
if (t instanceof InvocationTargetException) {
throw t.getCause() != null ? t.getCause() : t;
} else {
throw t;
}
}
}
ProxyConnection拦截器是一个非常巧妙的设计,下文会更深入到介绍拦截器机制,这个拦截器代码的实现可以对java.sql.Connection.class,javax.sql.PooledConnection.class, javax.sql.XAConnection.class三种接口的动态代理类方法进行处理,接下来我们就去看看池子的实现机制,以及它是如何生成动态代理,绑定ProxyConnection拦截器的。
核心组件实现:池子ConnectionPool
在通用池设计中讲过,池子在内部实现都是某种集合的表示,ConnectionPool内部包含了两个集合(被使用集合和空闲集合)表示池子元素:
/**
* Contains all the connections that are in use
* TODO - this shouldn't be a blocking queue, simply a list to hold our objects
*/
private BlockingQueue<PooledConnection> busy;
/**
* Contains all the idle connections
*/
private BlockingQueue<PooledConnection> idle;
接下来我们直接进入一些核心方法的实现。
borrowConnection
private PooledConnection borrowConnection(int wait, String username, String password) throws SQLException {
if (isClosed()) {
throw new SQLException("Connection pool closed.");
} //end if
//get the current time stamp
long now = System.currentTimeMillis();
//see if there is one available immediately
// 从空闲集合中获取连接
PooledConnection con = idle.poll();
while (true) {
if (con!=null) {
// 初始化和校验连接testOnBorrow
//configure the connection and return it
PooledConnection result = borrowConnection(now, con, username, password);
borrowedCount.incrementAndGet();
if (result!=null) return result;
}
//if we get here, see if we need to create one
//this is not 100% accurate since it doesn't use a shared
//atomic variable - a connection can become idle while we are creating
//a new connection
// 上限判断
if (size.get() < getPoolProperties().getMaxActive()) {
//atomic duplicate check
if (size.addAndGet(1) > getPoolProperties().getMaxActive()) {
//if we got here, two threads passed through the first if
size.decrementAndGet();
} else {
//create a connection, we're below the limit
// 创建新连接
return createConnection(now, con, username, password);
}
} //end if
//calculate wait time for this iteration
long maxWait = wait;
//if the passed in wait time is -1, means we should use the pool property value
if (wait==-1) {
maxWait = (getPoolProperties().getMaxWait()<=0)?Long.MAX_VALUE:getPoolProperties().getMaxWait();
}
// 池子已满时最大等待时间
long timetowait = Math.max(0, maxWait - (System.currentTimeMillis() - now));
waitcount.incrementAndGet();
try {
//retrieve an existing connection
con = idle.poll(timetowait, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
if (getPoolProperties().getPropagateInterruptState()) {
Thread.currentThread().interrupt();
}
SQLException sx = new SQLException("Pool wait interrupted.");
sx.initCause(ex);
throw sx;
} finally {
waitcount.decrementAndGet();
}
// 略
} //while
}
这段代码表明了借取对象时如何与配置参数配合的实现,首先从空闲队列获取连接,获取到就进行初始化工作,没有获取到则判断池子是否已满,满了就阻塞等待,否则创建新连接,几点说明如下:
borrowConnection(now, con, username, password)实现了初始化和校验功能createConnection实现是调用了上文说的PooledConnection.connect()方法
getConnection和setupConnection
ConnectionPool除了实现池子功能以外,还实现了池中对象PooledConnection到Connection的转化,具体方法在getConnection中实现,DataSourceProxy正是调用了这方法获取连接。
public Connection getConnection() throws SQLException {
//check out a connection
PooledConnection con = borrowConnection(-1,null,null);
return setupConnection(con);
}
核心代码在setupConnection中:
protected Connection setupConnection(PooledConnection con) throws SQLException {
//fetch previously cached interceptor proxy - one per connection
JdbcInterceptor handler = con.getHandler();
// 初始化拦截器
if (handler==null) {
if (jmxPool != null) con.createMBean();
//build the proxy handler
// 默认都会有ProxyConnection拦截器,实现具体连接方法的调用
handler = new ProxyConnection(this,con,getPoolProperties().isUseEquals());
//set up the interceptor chain
// 获取拦截器配置
PoolProperties.InterceptorDefinition[] proxies = getPoolProperties().getJdbcInterceptorsAsArray();
for (int i=proxies.length-1; i>=0; i--) {
try {
//create a new instance
JdbcInterceptor interceptor = proxies[i].getInterceptorClass().getConstructor().newInstance();
//configure properties
interceptor.setProperties(proxies[i].getProperties());
//setup the chain
interceptor.setNext(handler);
//call reset
interceptor.reset(this, con);
//configure the last one to be held by the connection
handler = interceptor;
}catch(Exception x) {
SQLException sx = new SQLException("Unable to instantiate interceptor chain.");
sx.initCause(x);
throw sx;
}
}
//cache handler for the next iteration
con.setHandler(handler);
} else {
JdbcInterceptor next = handler;
//we have a cached handler, reset it
while (next!=null) {
next.reset(this, con);
next = next.getNext();
}
}
// setup statement proxy
if (getPoolProperties().getUseStatementFacade()) {
handler = new StatementFacade(handler);
}
try {
// 生成动态代理Class
getProxyConstructor(con.getXAConnection() != null);
//create the proxy
//TODO possible optimization, keep track if this connection was returned properly, and don't generate a new facade
Connection connection = null;
// 设置拦截器
if (getPoolProperties().getUseDisposableConnectionFacade() ) {
connection = (Connection)proxyClassConstructor.newInstance(new Object[] { new DisposableConnectionFacade(handler) });
} else {
connection = (Connection)proxyClassConstructor.newInstance(new Object[] {handler});
}
//return the connection
return connection;
}catch (Exception x) {
SQLException s = new SQLException();
s.initCause(x);
throw s;
}
}
这是一段非常重要的代码,实现了拦截器机制,默认拦截器ProxyConnection也在这里体现了它的用法,它还实现了javax.sql.PooledConnection和javax.sql.XAConnection接口的代理,还我们进入getProxyConstructor方法验证下究竟是不是如此:
public Constructor<?> getProxyConstructor(boolean xa) throws NoSuchMethodException {
//cache the constructor
if (proxyClassConstructor == null ) {
Class<?> proxyClass = xa ?
Proxy.getProxyClass(ConnectionPool.class.getClassLoader(), new Class[] {java.sql.Connection.class,javax.sql.PooledConnection.class, javax.sql.XAConnection.class}) :
Proxy.getProxyClass(ConnectionPool.class.getClassLoader(), new Class[] {java.sql.Connection.class,javax.sql.PooledConnection.class});
proxyClassConstructor = proxyClass.getConstructor(new Class[] { InvocationHandler.class });
}
return proxyClassConstructor;
}
所以setupConnection方法实现了PooledConnection到代理类的转换,而这个代理类通过拦截器ProxyConnection实现了具体方法的调用,我们还可以定义和配置更多的拦截器。
核心组件实现:拦截器JdbcInterceptor
ProxyConnection继承了抽象拦截器JdbcInterceptor,实现了接口InvocationHandler。
public abstract class JdbcInterceptor implements InvocationHandler {
/**
* {@link java.sql.Connection#close()} method name
*/
public static final String CLOSE_VAL = "close";
/**
* {@link Object#toString()} method name
*/
public static final String TOSTRING_VAL = "toString";
/**
* {@link java.sql.Connection#isClosed()} method name
*/
public static final String ISCLOSED_VAL = "isClosed";
/**
* {@link javax.sql.PooledConnection#getConnection()} method name
*/
public static final String GETCONNECTION_VAL = "getConnection";
/**
* {@link java.sql.Wrapper#unwrap(Class)} method name
*/
public static final String UNWRAP_VAL = "unwrap";
/**
* {@link java.sql.Wrapper#isWrapperFor(Class)} method name
*/
public static final String ISWRAPPERFOR_VAL = "isWrapperFor";
/**
* {@link java.sql.Connection#isValid(int)} method name
*/
public static final String ISVALID_VAL = "isValid";
/**
* {@link java.lang.Object#equals(Object)}
*/
public static final String EQUALS_VAL = "equals";
/**
* {@link java.lang.Object#hashCode()}
*/
public static final String HASHCODE_VAL = "hashCode";
/**
* Properties for this interceptor.
*/
protected Map<String,InterceptorProperty> properties = null;
/**
* The next interceptor in the chain
*/
private volatile JdbcInterceptor next = null;
/**
* Property that decides how we do string comparison, default is to use
* {@link String#equals(Object)}. If set to <code>false</code> then the
* equality operator (==) is used.
*/
private boolean useEquals = true;
public JdbcInterceptor() {
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (getNext()!=null) return getNext().invoke(proxy,method,args);
else throw new NullPointerException();
}
public JdbcInterceptor getNext() {
return next;
}
public void setNext(JdbcInterceptor next) {
this.next = next;
}
public boolean compare(String name1, String name2) {
if (isUseEquals()) {
return name1.equals(name2);
} else {
return name1==name2;
}
}
public boolean compare(String methodName, Method method) {
return compare(methodName, method.getName());
}
/**
* Gets called each time the connection is borrowed from the pool
* This means that if an interceptor holds a reference to the connection
* the interceptor can be reused for another connection.
* <br>
* This method may be called with null as both arguments when we are closing down the connection.
* @param parent - the connection pool owning the connection
* @param con - the pooled connection
*/
public abstract void reset(ConnectionPool parent, PooledConnection con);
public void disconnected(ConnectionPool parent, PooledConnection con, boolean finalizing) {
}
public Map<String,InterceptorProperty> getProperties() {
return properties;
}
public void setProperties(Map<String,InterceptorProperty> properties) {
this.properties = properties;
final String useEquals = "useEquals";
InterceptorProperty p = properties.get(useEquals);
if (p!=null) {
setUseEquals(Boolean.parseBoolean(p.getValue()));
}
}
public boolean isUseEquals() {
return useEquals;
}
public void setUseEquals(boolean useEquals) {
this.useEquals = useEquals;
}
public void poolClosed(ConnectionPool pool) {
// NOOP
}
public void poolStarted(ConnectionPool pool) {
// NOOP
}
}
JdbcInterceptor定义了一些方法名的常量,其中有个很重要的方法reset(ConnectionPool parent, PooledConnection con)用来重置拦截器的状态,当从池子中借取对象执行setupConnection方法时都会被调用。
tomcat-jdbc提供了一些默认拦截器实现,具体用法参见Java docs:
- org.apache.tomcat.jdbc.pool.interceptor.StatementFinalizer
- org.apache.tomcat.jdbc.pool.interceptor.QueryTimeoutInterceptor
- org.apache.tomcat.jdbc.pool.interceptor.SlowQueryReport
- org.apache.tomcat.jdbc.pool.interceptor.ConnectionState
总结和其它:Druid、HikariCP
我们通过设计和源码比较了commons-dbcp2和tomcat-jdbc在池化机制技术上的实现,tomcat-jdbc通过自己实现池将核心类减少到一定数量。
对于连接池来说,连接池本身的性能消耗在整个调用链路中通常占比不大,现在更加流行的一些连接池(Druid、HikariCP)在性能和监控方面作了很多考量。本篇主要介绍池化机制,关于这些连接池的原理会在未来专题描述。