sayi.github.com icon indicating copy to clipboard operation
sayi.github.com copied to clipboard

对象池化机制(二)数据库连接池

Open Sayi opened this issue 7 years ago • 0 comments

前面已经讨论过通用池设计、线程池的设计,本文探讨池化机制在数据库连接复用上的应用:数据库连接池。

JDBC API是一个高度抽象的API,每个厂商自己有着自己的实现。大多数针对JDBC层的框架都是对JDBC API的包装,包装成一些高级功能,比如可以对DataSource和Connection进行包装支持连接池功能,可以对Statement进行包装,支持多数据源的分库分表功能。关于JDBC可以参考我之前的文章 JDBC和DBUtils的架构设计

开发者接触的数据库连接池配置大多数都是在配置数据源,因为数据库连接池通常是在DataSource抽象基础上实现的。

DataSource

我们先来看看数据源javax.sql.DataSource接口的定义:

public interface DataSource  extends CommonDataSource, Wrapper {
  Connection getConnection() throws SQLException;

  Connection getConnection(String username, String password)
    throws SQLException;
}

DataSource是官方推荐建立连接的方式,提供了getConnection方法获取连接,很显然,我们可以在连接获取时加入池机制。

commons-dbcp2:基于commons-pool2实现

架构设计

image

commons-dbcp2是Apache提供的数据库连接池,底层池是基于commons-pool2实现的,上一篇文章《对象池化机制(一)池设计和commons-pool原理》已经详细介绍过commons-pool的原理,从commons-dpcp2的架构设计图可以看出,它的核心功能就是扩展commons-pool提供的PooledObjectFactory接口,定义Connection的创建、初始化、校验等操作。

PoolableConnectionFactory是PooledObjectFactory的具体实现,将创建Connection这部分工作交给了另一个工厂ConnectionFactory去处理。

PoolingDataSource是对数据源接口最基础的实现,对连接池进行了包装。

BasicDataSource是一个更高层的封装,将连接池、数据源等配置都集中在这个类中暴露。

示例:PoolingDataSource

我们先来看一个创建数据源的示例:

@Test
public void testDBCPPool() throws SQLException {
  // 创建PooledObjectFactory
  ConnectionFactory connectionFactory = new DriverManagerConnectionFactory(url, user,
      password);
  PoolableConnectionFactory poolableConnectionFactory = new PoolableConnectionFactory(
      connectionFactory, null);

  // 创建池子
  GenericObjectPool<PoolableConnection> connectionPool = new GenericObjectPool<>(
      poolableConnectionFactory);
  poolableConnectionFactory.setPool(connectionPool);

  // 包装池子
  PoolingDataSource<PoolableConnection> dataSource = new PoolingDataSource<>(connectionPool);

  // process with dataSource
}

BasicDataSource的示例会更简单,这里我们使用更基础的实现PoolingDataSource可以更清楚的看出底层的逻辑与commons-pool2是一致的。

核心组件实现:数据源PoolingDataSource

PoolingDataSource实现了DataSource接口,构造函数要求指定连接池ObjectPool的实现,提供了从连接池获取连接的功能。

public class PoolingDataSource<C extends Connection> implements DataSource, AutoCloseable {
  private final ObjectPool<C> pool;

