'bulk_write' behavior does not match pymongo
The 'bulk_write' behavior in 'mongomock' is not working as expected.
When I execute the following test using pymongo...
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from pymongo import UpdateOne
from pymongo.errors import BulkWriteError, PyMongoError
from pymongo.mongo_client import MongoClient
from pymongo import DESCENDING
from typing import Any, Dict, List, Mapping, Optional
def filter_fn(msg: dict) -> Mapping[str, Any]:
return {"meter_id": msg['meter_id'], 'source_timestamp_utc': {'$lt': msg['source_timestamp_utc']}}
class UpsertStatusType(Enum):
INSERTED = 1
UPDATED = 2
REJECTED = 3
ERROR = 4
@dataclass
class UpsertResult:
idx: int
message: Dict[str, Any]
status: UpsertStatusType
error: Optional[Dict] = None
class UpsertError(Exception):
pass
source_timestamp_1 = datetime(2023, 2, 1)
source_timestamp_2 = datetime(2023, 2, 2)
source_timestamp_3 = datetime(2023, 2, 3)
DATABASE_NAME = 'eu_b2b_ci'
COLL_CONSUMPTION = 'consumption'
mongo_client = MongoClient(f'mongodb://localhost:27017/{DATABASE_NAME}')
mongo_db = mongo_client.get_database()
# initial setup
mongo_client.drop_database(DATABASE_NAME)
mongo_client.get_default_database().create_collection(COLL_CONSUMPTION)
consumption_coll = mongo_client.get_default_database().get_collection(COLL_CONSUMPTION)
consumption_coll.create_index([('meter_id', DESCENDING)], unique=True)
consumption_coll.insert_one({'meter_id': 'AAA', 'source_timestamp_utc': source_timestamp_2, 'value_kwh': 11})
consumption_coll.insert_one({'meter_id': 'BBB', 'source_timestamp_utc': source_timestamp_3, 'value_kwh': 11})
consumption_coll.insert_one({'meter_id': 'CCC', 'source_timestamp_utc': source_timestamp_3, 'value_kwh': 11})
consumption_coll.insert_one({'meter_id': 'DDD', 'source_timestamp_utc': source_timestamp_1, 'value_kwh': 11})
consumption_coll.insert_one({'meter_id': 'EEE', 'source_timestamp_utc': source_timestamp_1, 'value_kwh': 11})
consumption_coll.insert_one({'meter_id': 'FFF', 'source_timestamp_utc': source_timestamp_1, 'value_kwh': 11})
messages = [
# should be updated
{'meter_id': 'DDD', 'source_timestamp_utc': source_timestamp_2, 'value_kwh': 22},
{'meter_id': 'EEE', 'source_timestamp_utc': source_timestamp_2, 'value_kwh': 22},
{'meter_id': 'FFF', 'source_timestamp_utc': source_timestamp_2, 'value_kwh': 22},
# should not be updated
{'meter_id': 'AAA', 'source_timestamp_utc': source_timestamp_2, 'value_kwh': 22},
{'meter_id': 'BBB', 'source_timestamp_utc': source_timestamp_2, 'value_kwh': 22},
{'meter_id': 'CCC', 'source_timestamp_utc': source_timestamp_2, 'value_kwh': 22},
# should be inserted
{'meter_id': 'GGG', 'source_timestamp_utc': source_timestamp_2, 'value_kwh': 22},
{'meter_id': 'HHH', 'source_timestamp_utc': source_timestamp_2, 'value_kwh': 22},
{'meter_id': 'III', 'source_timestamp_utc': source_timestamp_2, 'value_kwh': 22}
]
upsert_requests: List[UpdateOne] = []
bulk_upsert_result: List[UpsertResult] = []
for idx, message in enumerate(messages):
upsert_requests.append(UpdateOne(filter=filter_fn(message), update={"$set": message}, upsert=True))
bulk_upsert_result.append(UpsertResult(idx, message, UpsertStatusType.UPDATED))
try:
bulk_write_result = mongo_db[COLL_CONSUMPTION].bulk_write(upsert_requests, ordered=False)
if bulk_write_result.upserted_ids is not None:
for k, _ in bulk_write_result.upserted_ids.items():
bulk_upsert_result[k].status = UpsertStatusType.INSERTED
except BulkWriteError as bwe:
write_concerns = bwe.details.get('writeConcernErrors')
if write_concerns is not None and len(write_concerns) > 0:
raise UpsertError(bwe)
write_errors = bwe.details.get('writeErrors')
if write_errors is not None:
for write_error in write_errors:
idx = write_error.get('index')
if write_error.get('code') == 11000:
bulk_upsert_result[idx].status = UpsertStatusType.REJECTED
else:
bulk_upsert_result[idx].status = UpsertStatusType.ERROR
bulk_upsert_result[idx].error = write_error
upserted = bwe.details.get('upserted')
if upserted is not None:
for upsert in upserted:
idx = upsert['index']
bulk_upsert_result[idx].status = UpsertStatusType.INSERTED
except PyMongoError as pme:
raise UpsertError(pme)
for result in bulk_upsert_result:
print(result)
I get the following results:
'meter_id': 'DDD', 'source_timestamp_utc': datetime(2023, 2, 2, 0, 0), 'value_kwh': 22, status=UPDATED 'meter_id': 'EEE', 'source_timestamp_utc': datetime(2023, 2, 2, 0, 0), 'value_kwh': 22, status=UPDATED 'meter_id': 'FFF', 'source_timestamp_utc': datetime(2023, 2, 2, 0, 0), 'value_kwh': 22, status=UPDATED 'meter_id': 'AAA', 'source_timestamp_utc': datetime(2023, 2, 2, 0, 0), 'value_kwh': 22, status=REJECTED 'meter_id': 'BBB', 'source_timestamp_utc': datetime(2023, 2, 2, 0, 0), 'value_kwh': 22, status=REJECTED 'meter_id': 'CCC', 'source_timestamp_utc': datetime(2023, 2, 2, 0, 0), 'value_kwh': 22, status=REJECTED 'meter_id': 'GGG', 'source_timestamp_utc': datetime(2023, 2, 2, 0, 0), 'value_kwh': 22, status=INSERTED 'meter_id': 'HHH', 'source_timestamp_utc': datetime(2023, 2, 2, 0, 0), 'value_kwh': 22, status=INSERTED 'meter_id': 'III', 'source_timestamp_utc': datetime(2023, 2, 2, 0, 0), 'value_kwh': 22, status=INSERTED

