osm-read icon indicating copy to clipboard operation
osm-read copied to clipboard

Pause/Resume

Open missinglink opened this issue 10 years ago • 12 comments

Hey Markus,

As we discussed briefly it would be advantageous for me if we could introduce a pause/resume feature to osm-read.

The use-case for this is (for instance) when a consuming service (such as a database) is becoming flooded by requests, you can either buffer those requests in-memory or ask the parser to slow down or stop for a short while.

Buffering in-memory can be problematic when the dataset is very large (ie the planet file) and flood control mechanisms are very important for streaming interfaces.

The way I see it; this can be achieved a couple of different ways:

deferred recursion

Since visitNextBlock is called recursively:

  • when pause() is called, recursion is stopped
  • when resume() is called, recursion is started again

explicit next()

The consuming service must call next() otherwise the iterator will not advance.

node: function(node,next){
    console.log('node: ' + JSON.stringify(node));
    next(); // this triggers the next recursion
}

Either way we will need to add pause() and resume() methods to the public API.

I'll leave this issue open so we can discuss it further.

missinglink avatar Jun 20 '14 12:06 missinglink

@substack has recently pushed a new osm module https://github.com/substack/osm-pbf-parser which exhibits the second behaviour I mentioned with a nice streaming interface. However that library is not as mature as this one and [currently] doesn't output as many nodes/ways as osm-read.

missinglink avatar Jun 27 '14 11:06 missinglink

This is my hacky workaround of the 1st behaviour I mentioned:

diff --git a/lib/main.js b/lib/main.js
index a3b254a..6f7c85a 100644
--- a/lib/main.js
+++ b/lib/main.js
@@ -40,6 +40,8 @@ module.exports = {

     parsePbf: xmlParser.parse,

+    pbfParser: pbfParser,
+
     createPbfParser: pbfParser.createParser

 };
diff --git a/lib/pbfParser.js b/lib/pbfParser.js
index 19f9604..5cc5719 100644
--- a/lib/pbfParser.js
+++ b/lib/pbfParser.js
@@ -552,6 +552,18 @@ function visitBlock(fileBlock, block, opts){
     BLOCK_VISITORS_BY_TYPE[fileBlock.blobHeader.type](block, opts);
 }

