azure-sdk-for-java icon indicating copy to clipboard operation
azure-sdk-for-java copied to clipboard

[BUG] Closing the CosmosClient causes massive amounts of stacktrace

Open fivetran-kevinzhao opened this issue 1 year ago • 7 comments

Describe the bug I have a singleton CosmosClient initialized in the follow manner:

 CosmosClient client = new CosmosClientBuilder()
                    .endpoint(credentials.uri)
                    .key(credentials.accountKey)
                    .consistencyLevel(ConsistencyLevel.SESSION)
                    .contentResponseOnWriteEnabled(true)
                    .buildClient();

In separate threads, I run a ChangeFeedPull similar to the examples in here: https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples/blob/main/src/main/java/com/azure/cosmos/examples/changefeedpull/SampleChangeFeedPullModel.java

When I'm done with the ChangeFeedPull logic, I try to close the CosmosClient like below.

        if (client != null) {
            client.close();
            client = null;
        }

Immediately after I call client.close(), I see a massive amount of stacktrace like below but over and over again:

SEVERE	cosmos-rntbd-nio-2-1	2022-09-15T21:42:51.282Z	StoreReader#createStoreResult	Unexpected exception RntbdTransportClient({"id":1,"isClosed":true,"configuration":{"wireLogLevel":null,"userAgent":{"suffix":"","userAgent":"azsdk-java-cosmos/4.33.0 Linux/5.10.68+ JRE/11.0.10"},"connectionEndpointRediscoveryEnabled":true,"bufferPageSize":8192,"maxBufferCapacity":8388608,"maxChannelsPerEndpoint":130,"maxRequestsPerChannel":30,"tcpKeepIntvl":1,"tcpKeepIdle":30,"preferTcpNative":true,"tcpNetworkRequestTimeoutInNanos":5000000000,"requestTimerResolutionInNanos":100000000,"shutdownTimeoutInNanos":15000000000,"connectionAcquisitionTimeoutInNanos":5000000000,"connectTimeoutInMillis":5000,"idleConnectionTimeoutInNanos":0,"idleConnectionTimerResolutionInNanos":100000000,"idleEndpointTimeoutInNanos":3600000000000,"receiveHangDetectionTimeInNanos":65000000000,"sendHangDetectionTimeInNanos":10000000000,"maxConcurrentRequestsPerEndpoint":10000,"channelAcquisitionContextEnabled":false},"serviceEndpoints":{"count":6,"items":[{"id":15,"closed":false,"concurrentRequests":0,"remoteAddress":"cdb-ms-prod-westus1-fd83.documents.azure.com:14332","channelPool":{"remoteAddress":"cdb-ms-prod-westus1-fd83.documents.azure.com:14332","isClosed":false,"configuration":{"maxChannels":130,"maxRequestsPerChannel":30,"idleConnectionTimeout":0,"readDelayLimit":65000000000,"writeDelayLimit":10000000000},"state":{"channelsAcquired":0,"channelsAvailable":1,"requestQueueLength":0}},"transportClient":{"id":1,"closed":true,"endpointCount":6,"endpointEvictionCount":10}},{"id":14,"closed":false,"concurrentRequests":0,"remoteAddress":"cdb-ms-prod-westus1-fd83.documents.azure.com:14001","channelPool":{"remoteAddress":"cdb-ms-prod-westus1-fd83.documents.azure.com:14001","isClosed":false,"configuration":{"maxChannels":130,"maxRequestsPerChannel":30,"idleConnectionTimeout":0,"readDelayLimit":65000000000,"writeDelayLimit":10000000000},"state":{"channelsAcquired":0,"channelsAvailable":1,"requestQueueLength":0}},"transportClient":{"id":1,"closed":true,"endpointCount":6,"endpointEvictionCount":10}},{"id":7,"closed":false,"concurrentRequests":0,"remoteAddress":"cdb-ms-prod-westus1-fd54.documents.azure.com:14063","channelPool":{"remoteAddress":"cdb-ms-prod-westus1-fd54.documents.azure.com:14063","isClosed":false,"configuration":{"maxChannels":130,"maxRequestsPerChannel":30,"idleConnectionTimeout":0,"readDelayLimit":65000000000,"writeDelayLimit":10000000000},"state":{"channelsAcquired":0,"channelsAvailable":1,"requestQueueLength":0}},"transportClient":{"id":1,"closed":true,"endpointCount":6,"endpointEvictionCount":10}},{"id":8,"closed":false,"concurrentRequests":0,"remoteAddress":"cdb-ms-prod-westus1-fd54.documents.azure.com:14360","channelPool":{"remoteAddress":"cdb-ms-prod-westus1-fd54.documents.azure.com:14360","isClosed":false,"configuration":{"maxChannels":130,"maxRequestsPerChannel":30,"idleConnectionTimeout":0,"readDelayLimit":65000000000,"writeDelayLimit":10000000000},"state":{"channelsAcquired":0,"channelsAvailable":1,"requestQueueLength":0}},"transportClient":{"id":1,"closed":true,"endpointCount":6,"endpointEvictionCount":10}},{"id":3,"closed":false,"concurrentRequests":0,"remoteAddress":"cdb-ms-prod-westus1-fd6.documents.azure.com:11003","channelPool":{"remoteAddress":"cdb-ms-prod-westus1-fd6.documents.azure.com:11003","isClosed":false,"configuration":{"maxChannels":130,"maxRequestsPerChannel":30,"idleConnectionTimeout":0,"readDelayLimit":65000000000,"writeDelayLimit":10000000000},"state":{"channelsAcquired":0,"channelsAvailable":1,"requestQueueLength":0}},"transportClient":{"id":1,"closed":true,"endpointCount":6,"endpointEvictionCount":10}},{"id":4,"closed":false,"concurrentRequests":0,"remoteAddress":"cdb-ms-prod-westus1-fd63.documents.azure.com:14394","channelPool":{"remoteAddress":"cdb-ms-prod-westus1-fd63.documents.azure.com:14394","isClosed":false,"configuration":{"maxChannels":130,"maxRequestsPerChannel":30,"idleConnectionTimeout":0,"readDelayLimit":65000000000,"writeDelayLimit":10000000000},"state":{"channelsAcquired":0,"channelsAvailable":1,"requestQueueLength":0}},"transportClient":{"id":1,"closed":true,"endpointCount":6,"endpointEvictionCount":10}}]}}) is closed received while reading from store.
class com.azure.cosmos.implementation.directconnectivity.TransportException: RntbdTransportClient({"id":1,"isClosed":true,"configuration":{"wireLogLevel":null,"userAgent":{"suffix":"","userAgent":"azsdk-java-cosmos/4.33.0 Linux/5.10.68+ JRE/11.0.10"},"connectionEndpointRediscoveryEnabled":true,"bufferPageSize":8192,"maxBufferCapacity":8388608,"maxChannelsPerEndpoint":130,"maxRequestsPerChannel":30,"tcpKeepIntvl":1,"tcpKeepIdle":30,"preferTcpNative":true,"tcpNetworkRequestTimeoutInNanos":5000000000,"requestTimerResolutionInNanos":100000000,"shutdownTimeoutInNanos":15000000000,"connectionAcquisitionTimeoutInNanos":5000000000,"connectTimeoutInMillis":5000,"idleConnectionTimeoutInNanos":0,"idleConnectionTimerResolutionInNanos":100000000,"idleEndpointTimeoutInNanos":3600000000000,"receiveHangDetectionTimeInNanos":65000000000,"sendHangDetectionTimeInNanos":10000000000,"maxConcurrentRequestsPerEndpoint":10000,"channelAcquisitionContextEnabled":false},"serviceEndpoints":{"count":6,"items":[{"id":15,"closed":false,"concurrentRequests":0,"remoteAddress":"cdb-ms-prod-westus1-fd83.documents.azure.com:14332","channelPool":{"remoteAddress":"cdb-ms-prod-westus1-fd83.documents.azure.com:14332","isClosed":false,"configuration":{"maxChannels":130,"maxRequestsPerChannel":30,"idleConnectionTimeout":0,"readDelayLimit":65000000000,"writeDelayLimit":10000000000},"state":{"channelsAcquired":0,"channelsAvailable":1,"requestQueueLength":0}},"transportClient":{"id":1,"closed":true,"endpointCount":6,"endpointEvictionCount":10}},{"id":14,"closed":false,"concurrentRequests":0,"remoteAddress":"cdb-ms-prod-westus1-fd83.documents.azure.com:14001","channelPool":{"remoteAddress":"cdb-ms-prod-westus1-fd83.documents.azure.com:14001","isClosed":false,"configuration":{"maxChannels":130,"maxRequestsPerChannel":30,"idleConnectionTimeout":0,"readDelayLimit":65000000000,"writeDelayLimit":10000000000},"state":{"channelsAcquired":0,"channelsAvailable":1,"requestQueueLength":0}},"transportClient":{"id":1,"closed":true,"endpointCount":6,"endpointEvictionCount":10}},{"id":7,"closed":false,"concurrentRequests":0,"remoteAddress":"cdb-ms-prod-westus1-fd54.documents.azure.com:14063","channelPool":{"remoteAddress":"cdb-ms-prod-westus1-fd54.documents.azure.com:14063","isClosed":false,"configuration":{"maxChannels":130,"maxRequestsPerChannel":30,"idleConnectionTimeout":0,"readDelayLimit":65000000000,"writeDelayLimit":10000000000},"state":{"channelsAcquired":0,"channelsAvailable":1,"requestQueueLength":0}},"transportClient":{"id":1,"closed":true,"endpointCount":6,"endpointEvictionCount":10}},{"id":8,"closed":false,"concurrentRequests":0,"remoteAddress":"cdb-ms-prod-westus1-fd54.documents.azure.com:14360","channelPool":{"remoteAddress":"cdb-ms-prod-westus1-fd54.documents.azure.com:14360","isClosed":false,"configuration":{"maxChannels":130,"maxRequestsPerChannel":30,"idleConnectionTimeout":0,"readDelayLimit":65000000000,"writeDelayLimit":10000000000},"state":{"channelsAcquired":0,"channelsAvailable":1,"requestQueueLength":0}},"transportClient":{"id":1,"closed":true,"endpointCount":6,"endpointEvictionCount":10}},{"id":3,"closed":false,"concurrentRequests":0,"remoteAddress":"cdb-ms-prod-westus1-fd6.documents.azure.com:11003","channelPool":{"remoteAddress":"cdb-ms-prod-westus1-fd6.documents.azure.com:11003","isClosed":false,"configuration":{"maxChannels":130,"maxRequestsPerChannel":30,"idleConnectionTimeout":0,"readDelayLimit":65000000000,"writeDelayLimit":10000000000},"state":{"channelsAcquired":0,"channelsAvailable":1,"requestQueueLength":0}},"transportClient":{"id":1,"closed":true,"endpointCount":6,"endpointEvictionCount":10}},{"id":4,"closed":false,"concurrentRequests":0,"remoteAddress":"cdb-ms-prod-westus1-fd63.documents.azure.com:14394","channelPool":{"remoteAddress":"cdb-ms-prod-westus1-fd63.documents.azure.com:14394","isClosed":false,"configuration":{"maxChannels":130,"maxRequestsPerChannel":30,"idleConnectionTimeout":0,"readDelayLimit":65000000000,"writeDelayLimit":10000000000},"state":{"channelsAcquired":0,"channelsAvailable":1,"requestQueueLength":0}},"transportClient":{"id":1,"closed":true,"endpointCount":6,"endpointEvictionCount":10}}]}}) is closed

