sayi.github.com icon indicating copy to clipboard operation
sayi.github.com copied to clipboard

对象池化机制(一)池设计和commons-pool原理

Open Sayi opened this issue 7 years ago • 0 comments

本系列文章探讨对象池化技术。

池化机制的好处

说到池我们可以联想到很多概念,比如线程池、数据库连接池、常量池等等。在多线程设计中,可以通过池化机制复用对象以达到提高性能的目的,池的核心优势是对 对象的复用,这样就免去了“对象重复创建”的时间,尤其当创建对象本身是一个耗时的事情,比如IO操作。

拿我们最常见的线程池为例,线程这个对象是可以被复用的,程序要执行的是任务,这些任务可以交给复用的线程来处理,而线程的创建恰恰又是一个比较耗时的操作,我们通过线程对象的池化技术达到复用线程的目的。

与多线程+池的设计还有一种类似的设计:单线程+IO多路复用,主要用来解决IO密集型的问题。

实现一个池要考虑哪些因素?

设计一个池的核心要素无非分成两个方面:池子+对象。

池子 要考虑大小,池子过大可能会占用过多的系统性能,池子过小可能由于无法获取对象而阻塞线程。当我们很确定自己需要多大的池来执行,可以使用fixed大小的池子,我们也可以设计一个对象个数动态变化的池子:池子有一个最大值maxTotal和最小值minIdle,最大值是对象个数的上限,当池子一段时间没有使用后,就去回收超过最小值个数的对象,这样在系统繁忙时,就可以充分复用对象,在系统空闲时,又可以释放不必要的对象。

对象 必须是可以被复用的,且创建对象应该是耗时的才值得被复用。可以被复用是一个特别重要的点,当你的对象做了一些不可复用的操作后,必须在放回池中的时候重置这些设置,或者说从池子中取出对象时,都要重新进行初始化。

池的配置

本节我们来看看池子的一些通用配置,可能命名和你见过的一些池子的配置名称不一样,但是它们的含义是一致的。

maxTotal、maxIdle、minIdle

我们先来看看关于池大小的几个配置。

配置 作用
maxTotal int:池子空间最大值,负数表示不设上限,别名:maxActive
maxIdle int:最大的空闲对象个数,超过此值的对象放回池子的时候都会被释放
minIdle int:最小的空闲对象个数,无论对象如何被释放,保证池子里面最少的对象个数

maxIdle参数实际意义并不是很大,通常设置为maxTotal值,minIdle和许多池配置参数initialSize初始化大小在实现含义中有着异曲同工之用。

blockWhenExhausted、maxWaitMillis

当池子已满,通过参数来定义从池子获取对象的行为。

配置 作用
blockWhenExhausted boolean:池子已满,从池子获取对象时是否阻塞
maxWaitMillis long:池子已满,从池子获取对象最大等待时间毫秒数

如果在最大等待时间未能获取到对象,可以抛出throw new NoSuchElementException("Timeout waiting for idle object");异常。

testOnCreate、testOnBorrow、testOnReturn、testWhileIdle

这几个参数定义了在某个节点校验对象的有效性

配置 作用
testOnCreate boolean:是否在创建对象后进行校验
testOnBorrow boolean:是否从池子借取对象时校验
testOnReturn boolean:是否向池子归还对象时校验
testWhileIdle boolean:这个概念需要结合下文驱赶线程来理解,当对象不被驱赶时是否要进行校验

关于如何校验,池设计都会把校验规则开放出去让用户自己定义,下文提到commons-pool通过PooledObjectFactory.validateObject方法定义了校验方式。

minEvictableIdleTimeMillis、timeBetweenEvictionRunsMillis

对空闲对象进行校验是为了将一定时间未使用的对象释放,减少资源占用,做到这一点通常是通过一种叫做『空闲对象驱赶线程』来实现的。

配置 作用
minEvictableIdleTimeMillis long:允许空闲对象存活(或者说不被驱赶)的最小时间
timeBetweenEvictionRunsMillis long:驱赶线程池扫描池子空闲对象的时间间隔

