Spark-MongoDB icon indicating copy to clipboard operation
Spark-MongoDB copied to clipboard

Connections still not closing (branch-0.11)

Open seddonm1 opened this issue 8 years ago • 9 comments

Hi, I have just built the branch-0.11 with the fix https://github.com/Stratio/Spark-MongoDB/pull/118 applied.

It still appears to be holding connections open. Here is my connection string: MongodbConfigBuilder(Map(Host -> List("servername:27017"), Database -> "myDB", Collection -> "myCollection", ConnectionsTime -> "120000", ConnectionsPerHost -> "10")).build

executed using: sqlContext.fromMongoDB

If i watch the open files from a spark streaming app I can see more and more connections being opened: lsof -p 14169 | grep servername:27017

Is there something else that needs to be configured to allow the scheduler to release these connections?

seddonm1 avatar May 07 '16 03:05 seddonm1

I didn't use it in spark streaming yet. will you try to use it in a normal spark app, will it close the connection? #118 fixes the problem that the ActorSystem gets stuck on exit of the application. And according to your description, maybe you can have a look at this file. It will reuse old connections if the old connection is not busy.

wuciawe avatar May 07 '16 03:05 wuciawe

Hi, I can test without streaming but streaming is the main use case.

After just over 3 hours of 1 minute spark streaming batches (186 batches) I can see 1092 connections to mongo (~5 per batch). Eventually it hits the open file limit for Ubuntu and stops being able to open connections.

Interestingly it has not caused problems with WRITING using saveToMongodb to Mongo which ran at same interval for many days: MongodbConfigBuilder(Map(Host -> List("servername:27017"), Database -> "myDB", Collection ->"myCollection", SamplingRatio -> 1.0, WriteConcern -> "normal", SplitSize -> 8, SplitKey -> "_id", IdAsObjectId -> "false")).build

Perhaps the problem can be diagnosed by comparing how connections are handled.

seddonm1 avatar May 07 '16 04:05 seddonm1

Did you check that file yet? I think it could be caused by keeping some connection in BUSY state which maybe caused by not calling freeConnection() method on every generated writer. Because after a quick searching, I find a suspicious place in the file com.stratio.datasource.mongodb.MongodbRelation,

def insert(data: DataFrame, overwrite: Boolean): Unit = {
  if (overwrite) {
    new MongodbSimpleWriter(config).dropCollection
  }

  data.saveToMongodb(config)
}

The newed MongodbSimpleWriter doesn't call freeConnection(), this maybe a leak and maybe a bug. In your case, maybe some other place there is a leak.

@pfcoperez is this a leak of connection?

wuciawe avatar May 07 '16 05:05 wuciawe

thanks @wuciawe

I think i will have to make do with some hacking with mongoexport and mongoimport and avoid the spark-mongodb package until this type of issue is resolved.

seddonm1 avatar May 07 '16 05:05 seddonm1

@seddonm1 @wuciawe The currently implemented connection pool has a problem related to the fact that there are not destructors in Java nor in Scala. If it were, we could guarantee that a provided connection, extracted from the pool, would close by using something RAII like pattern.

The main problem is that the connection pool is providing Client instances which should be freed explicitly by calling to one of MongoClientFactory#setFreeConnection... methods.

Some spark hooks are being used to automatically call these methods after Spark tasks have finished. However there are other cases of use, as described in your conversation, for which the client is not being freed.

The current approach would work just right if after any possible use of MongoClients they were closed. But that is quite a hazardous assumption provided that there are just too much possible points of need for explicit resource deallocation.

We've decided to follow a new approach: To imitate the way other well known JVM resources pools work. That is, by passing the pool a task to perform and letting it, the pool, to assign it to a connection. Hence, the pool is responsible for resource deallocation instead its client code. That focuses resource deallocation into a single point of responsibility.

We've already started the task of changing the pool implementation but it will take a while. In the mean time, we'll remove the connections pool thus removing all the issues you are finding concerning connection leaks.

I hope this helps.

pfcoperez avatar May 08 '16 17:05 pfcoperez

thanks @pfcoperez. I'm glad you acknowledge the issue and that you have a plan for resolution.

good luck.

seddonm1 avatar May 10 '16 11:05 seddonm1

Hi, The new aproach what @pfcoperez is referring to will be implemented for 0.12.X ASAP. In the meantime, we have added a PR ( #133) to solve the issue.

darroyocazorla avatar May 10 '16 13:05 darroyocazorla

Hi, I am using branch 0.11 by Python API, but when I use it like the following: df = sqlContext.read.format("com.stratio.datasource.mongodb").options(host="serverhost:27017", database="db_name", collection="collection_name").load() It still not close its connections even my spark jobs done. How can I do to force close the connection by Python API?

jaminglam avatar May 12 '16 07:05 jaminglam

Hi @jaminglam #133 has already been merged.

darroyocazorla avatar May 12 '16 13:05 darroyocazorla