Ming icon indicating copy to clipboard operation
Ming copied to clipboard

Creating Process-wise ThreadLocalODMSession and MappedClass

Open KshitizGIT opened this issue 7 years ago • 6 comments

class WikiPage(MappedClass): class mongometa: session = session name = 'wiki_page'

_id = FieldProperty(schema.ObjectId)
title = FieldProperty(schema.String(required=True))
text = FieldProperty(schema.String(if_missing=''))

I am developing a multiprocess app . For each process, I would like to create a new ThreadLocalODMSession.

Given the above declarative method, how would I bind the MappedClass to different session?

I have already tried numerous ways but none seem to work.

KshitizGIT avatar Sep 15 '18 02:09 KshitizGIT

I don't think I've tried this exactly, but a few ideas: when you declare the session variable, can you check there which process it is already and set it to a different connection? Otherwise if you need to change it dynamically later, I believe you can set session.bind to a new datastore connection at any time and it will use that then. Hope that helps.

brondsem avatar Sep 21 '18 00:09 brondsem

from ming import create_datastore
from ming.odm import ThreadLocalODMSession
from ming import schema
from ming.odm import MappedClass
from ming.odm import FieldProperty, ForeignIdProperty


session = ThreadLocalODMSession(
        bind=create_datastore('mongodb://localhost:27017/test_1')
        )


class WikiPage(MappedClass):
    class __mongometa__:
        session = session
        name = 'wiki_page'

    _id = FieldProperty(schema.ObjectId)
    title = FieldProperty(schema.String(required=True))
    text = FieldProperty(schema.String(if_missing=''))

# Insert into db 'test_1' 
wp = WikiPage(title='This is first title', text='This is the first text')
session.flush()
# Insert into db 'test_2'
session.bind = create_datastore('mongodb://localhost:27017/test_2')
wp = WikiPage(title='This is second title', text='This is the second text')
session.flush()

I expected the above code snippet to insert into test_2 db after I changed the bind to new datastore connection. But this didn't happen. Am i missing anything here?

KshitizGIT avatar Sep 29 '18 14:09 KshitizGIT

@KshitizGIT session it's just a thread local wrapper to the real session. So I think that what you are looking for is session._get().bind = ...

amol- avatar Jan 09 '19 19:01 amol-

Btw my suggestion would be to use the ContextualODMSession instead of ThreadLocalODMSession and have each process use a different identifier. Most obvious usage would probably be ContextualODMSession(os.getpid) but I'm not aware of the overhead of getpid given it's probably implemented over the getpid system call and thus involves a whole kernel space call.

amol- avatar Jan 09 '19 19:01 amol-

@KshitizGIT I ran into this issue too. Changing the session property on __mongometa__ nested class didn't do anything. also session._get().bind = ... raises AttributeError because it's a readonly property and even after changing it externally through

def fset(self, v): self.impl.bind = v
def fget(self): return self.impl.bind
type(s._get()).bind = property(fget=fget, fset=fset)
s._get().bind = Session(create_datastore('mongodb://127.0.0.1:27017/s'))

I didn't see any difference (I tested only through query tough).

Then I figured out the solution:

from ming.odm import Mapper
session2 = ThreadLocalODMSession(bind=create_datastore('mongodb://127.0.0.1:27017/test'))
for _mapper in Mapper.all_mappers():
    _mapper.session = session2
    _mapper.mapped_class.query.session = session2
    _mapper._compiled = False
    _mapper.compile()
    _mapper.session.ensure_indexes(_mapper.collection)

I double checked this is working, the id() of the proxied session is different on every process

I also had some trouble trying what's described here https://ming.readthedocs.io/en/latest/baselevel.html#other-sessions (another issue should be created for that)

Anyway there's something else weird:


Here's a minimal example that shows the issue