removeAbandonedOnMaintenance、removeAbandonedOnBorrow、removeAbandonedTimeout、logAbandoned

如果一个对象被借取之后再也没有归还回来,这样就造成了池子的泄漏,需要定义一系列参数来解决当池子泄漏后丢弃这些对象的方式。

配置 作用
removeAbandonedOnMaintenance boolean:当空闲对象驱赶线程执行任务时,也去扫描可以被丢弃的对象
removeAbandonedOnBorrow boolean:当每次从池子中借取对象时,去扫描可以被丢弃的对象
removeAbandonedTimeout int:对象借出去最后一次使用后,多长时间未使用和归还就认为是泄漏,单位秒
logAbandoned boolean:开启丢弃对象日志

我们重点理解下怎么能认为是泄漏:对象借出去或者最后一次使用,通常我们使用对象借出去的时间作为开始时间点,更好的办法我们可以使用最后一次使用时间,要想获取最后一次使用时间就需要记录对象每个方法的调用时间,这里可以使用代理模式。

commons-pool示例

commons-pool是一个通用的池设计,其它框架可以基于commons-pool增加池的功能,它提供了池API和一些默认实现,我们先来看看怎么使用,示例以StringBuffer作为复用的对象,虽然这没有任何实际意义:

@Test
public void test() throws Exception {
    GenericObjectPool<StringBuffer> pool = new GenericObjectPool<StringBuffer>(new BasePooledObjectFactory<StringBuffer>() {

        @Override
        public StringBuffer create() throws Exception {
            return new StringBuffer();
        }

        @Override
        public PooledObject<StringBuffer> wrap(StringBuffer buffer) {
             return new DefaultPooledObject<StringBuffer>(buffer);
        }
        
        @Override
        public void passivateObject(PooledObject<StringBuffer> pooledObject) {
            pooledObject.getObject().setLength(0);
        }

    });
    pool.setMaxTotal(8);
    pool.setMaxIdle(4);
    pool.setMinIdle(2);
    
    StringBuffer[] buffer = new StringBuffer[8];
    for (int i = 0; i < 8; i++) {
        buffer[i] = pool.borrowObject();
        buffer[i].append("sayi" + i);
    }
    pool.returnObject(buffer[7]);
    pool.returnObject(buffer[6]);
    pool.returnObject(buffer[5]);
    pool.returnObject(buffer[4]);
    pool.returnObject(buffer[3]);
}

代码很简单,有几点说明:

  • GenericObjectPool是一个默认池子的实现,通过borrowObject借取对象,returnObject归还对象
  • 对象的创建、销毁、初始化、校验则是由继承了BasePooledObjectFactory的匿名类负责

应用要做的所有事情就是去定义如何生成对象等操作,即实现BasePooledObjectFactory,这是一个抽象实现,更通用的接口是PooledObjectFactory<T>

commons-pool架构设计

image

上文中说过池机制的设计要素是池子+对象,比对上图可以看到commons-pool的设计,池子的抽象API是ObjectPool,对象的工厂就是PooledObjectFactory。

ObjectPool

池子的抽象接口,提供了最基础的API。

public interface ObjectPool<T> extends Closeable {

  T borrowObject() throws Exception, NoSuchElementException,
      IllegalStateException;

  void returnObject(T obj) throws Exception;

  void invalidateObject(T obj) throws Exception;

  void addObject() throws Exception, IllegalStateException,
      UnsupportedOperationException;

  int getNumIdle();

  int getNumActive();

  void clear() throws Exception, UnsupportedOperationException;

  @Override
  void close();
}

核心方式是borrowObject和returnObject,commons-pool还提供了一个映射的池子KeyedObjectPool,通过Key值映射一个池子,这里不展开了。ObjectPool有一些默认实现,提供了丰富的池子参数配置,比如GenericObjectPool,还提供了一个对象代理的池子ProxiedObjectPool,这个池子可以记录这些对象的最后一次使用时间用用来驱赶空闲对象使用。

