bsv
bsv copied to clipboard
maximum performance data processing
bsv
why
it should be simple and easy to process data at the speed of sequential io.
what
a simple and efficient data format for easily manipulating chunks of rows of columns while minimizing allocations and copies.
minimal cli tools for rapidly composing performant data flow pipelines.
how
column: 0-65536 bytes.
row: 0-65536 columns.
chunk: up to 5MB containing 1 or more complete rows.
note: row data cannot exceed chunk size.
layout
| i32:size | u8[]:row | ... |
row:
| u16:max | u16:size | ... | u8[]:column | ... |
note: column bytes are always followed by a single null byte.
note: max is the maximum zero based index into the row.
install
>> curl https://raw.githubusercontent.com/nathants/bsv/master/scripts/install_archlinux.sh | bash
>> git clone https://github.com/nathants/bsv
>> cd bsv
>> make -j
>> sudo mv -fv bin/* /usr/local/bin
note: for best pipeline performance increase maximum pipe size
>> sudo sysctl fs.pipe-max-size=5242880
test
>> tox
>> docker build -t bsv:debian -f Dockerfile.debian .
>> docker run -v $(pwd):/code --rm -it bsv:debian bash -c 'cd /code && py.test -vvx --tb native -n auto test/'
>> docker build -t bsv:alpine -f Dockerfile.alpine .
>> docker run -v $(pwd):/code --rm -it bsv:alpine bash -c 'cd /code && py.test -vvx --tb native -n auto test/'
increase the number of generated tests cases with environment variable: TEST_FACTOR=5
example
add bsumall.c to bsv/src/:
#include "util.h"
#include "load.h"
#include "dump.h"
#define DESCRIPTION "sum columns of u16 as i64\n\n"
#define USAGE "... | bsumall \n\n"
#define EXAMPLE ">> echo '\n1,2\n3,4\n' | bsv | bschema a:u16,a:u16 | bsumall i64 | bschema i64:a,i64:a | csv\n4,6\n"
int main(int argc, char **argv) {
// setup state
SETUP();
readbuf_t rbuf = rbuf_init((FILE*[]){stdin}, 1, false);
writebuf_t wbuf = wbuf_init((FILE*[]){stdout}, 1, false);
i64 sums[MAX_COLUMNS] = {0};
row_t row;
// process input row by row
while (1) {
load_next(&rbuf, &row, 0);
if (row.stop)
break;
for (i32 i = 0; i <= row.max; i++) {
ASSERT(sizeof(u16) == row.sizes[i], "fatal: bad data\n");
sums[i] += *(u16*)row.columns[i];
}
}
// generate output row
row.max = -1;
for (i32 i = 0; i < MAX_COLUMNS; i++) {
if (!sums[i])
break;
row.sizes[i] = sizeof(i64);
row.columns[i] = &sums[i];
row.max++;
}
// dump output
if (row.max >= 0)
dump(&wbuf, &row, 0);
dump_flush(&wbuf, 0);
}
build and run:
>> ./scripts/makefile.sh
>> make bsumall
>> bsumall -h
sum columns of u16 as i64
usage: ... | bsumall
>> echo '
1,2
3,4
' | bsv | bschema a:u16,a:u16 | bsumall i64 | bschema i64:a,i64:a | csv
4,6
non goals
support of hardware other than little endian.
types and schemas as a part of the data format.
testing methodology
quickcheck style testing with python implementations to verify correct behavior for arbitrary inputs and varying buffer sizes.
experiments
performance experiments and alternate implementations.
related projects
s4 - a storage cluster that is cheap and fast, with data local compute and efficient shuffle.
related posts
optimizing a bsv data processing pipeline
performant batch processing with bsv, s4, and presto
discovering a baseline for data processing performance
refactoring common distributed data patterns into s4
scaling python data processing horizontally
scaling python data processing vertically
more examples
structured analysis of nyc taxi data with bsv and hive
tools
| name | description |
|---|---|
| bcat | cat some bsv files to csv |
| bcombine | prepend a new column by combining values from existing columns |
| bcounteach | count as i64 each contiguous identical row by the first column |
| bcounteach-hash | count as i64 by hash of the first column |
| bcountrows | count rows as i64 |
| bcut | select some columns |
| bdedupe | dedupe identical contiguous rows by the first column, keeping the first |
| bdedupe-hash | dedupe rows by hash of the first column, keeping the first |
| bdropuntil | for sorted input, drop until the first column is gte to VALUE |
| bhead | keep the first n rows |
| blz4 | compress bsv data |
| blz4d | decompress bsv data |
| bmerge | merge sorted files from stdin |
| bpartition | split into multiple files by consistent hash of the first column value |
| bquantile-merge | merge ddsketches and output quantile value pairs as f64 |
| bquantile-sketch | collapse the first column into a single row ddsketch |
| bschema | validate and converts row data with a schema of columns |
| bsort | timsort rows by the first column |
| bsplit | split a stream into multiple files |
| bsum | sum the first column |
| bsumeach | sum the second column of each contiguous identical row by the first column |
| bsumeach-hash | sum as i64 the second column by hash of the first column |
| bsv | convert csv to bsv |
| btake | take while the first column is VALUE |
| btakeuntil | for sorted input, take until the first column is gte to VALUE |
| btopn | accumulate the top n rows in a heap by first column value |
| bunzip | split a multi column input into single column outputs |
| bzip | combine single column inputs into a multi column output |
| csv | convert bsv to csv |
| xxh3 | xxh3_64 hash stdin |
bcat
cat some bsv files to csv
usage: bcat [-l|--lz4] [-p|--prefix] [-h N|--head N] FILE1 ... FILEN
>> for char in a a b b c c; do
echo $char | bsv >> /tmp/$char
done
>> bcat --head 1 --prefix /tmp/{a,b,c}
/tmp/a:a
/tmp/b:b
/tmp/c:c
bcombine
prepend a new column by combining values from existing columns
usage: ... | bcombine COL1,...,COLN
>> echo a,b,c | bsv | bcombine 3,2 | csv
b:a,a,b,c
bcounteach
count as i64 each contiguous identical row by the first column
usage: ... | bcounteach
echo '
a
a
b
b
b
a
' | bsv | bcounteach | bschema *,i64:a | csv
a,2
b,3
a,1
bcounteach-hash
count as i64 by hash of the first column
usage: ... | bcounteach-hash
echo '
a
a
b
b
b
a
' | bsv | bcounteach-hash | bschema *,i64:a | bsort | csv
a,3
b,3
bcountrows
count rows as i64
usage: ... | bcountrows
>> echo '
1
2
3
4
' | bsv | bcountrows | csv
4
bcut
select some columns
usage: ... | bcut COL1,...,COLN
>> echo a,b,c | bsv | bcut 3,3,3,2,2,1 | csv
c,c,c,b,b,a
bdedupe
dedupe identical contiguous rows by the first column, keeping the first
usage: ... | bdedupe
>> echo '
a
a
b
b
a
a
' | bsv | bdedupe | csv
a
b
a
bdedupe-hash
dedupe rows by hash of the first column, keeping the first
usage: ... | bdedupe-hash
>> echo '
a
a
b
b
a
a
' | bsv | bdedupe-hash | csv
a
b
bdropuntil
for sorted input, drop until the first column is gte to VALUE
usage: ... | bdropuntil VALUE [TYPE]
>> echo '
a
b
c
d
' | bsv | bdropuntil c | csv
c
d
bhead
keep the first n rows
usage: ... | bhead N
>> echo '
a
b
c
' | bsv | btail 2 | csv
a
b
blz4
compress bsv data
usage: ... | blz4
>> echo a,b,c | bsv | blz4 | blz4d | csv
a,b,c
blz4d
decompress bsv data
usage: ... | blz4d
>> echo a,b,c | bsv | blz4 | blz4d | csv
a,b,c
bmerge
merge sorted files from stdin
usage: echo FILE1 ... FILEN | bmerge [TYPE] [-r|--reversed] [-l|--lz4]
>> echo -e 'a
c
e
' | bsv > a.bsv
>> echo -e 'b
d
f
' | bsv > b.bsv
>> echo a.bsv b.bsv | bmerge
a
b
c
d
e
f
bpartition
split into multiple files by consistent hash of the first column value
usage: ... | bpartition NUM_BUCKETS [PREFIX] [-l|--lz4]
>> echo '
a
b
c
' | bsv | bpartition 10 prefix
prefix03
prefix06
bquantile-merge
merge ddsketches and output quantile value pairs as f64
usage: ... | bquantile-merge QUANTILES
>> seq 1 100 | bsv | bschema a:i64 | bquantile-sketch i64 | bquantile-merge .2,.5,.7 | bschema f64:a,f64:a | csv
0.2,19.88667024086646
0.5,49.90296094906742
0.7,70.11183939140405
bquantile-sketch
collapse the first column into a single row ddsketch
usage: ... | bquantile-sketch TYPE [-a|--alpha] [-b|--max-bins] [-m|--min-value]
>> seq 1 100 | bsv | bschema a:i64 | bquantile-sketch i64 | bquantile-merge .2,.5,.7 | bschema f64:a,f64:a | csv
0.2,19.88667024086646
0.5,49.90296094906742
0.7,70.11183939140405
bschema
validate and converts row data with a schema of columns
usage: ... | bschema SCHEMA [--filter]
--filter remove bad rows instead of erroring
example schemas:
*,*,* = 3 columns of any size
8,* = a column with 8 bytes followed by a column of any size
8,*,... = same as above, but ignore any trailing columns
a:u16,a:i32,a:f64 = convert ascii to numerics
u16:a,i32:a,f64:a = convert numerics to ascii
4*,*4 = keep the first 4 bytes of column 1 and the last 4 of column 2
>> echo aa,bbb,cccc | bsv | bschema 2,3,4 | csv
aa,bbb,cccc
bsort
timsort rows by the first column
usage: ... | bsort [-r|--reversed] [TYPE]
>> echo '
3
2
1
' | bsv | bschema a:i64 | bsort i64 | bschema i64:a | csv
1
2
3
bsplit
split a stream into multiple files
usage: ... | bsplit PREFIX [chunks_per_file=1]
>> echo -n a,b,c | bsv | bsplit prefix
prefix_0000000000
bsum
sum the first column
usage: ... | bsum TYPE
>> echo '
1
2
3
4
' | bsv | bschema a:i64 | bsum i64 | bschema i64:a | csv
10
bsumeach
sum the second column of each contiguous identical row by the first column
usage: ... | bsumeach TYPE
echo '
a,1
a,2
b,3
b,4
b,5
a,6
' | bsv | bschema *,a:i64 | bsumeach i64 | bschema *,i64:a | csv
a,3
b,12
a,6
bsumeach-hash
sum as i64 the second column by hash of the first column
usage: ... | bsumeach-hash i64
echo '
a,1
a,2
b,3
b,4
b,5
a,6
' | bsv | bschema *,a:i64 | bsumeach-hash i64 | bschema *,i64:a | csv
a,3
b,12
a,6
bsv
convert csv to bsv
usage: ... | bsv
>> echo a,b,c | bsv | bcut 3,2,1 | csv
c,b,a
btake
take while the first column is VALUE
usage: ... | btake VALUE
>> echo '
a
b
c
d
' | bsv | bdropntil c | btake c | csv
c
btakeuntil
for sorted input, take until the first column is gte to VALUE
usage: ... | btakeuntil VALUE [TYPE]
>> echo '
a
b
c
d
' | bsv | btakeuntil c | csv
a
b
btopn
accumulate the top n rows in a heap by first column value
usage: ... | btopn N [TYPE] [-r|--reversed]
>> echo '
1
3
2
' | bsv | bschema a:i64 | btopn 2 i64 | bschema i64:a | csv
3
2
bunzip
split a multi column input into single column outputs
usage: ... | bunzip PREFIX [-l|--lz4]
>> echo '
a,b,c
1,2,3
' | bsv | bunzip col && echo col_1 col_3 | bzip | csv
a,c
1,3
bzip
combine single column inputs into a multi column output
usage: ls column_* | bzip [COL1,...COLN] [-l|--lz4]
>> echo '
a,b,c
1,2,3
' | bsv | bunzip column && ls column_* | bzip 1,3 | csv
a,c
1,3
csv
convert bsv to csv
usage: ... | csv
>> echo a,b,c | bsv | csv
a,b,c
xxh3
xxh3_64 hash stdin
usage: ... | xxh3 [--stream|--int]
--stream pass stdin through to stdout with hash on stderr
--int output hash as int not hash
>> echo abc | xxh3
079364cbfdf9f4cb