Enable reading and writing to dataset with NodeJS streams
We have 2 main endpoints for dataset items collection enabling manipulation with items stored in the dataset
dataset.listItems()https://github.com/apify/apify-client-js/blob/master/src/resource_clients/dataset.js#L58dataset.pushItems()https://github.com/apify/apify-client-js/blob/master/src/resource_clients/dataset.js#L140
to make transformation more effective we could use NodeJS streams in the following manner:
const { ApifyClient } = require('apify-client');
const { Transform } = require('stream');
const client = new ApifyClient({
token: 'MY-APIFY-TOKEN',
});
const sourceDatasetClient = client.dataset(SOME_DATASET_ID_1);
const targetDatasetClient = client.dataset(SOME_DATASET_ID_2);
const myTransform = new Transform({
objectMode: true,
transform(obj, encoding, callback) {
delete obj.someField1;
delete obj.someField2;
callback(null, obj);
}
});
const myTransformation = await datasetClient.getListItemsStream();
const itemsWriteStream = await datasetClient.getPushItemsStream();
itemsReadStream
.pipe(myTransformation)
.pipe(itemsWriteStream)
getListItemsStreamshould support the same parameters as the non-streamlistItemsmethod and just instead of returning object will return a stream. As response may have GBs of data the parsing must also happen in a streaming manner. To make this easier you can useformat=jsonlwhere each line is one item serialized as JSON.getPushItemsStreamwill have no parameters and will be simply pushing data to the dataset also in a streaming manner. The only tricky part here is our maximal payload which is 9MB so before reaching this threshold you need to close the request and initiate a new one.
Info on Apify Dataset - https://docs.apify.com/storage/dataset Dataset API - https://docs.apify.com/api/v2#/reference/datasets/item-collection/get-items API client dataset docs - https://docs.apify.com/apify-client-js#datasetclient
Wouldn't an iterator that works in batches be preferable instead of streams?
Two reasons: First is that processing each item as an event is not effective and there's a lot of overhead. It might be much faster to download eg 1000 items and then process them in a sync loop.
Second is that the push stream would not really be a stream, but a batch push anyway.
What do you think?
- Yes, the processing might be CPU intensive due to a large number of callbacks.
- But push might be actually done the streaming way, you open a POST request and start pushing data there as they are being read and transformed. The only thing is that after every 9MBs of data you need to close and reinitiate the request so some data may get buffered.
- There is one advantage of this - if transformation or upload is slow the backpressure (if streaming gets implemented correctly) will slow down/pause the request read stream.
- So with backpressure, it may really effectively use 100% of available reading and writing speed as long as there is enough CPU to cover it
I am not sure how much more effective this might be but it's worth an experiment.