PooledObject

池子中存储的对象不是你需要复用的对象,而是做了一层封装,抽象接口是PooledObject,为了便于构造这个封装,提供了默认包装类DefaultPooledObject。

public interface PooledObject<T> extends Comparable<PooledObject<T>> {
  T getObject();

  long getCreateTime();

  long getActiveTimeMillis();

  long getIdleTimeMillis();

  long getLastBorrowTime();

  long getLastReturnTime();

  long getLastUsedTime();

  @Override
  int compareTo(PooledObject<T> other);

  @Override
  boolean equals(Object obj);

  @Override
  int hashCode();

  @Override
  String toString();

  boolean startEvictionTest();

  boolean endEvictionTest(Deque<PooledObject<T>> idleQueue);

  boolean allocate();

  boolean deallocate();

  void invalidate();

  void setLogAbandoned(boolean logAbandoned);

  void use();

  PooledObjectState getState();

  void markAbandoned();

  void markReturning();
}

PooledObjectFactory

PooledObjectFactory抽象了对象在池子生命周期中每个节点的方法。

package org.apache.commons.pool2;

public interface PooledObjectFactory<T> {

  PooledObject<T> makeObject() throws Exception;

  void destroyObject(PooledObject<T> p) throws Exception;

  boolean validateObject(PooledObject<T> p);
  
  void activateObject(PooledObject<T> p) throws Exception;

  void passivateObject(PooledObject<T> p) throws Exception;
}

我们来解释下每个方法:

  • makeObject定义如何生成对象
  • destroyObject定义如何摧毁对象,比如释放资源
  • validateObject定义校验对象的方式
  • activateObject初始化对象
  • passivateObject重置对象

这个接口是需要用户自己去实现的,下文我们把介绍重点放在池的内在机制实现上。

commons-pool实现原理

GenericObjectPool池的实现

池子的参数在GenericObjectPool中得以实现,我们基于GenericObjectPool介绍这些参数的实现原理。它的内部通过阻塞队列维护一个空闲对象集合,通过一个Map维护所有对象的映射。

private final Map<IdentityWrapper<T>, PooledObject<T>> allObjects = new ConcurrentHashMap<>();
private final LinkedBlockingDeque<PooledObject<T>> idleObjects;

池在内部都是某种集合的表示,我们也可以维护两个集合,分别表示繁忙对象集合和空闲对象集合。池子的核心方法是借取和归还,所以我们直接从这两个方面读源码。

borrowObject

public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
  assertOpen();

  // 根据removeAbandonedOnBorrow配置判断是否进入泄漏丢弃
  final AbandonedConfig ac = this.abandonedConfig;
  if (ac != null && ac.getRemoveAbandonedOnBorrow() &&
      (getNumIdle() < 2) &&
      (getNumActive() > getMaxTotal() - 3) ) {
    removeAbandoned(ac);
  }

  PooledObject<T> p = null;

  // Get local copy of current config so it is consistent for entire
  // method execution
  // 获取是否池满时,获取对象阻塞
  final boolean blockWhenExhausted = getBlockWhenExhausted();

  boolean create;
  final long waitTime = System.currentTimeMillis();

  while (p == null) {
    create = false;
    p = idleObjects.pollFirst();
    // 空闲集合是空,尝试去创建新对象
    if (p == null) {
      p = create();
      if (p != null) {
        create = true;
      }
    }
    if (blockWhenExhausted) {
        // 未能创建新对象,池子已满,进入阻塞状态
      if (p == null) {
        if (borrowMaxWaitMillis < 0) {
          p = idleObjects.takeFirst();
        } else {
          p = idleObjects.pollFirst(borrowMaxWaitMillis,
              TimeUnit.MILLISECONDS);
        }
      }
      if (p == null) {
        throw new NoSuchElementException(
            "Timeout waiting for idle object");
      }
    } else {
      if (p == null) {
        throw new NoSuchElementException("Pool exhausted");
      }
    }
    if (!p.allocate()) {
      p = null;
    }

    if (p != null) {
      // 初始化对象
      try {
        factory.activateObject(p);
      } catch (final Exception e) {
        try {
          destroy(p);
        } catch (final Exception e1) {
          // Ignore - activation failure is more important
        }
        p = null;
        if (create) {
          final NoSuchElementException nsee = new NoSuchElementException(
              "Unable to activate object");
          nsee.initCause(e);
          throw nsee;
        }
      }
      // 校验对象
      if (p != null && (getTestOnBorrow() || create && getTestOnCreate())) {
        boolean validate = false;
        Throwable validationThrowable = null;
        try {
          validate = factory.validateObject(p);
        } catch (final Throwable t) {
          PoolUtils.checkRethrow(t);
          validationThrowable = t;
        }
        if (!validate) {
          try {
            destroy(p);
            destroyedByBorrowValidationCount.incrementAndGet();
          } catch (final Exception e) {
            // Ignore - validation failure is more important
          }
          p = null;
          if (create) {
            final NoSuchElementException nsee = new NoSuchElementException(
                "Unable to validate object");
            nsee.initCause(validationThrowable);
            throw nsee;
          }
        }
      }
    }
  }

  updateStatsBorrow(p, System.currentTimeMillis() - waitTime);

  return p.getObject();
}