  @Override
  public Connection getConnection() throws SQLException {
    try {
      final C conn = pool.borrowObject();
      if (conn == null) {
        return null;
    }
      return new PoolGuardConnectionWrapper<>(conn);
    } catch (final SQLException e) {
      throw e;
    } catch (final NoSuchElementException e) {
      throw new SQLException("Cannot get a connection, pool error " + e.getMessage(), e);
    } catch (final RuntimeException e) {
      throw e;
    } catch (final InterruptedException e) {
      // Reset the interrupt status so it is visible to callers
      Thread.currentThread().interrupt();
      throw new SQLException("Cannot get a connection, general error", e);
    } catch (final Exception e) {
      throw new SQLException("Cannot get a connection, general error", e);
    }
  }
  // 略
」

getConnection的实现就是从连接池中借取连接:pool.borrowObject()

那么,什么时候会将连接归还给连接池呢?答案是在Connection关闭的时候,因为连接池的存在,关闭的时候并不是真的去关闭,而是放回连接池,Connection的封装实现是PoolableConnection。

核心组件实现:连接对象PoolableConnection

PoolableConnection实现了java.sql.Connection接口,它继承DelegatingConnection<Connection>类,真正委托的对象是内部一个具体的Connection实现。

PoolableConnection的close方法实现就是将Connection归还到pool中。

public synchronized void close() throws SQLException {
  if (isClosedInternal()) {
    // already closed
    return;
  }

  boolean isUnderlyingConnectionClosed;
  try {
    isUnderlyingConnectionClosed = getDelegateInternal().isClosed();
  } catch (final SQLException e) {
    try {
      pool.invalidateObject(this);
    } catch (final IllegalStateException ise) {
      // pool is closed, so close the connection
      passivate();
      getInnermostDelegate().close();
    } catch (final Exception ie) {
      // DO NOTHING the original exception will be rethrown
    }
    throw new SQLException("Cannot close connection (isClosed check failed)", e);
  }

  /*
   * Can't set close before this code block since the connection needs to be open when validation runs. Can't set
   * close after this code block since by then the connection will have been returned to the pool and may have
   * been borrowed by another thread. Therefore, the close flag is set in passivate().
   */
  if (isUnderlyingConnectionClosed) {
    // Abnormal close: underlying connection closed unexpectedly, so we
    // must destroy this proxy
    try {
      pool.invalidateObject(this);
    } catch (final IllegalStateException e) {
      // pool is closed, so close the connection
      passivate();
      getInnermostDelegate().close();
    } catch (final Exception e) {
      throw new SQLException("Cannot close connection (invalidating pooled object failed)", e);
    }
  } else {
    // Normal close: underlying connection is still open, so we
    // simply need to return this proxy to the pool
    try {
      pool.returnObject(this);
    } catch (final IllegalStateException e) {
      // pool is closed, so close the connection
      passivate();
      getInnermostDelegate().close();
    } catch (final SQLException e) {
      throw e;
    } catch (final RuntimeException e) {
      throw e;
    } catch (final Exception e) {
      throw new SQLException("Cannot close connection (return to pool failed)", e);
    }
  }
}

PoolableConnection不止重写了close方法,对Connection接口其它的方法比如createStatement使用委托模式都做了一层封装,这里就不展开了。

核心组件实现:对象工厂PoolableConnectionFactory

最核心的代码是PooledObjectFactory的实现,即PoolableConnectionFactory,我们一点点来看里面每个方法的实现。

makeObject

在创建连接对象时,也引来了一些新的配置参数:connectionInitSqls、poolPreparedStatements和maxOpenPreparedStatements。

配置 作用
connectionInitSqls List:创建连接时执行的sql语句
poolPreparedStatements boolean:是否开启Statement池复用,mysql下建议关闭
maxOpenPreparedStatements int:Statement池上限
@Override
public PooledObject<PoolableConnection> makeObject() throws Exception {
  // 创建连接
  Connection conn = connectionFactory.createConnection();
  if (conn == null) {
    throw new IllegalStateException("Connection factory returned null from createConnection");
  }
  try {
    // 初始化连接
    initializeConnection(conn);
  } catch (final SQLException sqle) {
    // Make sure the connection is closed
    try {
      conn.close();
    } catch (final SQLException ignore) {
      // ignore
    }
    // Rethrow original exception so it is visible to caller
    throw sqle;
  }

  final long connIndex = connectionIndex.getAndIncrement();

  // 处理poolStatements
  if (poolStatements) {
    conn = new PoolingConnection(conn);
    final GenericKeyedObjectPoolConfig<DelegatingPreparedStatement> config = new GenericKeyedObjectPoolConfig<>();
    config.setMaxTotalPerKey(-1);
    config.setBlockWhenExhausted(false);
    config.setMaxWaitMillis(0);
    config.setMaxIdlePerKey(1);
    config.setMaxTotal(maxOpenPreparedStatements);
    if (dataSourceJmxObjectName != null) {
      final StringBuilder base = new StringBuilder(dataSourceJmxObjectName.toString());
      base.append(Constants.JMX_CONNECTION_BASE_EXT);
      base.append(Long.toString(connIndex));
      config.setJmxNameBase(base.toString());
      config.setJmxNamePrefix(Constants.JMX_STATEMENT_POOL_PREFIX);
    } else {
      config.setJmxEnabled(false);
    }
    final PoolingConnection poolingConn = (PoolingConnection) conn;
    final KeyedObjectPool<PStmtKey, DelegatingPreparedStatement> stmtPool = new GenericKeyedObjectPool<>(
        poolingConn, config);
    poolingConn.setStatementPool(stmtPool);
    poolingConn.setCacheState(cacheState);
  }

  // Register this connection with JMX
  ObjectName connJmxName;
  if (dataSourceJmxObjectName == null) {
    connJmxName = null;
  } else {
    connJmxName = new ObjectName(
        dataSourceJmxObjectName.toString() + Constants.JMX_CONNECTION_BASE_EXT + connIndex);
  }

  // 生成包装连接对象PoolableConnection
  final PoolableConnection pc = new PoolableConnection(conn, pool, connJmxName, disconnectionSqlCodes,
      fastFailValidation);
  pc.setCacheState(cacheState);

  // 创建PooledObject
  return new DefaultPooledObject<>(pc);
}

简要说明下创建对象的代码:

  1. 创建连接的工作交由ConnectionFactory实现,可选实现有DriverConnectionFactoryDriverManagerConnectionFactoryDataSourceConnectionFactory
  2. 由于Statement也可以被pool,代码里面对poolStatements做了处理。
  3. PoolableConnection构造时传入了具体的Connection实现。
  4. initializeConnection方法正是执行参数connectionInitSqls语句。

validateObject

数据库连接池的校验工作原理是通过一条sql去校验是否能有返回值,所以这里有两个很重要的参数:validationQuery和validationQueryTimeout。

配置 作用
validationQuery string:select语句,必须至少有一行返回值
validationQueryTimeout int:校验查询超时时间,单位秒

几乎所有的数据库连接池框架都会有这两个参数来实现校验功能,我们进入源码,看看这两个重要的参数是如何起作用的:

public void validate(final String sql, int timeoutSeconds) throws SQLException {
  if (fastFailValidation && fatalSqlExceptionThrown) {
    throw new SQLException(Utils.getMessage("poolableConnection.validate.fastFail"));
  }

  // 没有指定sql
  if (sql == null || sql.length() == 0) {
    if (timeoutSeconds < 0) {
      timeoutSeconds = 0;
    }
    if (!isValid(timeoutSeconds)) {
      throw new SQLException("isValid() returned false");
    }
    return;
  }

  if (!sql.equals(lastValidationSql)) {
    lastValidationSql = sql;
    // Has to be the innermost delegate else the prepared statement will
    // be closed when the pooled connection is passivated.
    validationPreparedStatement = getInnermostDelegateInternal().prepareStatement(sql);
  }

  if (timeoutSeconds > 0) {
    validationPreparedStatement.setQueryTimeout(timeoutSeconds);
  }

  // 执行校验
  try (ResultSet rs = validationPreparedStatement.executeQuery()) {
    if (!rs.next()) {
      throw new SQLException("validationQuery didn't return a row");
    }
  } catch (final SQLException sqle) {
    throw sqle;
  }
}

我们可以读出一些关于参数的重要实现:

