Write data streaming to a parquet file
What?
Hi, we're using at the time @dsnp/parquetjs to write parquet files in node. But is a fork of an old package. And doesn't look super maintained.
So I came across this repo that looks super active but is not clear to me if we can do what we're doing now with parquet-wasm. So maybe you can help me understand.
What do we want to do?
We want to iterate a huge PostgreSQL table with a cursor so we have batches of rows that we want to iterate and store in a parquet file.
So I was wondering if that's possible with parquet-wasm. Handle streaming of data and at the end save the file in disk
This is how we do with @dsnp/parquetjs
const BATCH_SIZE = 4096
const SQL_QUERY = 'SELECT * FROM users'
async function writeParquet(): Promise<string> {
return new Promise<string>((resolve) => {
let url: string
// This doesn't matter.
// Source batchquery do a cursor pg iteration
// and we receive N rows for each batch in `onBatch` method
OUR_POSTGREST_DB.batchQuery(SQL_QUERY, {
batchSize: BATCH_SIZE,
onBatch: async (batch) => {
if (!writer) {
const schema = this.buildParquetSchema(batch.fields)
writer = await ParquetWriter.openFile(schema, '/path/to/file.parquet', {
rowGroupSize: size > ROW_GROUP_SIZE ? size : ROW_GROUP_SIZE,
})
}
for (const row of batch.rows) {
// This does not write in parquet I think but accumulate as many rows
// as you define in `rowGroupSize`
await writer.appendRow(row)
}
if (batch.lastBatch) {
await writer.close()
resolve(url)
}
},
})
})
}
Thanks for the help!
Right now we support streaming reads but not yet streaming writes. That's pending https://github.com/kylebarron/parquet-wasm/pull/305
Thanks! looking forward. Do you know how much work is left to do in that PR?
I haven't looked at that PR in a while. It looks like it needs a little work to be updated with the latest main branch. But aside from that it might work with few changes. You can ask @H-Plus-Time if he's interested in working on that PR more.
I see that the work on the #305 has been completed. I looked at code examples, but none of them show how to create a parquet file from scratch with a specific scheme and write data to file in a stream, something like here or here. Is there a minimal working example that demonstrates this? Thanks.
I don't have a full example myself; let's check with @H-Plus-Time in case he has a minimum example of using this.
The crux of this is: you need something that generates Arrow IPC data first.
The below is reasonable for a materialized table:
import * as arrow from 'apache-arrow';
import * as parquet from 'parquet-wasm';
// construct your table (and therefore schema) via apache-arrow (or similar)
const arrowTableInstance = arrow.tableFromArrays({"sample": [1,2,3]});
const recordBatches = parquet.Table.fromIPCStream(arrow.tableToIPC(arrowTableInstance)).recordBatches();
// this is what you need (easy in this materialized case)
const streamOfRecordBatches = ReadableStream.from(recordBatches);
const byteStream = await parquet.transformParquetStream(streamOfRecordBatches);
In NodeJS, the IO parts:
import { open } from 'fs/promises';
import { Writable } from 'stream';
const handle = await open("file.parquet");
const destinationStream = Writable.toWeb(handle.createWriteStream());
await byteStream.pipeTo(destinationStream);
Assuming you had something like an async generator that yielded batches of json objects, you'd do something like:
const streamOfRecordBatches = new ReadableStream({
async start(controller) {
for await (const chunk of yourGeneratorFunction()) {
const arrowTable = arrow.tableFromJSON(chunk);
const recordBatches = parquet.Table.fromIPCStream(arrow.tableToIPC(arrowTable)).recordBatches();
for(const recordBatch of recordBatches) {
controller.enqueue(recordBatch);
}
}
controller.close();
},
});
The interface in the start of this thread (this: https://github.com/kylebarron/parquet-wasm/issues/542#issue-2315584308 ), would take a bit of adapting since it's (basically) callback-based. If it happens to be exactly your use-case, you'd want this: https://github.com/porsager/postgres?tab=readme-ov-file#await-sqlcursorrows--1-fn as your source async iterable.