This is the final exception that was thrown:

Exception or Stack Trace

	com.azure.cosmos.implementation.directconnectivity.ConsistencyReader::lambda$readSessionAsync$3(ConsistencyReader.java::333)
	com.fivetran.shaded.reactor.core.publisher.MonoFlatMap$FlatMapMain::onNext(MonoFlatMap.java::125)
	com.fivetran.shaded.reactor.core.publisher.Operators$MonoInnerProducerBase::complete(Operators.java::2664)
	com.fivetran.shaded.reactor.core.publisher.MonoSingle$SingleSubscriber::onComplete(MonoSingle.java::180)
	com.fivetran.shaded.reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber::onComplete(FluxPeekFuseable.java::277)
	com.fivetran.shaded.reactor.core.publisher.Operators$MonoSubscriber::complete(Operators.java::1817)
	com.fivetran.shaded.reactor.core.publisher.MonoFlatMap$FlatMapMain::onNext(MonoFlatMap.java::151)
	com.fivetran.shaded.reactor.core.publisher.Operators$MonoInnerProducerBase::complete(Operators.java::2664)
	com.fivetran.shaded.reactor.core.publisher.MonoSingle$SingleSubscriber::onComplete(MonoSingle.java::180)
	com.fivetran.shaded.reactor.core.publisher.FluxFlatMap$FlatMapMain::checkTerminated(FluxFlatMap.java::846)
	com.fivetran.shaded.reactor.core.publisher.FluxFlatMap$FlatMapMain::drainLoop(FluxFlatMap.java::608)
	com.fivetran.shaded.reactor.core.publisher.FluxFlatMap$FlatMapMain::drain(FluxFlatMap.java::588)
	com.fivetran.shaded.reactor.core.publisher.FluxFlatMap$FlatMapMain::onComplete(FluxFlatMap.java::465)
	com.fivetran.shaded.reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber::onComplete(FluxMapFuseable.java::152)
	com.fivetran.shaded.reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber::onComplete(FluxMapFuseable.java::152)
	com.fivetran.shaded.reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber::onComplete(FluxMapFuseable.java::152)
	com.fivetran.shaded.reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber::onComplete(FluxMapFuseable.java::152)
	com.fivetran.shaded.reactor.core.publisher.Operators$MonoSubscriber::complete(Operators.java::1817)
	com.fivetran.shaded.reactor.core.publisher.MonoFlatMap$FlatMapMain::onNext(MonoFlatMap.java::151)
	com.fivetran.shaded.reactor.core.publisher.Operators$MonoSubscriber::complete(Operators.java::1816)
	com.fivetran.shaded.reactor.core.publisher.MonoFlatMap$FlatMapInner::onNext(MonoFlatMap.java::249)
	com.fivetran.shaded.reactor.core.publisher.Operators$MonoSubscriber::complete(Operators.java::1816)
	com.fivetran.shaded.reactor.core.publisher.MonoFlatMap$FlatMapMain::onNext(MonoFlatMap.java::151)
	com.fivetran.shaded.reactor.core.publisher.Operators$MonoSubscriber::complete(Operators.java::1816)
	com.fivetran.shaded.reactor.core.publisher.MonoFlatMap$FlatMapMain::onNext(MonoFlatMap.java::151)
	com.fivetran.shaded.reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber::onNext(FluxOnErrorResume.java::79)
	com.fivetran.shaded.reactor.core.publisher.FluxMap$MapSubscriber::onNext(FluxMap.java::122)
	com.fivetran.shaded.reactor.core.publisher.FluxMap$MapSubscriber::onNext(FluxMap.java::122)
	com.fivetran.shaded.reactor.core.publisher.Operators$MonoInnerProducerBase::complete(Operators.java::2664)
	com.fivetran.shaded.reactor.core.publisher.MonoSingle$SingleSubscriber::onComplete(MonoSingle.java::180)
	com.fivetran.shaded.reactor.core.publisher.FluxFlatMap$FlatMapMain::checkTerminated(FluxFlatMap.java::846)
	com.fivetran.shaded.reactor.core.publisher.FluxFlatMap$FlatMapMain::drainLoop(FluxFlatMap.java::608)
	com.fivetran.shaded.reactor.core.publisher.FluxFlatMap$FlatMapMain::drain(FluxFlatMap.java::588)
	com.fivetran.shaded.reactor.core.publisher.FluxFlatMap$FlatMapMain::onComplete(FluxFlatMap.java::465)
	com.fivetran.shaded.reactor.core.publisher.DrainUtils::postCompleteDrain(DrainUtils.java::132)
	com.fivetran.shaded.reactor.core.publisher.DrainUtils::postComplete(DrainUtils.java::187)
	com.fivetran.shaded.reactor.core.publisher.FluxMapSignal$FluxMapSignalSubscriber::onComplete(FluxMapSignal.java::226)
	com.fivetran.shaded.reactor.core.publisher.Operators$MonoSubscriber::complete(Operators.java::1817)
	com.fivetran.shaded.reactor.core.publisher.MonoCacheTime::subscribeOrReturn(MonoCacheTime.java::151)
	com.fivetran.shaded.reactor.core.publisher.InternalMonoOperator::subscribe(InternalMonoOperator.java::57)
	com.fivetran.shaded.reactor.core.publisher.MonoFlatMap$FlatMapMain::onNext(MonoFlatMap.java::157)
	com.fivetran.shaded.reactor.core.publisher.Operators$MonoSubscriber::complete(Operators.java::1816)
	com.fivetran.shaded.reactor.core.publisher.MonoFlatMap$FlatMapMain::onNext(MonoFlatMap.java::151)
	com.fivetran.shaded.reactor.core.publisher.Operators$MonoSubscriber::complete(Operators.java::1816)
	com.fivetran.shaded.reactor.core.publisher.MonoFlatMap$FlatMapInner::onNext(MonoFlatMap.java::249)
	com.fivetran.shaded.reactor.core.publisher.Operators$MonoSubscriber::complete(Operators.java::1816)
	com.fivetran.shaded.reactor.core.publisher.MonoFlatMap$FlatMapMain::onNext(MonoFlatMap.java::151)
	com.fivetran.shaded.reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber::onNext(FluxOnErrorResume.java::79)
	com.fivetran.shaded.reactor.core.publisher.FluxMap$MapSubscriber::onNext(FluxMap.java::122)
	com.fivetran.shaded.reactor.core.publisher.Operators$MonoInnerProducerBase::complete(Operators.java::2664)
	com.fivetran.shaded.reactor.core.publisher.MonoSingle$SingleSubscriber::onComplete(MonoSingle.java::180)
	com.fivetran.shaded.reactor.core.publisher.FluxFlatMap$FlatMapMain::checkTerminated(FluxFlatMap.java::846)
	com.fivetran.shaded.reactor.core.publisher.FluxFlatMap$FlatMapMain::drainLoop(FluxFlatMap.java::608)
	com.fivetran.shaded.reactor.core.publisher.FluxFlatMap$FlatMapMain::drain(FluxFlatMap.java::588)
	com.fivetran.shaded.reactor.core.publisher.FluxFlatMap$FlatMapMain::onComplete(FluxFlatMap.java::465)
	com.fivetran.shaded.reactor.core.publisher.DrainUtils::postCompleteDrain(DrainUtils.java::132)
	com.fivetran.shaded.reactor.core.publisher.DrainUtils::postComplete(DrainUtils.java::187)
	com.fivetran.shaded.reactor.core.publisher.FluxMapSignal$FluxMapSignalSubscriber::onComplete(FluxMapSignal.java::226)
	com.fivetran.shaded.reactor.core.publisher.Operators$MonoSubscriber::complete(Operators.java::1817)
	com.fivetran.shaded.reactor.core.publisher.MonoCacheTime::subscribeOrReturn(MonoCacheTime.java::151)
	com.fivetran.shaded.reactor.core.publisher.InternalMonoOperator::subscribe(InternalMonoOperator.java::57)
	com.fivetran.shaded.reactor.core.publisher.MonoFlatMap$FlatMapMain::onNext(MonoFlatMap.java::157)
	com.fivetran.shaded.reactor.core.publisher.FluxHide$SuppressFuseableSubscriber::onNext(FluxHide.java::137)
	com.fivetran.shaded.reactor.core.publisher.FluxMap$MapSubscriber::onNext(FluxMap.java::122)
	com.fivetran.shaded.reactor.core.publisher.Operators$MonoInnerProducerBase::complete(Operators.java::2664)
	com.fivetran.shaded.reactor.core.publisher.MonoSingle$SingleSubscriber::onComplete(MonoSingle.java::180)
	com.fivetran.shaded.reactor.core.publisher.FluxFlatMap$FlatMapMain::checkTerminated(FluxFlatMap.java::846)
	com.fivetran.shaded.reactor.core.publisher.FluxFlatMap$FlatMapMain::drainLoop(FluxFlatMap.java::608)
	com.fivetran.shaded.reactor.core.publisher.FluxFlatMap$FlatMapMain::drain(FluxFlatMap.java::588)
	com.fivetran.shaded.reactor.core.publisher.FluxFlatMap$FlatMapMain::onComplete(FluxFlatMap.java::465)
	com.fivetran.shaded.reactor.core.publisher.DrainUtils::postCompleteDrain(DrainUtils.java::132)
	com.fivetran.shaded.reactor.core.publisher.DrainUtils::postComplete(DrainUtils.java::187)
	com.fivetran.shaded.reactor.core.publisher.FluxMapSignal$FluxMapSignalSubscriber::onComplete(FluxMapSignal.java::226)
	com.fivetran.shaded.reactor.core.publisher.Operators$ScalarSubscription::request(Operators.java::2400)
	com.fivetran.shaded.reactor.core.publisher.FluxMapSignal$FluxMapSignalSubscriber::request(FluxMapSignal.java::238)
	com.fivetran.shaded.reactor.core.publisher.FluxFlatMap$FlatMapMain::onSubscribe(FluxFlatMap.java::371)
	com.fivetran.shaded.reactor.core.publisher.FluxMapSignal$FluxMapSignalSubscriber::onSubscribe(FluxMapSignal.java::122)
	com.fivetran.shaded.reactor.core.publisher.FluxJust::subscribe(FluxJust.java::68)
	com.fivetran.shaded.reactor.core.publisher.Mono::subscribe(Mono.java::4397)
	com.fivetran.shaded.reactor.core.publisher.FluxFlatMap::trySubscribeScalarMap(FluxFlatMap.java::203)
	com.fivetran.shaded.reactor.core.publisher.MonoFlatMap::subscribeOrReturn(MonoFlatMap.java::53)
	com.fivetran.shaded.reactor.core.publisher.InternalMonoOperator::subscribe(InternalMonoOperator.java::57)
	com.fivetran.shaded.reactor.core.publisher.MonoDefer::subscribe(MonoDefer.java::52)
	com.fivetran.shaded.reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber::resubscribe(FluxRetryWhen.java::216)
	com.fivetran.shaded.reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber::onNext(FluxRetryWhen.java::269)
	com.fivetran.shaded.reactor.core.publisher.FluxFlatMap$FlatMapMain::tryEmit(FluxFlatMap.java::543)
	com.fivetran.shaded.reactor.core.publisher.FluxFlatMap$FlatMapInner::onNext(FluxFlatMap.java::984)
	com.fivetran.shaded.reactor.core.publisher.MonoDelay$MonoDelayRunnable::propagateDelay(MonoDelay.java::271)
	com.fivetran.shaded.reactor.core.publisher.MonoDelay$MonoDelayRunnable::run(MonoDelay.java::286)
	com.fivetran.shaded.reactor.core.scheduler.SchedulerTask::call(SchedulerTask.java::68)
	com.fivetran.shaded.reactor.core.scheduler.SchedulerTask::call(SchedulerTask.java::28)
	java.util.concurrent.FutureTask::run(FutureTask.java::264)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask::run(ScheduledThreadPoolExecutor.java::304)
	java.util.concurrent.ThreadPoolExecutor::runWorker(ThreadPoolExecutor.java::1128)
	java.util.concurrent.ThreadPoolExecutor$Worker::run(ThreadPoolExecutor.java::628)
	java.lang.Thread::run(Thread.java::834)

