neo4j-javascript-driver icon indicating copy to clipboard operation
neo4j-javascript-driver copied to clipboard

[Feature Request] Pull through streaming API to prevent back-pressure

Open jasperblues opened this issue 5 years ago • 20 comments
trafficstars

I have written a Neo4j client library that uses this driver under the hood. (https://drivine.org/).

One of the use-cases is streaming without back-pressure. Back-pressure is where the source produces data faster than the sync can process, for example writing too quickly into a file-stream, say if we were producing a report that presents COVID-19 cases for a region.

The library includes an API as follows:

openCursor<T>(spec: QuerySpecification<T>): Promise<Cursor<T>>;

. . whereby you can open a Cursor<T> representing a set of results. This cursor is AsyncIterable (supports for await (const item of cursor) and more importantly can turn itself into a Node.JS Readable stream.

When these streams are piped together, it is the sink stream that will pull through from the source, at the rate the sink can handle. So no back-pressure problems, excessive memory usage or going over a high-water mark. Good for many use-case. We can easily create an Observable from a stream too.

How it is currently Implemented:

The way this is implemented is to fetch data from Neo4j in batches using SKIP and LIMIT.

I wanted to see if I can replace this using the driver's scheming capabilities, however from what I understand RxJS has no capability to handling back-pressure. It is a push-through library. Right?

For pull-through we should use the companion IxJS (https://github.com/ReactiveX/IxJS) instead. Pulling through will push the onus of handling back-pressure onto the source, which would need to hold the entire result set and emit items when requested. This should not be a problem, as it needs to hold the results in memory for a period of time in any case.

So how about supporting pull-through streaming at the driver level? Either as Readable streams or with IxJS. (Or apparently there is a way to do it in RxJS: https://itnext.io/lossless-backpressure-in-rxjs-b6de30a1b6d4 <--- I'm still digesting this article).

jasperblues avatar Mar 15 '20 05:03 jasperblues

Hi @jasperblues,

We implemented back pressure with a buffer inside this driver with RxJS.

The idea is described in our upgrade guide The impl of this buffer in this driver is in stream-observers

zhenlineo avatar Mar 16 '20 08:03 zhenlineo

Hi @zhenlineo ,

From what I can see, back-pressure is handled correctly in the driver using the buffering approach, however for the consumer/user I don't see a facility, for the memory-constrained use-case. We can handle back-pressure as follows:

  • Sampling (this is lossy, so not suitable for our case, but I mention as it is a strategy).
  • Buffering. <--- Requires memory.
  • Pausing, until ready for more. <-- This case appears not covered for user/consumer of the driver?

Pausing:

On pausing:

If the upstream value is already calculated, then this shifts the onus of buffering upstream.

If it is computed then we can defer computation. In our case it is pre-calculated, but held in memory by Neo4j after query execution anyway. So it is "free".

Arguably we should let the consumer digest it at the speed they're able to. But from what I can see, with RxJS, we have some future value, and it will be pushed upon the user (onNext <-- deal with it). And this is why there is such a companion library as IxJS (https://github.com/ReactiveX/IxJS) which uses a pull-through model.

[x] Use-case: Query will return a large result set, I would like to subscribe as soon as first result is available. If the results come hard and fast, I'm ok to buffer this.   
^-- We're good.
[ ] Use-case: This will return a large result set. I have limited memory. I would like to process iteratively, and request more results as I'm able.  This way I can process large data on limited resources over time. 
^-- Not yet covered, as far as I know?```

Perhaps I have misunderstood though. 

jasperblues avatar Mar 16 '20 09:03 jasperblues

Apologies if I have misunderstood. If that is the case, it would be good to contribute to the documentation to describe:

  • How to consume results incrementally given resources. (onNext seems to push more results, ready or not).
  • Consequently, how to be confident the driver is not using too much memory.

_above message typed while eating dinner 😊 _

jasperblues avatar Mar 16 '20 09:03 jasperblues

Hi @jasperblues,

When the local buffer is full, the driver will pause auto pulling new batches from the server. That's the design of this driver. Pls let us know if the impl is different from what've designed.

We currently fix the local buffer size to be the same as the fetch size of each batch. This means if a query will return 10000 records back. By default, the driver pull 1000 records each time. If a user can not consume them at this moment, then we will not pull more until the user has consumed 700 records. So we both uses buffer and pause to achieve the back pressure.

The user of this driver can set the fetch size (a.k.a. buffer size) to "control" the driver back pressure.

zhenlineo avatar Mar 16 '20 10:03 zhenlineo

Hello @zhenlineo This is clear now! Thank you very much for the explanation.

(If you like I can send a PR to clarify what you have explained to me in the README)

jasperblues avatar Mar 16 '20 10:03 jasperblues

If a user can not consume them at this moment, then we will not pull more until the user has consumed 700 records.

Actually, sorry it is still not clear. What is the mechanism by with the user signifies it is unable to consume more ? The API provides onNext and the user must accept this. How can the user control the speed at which records are pulled through?

If this option to pause on the user side does not exist, the only remaining option is buffering.

jasperblues avatar Mar 16 '20 17:03 jasperblues

@jasperblues ,

You have a point. We also have a reactive API provided via RxResult, would that make more sense to you? We might not actually make use of pause properly with this current API in javascript driver.

zhenlineo avatar Mar 17 '20 09:03 zhenlineo

Hello @zhenlineo , sorry for the slow reply - juggling some research and billable work.

Using RxResult would work if it was augmented with something like the following:

declare interface RxResult {
  keys(): Observable<string[]>

  records(): Observable<Record>

  summary(): Observable<ResultSummary>
  
  pause(): void; 
  
  resume(): void; 
  
}

If the pause() and resume() methods were added it would be fully possible to do each of the following:

  • implement AsyncIterable allowing the use of `for await (const item of) where results are pulled in batches.
  • Convert the RxResult to a node.js stream including memory efficient back-pressure.
  • Control the RxJS subscription rate so that there's no memory pressure.

Without some way to control the flow rate, the only option for the end-user is to buffer. This is not always feasible.

jasperblues avatar Mar 19 '20 09:03 jasperblues

Hi @jasperblues,

Thanks for the feedback to the js driver API.

When implementing reactive, we found there are two levels of API that one lib can provide: Level 1. Raw API such as reactive-streams where you can control the flow (pause/resume via control the pace of calling Subscription#request). Level 2. High level API such as ReactiveX. With this high level API, the buffering, the management of the pause of restart is all done by the lib.

I feel what you are looking for is the Level 1 API, where you want to control the flow. Currently we only have Java driver which exposes this API:

public interface RxResult {
  Publisher<List<String>> keys();
  Publisher<Record> records();
  Publisher<ResultSummary> consume();
}

The full API doc.

For most end users of reactive, we do not expect them to actually direct consume on this low level API. When using Java driver API, one mostly shall choose either project-reactor or RxJava, as it is not easy to get pause and resume correct.

The js driver decided to expose a Level 2 API RxJs directly, with the promise that we will buffer, pause, and resume for end user properly. We would very much to fix any bug if we failed to provide a smart buffering and fetching with this driver. But we will not give back the control of explicit pause and resume with this API.

We could of course look into the API to give the Level ! API directly. However most users may have problem to directly consume such a raw API to implement their own flow control and back-pressure.

Feel happy to have a call directly if you still have questions :)

zhenlineo avatar Mar 19 '20 10:03 zhenlineo

When using Java driver API, one mostly shall choose either project-reactor or RxJava, as it is not easy to get pause and resume correct.

@zhenlineo It can be very simple for the end-user in Node.js ! Here's how it is in Drivine:

Cursor Implements AsyncIterable

While looping over the result set, new results will be pulled as needed, until the upstream is depleted. Consumption controls the flow.

const cursor = await repo.asyncRoutesBetween('Cavite Island', 'NYC');
for await (const item of cursor) {

}

Cursor can pose as a Node.js Stream

Streams can be piped together without fear of back-pressure because the sink stream controls the flow. (Don't push grass into the cow. Milk the cow. It will eat when it gets hungry).

cursor.asStream().pipe(fileStream);
await StreamUtils.untilClosed(fileStream);

Neither of the above will result in memory pressure and the only advice needed for the end user is that the second approach should be used if the results will be fed into another source that has a limitation on flow-rate. (Like a file stream).

Most Users Scenarios

The js driver decided to expose a Level 2 API RxJs directly, with the promise that we will buffer, pause, and resume for end user properly.

I'm not sure that it is possible for a driver to know the correct speed to push onto the consumer, because the usage scenarios will vary.

The inspiration for the above features in Drivine was based on production usage. One example is a report that was resulting in an out of memory error. If writing those results into a file-stream, it would be necessary to ensure we prevent going over the high-water mark.

The driver might pause or buffer internally, however it is pushing the results onto the subscriber, which now has no alternative except to buffer. Out of memory errors will happen. We should be able to demonstrate that with a test case.

With Drivine, there is a fully-fledged implementation for AgensGraph. The Cursor<T> is both AsyncIterable and a Readable stream, thus it can also be Rx Observable.

I have provided the same for Neo4j, but only by using SKIP and LIMIT, as I can see an alternative way to do it using this driver. This means that the query needs to be evaluated again, which is not not very efficient.

I agree that we can easily turn an AsyncIterable or Node.js Readable into an RxJS Observable, but going the other way is complicated, because RxJS pushes.

However, once again, I'm puzzled and don't think the driver can correctly buffer and pause because it will not know the correct speed for end-user scenarios. Even though it might be efficient internally, if it pushes too quickly onto the consumer, the consumer is now saddled with the same problem. So not a solution.

So for me personally, I'd really like to see those level one features. They don't have to be pretty because it can I can provide a beautiful level of abstraction for business users as outlined above.

jasperblues avatar Mar 19 '20 10:03 jasperblues

We have added this to our future roadmap for later discussion.

technige avatar Mar 23 '20 10:03 technige

I'm also in the same boat. I'm puzzled how the driver can know when I'm ready to consume more records.

Every database I've worked with has the pull-based approach where I can ask for more records when I'm ready. Push-based doesn't make any sense to me, it's almost the same as just grabbing the entire array and then iterating through each record from that array.

Looking at the Java docs, it looks like that driver has a pull-based approach as well, where StatementResult implements an Iterator with a next() function to fetch the next record, etc.

AsyncIterator is the lowest level interface to do this in javascript. AsyncIterable is sugar on top of this that makes it easier to consume.

CarsonF avatar Feb 24 '21 21:02 CarsonF

This functionality is important to me as well. I'm trying to write the results of a large query to a file without consuming massive memory. Currently neo4j reads the data faster than the file can be written.

I agree that adding pause and resume to RxResult and Result seems like the simplest solution.

benstevens48 avatar Mar 17 '21 17:03 benstevens48

We are planing to provide a AsyncIterator for consuming results in the Javascript. We don't have the final shape of the solution, but I will publish it here as soon as I have it.

bigmontz avatar Jul 01 '21 10:07 bigmontz

image

(https://drivine.org docs)

Hi @bigmontz !!

In Drivine I provide a Cursor abstraction, where cursor can be:

  • An AsyncIterator
  • A readable stream (pure NodeJS stream, which can easily be converted to RxJS or IxJS)

The reason for the latter is because while AsyncIterator is very handy, if you were to iterate and feed results into another stream you can overload that stream (back-pressure) - result in messy fiddling with the high-water mark or pausing.

Whereas with a stream and pipes the results will be pulled through at the correct speed. And you can of course have whatever number of transformation required between the source and the sink.


tldr; I suggest to provide both AysncIterator and Stream abstractions (at pure JS level) and instructions (or helper) for how to compose an RxJS Observable from the raw materials.

I don't like to mention other graphDBs but when using Drivine with AgensGraph the streaming works flawlessly, whereas with with Neo4j the only we to control consumption speed from the client side is to fake it with SKIP and LIMIT.

jasperblues avatar Jul 01 '21 12:07 jasperblues

Hi @jasperblues,

It's could be a pretty good idea to have the readable stream as part of the API too. I will discuss internally to see in which point of the timeline it will be addressed.

Thanks.

bigmontz avatar Jul 13 '21 11:07 bigmontz

Welcome @bigmontz , glad the feedback was useful. Feel free to take a look at Drivine code for inspiration.

My 2c: I think plain NodeJS stream abstraction is the most flexible starting point. From this we can compose (perhaps with helper method) RxJS (push) or IxJS (pull) streams.

jasperblues avatar Jul 13 '21 23:07 jasperblues

Since we have to support older LTS versions of the NodeJS and Browser, we decide to go with a more standard language interface using async iterators.

Usage example:

      const session = driver.session()
      try {
        const result = session.run('UNWIND RANGE(0, 10) AS x RETURN x')
        const xs = []
        for await (const record of result) {
          xs.push(record.get('x').toInt())
        }
        expect(xs).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
      } finally {
        await session.close()
      }

This API correctly implements watermarks using the fetch size and pauses do not request for more records when iteration is suspended.

The next step will be integrate it to the existing RxJS interface for fixing its current errant behaviour.

bigmontz avatar Jan 12 '22 11:01 bigmontz

The changes related to the back-pressure were in the 5.0.0-alpha01.

⚠️ This is an experimental release. It may completely change in the future. It does not contain any connectivity to Neo4j 5.0. In other words, this is equivalent to the Javascript driver 4.4.x driver with a few changes detailed in the changelog: https://github.com/neo4j/neo4j-javascript-driver/wiki/5.0-changelog#500-alpha01

bigmontz avatar Mar 03 '22 15:03 bigmontz

W00t! Way to go @bigmontz !!

jasperblues avatar Mar 03 '22 22:03 jasperblues

Neo4j Driver 5.0 has been released. So I will close this issue. Any feedback are welcome.

bigmontz avatar Sep 15 '22 13:09 bigmontz

Great work again @bigmontz 🙏🙏🙏🙏🙏 This was complex feature!!!!

jasperblues avatar Sep 15 '22 19:09 jasperblues