rita icon indicating copy to clipboard operation
rita copied to clipboard

Reimplement array upserts in threadsafe manner to prevent duplicate entries

Open Zalgo2462 opened this issue 3 years ago • 1 comments

In many RITA modules, we attempt to perform an upsert into an array by querying to see if a matching record already exists and then issuing a $push or $set based on the result from the query. Unfortunately this causes issues when we have multiple threads performing an array upsert against the same array in MongoDB. Two or more threads may see that there is not an existing record at the same time. As a result, we will end up with duplicate records $push'd into the array.

One way to address this is replacing the array upserts with the following approach. First, submit a $push update with the selector set to not match the resulting $push'd record. If the new array element does not exist in the array, the selector will match, and the element will be $push'd into the array. If the new array element already exists in the array, the selector will not match. Example: http://jthomerson.github.io/mongodb-safe-upserts-array-subdocuments/#/5/1

In order to handle the update portion of the upsert, check if the selector did not match any elements and if it did not, run the corresponding $set operation. Example: http://jthomerson.github.io/mongodb-safe-upserts-array-subdocuments/#/5/2

These changes mainly apply to the host collection updates found in the various RITA packages. In addition to preventing duplicate records, these fixes will likely speed up the analysis modules as well. For example, in the beaconfqdn package, the host collection updates take up over half of the analysis time. These updates would reduce the number of queries from 4 to 1 or 2 depending on how the fix is implemented.

Zalgo2462 avatar Aug 10 '21 03:08 Zalgo2462

For example we might run the following commands for updating host in FQDN beaconing:

// push a new entry in if an entry doesn't exist for the current CID
db["beaconFQDN"].update(
{
    "ip" : "10.55.100.110",
    "network_uuid" : UUID("ffffffff-ffff-ffff-ffff-fffffffffffe"),
    "dat": {"$not": {"$elemMatch": {
        "cid": 1, // current CID
        "max_beacon_fqdn_score": {"$exists": true}, 
    }}},
},
{ 
    "$push": {
        ... (new max_beacon entry)
    }
}
)

Then we could run the following command if the update didn't match anything (implying a record already exists). Alternatively, we could just fire off the next update anyways without hurting anything.

// update an existing entry with the current CID and a lower score or an entry with a matching FQDN
db["beaconFQDN"].update(
{
    "ip" : "10.55.100.110",
    "network_uuid" : UUID("ffffffff-ffff-ffff-ffff-fffffffffffe"),
    "dat": {"$elemMatch": {
        "$or": [
            { // updating the max beacon for the current CID
                "cid": 1,
                "max_beacon_fqdn_score": {"$lt": 0.833},
            },
            { // update the score for a matching beacon from a previous CID
                "mbfqdn": "e7070.g.akamaiedge.net"
            }
         ]
    }},
},
{ 
    "$set": {
        ... (new max_beacon entry details)
    }
}
)

Zalgo2462 avatar Aug 13 '21 01:08 Zalgo2462