parquet-wasm icon indicating copy to clipboard operation
parquet-wasm copied to clipboard

Write data streaming to a parquet file

Open andresgutgon opened this issue 1 year ago • 3 comments

What?

Hi, we're using at the time @dsnp/parquetjs to write parquet files in node. But is a fork of an old package. And doesn't look super maintained.

So I came across this repo that looks super active but is not clear to me if we can do what we're doing now with parquet-wasm. So maybe you can help me understand.

What do we want to do?

We want to iterate a huge PostgreSQL table with a cursor so we have batches of rows that we want to iterate and store in a parquet file.

So I was wondering if that's possible with parquet-wasm. Handle streaming of data and at the end save the file in disk

This is how we do with @dsnp/parquetjs

const BATCH_SIZE = 4096
const SQL_QUERY = 'SELECT * FROM users'
async function writeParquet(): Promise<string> {
  return new Promise<string>((resolve) => {
    let url: string
    // This doesn't matter. 
    // Source batchquery do a cursor pg iteration 
    // and we receive N rows for each batch in `onBatch` method
    OUR_POSTGREST_DB.batchQuery(SQL_QUERY, {
      batchSize: BATCH_SIZE,
      onBatch: async (batch) => {
        if (!writer) {
          const schema = this.buildParquetSchema(batch.fields)
          writer = await ParquetWriter.openFile(schema, '/path/to/file.parquet', {
              rowGroupSize: size > ROW_GROUP_SIZE ? size : ROW_GROUP_SIZE,
          })
        }

        for (const row of batch.rows) {
          // This does not write in parquet I think but accumulate as many rows
          // as you define in `rowGroupSize`
          await writer.appendRow(row)
        }

        if (batch.lastBatch) {
          await writer.close()
          resolve(url)
        }
      },
    })
  })
}

Thanks for the help!

andresgutgon avatar May 24 '24 14:05 andresgutgon

Right now we support streaming reads but not yet streaming writes. That's pending https://github.com/kylebarron/parquet-wasm/pull/305

kylebarron avatar May 27 '24 10:05 kylebarron

Thanks! looking forward. Do you know how much work is left to do in that PR?

andresgutgon avatar May 27 '24 10:05 andresgutgon

I haven't looked at that PR in a while. It looks like it needs a little work to be updated with the latest main branch. But aside from that it might work with few changes. You can ask @H-Plus-Time if he's interested in working on that PR more.

kylebarron avatar May 27 '24 10:05 kylebarron

I see that the work on the #305 has been completed. I looked at code examples, but none of them show how to create a parquet file from scratch with a specific scheme and write data to file in a stream, something like here or here. Is there a minimal working example that demonstrates this? Thanks.

AlekseyMelikov avatar Feb 12 '25 04:02 AlekseyMelikov

I don't have a full example myself; let's check with @H-Plus-Time in case he has a minimum example of using this.

kylebarron avatar Feb 12 '25 15:02 kylebarron

The crux of this is: you need something that generates Arrow IPC data first.

The below is reasonable for a materialized table:

import * as arrow from 'apache-arrow';
import * as parquet from 'parquet-wasm';

// construct your table (and therefore schema) via apache-arrow (or similar)
const arrowTableInstance = arrow.tableFromArrays({"sample": [1,2,3]});

const recordBatches = parquet.Table.fromIPCStream(arrow.tableToIPC(arrowTableInstance)).recordBatches();

// this is what you need (easy in this materialized case)
const streamOfRecordBatches = ReadableStream.from(recordBatches);

const byteStream = await parquet.transformParquetStream(streamOfRecordBatches);

In NodeJS, the IO parts:

import { open } from 'fs/promises';
import { Writable } from 'stream';

const handle = await open("file.parquet");
const destinationStream = Writable.toWeb(handle.createWriteStream());
await byteStream.pipeTo(destinationStream);

Assuming you had something like an async generator that yielded batches of json objects, you'd do something like:

const streamOfRecordBatches = new ReadableStream({
	async start(controller) {
		for await (const chunk of yourGeneratorFunction()) {
                        const arrowTable = arrow.tableFromJSON(chunk);
                        const recordBatches = parquet.Table.fromIPCStream(arrow.tableToIPC(arrowTable)).recordBatches();
                        for(const recordBatch of recordBatches) {
			      controller.enqueue(recordBatch);
                        }
		}
		controller.close();
	},
});

H-Plus-Time avatar Feb 17 '25 08:02 H-Plus-Time

The interface in the start of this thread (this: https://github.com/kylebarron/parquet-wasm/issues/542#issue-2315584308 ), would take a bit of adapting since it's (basically) callback-based. If it happens to be exactly your use-case, you'd want this: https://github.com/porsager/postgres?tab=readme-ov-file#await-sqlcursorrows--1-fn as your source async iterable.

H-Plus-Time avatar Feb 17 '25 09:02 H-Plus-Time