spring-integration icon indicating copy to clipboard operation
spring-integration copied to clipboard

CachingSessionFactory - using different Pool strategies?

Open alturkovic opened this issue 6 years ago • 8 comments

The current CSF implementation is tightly coupled to SimplePool. It would be nice to be able to use different Pool implementations for the CSF.

Currently, the problem is in 2 methods that CSF exposes which are coupled to SimplePool, namely: public void setPoolSize(int poolSize) and public void setSessionWaitTimeout(long sessionWaitTimeout).

Would it be possible to change the SimplePool field to a generic Pool and add a new interface, PoolConfigurator with these specific methods to maintain backwards-compatibility and mark the coupled methods and the new PoolConfigurator as @Deprecated? This would keep backwards-compatibility for some time while allowing for different Pools to be used.

A more generic method for customization could be used in the future: public void configurePool(Pool pool). To be able to configure the pool from XML, maybe something like: public void configurePool(Expression expression)?

WDYT?

alturkovic avatar Jan 11 '19 14:01 alturkovic

I'd rather say it would be better to allow to inject any Pool implementation instead. Either setter or new ctor. It really might be reasonable to deprecate setters related to the SimplePool and distribute responsibility to the externally injected bean.

artembilan avatar Jan 11 '19 14:01 artembilan

Yes, that's exactly what I meant.

The last chapter is only about allowing to configure the Pool from within the CSF, but what you said to distribute that responsibility externally makes even more sense...

alturkovic avatar Jan 11 '19 14:01 alturkovic

We just will need to document it properly that SimplePool option now has to be configured on the bean injected into the CSF. And with that we will have a good coverage for XML configuration as well.

artembilan avatar Jan 11 '19 15:01 artembilan

Great :)

I was mainly worried about backwards-compatibility of such changes.

alturkovic avatar Jan 11 '19 15:01 alturkovic

It's not (yet) clear to me how an externally configured pool gets access to internal SF to get new sessions (unless separately configured there too, which makes the one in the CSF redundant)

Perhaps we need another wrapper, e.g. SessionProvider which encapsulates the SF and pool?

The other issue is the epoch for the shared session (when used).

garyrussell avatar Jan 11 '19 15:01 garyrussell

I don't think we need to worry about backwards-compatibility; the new implementation can be a new class that sits alongside the current (possibly deprecated) implementation.

garyrussell avatar Jan 11 '19 15:01 garyrussell

It's not (yet) clear to me how an externally configured pool gets access to internal SF to get new sessions (unless separately configured there too, which makes the one in the CSF redundant)

  1. Why not pass the internal SF to the pool through ctor?
public class CustomDummyPool<T> implements Pool<T> {

	...
	
	private Supplier<T> supplier;

	// () -> sessionFactory.getSession()
	public CustomDummyPool(final Supplier<T> supplier) {
		this.supplier = supplier;
	}

	@Override
	public T getItem() {
		return supplier.get();
	}

	...
}

Perhaps we need another wrapper, e.g. SessionProvider which encapsulates the SF and pool?

The other issue is the epoch for the shared session (when used).

  1. Why not leave shared session and cache reset functionality in the new SF?
public class PooledSessionFactory<F> implements SessionFactory<F>, DisposableBean {

	private static final Log logger = LogFactory.getLog(CachingSessionFactory.class);

	private final SessionFactory<F> sessionFactory;

	private final Pool<Session<F>> pool;

	private final boolean isSharedSessionCapable;

	private volatile long sharedSessionEpoch;

	/**
	 * Create a PooledSessionFactory delegating to pool.
	 * <p>
	 * Do not cache a {@link DelegatingSessionFactory}, cache each delegate therein instead.
	 *
	 * @param sessionFactory The underlying session factory.
	 * @param pool           The pool of sessions.
	 */
	public PooledSessionFactory(SessionFactory<F> sessionFactory, Pool<Session<F>> pool) {
		Assert.isTrue(!(sessionFactory instanceof DelegatingSessionFactory),
				"'sessionFactory' cannot be a 'DelegatingSessionFactory'; cache each delegate instead");
		this.sessionFactory = sessionFactory;
		this.pool = pool;
		this.isSharedSessionCapable = sessionFactory instanceof SharedSessionCapable;
	}

	/**
	 * Get a session from the pool (or block if none available).
	 */
	@Override
	public Session<F> getSession() {
		return new CachedSession(this.pool.getItem(), this.sharedSessionEpoch);
	}

	/**
	 * Remove (close) any unused sessions in the pool.
	 */
	@Override
	public void destroy() {
		this.pool.removeAllIdleItems();
	}