  1. 如果没有指定sql,则会调用isValid方法区校验,最终会调用java.sql.Connection.isValid(int)这个方法。
  2. 执行校验的时候,这条sql必须有返回值,它的判断逻辑是 (!rs.next())

activateObject

@Override
public void activateObject(final PooledObject<PoolableConnection> p) throws Exception {

  validateLifetime(p);

  final PoolableConnection conn = p.getObject();
  conn.activate();

  if (defaultAutoCommit != null && conn.getAutoCommit() != defaultAutoCommit.booleanValue()) {
    conn.setAutoCommit(defaultAutoCommit.booleanValue());
  }
  if (defaultTransactionIsolation != UNKNOWN_TRANSACTION_ISOLATION
      && conn.getTransactionIsolation() != defaultTransactionIsolation) {
    conn.setTransactionIsolation(defaultTransactionIsolation);
  }
  if (defaultReadOnly != null && conn.isReadOnly() != defaultReadOnly.booleanValue()) {
    conn.setReadOnly(defaultReadOnly.booleanValue());
  }
  if (defaultCatalog != null && !defaultCatalog.equals(conn.getCatalog())) {
    conn.setCatalog(defaultCatalog);
  }
  if (defaultSchema != null && !defaultSchema.equals(Jdbc41Bridge.getSchema(conn))) {
    Jdbc41Bridge.setSchema(conn, defaultSchema);
  }
  conn.setDefaultQueryTimeout(defaultQueryTimeoutSeconds);
}

这段代码的功能是重新初始化复用的连接,与之对应的还有passivateObject重置连接的方法。

小结

commons-dbcp2针对具体复用对象Connection的特性在commons—pools上实现了数据库连接池的功能,很好的结合了JDBC API和Commons-Pool API,并且提供了一些贴合的参数配置,如validateQuery。

tomcat-jdbc

tomcat-jdbc是tomcat用来代替DBCP的连接池,具体优势可以参见官方文档,接下来我们主要分析下面三个优势:

