osm-read
osm-read copied to clipboard
Pause/Resume
Hey Markus,
As we discussed briefly it would be advantageous for me if we could introduce a pause/resume feature to osm-read
.
The use-case for this is (for instance) when a consuming service (such as a database) is becoming flooded by requests, you can either buffer those requests in-memory or ask the parser to slow down or stop for a short while.
Buffering in-memory can be problematic when the dataset is very large (ie the planet file) and flood control mechanisms are very important for streaming interfaces.
The way I see it; this can be achieved a couple of different ways:
deferred recursion
Since visitNextBlock
is called recursively:
- when
pause()
is called, recursion is stopped - when
resume()
is called, recursion is started again
explicit next()
The consuming service must call next()
otherwise the iterator will not advance.
node: function(node,next){
console.log('node: ' + JSON.stringify(node));
next(); // this triggers the next recursion
}
Either way we will need to add pause()
and resume()
methods to the public API.
I'll leave this issue open so we can discuss it further.
@substack has recently pushed a new osm
module https://github.com/substack/osm-pbf-parser which exhibits the second behaviour I mentioned with a nice streaming interface. However that library is not as mature as this one and [currently] doesn't output as many nodes/ways as osm-read
.
This is my hacky workaround of the 1st behaviour I mentioned:
diff --git a/lib/main.js b/lib/main.js
index a3b254a..6f7c85a 100644
--- a/lib/main.js
+++ b/lib/main.js
@@ -40,6 +40,8 @@ module.exports = {
parsePbf: xmlParser.parse,
+ pbfParser: pbfParser,
+
createPbfParser: pbfParser.createParser
};
diff --git a/lib/pbfParser.js b/lib/pbfParser.js
index 19f9604..5cc5719 100644
--- a/lib/pbfParser.js
+++ b/lib/pbfParser.js
@@ -552,6 +552,18 @@ function visitBlock(fileBlock, block, opts){
BLOCK_VISITORS_BY_TYPE[fileBlock.blobHeader.type](block, opts);
}
+var paused = false;
+var pauseFunction = function(){};
+
+var pause = function(){
+ paused = true;
+}
+
+var resume = function(){
+ paused = false;
+ pauseFunction();
+}
+
function parse(opts){
return createPathParser({
filePath: opts.filePath,
@@ -595,7 +607,8 @@ function parse(opts){
nextFileBlockIndex += 1;
- visitNextBlock();
+ pauseFunction = visitNextBlock;
+ if( !paused ) pauseFunction();
});
}
@@ -607,5 +620,8 @@ function parse(opts){
module.exports = {
parse: parse,
+ pause: pause,
+ resume: resume,
+
createParser: createPathParser
};
This is the solution I found the best for my use case: https://github.com/missinglink/osm-read/compare/marook:master...streams
Note: this only works for the pbfParser at this stage so I didn't open a PR for it.
To add streaming read support with flood control you can now do something like this:
var osmread = require('osm-read');
var PassThrough = require('stream').PassThrough;
function createStream( filepath ){
var stream = new PassThrough({ objectMode: true });
console.log( 'parse', filepath );
osmread.parse({
filePath: filepath,
endDocument: function(){},
bounds: function( item, next ){
item.type = 'bounds';
stream.write( item, 'utf8', next );
},
node: function( item, next ){
item.type = 'node';
stream.write( item, 'utf8', next );
},
way: function( item, next ){
item.type = 'way';
stream.write( item, 'utf8', next );
},
relation: function( item, next ){
item.type = 'relation';
stream.write( item, 'utf8', next );
},
error: function( msg ){
console.error( msg );
}
});
return stream;
}
var stream = createStream( filepath );
stream.pipe( someOtherStream );
This approach keeps the memory very low when dealing with large pbf files, when doing time-consuming computation downstream.
heya, I shipped by streaming osm module today, which is based on osm-read
, so thank you for everything. https://github.com/geopipes/openstreetmap-stream
At the moment it's pointing to my fork because I need the stuff from https://github.com/missinglink/osm-read/compare/marook:master...streams . If you're happy to merge the backwards compatible changes that'd make my life easier and I can delete my fork.
What are your thoughts on all this?
OK... I think it should be possible to implement the pause/resume behavior for the XML and the PBF parser. the node-xml api also supports a pause/resume function ( https://github.com/robrighter/node-xml#parserpause ).
I'm implementing that soon :)
Sounds good.
IMHO the streaming model (advancing iterator only by calling with next()
) is much easier to deal with than a pause/resume approach.
This was one of the major design changes in nodejs between streams1 and stream2 and makes handling back-pressure in data pipelines much easier.
If you can figure out the pause/resume, I can fairly easily wrap those methods to achieve the iterator style API.
OK... I've implemented a pause/resume feature because it seems to me like it's easy to implement.
@missinglink Can you test it with your application? If it works fine with your app I'm going to release a new npm module version.
I tried the new pause implementation. But it's not working as expected. Just wanted to output the first node and exit the app.
var osmread, parser;
osmread = require('osm-read');
parser = osmread.parse({
filePath: 'germany-latest.osm.pbf',
node: function(node) {
parser.pause();
return console.log('node', JSON.stringify(node, null, 2));
}
});
As output I get multiple nodes. Am I using it wrong?
You're doing it right. The reason why you get multiple nodes is a combination of the current osm-read implementation and the osm pbf file format. Osm pbf files store their elements (nodes, ways, relations, ...) in blocks. OSM read currently pauses only after it has processed the current block. So you might get multiple nodes before the parser is actually paused.
Feel free to send a pull request with a fix for that :)
Ty, nice to know. Now I have a problem with resume. Like you mentioned before, pause is triggered after a block is read. So I can't call resume like I do it currently. Lots of memory is used, because the resumeCallback is fired multiple times.
// store all nodes that are places
MongoClient.connect(url, function(err, db) {
var parser;
return parser = osmread.parse({
filePath: 'germany-latest.osm.pbf',
node: function(node) {
var _ref;
// check if node is a place
if ((_ref = node.tags) != null ? _ref.place : void 0) {
parser.pause();
return db.collection('nodes').insert(node, function(err, result) {
return parser.resume();
});
}
}
});
});
To fix this I added if(!paused){ return; }
as the first statement to the resume function and the memory consumption looks fine again. But now I don't know if I altered the behavior of the lib in an unexpected way.
I know I'm asking a lot here, but I'm new to the lib and don't know yet if I'm using it right.
hey @bekite you can also try using this lib https://github.com/geopipes/openstreetmap-stream which is based on osm-read
. If you don't call next()
in the consuming stream you'll only get 1 record.
We also provide a bunch of other streaming libraries to make consuming geo data in node a bit easier. https://github.com/geopipes
The new async iteration system I have recently developed would be able to enable this feature. It may be quite a substantial restructuring of existing code to use it in a way that enables this but does not change anything else. Not sure on how easy it would be to integrate.