node-stream-to-mongo-db
node-stream-to-mongo-db copied to clipboard
Use DuplexStream/Tranform to allow reading the committed record.
It would be beneficial if you used stream.Transform
to allow enable your the stream to be re-usable
E.g. the following code would be extremely useful for message ingesting from a kafka/messaging topic.
const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;
const mongoUrl = 'mongodb://localhost:27017/schema';
const mongoStream = streamToMongoDB({ dbURL: mongoUrl, collection: 'messages' });
topicConsumer
.pipe(messageTransform) // Normalize a consumed message
.pipe(mongoStream) // publish to mongodb
.pipe(topicProducer); // publish resulting mongo record (including ObjectId) to some other topic
Based on this stackoverflow discussion I would guess that Transform
makes sense here as you'd want a 1:1 read/write in the scenario I've provided.
However, perhaps there are scenarios I'm not familiar with where duplex would be a more appropriate option.
Why not, in which case it should be an option in the constructor. Because in many of my cases I needed at the end of the pipe, so it must be a Writable.
const mongoStream = streamToMongoDB({
dbURL: mongoUrl,
collection: 'messages',
transformStream: true, // default is false
});
What do you think @sparksis ?
Looking at the code, this would require the remove of your batch logic in order to accommodate the 1:1 nature of the Transform, perhaps using a Duplex without Transform would be beneficial as it would allow the batching to continue in its own configuration.
I'm somewhat freshly learning node.js from but from what I can tell, if we convert the Writable
to Duplex
with the option allowHalfOpen
it would accomplish both of our goals fairly closely. It should also keep the code complexity down.
I started a quick prototype and will likely throw something into a PR tomorrow.
Are the tests known to be temperamental? When I run npm test
in rapid succession they intermittently fail; even on master.
I don't know what means temperamental. At some point I had the same behaviour than you have and I thank I fixed it in this commit: https://github.com/AbdullahAli/node-stream-to-mongo-db/commit/6cc6cfddfef6d458a6c58201ca9e5b947b56e37f
Maybe I miss something, or this is a regression?