To Reproduce

  1. Create CosmosClient
  2. Make ChangeFeelPull query from CodeSnippet below:
  3. Observe stacktrace after client.close() is called.

Code Snippet

try  {
     CosmosContainer container = cosmosClientUtils.getCosmosContainer(containerId);
     CosmosChangeFeedRequestOptions options =
                CosmosChangeFeedRequestOptions.createForProcessingFromContinuation(
                        state.getChangeFeedContinuationToken().get());
        Iterator<FeedResponse<JsonNode>> responseIterator =
                container.queryChangeFeed(options, JsonNode.class).iterableByPage().iterator();
        do {
            FeedResponse<JsonNode> response = responseIterator.next();
            List<JsonNode> results = response.getResults();
            if (!results.isEmpty()) {
                // process results
            }
            state.setChangeFeedContinuationToken(response.getContinuationToken());
        } while (responseIterator.hasNext());
} catch (Exception ex) {
  / /handle exceptions
} finally {
   client.close();   // Close the CosmosClient
}              

Expected behavior CosmosClient closes successfully and we don't see these stacktraces

Screenshots N/A

Setup (please complete the following information):

  • OS: macOS Monterey 12.4
  • IDE: IntelliJ
  • Library/Libraries: com.azure:azure-cosmos:4.33.0, com.azure:azure-core:1.30.0
  • Java version: Java 11
  • App Server/Environment: N/A
  • Frameworks: N/A

