highland icon indicating copy to clipboard operation
highland copied to clipboard

processing highland stream chunks using async

Open 3goats opened this issue 8 years ago • 22 comments

I'm using highland.js to process a file using a stream to read the contents between two delimiters. I'm also using async.js to run a series of http requests in sequence.

Ideally I would like to pass the output x from highland as the first function to the async series (chain) so that the HTTP requests get executed for each chunk extracted from the stream.

Is this possible? If so, how can this be achieved?

var async = require('async');
var _ = require('highland');


_(fs.createReadStream(files[0], { encoding: 'utf8' }))
        .splitBy('-----BEGIN-----\n')
        .splitBy('\n-----END-----\n')
        .filter(chunk => chunk !== '')
        .each(function (x) {
        }).done(function () {

    async.series([
        function(callback) {
            setTimeout(function() {
                console.log('Task 1');
                callback(null, 1);
            }, 300);
        },
        function(callback) {
            setTimeout(function() {
                console.log('Task 2');
                callback(null, 2);
            }, 200);
        },
    ], function(error, results) {
        console.log(results);
    });

});;

3goats avatar Sep 23 '16 19:09 3goats

Yes, you can use flatMap and wrapCallback to get what you want. For example,

const asyncSeriesStream = _.wrapCallback(async.series);

