node-sqlite icon indicating copy to clipboard operation
node-sqlite copied to clipboard

For await syntax

Open woubuc opened this issue 3 years ago • 10 comments

Given that the for await syntax is a thing now, I think it would be very useful if the .each() method returned an async iterator instead of using a classical callback.

woubuc avatar Sep 28 '20 08:09 woubuc

Not sure how it would work - the reason for the existing implementation is because sqlite3 has two separate callbacks - one for when a row is fetched and another for when the entire operation is done, giving back the number of rows returned.

Altering each to return an iterator is tricky as a result, and also breaks the existing API.

I would be open to a new API method (like asyncEach) that returns an iterator if you want to try your hand at it.

theogravity avatar Sep 28 '20 08:09 theogravity

This is difficult because the row callback for each is synchronous, but for await would require an aync callback (even if you don't actually use any async methods in the for body). To achieve this result you'd have to keep each row in a queue and wait for the row callback to finish before emitting the next result.

Something like this should do it:

function resultQueue(){
  let returnValue;
  const q = [];
  return {
    getNext(){
      if(q.length){
        return q.shift();
      }
      return new Promise(resolve => {
        returnValue = v => {
          returnValue = null;
          resolve(v);
        };
      });
    },

    setNext(v){
      if(returnValue) {
        returnValue(v);
      } else {
        q.push(v);
      }
    }
  }
}

async function* asyncEach(statement) {
  const { getNext, setNext } = resultQueue();
  db.each(statement, setNext).then(() => setNext(null));
  let row;
  // Assuming there are no falsy values passed to `setNext` until the final callback
  // i.e. null return value indicates no more records
  while(row = await getNext()) {
    yield row;
  }
}

(async function(){
  for await(const row of asyncEach('SELECT * FROM table;')) {
    await doSomethingWithRow(row);
  }
}());

PaulKiddle avatar Oct 09 '20 14:10 PaulKiddle

I have also expected non-callback API of this lib.

Maybe I have missed something, but this simple code works for me:

for await (const row of getRows(db, sql)) {
    console.log(JSON.stringify(row));
}

function getRows(db, sql) {
    let resolve;
    let promise = new Promise(res => {
        resolve = res;
    })
    db.each(sql, (err, row) => {
        resolve(row);
        promise = new Promise(res => {
            resolve = res;
        });
    }).then(rowCount => {
        resolve();
    });
    return (async function * generator() {
        while (true) {
            const row = await promise;
            if (row === undefined) {
                break;
            }
            yield row;
        }
    })();
}

AlttiRi avatar Mar 09 '21 19:03 AlttiRi

@AlttiRi Looks interesting. Do you know what the perf / memory impact would be? Biggest concern is possible memory leak due to garbage collection issues.

theogravity avatar Mar 09 '21 22:03 theogravity

const db = await open({
    filename: "history.sqlite", // use Chromium DB
    driver: sqlite3.Database
});

const sql = `SELECT * FROM urls`;
let now, total, loopTime1, loopTime2;

loopTime1 = 0;
for (let i = 0; i < 10; i++) {
    total = 0;
    now = new Date();
    for await (const row of getRows(db, sql)) {
        total += row.url.length; // "useful work"
    }
    const time = new Date() - now;
    loopTime1 += time;
    // console.log(total);
    console.log("time 1", time);
}

loopTime2 = 0;
for (let i = 0; i < 10; i++) {
    total = 0;
    now = new Date();
    await db.each(sql, (err, row) => {
        total += row.url.length;
    });
    const time = new Date() - now;
    loopTime2 += time;
    // console.log(total);
    console.log("time 2", time);
}

console.log(loopTime1); // ~11 secs
console.log(loopTime2); // ~10 secs

Of course, for await would be slower a bit. But it also should be order. Memory consuming for both cycles are ~ the same.

BTW, PaulKiddle's code does not work at all.

But there is a strange thing, that awaiting inside for await breaks the work. It's really strange.

const sql2 = `SELECT * FROM urls LIMIT 5;`;
for await (const row of getRows(db, sql2)) {
    console.log(row.url); // printed once
    await new Promise(resolve => setTimeout(resolve, 0)); // here
    // await Promise.resolve(); // with it the code works
    console.log(row.url.length); // printed once
}

In fact it just should add pauses (4 ms) between each iteration, but it does not work correctly. There is only 1 iteration instead of 5 ones.

UPD. Well, possible it's problem of my code. It looks, that it requires to use a queue for the promises storing when consuming (by user's code) is slower that producing (by the library).