  1. 极简代码,没有依赖commons-pool去实现连接池的功能,自己设计了池ConnectionPool
  2. 支持配置拦截器JdbcInterceptor,从而实现更多的扩展功能
  3. 基于JDBC API提供的javax.sql.ConnectionPoolDataSourcejavax.sql.PooledConnection抽象连接

官方文档:The Tomcat JDBC Connection Pool

架构设计

image

图中可以看出,真实的连接对象被封装成PooledConnection类,并且通过代理的方式支持拦截器的功能,连接池的功能是由ConnectionPool实现,org.apache.tomcat.jdbc.pool.DataSource是数据源的实现,因为重新设计的池子不需要有扩展性,所以减少了很多类。

示例

@Test
public void testBasic() throws SQLException {
  PoolProperties p = new PoolProperties();
  p.setUrl(url);
  p.setDriverClassName(driverName);
  p.setUsername(user);
  p.setPassword(password);
  p.setTestWhileIdle(false);
  p.setTestOnBorrow(true);
  p.setValidationQuery("SELECT 1");
  p.setTestOnReturn(false);
  p.setValidationInterval(30000);
  p.setTimeBetweenEvictionRunsMillis(30000);
  p.setMaxActive(100);
  p.setInitialSize(10);
  p.setMaxWait(10000);
  p.setRemoveAbandonedTimeout(60);
  p.setMinEvictableIdleTimeMillis(30000);
  p.setMinIdle(10);
  p.setLogAbandoned(true);
  p.setRemoveAbandoned(true);
  p.setJdbcInterceptors(
    "org.apache.tomcat.jdbc.pool.interceptor.ConnectionState;"+
    "org.apache.tomcat.jdbc.pool.interceptor.StatementFinalizer");
  DataSource datasource = new org.apache.tomcat.jdbc.pool.DataSource();
  datasource.setPoolProperties(p);

  // process with datasource
}

从示例代码可以看到,除了使用方式不一样,参数的含义与commons-dbcp2基本是一致的,接下来我们深入几个核心组件,看一看它们的实现原理。

核心组件实现:数据源org.apache.tomcat.jdbc.pool.DataSource

org.apache.tomcat.jdbc.pool.DataSource实现了javax.sql.ConnectionPoolDataSourcejavax.sql.DataSource接口,继承自DataSourceProxy

public Connection getConnection() throws SQLException {
  if (pool == null)
    return createPool().getConnection();
  return pool.getConnection();
}

public javax.sql.PooledConnection getPooledConnection() throws SQLException {
  return (javax.sql.PooledConnection) getConnection();
}

public ConnectionPool createPool() throws SQLException {
  if (pool != null) {
    return pool;
  } else {
    return pCreatePool();
  }
}

private synchronized ConnectionPool pCreatePool() throws SQLException {
  if (pool != null) {
    return pool;
  } else {
    pool = new ConnectionPool(poolProperties);
    return pool;
  }
}

核心方法实现是通过ConnectionPool池子获取连接。需要注意的是javax.sql.PooledConnectionjava.sql.Connection并没有继承关系,但是在DataSourceProxy实现中getPooledConnection和getConnection却是返回了同一个对象,可想而知,这个对象的类型同时实现了这两个接口,tomcat-jdbc是通过动态代理实现的,正如架构图中的Connection$Proxy

核心组件实现:连接对象PooledConnection和代理拦截器ProxyConnection

在讨论具体池子实现前,我们先来看看中转的实体对象PooledConnection,它是对具体实现的java.sql.Connection对象的封装,是连接池中复用的对象,它 内部包含了创建实际连接的功能,这个方法将会在ConnectionPool具体创建连接时调用,我们看看创建连接(可以理解为makeObject)代码PooledConnection.connect():

public void connect() throws SQLException {
  if (released.get()) throw new SQLException("A connection once released, can't be reestablished.");
  if (connection != null) {
    try {
      this.disconnect(false);
    } catch (Exception x) {
      log.debug("Unable to disconnect previous connection.", x);
    } //catch
  } //end if
  //if (poolProperties.getDataSource()==null && poolProperties.getDataSourceJNDI()!=null) {
    //TODO lookup JNDI name
  //}

  if (poolProperties.getDataSource()!=null) {
    connectUsingDataSource();
  } else {
    connectUsingDriver();
  }

  //set up the default state, unless we expect the interceptor to do it
  if (poolProperties.getJdbcInterceptors()==null || poolProperties.getJdbcInterceptors().indexOf(ConnectionState.class.getName())<0 ||
      poolProperties.getJdbcInterceptors().indexOf(ConnectionState.class.getSimpleName())<0) {
    if (poolProperties.getDefaultTransactionIsolation()!=DataSourceFactory.UNKNOWN_TRANSACTIONISOLATION) connection.setTransactionIsolation(poolProperties.getDefaultTransactionIsolation());
    if (poolProperties.getDefaultReadOnly()!=null) connection.setReadOnly(poolProperties.getDefaultReadOnly().booleanValue());
    if (poolProperties.getDefaultAutoCommit()!=null) connection.setAutoCommit(poolProperties.getDefaultAutoCommit().booleanValue());
    if (poolProperties.getDefaultCatalog()!=null) connection.setCatalog(poolProperties.getDefaultCatalog());
  }
  this.discarded = false;
  this.lastConnected = System.currentTimeMillis();
}

其中关键的两行代码是connectUsingDataSource和connectUsingDriver,里面无非就是通过JDBC API获取连接。

当我们从池子中借取PooledConnection连接后,通过动态代理的方式实现拦截器功能,默认第一个拦截器就是ProxyConnection,这个拦截器委托实体对象PooledConnection执行相关的方法,构造方法传入了实体对象:

protected ProxyConnection(ConnectionPool parent, PooledConnection con,
    boolean useEquals) {
  pool = parent;
  connection = con;
  setUseEquals(useEquals);
}

当我们调用Connection的方法时,都会进入ProxyConnection的方法,比如当Connection.close()时,就会在这个拦截器中实现归还回池子的功能:

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  if (compare(ISCLOSED_VAL,method)) {
    return Boolean.valueOf(isClosed());
  }
  // 归还回池子
  if (compare(CLOSE_VAL,method)) {
    if (connection==null) return null; //noop for already closed.
    PooledConnection poolc = this.connection;
    this.connection = null;
    pool.returnConnection(poolc);
    return null;
  } else if (compare(TOSTRING_VAL,method)) {
    return this.toString();
  } else if (compare(GETCONNECTION_VAL,method) && connection!=null) {
    // 如果是javax.sql.PooledConnection的getConnection方法
    return connection.getConnection();
  } else if (method.getDeclaringClass().isAssignableFrom(XAConnection.class)) {
    try {
      // XAConnection
      return method.invoke(connection.getXAConnection(),args);
    }catch (Throwable t) {
      if (t instanceof InvocationTargetException) {
        throw t.getCause() != null ? t.getCause() : t;
      } else {
        throw t;
      }
    }
  }
  if (isClosed()) throw new SQLException("Connection has already been closed.");
  if (compare(UNWRAP_VAL,method)) {
    return unwrap((Class<?>)args[0]);
  } else if (compare(ISWRAPPERFOR_VAL,method)) {
    return Boolean.valueOf(this.isWrapperFor((Class<?>)args[0]));
  }
  try {
    // 委托实际连接处理
    PooledConnection poolc = connection;
    if (poolc!=null) {
      return method.invoke(poolc.getConnection(),args);
    } else {
      throw new SQLException("Connection has already been closed.");
    }
  }catch (Throwable t) {
    if (t instanceof InvocationTargetException) {
      throw t.getCause() != null ? t.getCause() : t;
    } else {
      throw t;
    }
  }
}

