highland
highland copied to clipboard
processing highland stream chunks using async
I'm using highland.js to process a file using a stream to read the contents between two delimiters. I'm also using async.js to run a series of http requests in sequence.
Ideally I would like to pass the output x from highland as the first function to the async series (chain) so that the HTTP requests get executed for each chunk extracted from the stream.
Is this possible? If so, how can this be achieved?
var async = require('async');
var _ = require('highland');
_(fs.createReadStream(files[0], { encoding: 'utf8' }))
.splitBy('-----BEGIN-----\n')
.splitBy('\n-----END-----\n')
.filter(chunk => chunk !== '')
.each(function (x) {
}).done(function () {
async.series([
function(callback) {
setTimeout(function() {
console.log('Task 1');
callback(null, 1);
}, 300);
},
function(callback) {
setTimeout(function() {
console.log('Task 2');
callback(null, 2);
}, 200);
},
], function(error, results) {
console.log(results);
});
});;
Yes, you can use flatMap
and wrapCallback
to get what you want. For example,
const asyncSeriesStream = _.wrapCallback(async.series);
_(fs.createReadStream(files[0], { encoding: 'utf8' }))
.splitBy('-----BEGIN-----\n')
.splitBy('\n-----END-----\n')
.filter(chunk => chunk !== '')
.flatMap(chunk => asyncSeriesStream([
callback => {
// Task 1
},
callback => {
// Task 2
}
]).each(result => console.log(result))
.done(() => console.log("All results received.");
What's going on here:
-
flatMap
will take each chunk, call the provided mapper, wait for the resulting stream to get finish, and emits its value. After that, it moves on to the next chunk. -
flatMap
expects its mapper to return a stream, so we usewrapCallback
to convertasync.series
from node callback style to highland stream style.asyncSeriesStream
will callasync.series
with the tasks you provided and an appropriate callback argument that it creates, then return a Highland stream that will emit either an error or an array depending on the result execution.
Using flatMap
here (equivalent to map(...).series()
) means that you'll have at most one HTTP request in-flight at a time. You can also use map(...).parallel(n)
, map(...).merge()
, or map(...).mergeWithLimit(n)
to get more parallelism.
BTW, we also have streamifyAll that you can use to basically wrapCallback
all of the async
object at once if you are using other functions from async
.
Thanks very much. So my code now looks like this:
const asyncSeriesStream = _.wrapCallback(async.series);
_(fs.createReadStream("small.txt", { encoding: 'utf8' }))
.splitBy('-----BEGIN-----\n')
.splitBy('\n-----END-----\n')
.filter(chunk => chunk !== '')
.flatMap(chunk => asyncSeriesStream([
callback => {
request('http://www.google.com', function (error, response, body) {
if (!error && response.statusCode == 200) {
console.log(body) // Show the HTML for the Google homepage.
}
})
},
callback => {
request('http://www.apple.com', function (error, response, body) {
if (!error && response.statusCode == 200) {
console.log(body) // Show the HTML for the Google homepage.
}
})
}
]).each(result => console.log(result))
.done(() => console.log("All results received.")));
However, it never seems to get instantiated. It just exits with a 0 exit code.
However, this does get instantiated.
Think I must be missing something obvious here, but can't see what since there are no errors.
_(fs.createReadStream("small.txt", { encoding: 'utf8' }))
.splitBy('-----BEGIN-----\n')
.splitBy('\n-----END-----\n')
.filter(chunk => chunk !== '')
.each(function (x) {
console.log(x)
}).done(function () {
})
I've attached a sample of the file I'm trying to process. However, the real file is much much larger. Basically, my objective is to extract the chunks then call request
to get the chunks uploaded to a REST API. The Google & Apple URI's are just there to test the flow.
What is async.series
doing? Request
returns a node stream, could you not just wrap them with the Highland constructor and chain them using flatMap
.
Hmm - I'm not sure. I'm starting to get a little out of my depth now. Any guidance would be much appreciated.
Essentially, I'm just trying to process a large file and have the junks sent to a series of HTTP requests without creating silly numbers of requests. As per @vqvu mentioned "you'll have at most one HTTP request in-flight at a time" which is highly desired.
No probs. So from what I can gather, you want to make 2 http requests per chunk of data. The way I would do this is like so:
_(fs.createReadStream("small.txt", { encoding: 'utf8' }))
.splitBy('-----BEGIN-----\n')
.splitBy('\n-----END-----\n')
.filter(chunk => chunk !== '')
.flatMap(chunk => {
// I presume you'll want to do something with chunk in both requests
// so you'll want to keep both flatMaps in the same function body.
return hl(request({url: 'http://www.google.com', json: true))
.flatMap(body => hl(request({url: 'http://www.apple.com', json: true)));
})
.each(result => console.log(result))
.done(() => console.log("All results received.")));
Thank you very much. I'll give this a try in a bit. Your help is much appreciated.
Sent from my iPhone
On 24 Sep 2016, at 12:06, Stefano Vozza [email protected] wrote:
No probs. So from what I can gather, you want to make 2 http requests per chunk of data. The way I would do this is like so:
_(fs.createReadStream("small.txt", { encoding: 'utf8' })) .splitBy('-----BEGIN-----\n') .splitBy('\n-----END-----\n') .filter(chunk => chunk !== '') .flatMap(chunk => { // I presume you'll want to do something with chunk in both requests // so you'll want to keep both flatMaps in the same function body. return hl(request({url: 'http://www.google.com', json: true)) .flatMap(body => hl(request({url: 'http://www.apple.com', json: true))); }) .each(result => console.log(result)) .done(() => console.log("All results received."))); — You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub, or mute the thread.
Let us know how you get on. Any more questions don't hesitate to ask.
@svozza - This is great. Again thanks for the guidance. I'm a python guy trying to get my head around the asynchronous world. :)
Haha. Been there! Takes a little while to click but becomes second nature after a while.
So this seems to work perfectly. This is my code now:
_(fs.createReadStream("small.txt", { encoding: 'utf8' }))
.splitBy('-----BEGIN-----\n')
.splitBy('\n-----END-----\n')
.filter(chunk => chunk !== '')
.flatMap(chunk => {
// I presume you'll want to do something with chunk in both requests
// so you'll want to keep both flatMaps in the same function body.
return hl(request(Helpers.Authenticate, function (error, response, body) {
if (!error && response.statusCode == 200) {
// console.log(body)
}
}))
.flatMap(body => hl(request(Helpers.Retrieve(**apikey**, **chunk**), function (error, response, body) {
if (!error && response.statusCode == 200) {
// console.log(body) // Show the HTML for the Google homepage.
}
})));
})
.each(
result =>
console.log(result.toString('utf8')))
.done(() => console.log("All results received."));
Couple of questions.
So the result was a buffer, so I needed to convert it to a string. Is this correct ? The body of the first response contains a body.APIKey
so how do pass the API key and the chunk
into the Helpers.Retrieve()
function for the second HTTP call.
You don't want to do the (error, response, body) callback if you're using it as a stream. Anything you do in there won't get passed down. Presuming that your REST API returns JSON, you've a couple of ways of dealing with the buffer. The easiest imo is to use the JSONStream library:
const JSONStream = require('JSONStream');
hl(request(Helpers.Authenticate))
.through(JSONStream.parse())
.flatMap(body => {
return hl(request(Helpers.Retrieve(body.APIKey, chunk)))
.through(JSONStream.parse())
})
Another way is:
hl(request(Helpers.Authenticate))
.reduce1((a, b) => a + b.toString())
.map(JSON.parse);
// ...
You need to do the reduce
there because depending on the size of the response it might be broken into more than one part as it comes down the Node stream.
You don't need to do anything special to get access to chunk
in the function passed to the second flatMap
there because it's a closure, you have access to all the variables in the function body that holds the chained flatMap
s.
You should probably put a .errors
call before each
to just to handle any problems from the request
calls that may have occured:
.errors(err => {
// do something with error
})
.each(result => /**...**/)
I'm almost there. I just need to figure out how to do two more things.
- Add more
requests
to the chain of HTTP calls and pass the data from previous calls down the chain. - Insert some events that get fired during before or after the HTTP requests. e.g.
event.sender.send('selected-directory', subject)
I tried chaining on another flatMap
but that doesn't seem see the response data from the previous step.
hl(fs.createReadStream("/Users/cbourne/Downloads/TPP_cert_pump/cert_small.txt", { encoding: 'utf8' }))
.splitBy('-----BEGIN-----\n')
.splitBy('\n-----END-----\n')
.filter(chunk => chunk !== '')
.flatMap(chunk => {
// I presume you'll want to do something with chunk in both requests
// so you'll want to keep both flatMaps in the same function body.
const id = chunk.getSerialNumber()
const subject = chunk.getSubject()
const name = chunk.getName()
return hl(request(Helpers.Authenticate))
.through(JSONStream.parse())
.flatMap(body => hl(request(Helpers.CreateObject(name, subject, body.APIKey))).through(JSONStream.parse()))
.through(JSONStream.parse())
.flatMap(body => hl(request(Helpers.StoreData(id, chunk, body.APIKey))).through(JSONStream.parse()));
}
Sorry, only seeing your posts now. So I guess if you need to pass things through from the authentication request you need to do all subsequent requests in the same flatMap
, e.g.,
// ...
return hl(request(Helpers.Authenticate))
.through(JSONStream.parse())
.flatMap(body => {
return hl(request(Helpers.CreateObject(name, subject, body.APIKey)))
.through(JSONStream.parse())
.flatMap(body2 => hl(request(Helpers.StoreData(id, chunk, body.APIKey)))
.through(JSONStream.parse()))
});
Not really sure about the events though. Are they coming from an event emitter? Can you wrap that emitter in the Highland constructor and then join that data using another flatMap
?
Arh thanks. I'll try this. I'm running this in Electron so yes the events are fired via the event emitter.
If that can be wired in I should be good to go.
Sent from my iPhone
On 25 Sep 2016, at 20:29, Stefano Vozza [email protected] wrote:
Sorry, only seeing your posts now. So I guess if you need to pass things through from the authentication request you need to do all subsequent requests in the same flatMap, e.g.,
// ... return hl(request(Helpers.Authenticate)) .through(JSONStream.parse()) .flatMap(body => { return hl(request(Helpers.CreateObject(name, subject, body.APIKey))) .through(JSONStream.parse()) .flatMap(body2 => hl(request(Helpers.StoreData(id, chunk, body.APIKey))) .through(JSONStream.parse())) }); Not really sure about the events thoigh. Are they coming from an event emitter? Can you wrap that emitter in the Highland constructor and then join that data using another flatMap?
— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub, or mute the thread.
This is great and works well. However, the results aggregation currently only shows the results from the last request
can all of the results be bubbled down?
}
)
.errors(err => {
// do something with error
console.log("Something wrong here\n" + err)
})
.each(
result =>
console.log(result))
.done(() => console.log("All results received."));
The simplest way I guess would be to use a context object that collects the various parts of the results and return that at the end of your flatMap
. I don't really like this sort of mutable state and side effects normally but it will work for your use case. As you can see though it's not pretty:
return hl(request(Helpers.Authenticate))
.through(JSONStream.parse())
.flatMap(body => {
let ctx = {};
return hl(request(Helpers.CreateObject(name, subject, body.APIKey)))
.through(JSONStream.parse())
.tap(res => ctx.req1 = res)
.flatMap(body2 => hl(request(Helpers.StoreData(id, chunk, body.APIKey)))
.through(JSONStream.parse()))
.tap(res => ctx.req2 = res)
// ...
.flatMap(() => hl.of(ctx));
});
A cleaner way to do it is to change the return type of of your flatMap
s so that they return a similar context object rather than just the body of the previous request and you can build that object up as the requests accumulate.
Is there any reason why you need all results do be collected at the end?
Is there any reason why you need all results do be collected at the end?
Good question. Its really more for logging/auditing and writing out the high level results to a log more than anything. However, if its possible to call an additional function just after the request
that should enable me to achieve the same thing.
Could you log using tap
as the responses occur?
hl(request(Helpers.CreateObject(name, subject, body.APIKey)))
.through(JSONStream.parse())
.tap(res => logger.debug('Create object response: ', res))
Could you log using tap as the responses occur?
Very nice. That does the trick.
Cool. Did you get the event stuff working? I was quite vague because I've never actually used Highland to stream events like that (I don't do any FE work).
Yes, If I run within an Electron context I can now do this:
.tap(res => event.sender.send('selected-directory', JSON.stringify(res)))
Which enable me to provide an event stream for stuff thats happening at the back end.
Ah yes of course, it's just another side effect.