eleventy icon indicating copy to clipboard operation
eleventy copied to clipboard

Consume Eleventy `toNDJSON` stream via `for await`

Open monochromer opened this issue 3 years ago • 3 comments
trafficstars

I'm trying to use programmatic API for Eleventy and toNDJSON.

When i use .on('data', () => {}) interface, it works fine - the data flow goes chunk by chunk

const Eleventy = require('@11ty/eleventy')

;(async () => {
  const eleventy = new Eleventy()
  const stream = await eleventy.toNDJSON()

  stream.on('data', (entry) => {
     console.log(entry + '')
  })
})()

But when i use for await, all data come in one chunk:

for await (const entry of stream) {
    console.log(entry + '')
}

Maybe it's relates more to Node.js stream internals than to Eleventy.

monochromer avatar Jun 30 '22 17:06 monochromer

The behavior you see is a little complex (if I understand it correctly). It has to do with how a push on a readable creates a seperate event, but an async iterator pulls data directly in a microtask. So basically, this is node stream internals and can't really be changed by eleventy.

But nonetheless I do have a solution for you! Since all linebreaks are encoded in the data streamed by eleventy and entries are seperated by line breaks, you could just split on "\n" and get your entries that way:

const Eleventy = require("@11ty/eleventy");

function consume(entry) {
  console.log(entry);
  console.log("------------------------------");
}

(async () => {
  const eleventy = new Eleventy();
  const stream = await eleventy.toNDJSON();

  for await (const entries of stream.iterator()) {
    // Split on line breaks and remove the last element if it is empty
    for (const entry of (entries + "").split("\n").filter(entry => entry)) {
      consume(entry);
    }
  }
})();

Snapstromegon avatar Jun 30 '22 20:06 Snapstromegon

I organized it into pipeline:

const { pipeline, Transform } = require('node:stream')
const Eleventy = require('@11ty/eleventy')

;(async () => {
  const eleventy = new Eleventy()
  const eleventyDataStream = await eleventy.toNDJSON()

  pipeline(
    eleventyDataStream,
    new Transform({
      objectMode: true,
      transform(chunk, enc, next) {
        try {
          const item = JSON.parse(chunk + '')
          next(null, item)
        } catch (error) {
          next(error)
        }
      }
    }),
    async function* (stream) {
      for await (const entry of stream) {
        if (!entry?.url) {
          continue
        }

        console.log(entry)
      }
    },
    (error) => {
      if (error) {
        console.error(error)
      }
    }
  )
})()

monochromer avatar Jul 01 '22 08:07 monochromer

As I thought I understood it, async generators were made for streams. Is there really no way to get this to work?

Zearin avatar Jul 31 '22 17:07 Zearin