apify-client-js icon indicating copy to clipboard operation
apify-client-js copied to clipboard

Enable reading and writing to dataset with NodeJS streams

Open mtrunkat opened this issue 4 years ago • 2 comments

We have 2 main endpoints for dataset items collection enabling manipulation with items stored in the dataset

  • dataset.listItems() https://github.com/apify/apify-client-js/blob/master/src/resource_clients/dataset.js#L58
  • dataset.pushItems() https://github.com/apify/apify-client-js/blob/master/src/resource_clients/dataset.js#L140

to make transformation more effective we could use NodeJS streams in the following manner:

const { ApifyClient } = require('apify-client');
const { Transform } = require('stream');

const client = new ApifyClient({
    token: 'MY-APIFY-TOKEN',
});

const sourceDatasetClient = client.dataset(SOME_DATASET_ID_1);
const targetDatasetClient = client.dataset(SOME_DATASET_ID_2);

const myTransform = new Transform({
    objectMode: true,
    transform(obj, encoding, callback) {
       delete obj.someField1;
       delete obj.someField2;

        callback(null, obj);
    }
});

const myTransformation = await datasetClient.getListItemsStream();
const itemsWriteStream = await datasetClient.getPushItemsStream();

itemsReadStream
    .pipe(myTransformation)
    .pipe(itemsWriteStream)
  • getListItemsStream should support the same parameters as the non-stream listItems method and just instead of returning object will return a stream. As response may have GBs of data the parsing must also happen in a streaming manner. To make this easier you can use format=jsonl where each line is one item serialized as JSON.
  • getPushItemsStream will have no parameters and will be simply pushing data to the dataset also in a streaming manner. The only tricky part here is our maximal payload which is 9MB so before reaching this threshold you need to close the request and initiate a new one.

Info on Apify Dataset - https://docs.apify.com/storage/dataset Dataset API - https://docs.apify.com/api/v2#/reference/datasets/item-collection/get-items API client dataset docs - https://docs.apify.com/apify-client-js#datasetclient

mtrunkat avatar Jul 15 '21 13:07 mtrunkat

Wouldn't an iterator that works in batches be preferable instead of streams?

Two reasons: First is that processing each item as an event is not effective and there's a lot of overhead. It might be much faster to download eg 1000 items and then process them in a sync loop.

Second is that the push stream would not really be a stream, but a batch push anyway.

What do you think?

mnmkng avatar Jul 16 '21 11:07 mnmkng

  • Yes, the processing might be CPU intensive due to a large number of callbacks.
  • But push might be actually done the streaming way, you open a POST request and start pushing data there as they are being read and transformed. The only thing is that after every 9MBs of data you need to close and reinitiate the request so some data may get buffered.
  • There is one advantage of this - if transformation or upload is slow the backpressure (if streaming gets implemented correctly) will slow down/pause the request read stream.
  • So with backpressure, it may really effectively use 100% of available reading and writing speed as long as there is enough CPU to cover it

I am not sure how much more effective this might be but it's worth an experiment.

mtrunkat avatar Jul 16 '21 11:07 mtrunkat