Ming
Ming copied to clipboard
Creating Process-wise ThreadLocalODMSession and MappedClass
class WikiPage(MappedClass): class mongometa: session = session name = 'wiki_page'
_id = FieldProperty(schema.ObjectId)
title = FieldProperty(schema.String(required=True))
text = FieldProperty(schema.String(if_missing=''))
I am developing a multiprocess app . For each process, I would like to create a new ThreadLocalODMSession.
Given the above declarative method, how would I bind the MappedClass to different session?
I have already tried numerous ways but none seem to work.
I don't think I've tried this exactly, but a few ideas: when you declare the session variable, can you check there which process it is already and set it to a different connection? Otherwise if you need to change it dynamically later, I believe you can set session.bind to a new datastore connection at any time and it will use that then. Hope that helps.
from ming import create_datastore
from ming.odm import ThreadLocalODMSession
from ming import schema
from ming.odm import MappedClass
from ming.odm import FieldProperty, ForeignIdProperty
session = ThreadLocalODMSession(
bind=create_datastore('mongodb://localhost:27017/test_1')
)
class WikiPage(MappedClass):
class __mongometa__:
session = session
name = 'wiki_page'
_id = FieldProperty(schema.ObjectId)
title = FieldProperty(schema.String(required=True))
text = FieldProperty(schema.String(if_missing=''))
# Insert into db 'test_1'
wp = WikiPage(title='This is first title', text='This is the first text')
session.flush()
# Insert into db 'test_2'
session.bind = create_datastore('mongodb://localhost:27017/test_2')
wp = WikiPage(title='This is second title', text='This is the second text')
session.flush()
I expected the above code snippet to insert into test_2 db after I changed the bind to new datastore connection. But this didn't happen. Am i missing anything here?
@KshitizGIT session it's just a thread local wrapper to the real session. So I think that what you are looking for is session._get().bind = ...
Btw my suggestion would be to use the ContextualODMSession instead of ThreadLocalODMSession and have each process use a different identifier.
Most obvious usage would probably be ContextualODMSession(os.getpid) but I'm not aware of the overhead of getpid given it's probably implemented over the getpid system call and thus involves a whole kernel space call.
@KshitizGIT I ran into this issue too.
Changing the session property on __mongometa__ nested class didn't do anything.
also
session._get().bind = ... raises AttributeError because it's a readonly property
and even after changing it externally through
def fset(self, v): self.impl.bind = v
def fget(self): return self.impl.bind
type(s._get()).bind = property(fget=fget, fset=fset)
s._get().bind = Session(create_datastore('mongodb://127.0.0.1:27017/s'))
I didn't see any difference (I tested only through query tough).
Then I figured out the solution:
from ming.odm import Mapper
session2 = ThreadLocalODMSession(bind=create_datastore('mongodb://127.0.0.1:27017/test'))
for _mapper in Mapper.all_mappers():
_mapper.session = session2
_mapper.mapped_class.query.session = session2
_mapper._compiled = False
_mapper.compile()
_mapper.session.ensure_indexes(_mapper.collection)
I double checked this is working, the id() of the proxied session is different on every process
I also had some trouble trying what's described here https://ming.readthedocs.io/en/latest/baselevel.html#other-sessions (another issue should be created for that)
Anyway there's something else weird:
Here's a minimal example that shows the issue
Python 3.10.2 (main, Jan 15 2022, 19:56:27) [GCC 11.1.0]
Type 'copyright', 'credits' or 'license' for more information
IPython 8.1.1 -- An enhanced Interactive Python. Type '?' for help.
In [1]: from ming import schema, create_datastore
...: from ming.odm import MappedClass, Mapper, ThreadLocalODMSession
...: from ming.odm import FieldProperty, ForeignIdProperty
In [2]: from os import getpid
In [3]: import multiprocessing
In [4]: from bson import ObjectId
In [5]: s = ThreadLocalODMSession(bind=create_datastore('mongodb://db:27017/mingtest'))
In [6]: def adapt_to_session(s):
...: for _mapper in Mapper.all_mappers():
...: _mapper.session = s
...: _mapper.mapped_class.query.session = s
...: _mapper._compiled = False
...: _mapper.compile()
...: _mapper.session.ensure_indexes(_mapper.collection)
...:
In [7]: class TestCol(MappedClass):
...: class __mongometa__:
...: session = s
...: name = 'testcol'
...:
...: _id = FieldProperty(schema.ObjectId)
...: a = FieldProperty(schema.Int)
...: b = FieldProperty(schema.Int)
...:
In [8]: def target(_id, _type):
...: s = ThreadLocalODMSession(bind=create_datastore('mongodb://db:27017/mingtest'))
...: adapt_to_session(s)
...: t = TestCol.query.get(_id)
...: if _type == 'A':
...: t.a = getpid()
...: else:
...: t.b = getpid()
...: s.flush_all()
...: s.close()
...:
In [9]: for i in range(100):
...: _id = TestCol(a=0, b=0)._id
...: s.flush_all()
...: proc1 = multiprocessing.Process(target=target, args=(_id, 'A'))
...: proc2 = multiprocessing.Process(target=target, args=(_id, 'B'))
...: proc1.start()
...: proc2.start()
...:
In [10]: for i in range(100):
...: _id = TestCol(a=0, b=0)._id
...: s.flush_all()
...: proc1 = multiprocessing.Process(target=target, args=(_id, 'A'))
...: proc2 = multiprocessing.Process(target=target, args=(_id, 'B'))
...: proc1.start()
...: proc2.start()
...:
In [11]:
And it seems to run fine.
but if you inspect in a mongo shell, if can find out that either a or b is still 0
> db.testcol.find({$or: [{a: 0}, {b: 0}]})
{ "_id" : ObjectId("6232017bb7957cba03f5f295"), "a" : 0, "b" : 404322 }
{ "_id" : ObjectId("6232017cb7957cba03f5f2a2"), "a" : 0, "b" : 404484 }
{ "_id" : ObjectId("6232017cb7957cba03f5f2a5"), "a" : 0, "b" : 404544 }
{ "_id" : ObjectId("6232017cb7957cba03f5f2b4"), "a" : 404745, "b" : 0 }
{ "_id" : ObjectId("6232017db7957cba03f5f2bf"), "a" : 0, "b" : 404896 }
{ "_id" : ObjectId("6232017db7957cba03f5f2d6"), "a" : 405192, "b" : 0 }
{ "_id" : ObjectId("6232017db7957cba03f5f2d8"), "a" : 405217, "b" : 0 }
{ "_id" : ObjectId("6232017db7957cba03f5f2df"), "a" : 0, "b" : 405317 }
{ "_id" : ObjectId("6232017eb7957cba03f5f2e7"), "a" : 405414, "b" : 0 }
{ "_id" : ObjectId("6232017eb7957cba03f5f2e8"), "a" : 405427, "b" : 0 }
{ "_id" : ObjectId("6232017eb7957cba03f5f2ec"), "a" : 405477, "b" : 0 }
{ "_id" : ObjectId("6232017eb7957cba03f5f2f5"), "a" : 0, "b" : 405619 }
{ "_id" : ObjectId("62320199b7957cba03f5f2f8"), "a" : 405694, "b" : 0 }
{ "_id" : ObjectId("62320199b7957cba03f5f2f9"), "a" : 0, "b" : 405709 }
{ "_id" : ObjectId("62320199b7957cba03f5f2fc"), "a" : 405736, "b" : 0 }
{ "_id" : ObjectId("62320199b7957cba03f5f2fe"), "a" : 405772, "b" : 0 }
{ "_id" : ObjectId("6232019ab7957cba03f5f307"), "a" : 0, "b" : 405894 }
{ "_id" : ObjectId("6232019ab7957cba03f5f309"), "a" : 0, "b" : 405908 }
{ "_id" : ObjectId("6232019ab7957cba03f5f30d"), "a" : 0, "b" : 405964 }
{ "_id" : ObjectId("6232019ab7957cba03f5f311"), "a" : 406006, "b" : 0 }
Type "it" for more
I can't figure out why this is happening, I feel stuck.
I am using versions: pymongo 3.11.4 ming 0.11.2 mongo 5.0.2
In [13]: def target(_id, _type):
...: db = MongoClient()
...: db.testdb.testcol.update_one({'_id': _id}, update={"$set": {_type: getpid()}})
...:
In [14]: for i in range(100):
...: _id = db.testdb.testcol.insert_one({'a': 0, 'b': 0}).inserted_id
...: proc1 = multiprocessing.Process(target=target, args=(_id, 'a'))
...: proc2 = multiprocessing.Process(target=target, args=(_id, 'b'))
...: proc1.start()
...: proc2.start()
with pymongo only it works fine, every field have both a and b ne 0.
In my use case I cannot drop the odm.
making assumptions based on the sourcecode:
-
__mongometa__.sessionis used only to create the mapper - the actual update query is done here
- so the update changes all the properties of the object, not just what was changed
- so what is going on is that the other process updated the database between the first fetched and flushed their change
How can we overcome this issue then?
- each object has state (new, clean, dirty, deleted) under
__ming__property - the state have an original_document property (can validation make difference? yes, eg: datetime)
- the tracker just keeps track of what objects get dirty (soil) through the instrumentation of the object, it doesn't care about which fields were changed
- here's the object being flushed whithin the mapper with associated state
- we could update only the fields that actually changed, to detect what changed, I tried in this way:
def doc_to_set(doc):
def to_hashable(v):
if isinstance(v, list):
return tuple((to_hashable(sv) for sv in v))
elif isinstance(v, dict):
return tuple(((to_hashable(k), to_hashable(sv))
for k, sv in sorted(v.items())))
elif hasattr(v, '__hash__'):
return v
else:
return v
return set((k, to_hashable(v)) for k, v in doc.copy().items())
fields = tuple(set((k for k, v in
doc_to_set(state.original_document)
^ doc_to_set(state.document))))
ming test suite is now saying OK, but it's not...
it seems it doesn't detect changes around instrumented list, why? because original_document is already mutated! ouch, how did that happen?
this comment makes me feel bad
I used remote_pdb to get the id of that instrumented list (that under the state.*document isn't instrumented) and they're the same object!
trying to deepcopy then... it works.
when two processes change the same field, the latter wins.
this can seem obvious, but for example calling append on a Ilist doesn't $push into the array when the session is flushed, the whole list is replaced instead.
so, before this change atomic operations and uow were always in conflict. now there is a chance: an atomic update doesn't conflict with the unit of work, as long as they operate on different fields you must still remember to rollback your atomic operations manually if flushing the session failed and vice versa
I'm preparing a PR out of this