elasticsearch-river-mongodb icon indicating copy to clipboard operation
elasticsearch-river-mongodb copied to clipboard

Map Reduce triggering drop

Open ecoutu opened this issue 9 years ago • 0 comments

I have been an experiencing an issue:

  • Occasionally, the index that the river was streaming data to would loose all it's documents - going from ~700k to 0 documents.
  • I wipe the index, the river and recreate it - the initial sync works as expected - once the number of documents in MongoDB and ElasticSearch become equal the index loses all the documents again.
  • After losing all the documents after an initial sync, the river continues to stream new documents to ES, going up to 100, a few thousand sometimes, and then after a period of time, will reset to 0, repeating this process.

MongoDB setup:

  • 3 shard cluster, each shard a replica set of 3 - each node on it's own VM
  • MongoDB 3.02, WiredTiger storage backend
  • River is pointed at mongos, proper permissions

ElasticSearch:

  • Basic Ubuntu installation
  • ES 1.4.2
  • mapper-attachments 2.4.3
  • river-mongodb 2.0.9

River config:

      "crawldbRiver": {
        "type": "mongodb",
        "mongodb": {
          "servers": [
            { "host": "xxx", "port": 27017 }
          ],
          "options": {
            "include_fields": ["......."],
            "drop_collection": true
          },
          "credentials": [
            {"db": "admin", "user": "xxx", "password": "xxx"}
          ],
          "db": "xxx",
          "collection": "profiles"
        },
        "index": {
          "name": "profiles",
          "type": "profile"
        }
      },

Also a note, I have two duplicate deployments for staging and production - with one difference, the staging environment has enabled sharding on the profiles collection that is being streamed by the river. It is the sharded staging environment that is experiencing this issue.

I did some digging into this, and it seems to be caused by map reduce jobs (I was able to easily reproduce this by checking the number of documents in ES, hitting our front end view that triggers a map reduce, and then checking the number of documents in ES again - it always dropped back down to 0). The river doesn't seem to be able to differentiate between a drop on the collection being monitored and a drop on other collections.

As a temporary solution, I have set the drop_collection option to false - this has resolved the issue.

You can see in the following ES logs a map reduce query being on the unrelated reviews collection. A temporary collection is created and then renamed, triggering a DROP_COLLECTION operation on the profiles collection.