If you suspect a dependency version mismatch (e.g. you see NoClassDefFoundError, NoSuchMethodError or similar), please check out Troubleshoot dependency version conflict article first. If it doesn't provide solution for the problem, please provide:

  • verbose dependency tree (mvn dependency:tree -Dverbose)
  • exception message, full stack trace, and any available logs

Additional context Add any other context about the problem here.

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • [x] Bug Description Added
  • [x] Repro Steps Added
  • [x] Setup information Added

fivetran-kevinzhao avatar Sep 15 '22 22:09 fivetran-kevinzhao

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @kushagraThapar, @TheovanKraay

ghost avatar Sep 16 '22 20:09 ghost

@kushagraThapar for azure-core and REST based SDKs we're working to improve this scenario with the "Sync Stack" initiative https://github.com/orgs/Azure/projects/119/views/11. It would be good to talk about how Cosmos could potential benefit from this pattern / work.

/cc @alzimmermsft

joshfree avatar Sep 16 '22 20:09 joshfree

@joshfree thanks for the pointer, we did talk about this in the past, will re-connect again. @xinlian12 can you please take a look at this issue?

kushagraThapar avatar Sep 19 '22 23:09 kushagraThapar

Thank you all for taking a look at this. From my search, this looks to be a recurring issue: https://github.com/microsoft/kafka-connect-cosmosdb/issues/290 https://github.com/microsoft/kafka-connect-cosmosdb/issues/253