借取的大体逻辑就是判断池子是否已满是否需要阻塞,如果有空闲对象,就会进行初始化和校验。代码中的中文注释都是一些核心参数的判断,可以看出PooledObjectFactory的作用,create()方法的作用可想而知:判断上限是否允许创建对象和创建对象。

returnObject

public void returnObject(final T obj) {
  final PooledObject<T> p = allObjects.get(new IdentityWrapper<>(obj));

  if (p == null) {
    if (!isAbandonedConfig()) {
      throw new IllegalStateException(
          "Returned object not currently part of this pool");
    }
    return; // Object was abandoned and removed
  }

  markReturningState(p);

  final long activeTime = p.getActiveTimeMillis();

  // 是否在归还的时候进行校验
  if (getTestOnReturn() && !factory.validateObject(p)) {
    try {
      destroy(p);
    } catch (final Exception e) {
      swallowException(e);
    }
    try {
      ensureIdle(1, false);
    } catch (final Exception e) {
      swallowException(e);
    }
    updateStatsReturn(activeTime);
    return;
  }

  // 重置对象
  try {
    factory.passivateObject(p);
  } catch (final Exception e1) {
    swallowException(e1);
    try {
      destroy(p);
    } catch (final Exception e) {
      swallowException(e);
    }
    try {
      ensureIdle(1, false);
    } catch (final Exception e) {
      swallowException(e);
    }
    updateStatsReturn(activeTime);
    return;
  }

  if (!p.deallocate()) {
    throw new IllegalStateException(
        "Object has already been returned to this pool or is invalid");
  }

  // 是否超过最大空闲值
  final int maxIdleSave = getMaxIdle();
  if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
    try {
      destroy(p);
    } catch (final Exception e) {
      swallowException(e);
    }
  } else {
    if (getLifo()) {
      idleObjects.addFirst(p);
    } else {
      idleObjects.addLast(p);
    }
    if (isClosed()) {
      // Pool closed while object was being added to idle objects.
      // Make sure the returned object is destroyed rather than left
      // in the idle object pool (which would effectively be a leak)
      clear();
    }
  }
  updateStatsReturn(activeTime);
}

归还的逻辑很简单,首先判断归还时是否进行校验,然后重置对象,当超过最大空闲值时就会销毁这个对象。

空闲对象驱赶线程:Evictor

在池子启动时,或者设置驱赶扫描间隔时间时都会启动空闲对线驱赶线程:

public final void setTimeBetweenEvictionRunsMillis(
    final long timeBetweenEvictionRunsMillis) {
  this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
  startEvictor(timeBetweenEvictionRunsMillis);
}

