highland
highland copied to clipboard
Transform CSV headers using Highland?
Trying to use HL to transform a CSV file's headers for easier parsing (field names are capitalized and have spaces) - not sure what the "Highland way" of doing this would be (or if it's even possible with HL) -
Here's my code:
var _ = require('highland');
var rawData = _([argv.file]).flatMap(_.compose(_, fs.createReadStream)).split();
var underscored = require('underscore.string').underscored;
var headers = rawData
.fork()
.take(1)
.splitBy(',')
.map(underscored)
.collect()
.invoke('join', [','])
var data = rawData
.fork()
.drop(1)
_([headers, data]).merge().pipe(process.stdout);
I thought I would create one stream to read the file, and fork it once to read the headers and transform them, and fork again to read the rest of the data, and merge the two streams together. However, it looks like data
's stream also is affected by the take(1)
and is empty after the drop(1)
.
Should I read from the file twice instead?
I got it to work reading from it twice -
var rawData = function () {
return _([argv.file]).flatMap(_.compose(_, fs.createReadStream)).split();
};
var headers = rawData()
.take(1)
.splitBy(',')
.map(underscored)
.collect()
.invoke('join', [','])
var data = rawData().drop(1);
_([headers, data])
.merge()
.intersperse('\n')
.pipe(process.stdout)
Is this the most efficient way of doing this? I'd like to avoid reading from the file twice if possible.
Your original way isn't wrong. Unfortunately, take
is broken when combined with fork
but can't be fixed in 2.x for back-compat reasons. Basically, take
still exerts backpressure even after it's done taking every requested element. If you run your original code against the 3.0.0 branch, it should work.
You could use observe
instead of fork
in your headers
stream. This leaks some memory though, since the observe
stream will buffer all of the data that comes after the first line.
However, since all you're doing is transforming the first line, it's easier (and more efficient) to just use map
.
function fixupHeader(stream) {
var first = true;
return stream.map(function (line) {
if (first) {
first = false;
return line.split(',')
.map(underscored)
.join(',');
} else {
return line;
}
});
}
_(fs.createReadStream(argv.file))
.split()
.through(fixupHeader)
.intersperse('\n')
.pipe(process.stdout);