Those issues above were printing the same stack traces and both relate to closing the CosmosClient. Both issues were also eventually closed due to a lack of reproducibility and the solution seemed to be just to update the Java SDK. However, my SDK version is one of the latest from this summer.

fivetran-kevinzhao avatar Sep 20 '22 18:09 fivetran-kevinzhao

This can happen when the client is closed but there are still operations running in the background on the SDK (i.e. subscribe calls). The sample you are using makes use of async client, and the shutdown part in the sample has the risk of throwing these errors. We'll adjust the sample.

TheovanKraay avatar Sep 20 '22 18:09 TheovanKraay

@fivetran-kevinzhao while you are closing the client, is there any other requests happening on a different thread? the stack traces kind hinted that there are still some requests in progress when the client is closed (and it could be read, query etc)

And for the "stacktrace like below but over and over again" part, we think there are some improvements can be done related to the retry behavior when client is closed, here is the tracking item: https://github.com/Azure/azure-sdk-for-java/issues/31062

xinlian12 avatar Sep 21 '22 00:09 xinlian12

@xinlian12 Thanks for getting back to me. While I'm not explicitly making the calls, I suspect the change feel pull queries I'm making are still active in other threads.

This would be the only call I'm making:

try  {
     CosmosContainer container = cosmosClientUtils.getCosmosContainer(containerId);
     CosmosChangeFeedRequestOptions options =
                CosmosChangeFeedRequestOptions.createForProcessingFromContinuation(
                        state.getChangeFeedContinuationToken().get());
        Iterator<FeedResponse<JsonNode>> responseIterator =
                container.queryChangeFeed(options, JsonNode.class).iterableByPage().iterator();
        do {
            FeedResponse<JsonNode> response = responseIterator.next();
            List<JsonNode> results = response.getResults();
            if (!results.isEmpty()) {
                // process results
            }
            state.setChangeFeedContinuationToken(response.getContinuationToken());
        } while (responseIterator.hasNext());
} catch (Exception ex) {
  / /handle exceptions
}       