If I execute the same test using mongomock the behavior is not the same. Records for meter_ids DDD, EEE and FFF are not updated because a duplicate key error. It appears that the filter is not being applied correctly. Also the three records inserted do not have an ObjectId as the _id although this is less important.
{'meter_id': 'AAA', 'source_timestamp_utc': datetime(2023, 2, 2, 0, 0), 'value': 11, '_id': ObjectId('63f8d5a15a6f6c4ce9b51ef6')} {'meter_id': 'BBB', 'source_timestamp_utc': datetime(2023, 2, 3, 0, 0), 'value': 11, '_id': ObjectId('63f8d5a15a6f6c4ce9b51ef7')} {'meter_id': 'CCC', 'source_timestamp_utc': datetime(2023, 2, 3, 0, 0), 'value': 11, '_id': ObjectId('63f8d5a15a6f6c4ce9b51ef8')} {'meter_id': 'DDD', 'source_timestamp_utc': datetime(2023, 2, 1, 0, 0), 'value': 11, '_id': ObjectId('63f8d5a15a6f6c4ce9b51ef9')} {'meter_id': 'EEE', 'source_timestamp_utc': datetime(2023, 2, 1, 0, 0), 'value': 11, '_id': ObjectId('63f8d5a15a6f6c4ce9b51efa')} {'meter_id': 'FFF', 'source_timestamp_utc': datetime(2023, 2, 1, 0, 0), 'value': 11, '_id': ObjectId('63f8d5a15a6f6c4ce9b51efb')} {'_id': 'G', 'meter_id': 'GGG', 'source_timestamp_utc': datetime(2023, 2, 2, 0, 0), 'value': 22} {'_id': 'H', 'meter_id': 'HHH', 'source_timestamp_utc': datetime(2023, 2, 2, 0, 0), 'value': 22} {'_id': 'I', 'meter_id': 'III', 'source_timestamp_utc': datetime(2023, 2, 2, 0, 0), 'value': 22}
For the records inserted the BulkWriteError 'upserted' details are also not consistent with pymongo as the indexes do not match. The indexes are 0, 1 and 2 instead of 6, 7, and 8.