AlttiRi avatar Mar 10 '21 00:03 AlttiRi

@AlttiRi yeah, it would definitely need a queue - for each row, add to the queue, then on yield, pop off. Don't think you need to wrap promises around the row results either.

One thing I'm concerned about is memory usage if a queue is kept - there could be a situation where the queue piles up faster than the underlying code that processes the row, but we could mention that in the readme on the downsides of using this approach vs the current one

theogravity avatar Mar 15 '21 18:03 theogravity

Here is the most correct realization (I assume, I did not test it a lot. Just a quick implementation.):

function getRows(db, sql) {
    const queue = new Queue();

    let finished = false;
    let readyResolve;
    let readyPromise = new Promise(res => {
        readyResolve = res;
    });

    db.each(sql, (err, row) => {
        queue.push(row);
        readyResolve();
    }).then(rowCount => {
        finished = true;
        readyResolve();
    });

    return (async function * generator() {
        while (true) {
            await readyPromise;
            const row = queue.shift();
            if (!queue.length && !finished) {
                readyPromise = new Promise(res => {
                    readyResolve = res;
                });
            }
            if (!row) {
                break;
            }
            yield row;
        }
    })();
}

As you can see it uses Queue.

For the case when you consume rows within one task (a task with chained micro-tasks) it's not so important, and it possible to use just Array ([]). (In this case my first variant of getRows works correctly).

for await (const row of getRows(db, sql)) {
    console.log(row);
    await Promise.resolve();
}

But if you handle the rows in different tasks:

for await (const row of getRows(db, sql)) {
    console.log(row);
    await new Promise(resolve => setImmediate(resolve));
}

It requires to store the rows in an array. Pushing and shifting values to the array works significantly slower for "ArrayList" (Array), so it requires to use "LinkedList" (see my Queue).

My implementation:

class Queue {
    length = 0;
    push(value) {
        const newLast = {
            value,
            next: null
        };
        if (!this._last) {
            if (!this._first) {
                this._first = newLast;
            } else {
                this._first.next = newLast;
                this._last = newLast;
            }
        } else {
            this._last.next = newLast;
            this._last = newLast;
        }
        this.length++;
    }
    shift() {
        if (!this.length) {
            return;
        }
        const first = this._first?.value;
        this._first = this._first?.next;
        this.length--;
        if (!this.length) {
            this._last = null;
        }
        return first;
    }
    *[Symbol.iterator]() {
        while (this.length) {
            yield this.shift();
        }
    }
}

The bench code:

const sql = `SELECT * FROM urls`; // 100k+ rows
let loopTime1 = 0, loopTime2 = 0;
for (let i = 0; i < 5; i++) {
    const now = new Date();
    let total = 0;
    for await (const row of getRows(db, sql)) {
        total += 1;
        //await new Promise(resolve => setImmediate(resolve));
    }
    const time = new Date() - now;
    loopTime1 += time;
    console.log(total);
    console.log("time 1", time);
}
for (let i = 0; i < 5; i++) {
    const now = new Date();
    let total = 0;
    await db.each(sql, (err, row) => {
        total += 1;
    });
    const time = new Date() - now;
    loopTime2 += time;
    console.log(total);
    console.log("time 2", time);
}
console.log(loopTime1);
console.log(loopTime2);
console.log(Math.abs(100 - (loopTime1 / loopTime2 * 100)).toFixed(2), "%");