[2015-07-02 17:49:39,600][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] namespace: crawldb.$cmd - operation: COMMAND
[2015-07-02 17:49:39,600][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] MongoDB object deserialized: { "create" : "tmp.mr.reviews_2147" , "temp" : true}
[2015-07-02 17:49:39,600][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] collection: profiles
[2015-07-02 17:49:39,600][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] oplog entry - namespace [crawldb.$cmd], operation [COMMAND]
[2015-07-02 17:49:39,600][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] oplog processing item { "ts" : { "$ts" : 1435859379 , "$inc" : 1} , "h" : 6055431814338317965 , "v" : 2 , "op" : "c" , "ns" : "crawldb.$cmd" , "o" : { "create" : "tmp.mr.reviews_2147" , "temp" : true}}
[2015-07-02 17:49:39,600][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] addToStream - operation [COMMAND], currentTimestamp [Timestamp.BSON(ts={ "$ts" : 1435859379 , "$inc" : 1})], data [{ }], collection [profiles]
[2015-07-02 17:49:39,601][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] namespace: admin.$cmd - operation: COMMAND
[2015-07-02 17:49:39,601][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] processAdminCommandOplogEntry - [{ "ts" : { "$ts" : 1435859379 , "$inc" : 2} , "h" : -4965257526687656775 , "v" : 2 , "op" : "c" , "ns" : "admin.$cmd" , "o" : { "renameCollection" : "crawldb.tmp.mr.reviews_2147" , "to" : "crawldb.tmp.mrs.reviews_1435859379_2817" , "stayTemp" : true}}]
[2015-07-02 17:49:39,601][TRACE][org.elasticsearch.river.mongodb.Indexer] Operation: COMMAND - index: profiles - type: profile - routing: null - parent: null
[2015-07-02 17:49:39,608][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] namespace: crawldb.$cmd - operation: DROP_COLLECTION
[2015-07-02 17:49:39,608][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] MongoDB object deserialized: { "drop" : "tmp.mrs.reviews_1435859379_2817"}
[2015-07-02 17:49:39,608][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] collection: profiles
[2015-07-02 17:49:39,608][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] oplog entry - namespace [crawldb.$cmd], operation [DROP_COLLECTION]
[2015-07-02 17:49:39,608][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] oplog processing item { "ts" : { "$ts" : 1435859379 , "$inc" : 3} , "h" : 4575942115879798158 , "v" : 2 , "op" : "c" , "ns" : "crawldb.$cmd" , "o" : { "drop" : "tmp.mrs.reviews_1435859379_2817"}}
[2015-07-02 17:49:39,608][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] addToStream - operation [DROP_COLLECTION], currentTimestamp [Timestamp.BSON(ts={ "$ts" : 1435859379 , "$inc" : 3})], data [{ }], collection [profiles]
[2015-07-02 17:49:39,608][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] namespace: crawldb.profiles - operation: UPDATE
[2015-07-02 17:49:39,609][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] MongoDB object deserialized is 675 characters long
[2015-07-02 17:49:39,609][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] collection: profiles
[2015-07-02 17:49:39,609][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] oplog entry - namespace [crawldb.profiles], operation [UPDATE]
[2015-07-02 17:49:39,609][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] Updated item: { "_id" : { "$oid" : "5567676308731842ca887513"}}
[2015-07-02 17:49:39,609][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] addQueryToStream - operation [UPDATE], currentTimestamp [Timestamp.BSON(ts={ "$ts" : 1435859379 , "$inc" : 4})], update [{ "_id" : { "$oid" : "5567676308731842ca887513"}}]
[2015-07-02 17:49:39,609][TRACE][org.elasticsearch.river.mongodb.Indexer] updateBulkRequest for id: [], operation: [DROP_COLLECTION]
[2015-07-02 17:49:39,609][TRACE][org.elasticsearch.river.mongodb.Indexer] Operation: DROP_COLLECTION - index: profiles - type: profile - routing: null - parent: null
[2015-07-02 17:49:39,616][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] addToStream - operation [UPDATE], currentTimestamp [Timestamp.BSON(ts={ "$ts" : 1435859379 , "$inc" : 4})], data [{ "_id" : { "$oid" : "5567676308731842ca887513"} ,xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx}], collection [profiles]
[2015-07-02 17:49:39,617][TRACE][org.elasticsearch.river.mongodb.Indexer] updateBulkRequest for id: [5567676308731842ca887513], operation: [UPDATE]
[2015-07-02 17:49:39,617][TRACE][org.elasticsearch.river.mongodb.Indexer] Operation: UPDATE - index: profiles - type: profile - routing: null - parent: null
[2015-07-02 17:49:39,617][TRACE][org.elasticsearch.river.mongodb.Indexer] Update operation - id: 5567676308731842ca887513 - contains attachment: false
[2015-07-02 17:49:39,617][TRACE][org.elasticsearch.river.mongodb.Indexer] bulkDeleteRequest - objectId: 5567676308731842ca887513 - index: profiles - type: profile - routing: null - parent: null
[2015-07-02 17:49:39,617][TRACE][org.elasticsearch.river.mongodb.MongoDBRiverBulkProcessor] deleteBulkRequest - id: 5567676308731842ca887513 - index: profiles - type: profile - routing: null - parent: null
[2015-07-02 17:49:39,620][TRACE][org.elasticsearch.river.mongodb.MongoDBRiverBulkProcessor] bulkQueueSize [50] - queue [0] - availability [1]
[2015-07-02 17:49:39,620][TRACE][org.elasticsearch.river.mongodb.MongoDBRiverBulkProcessor] beforeBulk - new bulk [34119] of items [1]
[2015-07-02 17:49:39,620][TRACE][org.elasticsearch.river.mongodb.MongoDBRiverBulkProcessor] About to flush bulk request index[profiles] - type[profile]
[2015-07-02 17:49:39,620][TRACE][org.elasticsearch.river.mongodb.MongoDBRiverBulkProcessor] dropRecreateMapping index[profiles] - type[profile]
[2015-07-02 17:49:39,620][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] namespace: crawldb.profiles - operation: UPDATE
[2015-07-02 17:49:39,621][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] MongoDB object deserialized is 649 characters long
[2015-07-02 17:49:39,621][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] collection: profiles
[2015-07-02 17:49:39,621][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] oplog entry - namespace [crawldb.profiles], operation [UPDATE]
[2015-07-02 17:49:39,621][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] Updated item: { "_id" : { "$oid" : "556fb17a1df9fe6258775c75"}}
[2015-07-02 17:49:39,621][TRACE][org.elasticsearch.river.mongodb.OplogSlurper] addQueryToStream - operation [UPDATE], currentTimestamp [Timestamp.BSON(ts={ "$ts" : 1435859379 , "$inc" : 1})], update [{ "_id" : { "$oid" : "556fb17a1df9fe6258775c75"}}]
[2015-07-02 17:49:39,621][TRACE][org.elasticsearch.river.mongodb.MongoDBRiverBulkProcessor] mappings contains type profile: true

ecoutu avatar Jul 02 '15 18:07 ecoutu