ProxyConnection拦截器是一个非常巧妙的设计,下文会更深入到介绍拦截器机制,这个拦截器代码的实现可以对java.sql.Connection.class,javax.sql.PooledConnection.class, javax.sql.XAConnection.class三种接口的动态代理类方法进行处理,接下来我们就去看看池子的实现机制,以及它是如何生成动态代理,绑定ProxyConnection拦截器的。

核心组件实现:池子ConnectionPool

在通用池设计中讲过,池子在内部实现都是某种集合的表示,ConnectionPool内部包含了两个集合(被使用集合和空闲集合)表示池子元素:

/**
 * Contains all the connections that are in use
 * TODO - this shouldn't be a blocking queue, simply a list to hold our objects
 */
private BlockingQueue<PooledConnection> busy;

/**
 * Contains all the idle connections
 */
private BlockingQueue<PooledConnection> idle;

接下来我们直接进入一些核心方法的实现。

borrowConnection

private PooledConnection borrowConnection(int wait, String username, String password) throws SQLException {

  if (isClosed()) {
    throw new SQLException("Connection pool closed.");
  } //end if

  //get the current time stamp
  long now = System.currentTimeMillis();
  //see if there is one available immediately
  // 从空闲集合中获取连接
  PooledConnection con = idle.poll();

  while (true) {
    if (con!=null) {
      // 初始化和校验连接testOnBorrow
      //configure the connection and return it
      PooledConnection result = borrowConnection(now, con, username, password);
      borrowedCount.incrementAndGet();
      if (result!=null) return result;
    }

    //if we get here, see if we need to create one
    //this is not 100% accurate since it doesn't use a shared
    //atomic variable - a connection can become idle while we are creating
    //a new connection
    // 上限判断
    if (size.get() < getPoolProperties().getMaxActive()) {
      //atomic duplicate check
      if (size.addAndGet(1) > getPoolProperties().getMaxActive()) {
        //if we got here, two threads passed through the first if
        size.decrementAndGet();
      } else {
        //create a connection, we're below the limit
        // 创建新连接
        return createConnection(now, con, username, password);
      }
    } //end if

    //calculate wait time for this iteration
    long maxWait = wait;
    //if the passed in wait time is -1, means we should use the pool property value
    if (wait==-1) {
      maxWait = (getPoolProperties().getMaxWait()<=0)?Long.MAX_VALUE:getPoolProperties().getMaxWait();
    }

    // 池子已满时最大等待时间
    long timetowait = Math.max(0, maxWait - (System.currentTimeMillis() - now));
    waitcount.incrementAndGet();
    try {
      //retrieve an existing connection
      con = idle.poll(timetowait, TimeUnit.MILLISECONDS);
    } catch (InterruptedException ex) {
      if (getPoolProperties().getPropagateInterruptState()) {
        Thread.currentThread().interrupt();
      }
      SQLException sx = new SQLException("Pool wait interrupted.");
      sx.initCause(ex);
      throw sx;
    } finally {
      waitcount.decrementAndGet();
    }
    // 略
  } //while
}

这段代码表明了借取对象时如何与配置参数配合的实现,首先从空闲队列获取连接,获取到就进行初始化工作,没有获取到则判断池子是否已满,满了就阻塞等待,否则创建新连接,几点说明如下:

  1. borrowConnection(now, con, username, password)实现了初始化和校验功能
  2. createConnection实现是调用了上文说的PooledConnection.connect()方法

getConnection和setupConnection

ConnectionPool除了实现池子功能以外,还实现了池中对象PooledConnection到Connection的转化,具体方法在getConnection中实现,DataSourceProxy正是调用了这方法获取连接。

public Connection getConnection() throws SQLException {
  //check out a connection
  PooledConnection con = borrowConnection(-1,null,null);
  return setupConnection(con);
}