	/**
	 * Clear the cache of sessions; also any in-use sessions will be closed when
	 * returned to the cache.
	 */
	public synchronized void resetCache() {
		if (logger.isDebugEnabled()) {
			logger.debug("Cache reset; idle sessions will be removed, in-use sessions will be closed when returned");
		}
		if (this.isSharedSessionCapable && ((SharedSessionCapable) this.sessionFactory).isSharedSession()) {
			((SharedSessionCapable) this.sessionFactory).resetSharedSession();
		}
		long epoch = System.nanoTime();
		/*
		 * Spin until we get a new value - nano precision but may be lower resolution.
		 * We reset the epoch AFTER resetting the shared session so there is no possibility
		 * of an "old" session being created in the new epoch. There is a slight possibility
		 * that a "new" session might appear in the old epoch and thus be closed when returned to
		 * the cache.
		 */
		while (epoch == this.sharedSessionEpoch) {
			epoch = System.nanoTime();
		}
		this.sharedSessionEpoch = epoch;
		this.pool.removeAllIdleItems();
	}

	public class CachedSession implements Session<F> { //NOSONAR (final)

		private final Session<F> targetSession;

		private boolean released;

		private boolean dirty;

		/**
		 * The epoch in which this session was created.
		 */
		private final long sharedSessionEpoch;

		private CachedSession(Session<F> targetSession, long sharedSessionEpoch) {
			this.targetSession = targetSession;
			this.sharedSessionEpoch = sharedSessionEpoch;
		}

		@Override
		public synchronized void close() {
			if (this.released) {
				if (logger.isDebugEnabled()) {
					logger.debug("Session " + this.targetSession + " already released.");
				}
			}
			else {
				if (logger.isDebugEnabled()) {
					logger.debug("Releasing Session " + this.targetSession + " back to the pool.");
				}
				if (this.sharedSessionEpoch != PooledSessionFactory.this.sharedSessionEpoch) {
					if (logger.isDebugEnabled()) {
						logger.debug("Closing session " + this.targetSession + " after reset.");
					}
					this.targetSession.close();
				}
				else if (this.dirty) {
					this.targetSession.close();
				}
				if (this.targetSession.isOpen()) {
					try {
						this.targetSession.finalizeRaw();
					}
					catch (IOException e) {
						//No-op in this context
					}
				}
				PooledSessionFactory.this.pool.releaseItem(this.targetSession);
				this.released = true;
			}
		}

		@Override
		public boolean remove(String path) throws IOException {
			return this.targetSession.remove(path);
		}

		@Override
		public F[] list(String path) throws IOException {
			return this.targetSession.list(path);
		}

		@Override
		public void read(String source, OutputStream os) throws IOException {
			this.targetSession.read(source, os);
		}

		@Override
		public void write(InputStream inputStream, String destination) throws IOException {
			this.targetSession.write(inputStream, destination);
		}

		@Override
		public void append(InputStream inputStream, String destination) throws IOException {
			this.targetSession.append(inputStream, destination);
		}

		@Override
		public boolean isOpen() {
			return this.targetSession.isOpen();
		}

		@Override
		public void rename(String pathFrom, String pathTo) throws IOException {
			this.targetSession.rename(pathFrom, pathTo);
		}

		@Override
		public boolean mkdir(String directory) throws IOException {
			return this.targetSession.mkdir(directory);
		}

		@Override
		public boolean rmdir(String directory) throws IOException {
			return this.targetSession.rmdir(directory);
		}

		@Override
		public boolean exists(String path) throws IOException {
			return this.targetSession.exists(path);
		}

		@Override
		public String[] listNames(String path) throws IOException {
			return this.targetSession.listNames(path);
		}

		@Override
		public InputStream readRaw(String source) throws IOException {
			return this.targetSession.readRaw(source);
		}

		@Override
		public boolean finalizeRaw() throws IOException {
			return this.targetSession.finalizeRaw();
		}

		@Override
		public void dirty() {
			this.dirty = true;
		}

		@Override
		public Object getClientInstance() {
			return this.targetSession.getClientInstance();
		}

	}

}

I am sure I am missing something, would you mind shedding more light on the problem? Thanks!

alturkovic avatar Jan 11 '19 16:01 alturkovic

My concern is that the user might configure a different SF in the pool Vs. the CCF - unlikely, but possible, and difficult to debug.

Perhaps this can be solved with a sub-interface of Pool, something like...

public NewCachingSF(SessionFactoryAwarePool<F> pool) {
    this.pool = pool;
    this.sessionFactory = pool.getSessionFactory();
}

garyrussell avatar Jan 11 '19 16:01 garyrussell