Is there a way to "close" the process started by container.queryChangeFeed() , I suspect it's always streaming even after the code exits the while loop.

Like @TheovanKraay mentioned, the above code was inspired by Cosmos's official github sample https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples/blob/main/src/main/java/com/azure/cosmos/examples/changefeedpull/SampleChangeFeedPullModel.java

fivetran-kevinzhao avatar Sep 21 '22 18:09 fivetran-kevinzhao

@xinlian12 Just wanted to know when we could expect a fix for this issue.

In the meantime, do you think I can get away with not invoking client.close() after I"m done with Cosmos operations?

fivetran-kevinzhao avatar Sep 28 '22 18:09 fivetran-kevinzhao

HI @fivetran-kevinzhao I tried the changeFeed pull locally, but somehow I am not able to repro the issue, there is no exception thrown if I call client.close after I exit the loop. If possible, can you provide a repro? it will help me to investigate the issue faster

And regarding not invoking client.close() -- it could means some resources will be held for a long time, like TCP connections etc

xinlian12 avatar Oct 01 '22 04:10 xinlian12

@xinlian12 That's good to know. Thank you for investigating.

If you're not able to reproduce using the code snippet I provided above then there must be some query somewhere preventing me from closing the client silently. I thought it was the change feed query that stayed open after the while loop but maybe there's something else in my threads. I'll have to investigate myself.