+var paused = false;
+var pauseFunction = function(){};
+
+var pause = function(){
+    paused = true;
+}
+
+var resume = function(){
+    paused = false;
+    pauseFunction();
+}
+
 function parse(opts){
     return createPathParser({
         filePath: opts.filePath,
@@ -595,7 +607,8 @@ function parse(opts){

                     nextFileBlockIndex += 1;

-                    visitNextBlock();
+                    pauseFunction = visitNextBlock;
+                    if( !paused ) pauseFunction();
                 });
             }

@@ -607,5 +620,8 @@ function parse(opts){
 module.exports = {
     parse: parse,

+    pause: pause,
+    resume: resume,
+
     createParser: createPathParser
 };

missinglink avatar Jun 27 '14 11:06 missinglink

This is the solution I found the best for my use case: https://github.com/missinglink/osm-read/compare/marook:master...streams

Note: this only works for the pbfParser at this stage so I didn't open a PR for it.

To add streaming read support with flood control you can now do something like this:

var osmread = require('osm-read');
var PassThrough = require('stream').PassThrough;

function createStream( filepath ){

  var stream = new PassThrough({ objectMode: true });
  console.log( 'parse', filepath );

  osmread.parse({
    filePath: filepath,
    endDocument: function(){},
    bounds: function( item, next ){
      item.type = 'bounds';
      stream.write( item, 'utf8', next );
    },
    node: function( item, next ){
      item.type = 'node';
      stream.write( item, 'utf8', next );
    },
    way: function( item, next ){
      item.type = 'way';
      stream.write( item, 'utf8', next );
    },
    relation: function( item, next ){
      item.type = 'relation';
      stream.write( item, 'utf8', next );
    },
    error: function( msg ){
      console.error( msg );
    }
  });

  return stream;
}
var stream = createStream( filepath );
stream.pipe( someOtherStream );

This approach keeps the memory very low when dealing with large pbf files, when doing time-consuming computation downstream.

missinglink avatar Jun 30 '14 10:06 missinglink

heya, I shipped by streaming osm module today, which is based on osm-read, so thank you for everything. https://github.com/geopipes/openstreetmap-stream

At the moment it's pointing to my fork because I need the stuff from https://github.com/missinglink/osm-read/compare/marook:master...streams . If you're happy to merge the backwards compatible changes that'd make my life easier and I can delete my fork.

What are your thoughts on all this?

missinglink avatar Jul 23 '14 20:07 missinglink

OK... I think it should be possible to implement the pause/resume behavior for the XML and the PBF parser. the node-xml api also supports a pause/resume function ( https://github.com/robrighter/node-xml#parserpause ).

I'm implementing that soon :)

marook avatar Aug 06 '14 18:08 marook

Sounds good.

IMHO the streaming model (advancing iterator only by calling with next()) is much easier to deal with than a pause/resume approach.

This was one of the major design changes in nodejs between streams1 and stream2 and makes handling back-pressure in data pipelines much easier.

If you can figure out the pause/resume, I can fairly easily wrap those methods to achieve the iterator style API.

missinglink avatar Aug 06 '14 20:08 missinglink

OK... I've implemented a pause/resume feature because it seems to me like it's easy to implement.

@missinglink Can you test it with your application? If it works fine with your app I'm going to release a new npm module version.

marook avatar Sep 29 '14 19:09 marook

I tried the new pause implementation. But it's not working as expected. Just wanted to output the first node and exit the app.

var osmread, parser;
osmread = require('osm-read');
parser = osmread.parse({
  filePath: 'germany-latest.osm.pbf',
  node: function(node) {
    parser.pause();
    return console.log('node', JSON.stringify(node, null, 2));
  }
});

As output I get multiple nodes. Am I using it wrong?

ghost avatar Nov 12 '14 05:11 ghost

You're doing it right. The reason why you get multiple nodes is a combination of the current osm-read implementation and the osm pbf file format. Osm pbf files store their elements (nodes, ways, relations, ...) in blocks. OSM read currently pauses only after it has processed the current block. So you might get multiple nodes before the parser is actually paused.

Feel free to send a pull request with a fix for that :)

marook avatar Nov 12 '14 08:11 marook

Ty, nice to know. Now I have a problem with resume. Like you mentioned before, pause is triggered after a block is read. So I can't call resume like I do it currently. Lots of memory is used, because the resumeCallback is fired multiple times.

// store all nodes that are places
MongoClient.connect(url, function(err, db) {
  var parser;
  return parser = osmread.parse({
    filePath: 'germany-latest.osm.pbf',
    node: function(node) {
      var _ref;
      // check if node is a place
      if ((_ref = node.tags) != null ? _ref.place : void 0) {
        parser.pause();
        return db.collection('nodes').insert(node, function(err, result) {
          return parser.resume();
        });
      }
    }
  });
});

To fix this I added if(!paused){ return; } as the first statement to the resume function and the memory consumption looks fine again. But now I don't know if I altered the behavior of the lib in an unexpected way. I know I'm asking a lot here, but I'm new to the lib and don't know yet if I'm using it right.

ghost avatar Nov 12 '14 10:11 ghost

hey @bekite you can also try using this lib https://github.com/geopipes/openstreetmap-stream which is based on osm-read. If you don't call next() in the consuming stream you'll only get 1 record.

We also provide a bunch of other streaming libraries to make consuming geo data in node a bit easier. https://github.com/geopipes

missinglink avatar Nov 12 '14 12:11 missinglink

The new async iteration system I have recently developed would be able to enable this feature. It may be quite a substantial restructuring of existing code to use it in a way that enables this but does not change anything else. Not sure on how easy it would be to integrate.

metabench avatar Nov 27 '22 23:11 metabench