_(fs.createReadStream(files[0], { encoding: 'utf8' }))
        .splitBy('-----BEGIN-----\n')
        .splitBy('\n-----END-----\n')
        .filter(chunk => chunk !== '')
        .flatMap(chunk => asyncSeriesStream([
            callback => {
                // Task 1
            },
            callback => {
                // Task 2
            }
        ]).each(result => console.log(result))
        .done(() => console.log("All results received.");

What's going on here:

  • flatMap will take each chunk, call the provided mapper, wait for the resulting stream to get finish, and emits its value. After that, it moves on to the next chunk.
  • flatMap expects its mapper to return a stream, so we use wrapCallback to convert async.series from node callback style to highland stream style. asyncSeriesStream will call async.series with the tasks you provided and an appropriate callback argument that it creates, then return a Highland stream that will emit either an error or an array depending on the result execution.

Using flatMap here (equivalent to map(...).series()) means that you'll have at most one HTTP request in-flight at a time. You can also use map(...).parallel(n), map(...).merge(), or map(...).mergeWithLimit(n) to get more parallelism.

BTW, we also have streamifyAll that you can use to basically wrapCallback all of the async object at once if you are using other functions from async.

vqvu avatar Sep 24 '16 07:09 vqvu

Thanks very much. So my code now looks like this:

const asyncSeriesStream = _.wrapCallback(async.series);

_(fs.createReadStream("small.txt", { encoding: 'utf8' }))
    .splitBy('-----BEGIN-----\n')
    .splitBy('\n-----END-----\n')
    .filter(chunk => chunk !== '')
    .flatMap(chunk => asyncSeriesStream([
        callback => {
            request('http://www.google.com', function (error, response, body) {
                if (!error && response.statusCode == 200) {
                    console.log(body) // Show the HTML for the Google homepage.
                }
            })
        },

        callback => {
            request('http://www.apple.com', function (error, response, body) {
                if (!error && response.statusCode == 200) {
                    console.log(body) // Show the HTML for the Google homepage.
                }
            })
        }
    ]).each(result => console.log(result))
        .done(() => console.log("All results received.")));

However, it never seems to get instantiated. It just exits with a 0 exit code.

However, this does get instantiated.

Think I must be missing something obvious here, but can't see what since there are no errors.

_(fs.createReadStream("small.txt", { encoding: 'utf8' }))
    .splitBy('-----BEGIN-----\n')
    .splitBy('\n-----END-----\n')
    .filter(chunk => chunk !== '')
    .each(function (x) {
        console.log(x)
    }).done(function () {
    })

I've attached a sample of the file I'm trying to process. However, the real file is much much larger. Basically, my objective is to extract the chunks then call request to get the chunks uploaded to a REST API. The Google & Apple URI's are just there to test the flow.

small.txt

3goats avatar Sep 24 '16 10:09 3goats

What is async.series doing? Request returns a node stream, could you not just wrap them with the Highland constructor and chain them using flatMap.

svozza avatar Sep 24 '16 10:09 svozza

Hmm - I'm not sure. I'm starting to get a little out of my depth now. Any guidance would be much appreciated.

Essentially, I'm just trying to process a large file and have the junks sent to a series of HTTP requests without creating silly numbers of requests. As per @vqvu mentioned "you'll have at most one HTTP request in-flight at a time" which is highly desired.

3goats avatar Sep 24 '16 10:09 3goats

No probs. So from what I can gather, you want to make 2 http requests per chunk of data. The way I would do this is like so:

_(fs.createReadStream("small.txt", { encoding: 'utf8' }))
    .splitBy('-----BEGIN-----\n')
    .splitBy('\n-----END-----\n')
    .filter(chunk => chunk !== '')
    .flatMap(chunk => {
        // I presume you'll want to do something with chunk in both requests
        // so you'll want to keep both flatMaps in the same function body.
        return hl(request({url: 'http://www.google.com', json: true))
                  .flatMap(body => hl(request({url: 'http://www.apple.com', json: true)));
    })
    .each(result => console.log(result))
        .done(() => console.log("All results received.")));

svozza avatar Sep 24 '16 11:09 svozza

Thank you very much. I'll give this a try in a bit. Your help is much appreciated.

Sent from my iPhone

On 24 Sep 2016, at 12:06, Stefano Vozza [email protected] wrote:

No probs. So from what I can gather, you want to make 2 http requests per chunk of data. The way I would do this is like so:

_(fs.createReadStream("small.txt", { encoding: 'utf8' })) .splitBy('-----BEGIN-----\n') .splitBy('\n-----END-----\n') .filter(chunk => chunk !== '') .flatMap(chunk => { // I presume you'll want to do something with chunk in both requests // so you'll want to keep both flatMaps in the same function body. return hl(request({url: 'http://www.google.com', json: true)) .flatMap(body => hl(request({url: 'http://www.apple.com', json: true))); }) .each(result => console.log(result)) .done(() => console.log("All results received."))); — You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub, or mute the thread.

3goats avatar Sep 24 '16 11:09 3goats

Let us know how you get on. Any more questions don't hesitate to ask.

svozza avatar Sep 24 '16 11:09 svozza

@svozza - This is great. Again thanks for the guidance. I'm a python guy trying to get my head around the asynchronous world. :)

3goats avatar Sep 24 '16 11:09 3goats

Haha. Been there! Takes a little while to click but becomes second nature after a while.

svozza avatar Sep 24 '16 11:09 svozza

So this seems to work perfectly. This is my code now:

_(fs.createReadStream("small.txt", { encoding: 'utf8' }))
    .splitBy('-----BEGIN-----\n')
    .splitBy('\n-----END-----\n')
    .filter(chunk => chunk !== '')
    .flatMap(chunk => {
        // I presume you'll want to do something with chunk in both requests
        // so you'll want to keep both flatMaps in the same function body.
        return hl(request(Helpers.Authenticate, function (error, response, body) {
            if (!error && response.statusCode == 200) {
            //    console.log(body) 

            }
        }))
            .flatMap(body => hl(request(Helpers.Retrieve(**apikey**, **chunk**), function (error, response, body) {
                if (!error && response.statusCode == 200) {
              //      console.log(body) // Show the HTML for the Google homepage.
                }
            })));
    })
    .each(
        result =>
        console.log(result.toString('utf8')))
    .done(() => console.log("All results received."));

Couple of questions.

So the result was a buffer, so I needed to convert it to a string. Is this correct ? The body of the first response contains a body.APIKey so how do pass the API key and the chunk into the Helpers.Retrieve() function for the second HTTP call.

3goats avatar Sep 24 '16 12:09 3goats

You don't want to do the (error, response, body) callback if you're using it as a stream. Anything you do in there won't get passed down. Presuming that your REST API returns JSON, you've a couple of ways of dealing with the buffer. The easiest imo is to use the JSONStream library:

const JSONStream = require('JSONStream');

hl(request(Helpers.Authenticate))
    .through(JSONStream.parse())
    .flatMap(body => {
        return hl(request(Helpers.Retrieve(body.APIKey, chunk)))
            .through(JSONStream.parse())
    })

Another way is:

hl(request(Helpers.Authenticate))
    .reduce1((a, b) => a + b.toString())
    .map(JSON.parse);
    // ...

You need to do the reduce there because depending on the size of the response it might be broken into more than one part as it comes down the Node stream.

You don't need to do anything special to get access to chunk in the function passed to the second flatMap there because it's a closure, you have access to all the variables in the function body that holds the chained flatMaps.

You should probably put a .errors call before each to just to handle any problems from the request calls that may have occured:

.errors(err => {
    // do something with error
})
.each(result => /**...**/)

svozza avatar Sep 24 '16 12:09 svozza

I'm almost there. I just need to figure out how to do two more things.

  1. Add more requests to the chain of HTTP calls and pass the data from previous calls down the chain.
  2. Insert some events that get fired during before or after the HTTP requests. e.g.
event.sender.send('selected-directory', subject) 

I tried chaining on another flatMap but that doesn't seem see the response data from the previous step.

hl(fs.createReadStream("/Users/cbourne/Downloads/TPP_cert_pump/cert_small.txt", { encoding: 'utf8' }))
    .splitBy('-----BEGIN-----\n')
    .splitBy('\n-----END-----\n')
    .filter(chunk => chunk !== '')
    .flatMap(chunk => {

        // I presume you'll want to do something with chunk in both requests
        // so you'll want to keep both flatMaps in the same function body.

       const id = chunk.getSerialNumber()
       const subject = chunk.getSubject()
       const name = chunk.getName()

        return hl(request(Helpers.Authenticate))
            .through(JSONStream.parse())
            .flatMap(body => hl(request(Helpers.CreateObject(name, subject, body.APIKey))).through(JSONStream.parse()))
            .through(JSONStream.parse())
            .flatMap(body => hl(request(Helpers.StoreData(id, chunk, body.APIKey))).through(JSONStream.parse()));
    }

3goats avatar Sep 25 '16 18:09 3goats

Sorry, only seeing your posts now. So I guess if you need to pass things through from the authentication request you need to do all subsequent requests in the same flatMap, e.g.,

// ...
return hl(request(Helpers.Authenticate))
    .through(JSONStream.parse())
    .flatMap(body => {
        return hl(request(Helpers.CreateObject(name, subject, body.APIKey)))
            .through(JSONStream.parse())
            .flatMap(body2 => hl(request(Helpers.StoreData(id, chunk, body.APIKey)))
                .through(JSONStream.parse()))
    });

Not really sure about the events though. Are they coming from an event emitter? Can you wrap that emitter in the Highland constructor and then join that data using another flatMap?

svozza avatar Sep 25 '16 19:09 svozza

Arh thanks. I'll try this. I'm running this in Electron so yes the events are fired via the event emitter.

If that can be wired in I should be good to go.

Sent from my iPhone

On 25 Sep 2016, at 20:29, Stefano Vozza [email protected] wrote:

Sorry, only seeing your posts now. So I guess if you need to pass things through from the authentication request you need to do all subsequent requests in the same flatMap, e.g.,

// ... return hl(request(Helpers.Authenticate)) .through(JSONStream.parse()) .flatMap(body => { return hl(request(Helpers.CreateObject(name, subject, body.APIKey))) .through(JSONStream.parse()) .flatMap(body2 => hl(request(Helpers.StoreData(id, chunk, body.APIKey))) .through(JSONStream.parse())) }); Not really sure about the events thoigh. Are they coming from an event emitter? Can you wrap that emitter in the Highland constructor and then join that data using another flatMap?

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub, or mute the thread.

3goats avatar Sep 25 '16 20:09 3goats

This is great and works well. However, the results aggregation currently only shows the results from the last request can all of the results be bubbled down?

 }

    )
    .errors(err => {
        // do something with error
        console.log("Something wrong here\n" + err)

    })
    .each(
        result =>
            console.log(result))
    .done(() => console.log("All results received."));

3goats avatar Sep 26 '16 09:09 3goats

The simplest way I guess would be to use a context object that collects the various parts of the results and return that at the end of your flatMap. I don't really like this sort of mutable state and side effects normally but it will work for your use case. As you can see though it's not pretty:

return hl(request(Helpers.Authenticate))
    .through(JSONStream.parse())
    .flatMap(body => {
        let ctx = {};
        return hl(request(Helpers.CreateObject(name, subject, body.APIKey)))
            .through(JSONStream.parse())
            .tap(res => ctx.req1 = res)
            .flatMap(body2 => hl(request(Helpers.StoreData(id, chunk, body.APIKey)))
                .through(JSONStream.parse()))
                .tap(res => ctx.req2 = res)
            // ...
            .flatMap(() => hl.of(ctx));
    });

A cleaner way to do it is to change the return type of of your flatMaps so that they return a similar context object rather than just the body of the previous request and you can build that object up as the requests accumulate.

Is there any reason why you need all results do be collected at the end?

svozza avatar Sep 26 '16 09:09 svozza

Is there any reason why you need all results do be collected at the end?

Good question. Its really more for logging/auditing and writing out the high level results to a log more than anything. However, if its possible to call an additional function just after the request that should enable me to achieve the same thing.

3goats avatar Sep 26 '16 09:09 3goats

Could you log using tap as the responses occur?

hl(request(Helpers.CreateObject(name, subject, body.APIKey)))
            .through(JSONStream.parse())
            .tap(res => logger.debug('Create object response: ', res))

svozza avatar Sep 26 '16 09:09 svozza

Could you log using tap as the responses occur?

Very nice. That does the trick.

3goats avatar Sep 26 '16 10:09 3goats

Cool. Did you get the event stuff working? I was quite vague because I've never actually used Highland to stream events like that (I don't do any FE work).

svozza avatar Sep 26 '16 10:09 svozza

Yes, If I run within an Electron context I can now do this:

.tap(res => event.sender.send('selected-directory', JSON.stringify(res)))

Which enable me to provide an event stream for stuff thats happening at the back end.

3goats avatar Sep 26 '16 10:09 3goats

Ah yes of course, it's just another side effect.

svozza avatar Sep 26 '16 10:09 svozza