If you know a way to let me see any active connections or queries that are being made by the CosmosClient please let me know.

fivetran-kevinzhao avatar Oct 05 '22 15:10 fivetran-kevinzhao

Upon further investigation, it does look like I have multiple threads from the Cosmos client running, despite exiting out of the while loop.

For the screenshot below, I have 4 threads reading the change feed from 4 separate containers that are still running by the time I invoke client.close(). At this point, I have read to the end of the change feed and have exited out of the while loop, but I guess the TCP connection is still open for each of the containers.

Screen Shot 2022-10-10 at 1 15 55 PM

How does one gracefully close the cosmos-rntbd? It is described here: https://github.com/Azure/azure-cosmosdb-java/blob/master/direct-impl/specifications/RntbdTransportClient/RntbdTransportClient.md but it is not immediately clear how it could be manipulated by the SDK user.

@xinlian12 Those cosmos-rntbd and cosmos-parallel are still active by the time client.close() gets invoked, resulting in the stacktrace being throws inside those threads. I'm unable to reproduce this stacktrace in the main method, but I get it consistently when running our actual process.

fivetran-kevinzhao avatar Oct 10 '22 17:10 fivetran-kevinzhao

Okay, I finally realized why those cosmos-rntbd threads stayed open despite exiting out of the change feed loop.

Turns out the issue is not related to the change feed or closing the client, but to a query that I was making earlier on in my process.

Earlier in my code, I have a query SELECT * from c. The thing is I only really needed to get the first page of the results. I erroneously thought that the TCP connection to cosmos would terminate if I returned from that function after the first iteration or if the container object was GCed. But it now turns out that the TCP connection was kept open in the backend and it was still trying to fetch everything from that query, despite the fact that I got everything I needed and was ready to close the client. Yikes.

My simple fix was to replace the select all query with SELECT * from c offset 0 limit 100, since I just need the first page of the results anyways. This ensures that the TCP connection will have no more data to fetch after I read the first page with the 100 results. Later on, when I close the client, the client will close gracefully and not complain about the sudden closure.

Thank you @xinlian12 for taking a look initially and sorry for the trouble. Marking this as closed.

fivetran-kevinzhao avatar Oct 10 '22 21:10 fivetran-kevinzhao