final void startEvictor(final long delay) {
  synchronized (evictionLock) {
    if (null != evictor) {
      EvictionTimer.cancel(evictor, evictorShutdownTimeoutMillis, TimeUnit.MILLISECONDS);
      evictor = null;
      evictionIterator = null;
    }
    if (delay > 0) {
      evictor = new Evictor();
      EvictionTimer.schedule(evictor, delay, delay);
    }
  }
}

startEvictor方法会判断是否已存在驱赶线程,如果有则替换掉,驱赶线程具体扫描机制是ScheduledThreadPoolExecutor控制的。

我们来看看evictor的任务run方法:

@Override
public void run() {
  final ClassLoader savedClassLoader =
      Thread.currentThread().getContextClassLoader();
  try {
    if (factoryClassLoader != null) {
      // Set the class loader for the factory
      final ClassLoader cl = factoryClassLoader.get();
      if (cl == null) {
        // The pool has been dereferenced and the class loader
        // GC'd. Cancel this timer so the pool can be GC'd as
        // well.
        cancel();
        return;
      }
      Thread.currentThread().setContextClassLoader(cl);
    }

    // Evict from the pool
    try {
      evict();
    } catch(final Exception e) {
      swallowException(e);
    } catch(final OutOfMemoryError oome) {
      // Log problem but give evictor thread a chance to continue
      // in case error is recoverable
      oome.printStackTrace(System.err);
    }
    // Re-create idle instances.
    try {
      ensureMinIdle();
    } catch (final Exception e) {
      swallowException(e);
    }
  } finally {
    // Restore the previous CCL
    Thread.currentThread().setContextClassLoader(savedClassLoader);
  }
}

主要调用了evict();方法,我们进入这个方法看看驱赶线程具体的实现:

public void evict() throws Exception {
  assertOpen();

  if (idleObjects.size() > 0) {

    PooledObject<T> underTest = null;
    final EvictionPolicy<T> evictionPolicy = getEvictionPolicy();

    synchronized (evictionLock) {
      final EvictionConfig evictionConfig = new EvictionConfig(
          getMinEvictableIdleTimeMillis(),
          getSoftMinEvictableIdleTimeMillis(),
          getMinIdle());

      // 获取testWhileIdel值
      final boolean testWhileIdle = getTestWhileIdle();

      for (int i = 0, m = getNumTests(); i < m; i++) {
        if (evictionIterator == null || !evictionIterator.hasNext()) {
          evictionIterator = new EvictionIterator(idleObjects);
        }
        if (!evictionIterator.hasNext()) {
          // Pool exhausted, nothing to do here
          return;
        }

        try {
          underTest = evictionIterator.next();
        } catch (final NoSuchElementException nsee) {
          // Object was borrowed in another thread
          // Don't count this as an eviction test so reduce i;
          i--;
          evictionIterator = null;
          continue;
        }

        if (!underTest.startEvictionTest()) {
          // Object was borrowed in another thread
          // Don't count this as an eviction test so reduce i;
          i--;
          continue;
        }

        // User provided eviction policy could throw all sorts of
        // crazy exceptions. Protect against such an exception
        // killing the eviction thread.
        // 根据驱赶策略判断是否可以被销毁
        boolean evict;
        try {
          evict = evictionPolicy.evict(evictionConfig, underTest,
              idleObjects.size());
        } catch (final Throwable t) {
          // Slightly convoluted as SwallowedExceptionListener
          // uses Exception rather than Throwable
          PoolUtils.checkRethrow(t);
          swallowException(new Exception(t));
          // Don't evict on error conditions
          evict = false;
        }

        // 销毁和驱赶
        if (evict) {
          destroy(underTest);
          destroyedByEvictorCount.incrementAndGet();
        } else {
          // 如果不被驱赶,是否进行校验
          if (testWhileIdle) {
            boolean active = false;
            try {
              factory.activateObject(underTest);
              active = true;
            } catch (final Exception e) {
              destroy(underTest);
              destroyedByEvictorCount.incrementAndGet();
            }
            if (active) {
              if (!factory.validateObject(underTest)) {
                destroy(underTest);
                destroyedByEvictorCount.incrementAndGet();
              } else {
                try {
                  factory.passivateObject(underTest);
                } catch (final Exception e) {
                  destroy(underTest);
                  destroyedByEvictorCount.incrementAndGet();
                }
              }
            }
          }
          if (!underTest.endEvictionTest(idleObjects)) {
            // TODO - May need to add code here once additional
            // states are used
          }
        }
      }
    }
  }

  // 扫描泄漏的对象
  final AbandonedConfig ac = this.abandonedConfig;
  if (ac != null && ac.getRemoveAbandonedOnMaintenance()) {
    removeAbandoned(ac);
  }
}