Python 3.10.2 (main, Jan 15 2022, 19:56:27) [GCC 11.1.0]
Type 'copyright', 'credits' or 'license' for more information
IPython 8.1.1 -- An enhanced Interactive Python. Type '?' for help.

In [1]: from ming import schema, create_datastore
   ...: from ming.odm import MappedClass, Mapper, ThreadLocalODMSession
   ...: from ming.odm import FieldProperty, ForeignIdProperty

In [2]: from os import getpid

In [3]: import multiprocessing

In [4]: from bson import ObjectId

In [5]: s = ThreadLocalODMSession(bind=create_datastore('mongodb://db:27017/mingtest'))

In [6]: def adapt_to_session(s):
   ...:     for _mapper in Mapper.all_mappers():
   ...:         _mapper.session = s
   ...:         _mapper.mapped_class.query.session = s
   ...:         _mapper._compiled = False
   ...:         _mapper.compile()
   ...:         _mapper.session.ensure_indexes(_mapper.collection)
   ...: 

In [7]: class TestCol(MappedClass):
   ...:     class __mongometa__:
   ...:         session = s
   ...:         name = 'testcol'
   ...: 
   ...:     _id = FieldProperty(schema.ObjectId)
   ...:     a = FieldProperty(schema.Int)
   ...:     b = FieldProperty(schema.Int)
   ...: 

In [8]: def target(_id, _type):
   ...:     s = ThreadLocalODMSession(bind=create_datastore('mongodb://db:27017/mingtest'))
   ...:     adapt_to_session(s)
   ...:     t = TestCol.query.get(_id)
   ...:     if _type == 'A':
   ...:         t.a = getpid()
   ...:     else:
   ...:         t.b = getpid()
   ...:     s.flush_all()
   ...:     s.close()
   ...: 

In [9]: for i in range(100):
   ...:     _id = TestCol(a=0, b=0)._id
   ...:     s.flush_all()
   ...:     proc1 = multiprocessing.Process(target=target, args=(_id, 'A'))
   ...:     proc2 = multiprocessing.Process(target=target, args=(_id, 'B'))
   ...:     proc1.start()
   ...:     proc2.start()
   ...: 

In [10]: for i in range(100):
    ...:     _id = TestCol(a=0, b=0)._id
    ...:     s.flush_all()
    ...:     proc1 = multiprocessing.Process(target=target, args=(_id, 'A'))
    ...:     proc2 = multiprocessing.Process(target=target, args=(_id, 'B'))
    ...:     proc1.start()
    ...:     proc2.start()
    ...: 

In [11]: 

And it seems to run fine. but if you inspect in a mongo shell, if can find out that either a or b is still 0

> db.testcol.find({$or: [{a: 0}, {b: 0}]})
{ "_id" : ObjectId("6232017bb7957cba03f5f295"), "a" : 0, "b" : 404322 }
{ "_id" : ObjectId("6232017cb7957cba03f5f2a2"), "a" : 0, "b" : 404484 }
{ "_id" : ObjectId("6232017cb7957cba03f5f2a5"), "a" : 0, "b" : 404544 }
{ "_id" : ObjectId("6232017cb7957cba03f5f2b4"), "a" : 404745, "b" : 0 }
{ "_id" : ObjectId("6232017db7957cba03f5f2bf"), "a" : 0, "b" : 404896 }
{ "_id" : ObjectId("6232017db7957cba03f5f2d6"), "a" : 405192, "b" : 0 }
{ "_id" : ObjectId("6232017db7957cba03f5f2d8"), "a" : 405217, "b" : 0 }
{ "_id" : ObjectId("6232017db7957cba03f5f2df"), "a" : 0, "b" : 405317 }
{ "_id" : ObjectId("6232017eb7957cba03f5f2e7"), "a" : 405414, "b" : 0 }
{ "_id" : ObjectId("6232017eb7957cba03f5f2e8"), "a" : 405427, "b" : 0 }
{ "_id" : ObjectId("6232017eb7957cba03f5f2ec"), "a" : 405477, "b" : 0 }
{ "_id" : ObjectId("6232017eb7957cba03f5f2f5"), "a" : 0, "b" : 405619 }
{ "_id" : ObjectId("62320199b7957cba03f5f2f8"), "a" : 405694, "b" : 0 }
{ "_id" : ObjectId("62320199b7957cba03f5f2f9"), "a" : 0, "b" : 405709 }
{ "_id" : ObjectId("62320199b7957cba03f5f2fc"), "a" : 405736, "b" : 0 }
{ "_id" : ObjectId("62320199b7957cba03f5f2fe"), "a" : 405772, "b" : 0 }
{ "_id" : ObjectId("6232019ab7957cba03f5f307"), "a" : 0, "b" : 405894 }
{ "_id" : ObjectId("6232019ab7957cba03f5f309"), "a" : 0, "b" : 405908 }
{ "_id" : ObjectId("6232019ab7957cba03f5f30d"), "a" : 0, "b" : 405964 }
{ "_id" : ObjectId("6232019ab7957cba03f5f311"), "a" : 406006, "b" : 0 }
Type "it" for more