核心代码在setupConnection中:

protected Connection setupConnection(PooledConnection con) throws SQLException {
  //fetch previously cached interceptor proxy - one per connection
  JdbcInterceptor handler = con.getHandler();
  // 初始化拦截器
  if (handler==null) {
    if (jmxPool != null) con.createMBean();
    //build the proxy handler
    // 默认都会有ProxyConnection拦截器,实现具体连接方法的调用
    handler = new ProxyConnection(this,con,getPoolProperties().isUseEquals());
    //set up the interceptor chain
    // 获取拦截器配置
    PoolProperties.InterceptorDefinition[] proxies = getPoolProperties().getJdbcInterceptorsAsArray();
    for (int i=proxies.length-1; i>=0; i--) {
      try {
        //create a new instance
        JdbcInterceptor interceptor = proxies[i].getInterceptorClass().getConstructor().newInstance();
        //configure properties
        interceptor.setProperties(proxies[i].getProperties());
        //setup the chain
        interceptor.setNext(handler);
        //call reset
        interceptor.reset(this, con);
        //configure the last one to be held by the connection
        handler = interceptor;
      }catch(Exception x) {
        SQLException sx = new SQLException("Unable to instantiate interceptor chain.");
        sx.initCause(x);
        throw sx;
      }
    }
    //cache handler for the next iteration
    con.setHandler(handler);
  } else {
    JdbcInterceptor next = handler;
    //we have a cached handler, reset it
    while (next!=null) {
      next.reset(this, con);
      next = next.getNext();
    }
  }
  // setup statement proxy
  if (getPoolProperties().getUseStatementFacade()) {
    handler = new StatementFacade(handler);
  }
  try {
    // 生成动态代理Class
    getProxyConstructor(con.getXAConnection() != null);
    //create the proxy
    //TODO possible optimization, keep track if this connection was returned properly, and don't generate a new facade
    Connection connection = null;
    // 设置拦截器
    if (getPoolProperties().getUseDisposableConnectionFacade() ) {
      connection = (Connection)proxyClassConstructor.newInstance(new Object[] { new DisposableConnectionFacade(handler) });
    } else {
      connection = (Connection)proxyClassConstructor.newInstance(new Object[] {handler});
    }
    //return the connection
    return connection;
  }catch (Exception x) {
    SQLException s = new SQLException();
    s.initCause(x);
    throw s;
  }
}

这是一段非常重要的代码,实现了拦截器机制,默认拦截器ProxyConnection也在这里体现了它的用法,它还实现了javax.sql.PooledConnection和javax.sql.XAConnection接口的代理,还我们进入getProxyConstructor方法验证下究竟是不是如此:

public Constructor<?> getProxyConstructor(boolean xa) throws NoSuchMethodException {
  //cache the constructor
  if (proxyClassConstructor == null ) {
    Class<?> proxyClass = xa ?
      Proxy.getProxyClass(ConnectionPool.class.getClassLoader(), new Class[] {java.sql.Connection.class,javax.sql.PooledConnection.class, javax.sql.XAConnection.class}) :
      Proxy.getProxyClass(ConnectionPool.class.getClassLoader(), new Class[] {java.sql.Connection.class,javax.sql.PooledConnection.class});
    proxyClassConstructor = proxyClass.getConstructor(new Class[] { InvocationHandler.class });
  }
  return proxyClassConstructor;
}

所以setupConnection方法实现了PooledConnection到代理类的转换,而这个代理类通过拦截器ProxyConnection实现了具体方法的调用,我们还可以定义和配置更多的拦截器。

核心组件实现:拦截器JdbcInterceptor

ProxyConnection继承了抽象拦截器JdbcInterceptor,实现了接口InvocationHandler

public abstract class JdbcInterceptor implements InvocationHandler {
  /**
   * {@link java.sql.Connection#close()} method name
   */
  public static final String CLOSE_VAL = "close";
  /**
   * {@link Object#toString()} method name
   */
  public static final String TOSTRING_VAL = "toString";
  /**
   * {@link java.sql.Connection#isClosed()} method name
   */
  public static final String ISCLOSED_VAL = "isClosed";
  /**
   * {@link javax.sql.PooledConnection#getConnection()} method name
   */
  public static final String GETCONNECTION_VAL = "getConnection";
  /**
   * {@link java.sql.Wrapper#unwrap(Class)} method name
   */
  public static final String UNWRAP_VAL = "unwrap";
  /**
   * {@link java.sql.Wrapper#isWrapperFor(Class)} method name
   */
  public static final String ISWRAPPERFOR_VAL = "isWrapperFor";

