FEATURE: Way to shutdown a connection pool
Hello, Can you please provide a way to shutdown a connection pool that will close all connections and empty the pool? I am running into an issue that the pool is leaving hanging connections once I am done using it, with no way to clean up the connections prior to it getting GC'd.
Thanks, Trevor
Hmm. Yeah, I can see how this would be useful. I just let the connections expire because they only have a 15 second timeout on them by default.
I've tinkered with it a bit and the way the pool is written right now really doesn't lend itself to being shut down. Between its super async implementation and the lack of bookkeeping (it has no idea what connections are acquired) it's looking real tricky to figure out a way to shut it down.
Best I've come up with so far is this:
@Override
public synchronized void close() {
for (int i = 0; i < connectionLimit; i++) {
acquire().thenAccept(this::remove);
}
}
which has the unfortunate effect of opening as many connections as needed up to the limit before closing them, if they don't already exist.
How about using a concurrent hashset to track the open connections?
So I am playing with using the DefaultConnectionManager with a wapper to manage the forward open without using the connection pool.
I am finding that the DefaultConnectionManager does not properly close the connection.
java.util.concurrent.ExecutionException: com.digitalpetri.enip.cip.CipResponseException: status=0x01 [connection failure] , additional=[0x0107]
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
at io.tlf.cip.tool.cip.ConnectionForwarder.close(ConnectionForwarder.java:30)
at io.tlf.cip.tool.cip.ConnectionManager.send(ConnectionManager.java:193)
at io.tlf.cip.tool.cip.ConnectionManager.send(ConnectionManager.java:117)
at io.tlf.cip.tool.cip.ConnectionManager.send(ConnectionManager.java:82)
at io.tlf.cip.tool.cip.discovery.DeviceScanner.probe(DeviceScanner.java:25)
at io.tlf.cip.tool.cip.discovery.DeviceScanner.probe(DeviceScanner.java:74)
at io.tlf.cip.tool.cip.discovery.DeviceScanner.scan(DeviceScanner.java:94)
at io.tlf.cip.tool.cip.discovery.DeviceDiscoveryManager.discover(DeviceDiscoveryManager.java:106)
at io.tlf.cip.tool.ui.MainWindow.lambda$connectionWidget1Discover$0(MainWindow.java:115)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: com.digitalpetri.enip.cip.CipResponseException: status=0x01 [connection failure] , additional=[0x0107]
at com.digitalpetri.enip.cip.services.ForwardCloseService.decodeResponse(ForwardCloseService.java:50)
at com.digitalpetri.enip.cip.CipConnectionPool$DefaultConnectionFactory.lambda$close$2(CipConnectionPool.java:350)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at com.digitalpetri.enip.cip.CipClient.lambda$sendUnconnectedData$8(CipClient.java:222)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at com.digitalpetri.enip.EtherNetIpClient$PendingRequest.lambda$new$0(EtherNetIpClient.java:412)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at com.digitalpetri.enip.EtherNetIpClient.onChannelRead(EtherNetIpClient.java:220)
at com.digitalpetri.enip.EtherNetIpClient.access$900(EtherNetIpClient.java:50)
at com.digitalpetri.enip.EtherNetIpClient$EtherNetIpClientHandler.lambda$channelRead0$0(EtherNetIpClient.java:346)
at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1395)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
This is my wrapper
package io.tlf.cip.tool.cip;
import com.digitalpetri.enip.cip.CipClient;
import com.digitalpetri.enip.cip.CipConnectionPool;
import com.digitalpetri.enip.cip.epath.EPath;
import com.digitalpetri.enip.cip.structs.ForwardCloseResponse;
import java.util.concurrent.ExecutionException;
public class ConnectionForwarder {
private final CipConnectionPool.CipConnectionFactory connectionFactory;
public ConnectionForwarder(CipClient client, EPath.PaddedEPath connectionPath, int connectionSize) {
this(
new CipConnectionPool.DefaultConnectionFactory(client, connectionPath, connectionSize)
);
}
public ConnectionForwarder(
CipConnectionPool.CipConnectionFactory connectionFactory
) {
this.connectionFactory = connectionFactory;
}
public CipConnectionPool.CipConnection open() throws ExecutionException, InterruptedException {
return connectionFactory.open().get();
}
public ForwardCloseResponse close(CipConnectionPool.CipConnection connection) throws ExecutionException, InterruptedException {
return connectionFactory.close(connection).get();
}
}
Here is my usage of it:
ConnectionForwarder forwarder = new ConnectionForwarder(client, connectionPath, connectionSize);
CipConnectionPool.CipConnection connection = null;
try {
connection = forwarder.open();
recvBuff = client.invokeConnected(connection.getO2tConnectionId(), genericService).get();
} catch (Exception ex) {
var response = exceptionResponse(ex);
entry.setResponse(response);
entry.setException(ex);
return response;
} finally {
try {
if (connection != null) {
forwarder.close(connection);
}
} catch (Exception ex) {
System.err.println("Failure to close connection");
ex.printStackTrace();
}
}
I'm not sure exactly what is causing the failure in closing the connection. It is almost like the close message sent had an invalid connection ID (I have not verified this)
The actual message sent via the forward open using the invokeConnected does work. It is just the close that does not.
I have confirmed that the connections are not closed and was able exhaust the max number of connections on the ethernet card.
I was not getting this close error with the connection pool, but I am not sure that it was actually trying to close the connection, I was definitely exhausting the connections on the ethernet card with it as well.
Thoughts?
Thanks for the help, Trevor
Hmm, this error does mean the connection is not found...
For a Forward_Close service request received by a target node to be successful, it is sufficient that the Connection Triad matches an existing connection’s parameters. If there is no connection match, no connection is released and the target shall return an error with a General Status code of 0x01 and an Extended Status code of 0x0107 (Target Connection Not Found), unless the device detected an improper Connection Path.
I'll have to see if I get the same error with one of our devices. The "triad" should be the connection serial number, originator vendor id, and originator serial number.
Thanks for looking into it. I am currently testing with a 1756-EWEB. I have an old 1756-ENET I will do testing with as well.
OK, I can confirm that I do not have the issue when testing with the 1756-ENET, this may be an issue with the EWEB module.