I can't figure out why this is happening, I feel stuck.

I am using versions: pymongo 3.11.4 ming 0.11.2 mongo 5.0.2

CastixGitHub avatar Mar 16 '22 15:03 CastixGitHub

In [13]: def target(_id, _type):
    ...:     db = MongoClient()
    ...:     db.testdb.testcol.update_one({'_id': _id}, update={"$set": {_type: getpid()}})
    ...: 

In [14]: for i in range(100):
    ...:     _id = db.testdb.testcol.insert_one({'a': 0, 'b': 0}).inserted_id
    ...:     proc1 = multiprocessing.Process(target=target, args=(_id, 'a'))
    ...:     proc2 = multiprocessing.Process(target=target, args=(_id, 'b'))
    ...:     proc1.start()
    ...:     proc2.start()

with pymongo only it works fine, every field have both a and b ne 0. In my use case I cannot drop the odm.


making assumptions based on the sourcecode:

How can we overcome this issue then?

  • each object has state (new, clean, dirty, deleted) under __ming__ property
  • the state have an original_document property (can validation make difference? yes, eg: datetime)
  • the tracker just keeps track of what objects get dirty (soil) through the instrumentation of the object, it doesn't care about which fields were changed
  • here's the object being flushed whithin the mapper with associated state
  • we could update only the fields that actually changed, to detect what changed, I tried in this way:
            def doc_to_set(doc):
                def to_hashable(v):
                    if isinstance(v, list):
                        return tuple((to_hashable(sv) for sv in v))
                    elif isinstance(v, dict):
                        return tuple(((to_hashable(k), to_hashable(sv))
                                      for k, sv in sorted(v.items())))
                    elif hasattr(v, '__hash__'):
                        return v
                    else:
                        return v
                return set((k, to_hashable(v)) for k, v in doc.copy().items())
            fields = tuple(set((k for k, v in
                                doc_to_set(state.original_document)
                                ^ doc_to_set(state.document))))

ming test suite is now saying OK, but it's not... it seems it doesn't detect changes around instrumented list, why? because original_document is already mutated! ouch, how did that happen? this comment makes me feel bad I used remote_pdb to get the id of that instrumented list (that under the state.*document isn't instrumented) and they're the same object! trying to deepcopy then... it works.


when two processes change the same field, the latter wins. this can seem obvious, but for example calling append on a Ilist doesn't $push into the array when the session is flushed, the whole list is replaced instead.

so, before this change atomic operations and uow were always in conflict. now there is a chance: an atomic update doesn't conflict with the unit of work, as long as they operate on different fields you must still remember to rollback your atomic operations manually if flushing the session failed and vice versa

I'm preparing a PR out of this

CastixGitHub avatar Mar 18 '22 12:03 CastixGitHub