mongoengine
mongoengine copied to clipboard
Pymongo UserWarning in multiprocessing
I have this error in my uwsgi logs about multiprocessing
UserWarning: MongoClient opened before fork. Create MongoClient with connect=False, or create client after forking. See PyMongo's documentation for details: http://api.mongodb.org/python/current/faq.html#using-pymongo-with-multiprocessing>
Mongoengine relies on pymongo under the hood, so you should not use connect before forking given this function itself create a pymongo.MongoClient connection (you should have a look at https://github.com/MongoEngine/mongoengine/blob/master/mongoengine/connection.py to better understand how connection is done)
@touilleMan Thanks for your reply, I didn't get your meaning about forking. Can you please explain this part more ?
What I understand from the pymongo FAQ is you should not create a connection before forking into subprocesses, otherwise you'll end up with multiple MongoClient (one per subprocess) with the same opened sockets and thread pool.
To avoid this, you need to first start the subprocesses, then create the MongoClient. Same thing for mongoengine with connect given it create internally a MongoClient
If you would like to learn more about this issue, here is an upstream bug report:
https://jira.mongodb.org/browse/PYTHON-961
You'll find links that explain this in great detail (take a look at the links to Python code and the Blog post!) One thing that is worth knowing is that the error is only known to happen on BSD-style operating systems. Linux and Windows do not seem to have a problem. pymongo decided it was best to warn on all OSes anyway because developers on Linux should be aware that their code won't work on BSD. I thought that was a reasonable decision.
Thanks a lot for your comment. I run my flask app with uwsgi. Actually uwsgi creates the other forks from the main process of my app. Do you have any idea how can I avoid this problem with uwsgi or other application servers.
Thanks again.
With gunicorn you can specify a function to run that will return the wsgi app (i.g. gunicorn "my_app.main:bootstrap_app()"), I guess uWSGI provides a similar mechanism.
The solution is then to call the mongoengine.connect() inside this function.
from flask.ext.mongoengine import MongoEngine
from flask import Flask
app = Flask(__name__)
def bootstrap_app():
app.db = MongoEngine()
app.db.init_app(app)
return app
@itmard, satisfied with this answer? Should we close this issue?
switch_db is just a way to change the default db alias to use (a db alias
is a name leading to a pymongo.MongoClient). So given you create a new
connection in the children, you are creating a new pymongo.MongoClient
(and you should explicitly specify a new alias name not to mix with the
parent connection).
Hence to use the new pymongo.MongoClient, you need to do a switch_db to
change the alias to use (even if under the hood you will access the same
db).
2016-04-11 5:27 GMT+02:00 Ethan Luo [email protected]:
Hi, I am running into similar problems when using MongoEngine together with Luigi, which uses the multiprocessing library for handling multi worker processing.
I think it is related to the question posed in https://jira.mongodb.org/browse/PYTHON-1016. Somehow even if I disconnect the connection in my main process and attempt to instantiate new connections in my subprocesses, it is not working as expected. The URL mentions the workaround but however I find it ambiguous for it in this case is not a switch_db but rather creating a new connection?
— You are receiving this because you were mentioned. Reply to this email directly or view it on GitHub https://github.com/MongoEngine/mongoengine/issues/1234#issuecomment-208146690
@touilleMan Is it possible that I use mongoengine's disconnect in the main process to achieve the same result?
mongoengine disconnect does two things:
- calling
closeon thepymongo.Mongoclientconnection delthe closed connection object to release it associate memory
Based on this, I think you should be able to do the disconnect in the main process before forking instead of using switch_db... but the only way to be sure of that is to try ! ;-)
Tried the solution, but I think it didn't work out. I used luigi which I think will spawn subprocesses for the workers to run the tasks. The code of interest looks something like this.
class AllGameRecords(luigi.WrapperTask):
"""This is the wrapper task to do all the gamelog bootstrapping"""
def requires(self):
seasons = [make_season(s) for s in range(2007, 2008)]
for s in seasons:
yield MongoGamelogTask(season=s)
connect('statsnba')
game_ids = Gamelog.objects().distinct('GAME_ID')
disconnect()
for game_id in game_ids:
yield MongoGameTask(game_id=game_id)
Basically what I am doing was to do some querying in the main process and use the result to do subsequent work. The error log looks like this.
INFO: [pid 1809] Worker Worker(salt=762776263, workers=4, host=Ethans-MacBook-Retina.local, username=Ethan, pid=1800) running MongoGameTask(game_id=0020700653)
INFO: [pid 1810] Worker Worker(salt=762776263, workers=4, host=Ethans-MacBook-Retina.local, username=Ethan, pid=1800) running MongoGameTask(game_id=0020700772)
INFO: [pid 1811] Worker Worker(salt=762776263, workers=4, host=Ethans-MacBook-Retina.local, username=Ethan, pid=1800) running MongoGameTask(game_id=0020700793)
INFO: [pid 1812] Worker Worker(salt=762776263, workers=4, host=Ethans-MacBook-Retina.local, username=Ethan, pid=1800) running MongoGameTask(game_id=0020700160)
/Users/Ethan/anaconda/envs/scraping/lib/python2.7/site-packages/pymongo/topology.py:75: UserWarning: MongoClient opened before fork. Create MongoClient with connect=False, or create client after forking. See PyMongo's documentation for details: http://api.mongodb.org/python/current/faq.html#using-pymongo-with-multiprocessing>
"MongoClient opened before fork. Create MongoClient "
/Users/Ethan/anaconda/envs/scraping/lib/python2.7/site-packages/pymongo/topology.py:75: UserWarning: MongoClient opened before fork. Create MongoClient with connect=False, or create client after forking. See PyMongo's documentation for details: http://api.mongodb.org/python/current/faq.html#using-pymongo-with-multiprocessing>
"MongoClient opened before fork. Create MongoClient "
INFO: [pid 1812] Worker Worker(salt=762776263, workers=4, host=Ethans-MacBook-Retina.local, username=Ethan, pid=1800) done MongoGameTask(game_id=0020700160)
INFO: Informed scheduler that task MongoGameTask(game_id=0020700160) has status DONE
I am not so familiar with multiprocessing but IMHO I believe that I did the disconnect correctly in my setup. Can you tell if I may have made some mistakes in the code?
First thing, you should not define your database name like (i.e. connect('statsnba')), this is an old way of doing pre-pymongo 2.0 and doesn't work anymore (see 83e3c5c7d8545d5ced30ce679fef800aeecbd60c).
So basically doing connect('statsnba') is the same as connect(db='statsnba') which lead to connect() given the argument is never passed to pymongo.MongoClient. This is disturbing and I think we should open an issue about it...
So you should instead connect(host='mongodb://localhost:27017/statsnba') assuming you database run on localhost.
Now for your real trouble, can you try to add this code at the beginning of you MongoGameTask (just before you do the new connect
from mongoengine.connection import _connections
print(_connections)
_connections is a global variable where all the connections are stored (one per alias), given we used disconnect before, it should be empty. But if for any reason the forks are sharing this variable, you'll see it get populated once the first fork does it connect, after that the other forks will use the same connection (given they all use the default alias) which lead to this warning.
Interestingly, with some print debugging I find that disconnect indeed performs as expected but the warning keeps showing up
The log now looks something like this:
DEBUG: Checking if AllGameRecords() is complete
In AllGameRecords Before connect, _connections: {}
In AllGameRecords after connect, _connections: {'default': MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True, read_preference=Primary())}
In AllGameRecords After disconnect, _connections: {}
In AllGameRecords Before connect, _connections: {}
In AllGameRecords after connect, _connections: {'default': MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True, read_preference=Primary())}
In AllGameRecords After disconnect, _connections: {}
...
INFO: Running Worker with 2 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1213
DEBUG: Asking scheduler for work...
INFO: [pid 35234] Worker Worker(salt=644210890, workers=2, host=Ethans-MacBook-Retina.local, username=Ethan, pid=35222) running MongoGameTask(game_id=0020701099)
In MongoGameTask Before connect, _connections: {}
In MongoGameTask After connect, _connections: {'default': MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True, read_preference=Primary())}
DEBUG: Pending tasks: 1212
DEBUG: 2 running tasks, waiting for next task to finish
INFO: [pid 35235] Worker Worker(salt=644210890, workers=2, host=Ethans-MacBook-Retina.local, username=Ethan, pid=35222) running MongoGameTask(game_id=0020700965)
In MongoGameTask Before connect, _connections: {}
In MongoGameTask After connect, _connections: {'default': MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True, read_preference=Primary())}
DEBUG: 2 running tasks, waiting for next task to finish
DEBUG: 2 running tasks, waiting for next task to finish
DEBUG: 2 running tasks, waiting for next task to finish
/Users/Ethan/anaconda/envs/scraping/lib/python2.7/site-packages/pymongo/topology.py:75: UserWarning: MongoClient opened before fork. Create MongoClient with connect=False, or create client after forking. See PyMongo's documentation for details: http://api.mongodb.org/python/current/faq.html#using-pymongo-with-multiprocessing>
"MongoClient opened before fork. Create MongoClient "
INFO: [pid 35234] Worker Worker(salt=644210890, workers=2, host=Ethans-MacBook-Retina.local, username=Ethan, pid=35222) done MongoGameTask(game_id=0020701099)
INFO: Informed scheduler that task MongoGameTask(game_id=0020701099) has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1211
...
I have my connect/disconnect wrapped up with print functions:
class MongoGameTask(luigi.Task):
game_id = luigi.Parameter()
_con = None
def run(self):
print "In MongoGameTask Before connect, _connections: {}".format(_connections)
connect(host='mongodb://localhost:27017/statsnba')
print "In MongoGameTask After connect, _connections: {}".format(_connections)
Game.create_game(self.game_id)
with self.output().open('w') as out_file:
out_file.write(self.task_id)
def output(self):
return luigi.LocalTarget('data/tokens/%s' % self.task_id)
class AllGameRecords(luigi.WrapperTask):
logger = logging.getLogger('AllGameRecords')
"""This is the wrapper task to do all the gamelog bootstrapping"""
def requires(self):
seasons = [make_season(s) for s in range(2007, 2008)]
for s in seasons:
yield MongoGamelogTask(season=s)
print "In AllGameRecords Before connect, _connections: {}".format(_connections)
connect(host='mongodb://localhost:27017/statsnba')
print "In AllGameRecords after connect, _connections: {}".format(_connections)
game_ids = Gamelog.objects().distinct('GAME_ID')
disconnect()
print "In AllGameRecords After disconnect, _connections: {}".format(_connections)
for game_id in game_ids:
yield MongoGameTask(game_id=game_id)
The function where I called the save on the document is Game.create_game, and it is the following:
class Game(Document):
game_id = StringField(required=True, unique=True)
_playbyplay = DictField() # fetched from stats.nba.com
_boxscore = DictField() # fetched from stats.nba.com
home_gamelog = ReferenceField(Gamelog, require=True)
away_gamelog = ReferenceField(Gamelog, require=True)
@classmethod
def create_game(cls, game_id):
boxscore= StatsNBABoxscore.fetch_resource({'GameID': game_id})
pbps = StatsNBAPlayByPlay.fetch_resource({'GameID': game_id})
home_gamelog = Gamelog.objects(GAME_ID=game_id, MATCHUP__contains='vs.').get()
away_gamelog = Gamelog.objects(GAME_ID=game_id, MATCHUP__contains='@').get()
game = Game(game_id=game_id, _playbyplay=pbps, _boxscore=boxscore, home_gamelog=home_gamelog, away_gamelog=away_gamelog)
game.save()
return game
Given that the _connections got released correctly by disconnect, I am not so sure why this is still occuring..
UPDATE
Another problem that I found is that, even if I commented out connect in my MongoGameTask, the create_game function still proceeds without any error saying that connection does not exist (this is usually the case if I want to do some CRUD when I have not called connect before). However, printing _connections gives me an empty dict.
Can you replace connect and disconnect with db = MongoClient().statsnba and db.close(); del db (and of course commenting your Document.save calls).
Basically using pymongo instead of mongoengine to see if the error is still present. If so I guess the trouble is inside the way pymongo handles it connections...
@touilleMan I think when you say calling db.close() you are actually referring to closing the MongoClient?
Also I tried to do some mocking by invoking pymongo insert directly in the subprocess with explicit close and creating MongoClient in the subprocesses. So in my MongoGameTask I did something like
db = client.statsnba
db.gamelog.insert_one({'name':'helloword'})
This runs without error...
During my debugging, I noticed that the warning comes from trying to do some querying of another collection in order to create a reference field.
class Game(Document):
game_id = StringField(required=True, unique=True)
_playbyplay = DictField() # fetched from stats.nba.com
_boxscore = DictField() # fetched from stats.nba.com
home_gamelog = ReferenceField(Gamelog, require=True)
away_gamelog = ReferenceField(Gamelog, require=True)
@classmethod
def create_game(cls, game_id):
boxscore= StatsNBABoxscore.fetch_resource({'GameID': game_id})
pbps = StatsNBAPlayByPlay.fetch_resource({'GameID': game_id})
home_gamelog = Gamelog.objects(GAME_ID=game_id, MATCHUP__contains='vs.').get()
away_gamelog = Gamelog.objects(GAME_ID=game_id, MATCHUP__contains='@').get()
game = Game(game_id=game_id, _playbyplay=pbps, _boxscore=boxscore, home_gamelog=home_gamelog, away_gamelog=away_gamelog)
game.save()
return game
It will issue the warning if I use Gamelog.objects() to find home_gamelog or away_gamelog in order to pass in to Game as a reference field Perhaps this is due to problem with some caching at work?
I think when used with luigi, it behaves rather randomly sometimes. I think it would be best if I also figure out how luigi does their multiprocessing. In the meantime, say if I let the warning continue to be issued on my mac. Will there be any side effect such as data loss? If not, I will just first use it with the warnings turned on.
Hi, i ran into the same problem.
Just using the lazy-apps option when launching uwsgi made that work for me!
@touilleMan two questions:
- you mentioned that one shouldn't use
connect('statsnba')but it seems theregister_connectionfunction assumes a local database if a uri style connection is not provided. https://github.com/MongoEngine/mongoengine/blob/master/mongoengine/connection.py#L49 the docs still show this as a valid way of connecting too: http://docs.mongoengine.org/guide/connecting.html am I missing something? - instead of creating the mongoclient in the child process with an alias and then using switch_db OR running a
disconnectto close the MongoClient on the main process/thread before forking - what if you simply passed theconnect=Falseparameter to theconnectmethod as recommended here: https://api.mongodb.org/python/current/faq.html#using-pymongo-with-multiprocessing would this not work within mongoengine? it seems likeconn_settingsis passed right through to theMongoClienthttps://github.com/MongoEngine/mongoengine/blob/master/mongoengine/connection.py#L154
@AlJohri
- I think the documentation is outdated about
connecttheconnectsignature takesdbthenaliasas params. Thendbis passed to register_connection and stored asconn_settings.name. However thisconn_settings.nameis itself dropped before creating thepymongo.MongoClientinget_connection. So in a nutshell thisdbparam is never used, that's why you should usehostparam instead - I guess
connect=Falsewill makepymongo.MongoClientsocket connection lazy. But given mongoengine is used in the parent process before forking, the socket is going to be initialized anyway. However I didn't test this, so you should definitely try this to find out :+1:
I think I've solved it - see comment in: https://github.com/MongoEngine/mongoengine/issues/1599
Connect False did not help
https://stackoverflow.com/questions/51139089/mongoclient-opened-before-fork-create-mongoclient-only-flask/51217223#51217223
I wonder whether there is a performance penalty if we create client object after forking. That means we are creating MongoClient a thousand times if we create a thousand sub-processes?
from uwsgidecorators import postfork import mongoengine
@postfork def connect_db(): mongoengine.connect("OnFinance")