The performance is ~15 % slower than the callback approach.


Also you can uncomment await new Promise(resolve => setImmediate(resolve));. And compare the performance with Queue and Array. With Queue it slower by 50 %, with Array — by 850 %.

AlttiRi avatar Mar 30 '21 03:03 AlttiRi

Thanks for looking into this - I'll check it out over the weekend

theogravity avatar Mar 31 '21 20:03 theogravity

Here is the most correct realization (I assume, I did not test it a lot. Just a quick implementation.):

function getRows(db, sql) {
    const queue = new Queue();

    let finished = false;
    let readyResolve;
    let readyPromise = new Promise(res => {
        readyResolve = res;
    });

    db.each(sql, (err, row) => {
        queue.push(row);
        readyResolve();
    }).then(rowCount => {
        finished = true;
        readyResolve();
    });

    return (async function * generator() {
        while (true) {
            await readyPromise;
            const row = queue.shift();
            if (!queue.length && !finished) {
                readyPromise = new Promise(res => {
                    readyResolve = res;
                });
            }
            if (!row) {
                break;
            }
            yield row;
        }
    })();
}

As you can see it uses Queue.

For the case when you consume rows within one task (a task with chained micro-tasks) it's not so important, and it possible to use just Array ([]). (In this case my first variant of getRows works correctly).

for await (const row of getRows(db, sql)) {
    console.log(row);
    await Promise.resolve();
}

But if you handle the rows in different tasks:

for await (const row of getRows(db, sql)) {
    console.log(row);
    await new Promise(resolve => setImmediate(resolve));
}

It requires to store the rows in an array. Pushing and shifting values to the array works significantly slower for "ArrayList" (Array), so it requires to use "LinkedList" (see my Queue).

My implementation:

class Queue {
    length = 0;
    push(value) {
        const newLast = {
            value,
            next: null
        };
        if (!this._last) {
            if (!this._first) {
                this._first = newLast;
            } else {
                this._first.next = newLast;
                this._last = newLast;
            }
        } else {
            this._last.next = newLast;
            this._last = newLast;
        }
        this.length++;
    }
    shift() {
        const first = this._first?.value;
        this._first = this._first?.next;
        this.length--;
        return first;
    }
}

The bench code:

const sql = `SELECT * FROM urls`; // 100k+ rows
let loopTime1 = 0, loopTime2 = 0;
for (let i = 0; i < 5; i++) {
    const now = new Date();
    let total = 0;
    for await (const row of getRows(db, sql)) {
        total += 1;
        //await new Promise(resolve => setImmediate(resolve));
    }
    const time = new Date() - now;
    loopTime1 += time;
    console.log(total);
    console.log("time 1", time);
}
for (let i = 0; i < 5; i++) {
    const now = new Date();
    let total = 0;
    await db.each(sql, (err, row) => {
        total += 1;
    });
    const time = new Date() - now;
    loopTime2 += time;
    console.log(total);
    console.log("time 2", time);
}
console.log(loopTime1);
console.log(loopTime2);
console.log(Math.abs(100 - (loopTime1 / loopTime2 * 100)).toFixed(2), "%");

The performance is ~15 % slower than the callback approach.

Also you can uncomment await new Promise(resolve => setImmediate(resolve));. And compare the performance with Queue and Array. With Queue it slower by 50 %, with Array — by 850 %.

@AlttiRi The code looks good. I had to play around with it myself to understand what it's doing as I never write generators. I'm still trying to figure out if the queue should be pluggable, as in the end-user specifies what kind of queue they want to use for this as different queues may have different performance characteristics.

I'll try to bake this in for next weekend as I need to spend time writing tests / testing it out. Thanks!

theogravity avatar Apr 04 '21 19:04 theogravity

Why is there not a completed callback? An async queue is easy to do myself but I need to know when to end.

Edit: Sorry, just found the db property

brerneto avatar Jul 18 '22 18:07 brerneto