驱赶的代码逻辑大体如下:

  1. 通过驱赶策略去决定是否驱赶对象,我们可以想象默认的驱赶策略应该是一个空闲时间的判断
  2. 如果可以驱赶,则销毁对象,否则判断是否需要进行空闲时校验进入校验流程
  3. 通过removeAbandonedOnMaintenance配置参数判断是否进入泄漏对象处理流程

驱赶策略可以由用户自己实现,默认提供了基于空闲时间的策略:当空闲对象大于最小空闲对象个数并且空闲时间大于最小空闲时间。

public class DefaultEvictionPolicy<T> implements EvictionPolicy<T> {

  @Override
  public boolean evict(final EvictionConfig config, final PooledObject<T> underTest,
      final int idleCount) {

    if ((config.getIdleSoftEvictTime() < underTest.getIdleTimeMillis() &&
        config.getMinIdle() < idleCount) ||
        config.getIdleEvictTime() < underTest.getIdleTimeMillis()) {
      return true;
    }
    return false;
  }
}

泄漏对象超时丢弃:Abandoned

一个对象没有被returnObject就导致了泄漏,上文中已经看到在borrowObject和evict根据参数removeAbandonedOnBorrow和removeAbandonedOnMaintenance调用泄漏扫描的代码,我们深入泄漏的处理代码:

private void removeAbandoned(final AbandonedConfig ac) {
  // Generate a list of abandoned objects to remove
  final long now = System.currentTimeMillis();
  final long timeout =
      now - (ac.getRemoveAbandonedTimeout() * 1000L);
  final ArrayList<PooledObject<T>> remove = new ArrayList<>();
  final Iterator<PooledObject<T>> it = allObjects.values().iterator();
  // 扫描泄漏
  while (it.hasNext()) {
    final PooledObject<T> pooledObject = it.next();
    synchronized (pooledObject) {
      if (pooledObject.getState() == PooledObjectState.ALLOCATED &&
          pooledObject.getLastUsedTime() <= timeout) {
        pooledObject.markAbandoned();
        remove.add(pooledObject);
      }
    }
  }

  // Now remove the abandoned objects
  // 处理泄漏
  final Iterator<PooledObject<T>> itr = remove.iterator();
  while (itr.hasNext()) {
    final PooledObject<T> pooledObject = itr.next();
    if (ac.getLogAbandoned()) {
      pooledObject.printStackTrace(ac.getLogWriter());
    }
    try {
      invalidateObject(pooledObject.getObject());
    } catch (final Exception e) {
      e.printStackTrace();
    }
  }
}

整个流程就是扫描+处理泄漏对象,泄漏的判断条件如下:

pooledObject.getState() == PooledObjectState.ALLOCATED &&
          pooledObject.getLastUsedTime() <= timeout

lastUsedTime默认为对象借取的时间值,如果对象从借取到现在超过了丢弃超时时间则标记为丢弃,我们也可以通过代理实现真正的对象最后被调用的时间,参见ProxiedObjectPool代理对象池的实现。

总结

对象池化机制是一个广泛应用用的技术,关于线程池的实现可以参见以前的文章《并发(七)线程池》,系列接下来的文章将会探讨更多池的应用。

Sayi avatar Dec 27 '18 08:12 Sayi