  /**
   * {@link java.sql.Connection#isValid(int)} method name
   */
  public static final String ISVALID_VAL = "isValid";

  /**
   * {@link java.lang.Object#equals(Object)}
   */
  public static final String EQUALS_VAL = "equals";

  /**
   * {@link java.lang.Object#hashCode()}
   */
  public static final String HASHCODE_VAL = "hashCode";

  /**
   * Properties for this interceptor.
   */
  protected Map<String,InterceptorProperty> properties = null;

  /**
   * The next interceptor in the chain
   */
  private volatile JdbcInterceptor next = null;
  /**
   * Property that decides how we do string comparison, default is to use
   * {@link String#equals(Object)}. If set to <code>false</code> then the
   * equality operator (==) is used.
   */
  private boolean useEquals = true;

  public JdbcInterceptor() {
  }

  @Override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    if (getNext()!=null) return getNext().invoke(proxy,method,args);
    else throw new NullPointerException();
  }

  public JdbcInterceptor getNext() {
    return next;
  }
  public void setNext(JdbcInterceptor next) {
    this.next = next;
  }
  public boolean compare(String name1, String name2) {
    if (isUseEquals()) {
      return name1.equals(name2);
    } else {
      return name1==name2;
    }
  }
  public boolean compare(String methodName, Method method) {
    return compare(methodName, method.getName());
  }

  /**
   * Gets called each time the connection is borrowed from the pool
   * This means that if an interceptor holds a reference to the connection
   * the interceptor can be reused for another connection.
   * <br>
   * This method may be called with null as both arguments when we are closing down the connection.
   * @param parent - the connection pool owning the connection
   * @param con - the pooled connection
   */
  public abstract void reset(ConnectionPool parent, PooledConnection con);

  public void disconnected(ConnectionPool parent, PooledConnection con, boolean finalizing) {
  }

  public Map<String,InterceptorProperty> getProperties() {
    return properties;
  }
  public void setProperties(Map<String,InterceptorProperty> properties) {
    this.properties = properties;
    final String useEquals = "useEquals";
    InterceptorProperty p = properties.get(useEquals);
    if (p!=null) {
      setUseEquals(Boolean.parseBoolean(p.getValue()));
    }
  }
  public boolean isUseEquals() {
    return useEquals;
  }
  public void setUseEquals(boolean useEquals) {
    this.useEquals = useEquals;
  }
  public void poolClosed(ConnectionPool pool) {
    // NOOP
  }
  public void poolStarted(ConnectionPool pool) {
    // NOOP
  }

}

JdbcInterceptor定义了一些方法名的常量,其中有个很重要的方法reset(ConnectionPool parent, PooledConnection con)用来重置拦截器的状态,当从池子中借取对象执行setupConnection方法时都会被调用。

tomcat-jdbc提供了一些默认拦截器实现,具体用法参见Java docs:

  • org.apache.tomcat.jdbc.pool.interceptor.StatementFinalizer
  • org.apache.tomcat.jdbc.pool.interceptor.QueryTimeoutInterceptor
  • org.apache.tomcat.jdbc.pool.interceptor.SlowQueryReport
  • org.apache.tomcat.jdbc.pool.interceptor.ConnectionState

总结和其它:Druid、HikariCP

我们通过设计和源码比较了commons-dbcp2和tomcat-jdbc在池化机制技术上的实现,tomcat-jdbc通过自己实现池将核心类减少到一定数量。

对于连接池来说,连接池本身的性能消耗在整个调用链路中通常占比不大,现在更加流行的一些连接池(Druid、HikariCP)在性能和监控方面作了很多考量。本篇主要介绍池化机制,关于这些连接池的原理会在未来专题描述。

Druid连接池介绍 HikariCP: Down the Rabbit Hole

Sayi avatar Dec 29 '18 04:12 Sayi