flink-http-connector icon indicating copy to clipboard operation
flink-http-connector copied to clipboard

Add scan support

Open ChenShuai1981 opened this issue 2 years ago • 7 comments

Could you provide an example of Http Periodically Scan Source (not lookup source)? Does it support renew access token after expiration?

ChenShuai1981 avatar Dec 17 '22 01:12 ChenShuai1981

Hi @ChenShuai1981 Scan source is currently not supported by this connector, hence no example available :) for now we have only lookup source. Although this would be a great feature, would you like to contribute? :)

The proper Flink interfaces would have to be implanted.

This feature would be a nice one though, however it would be very "client specific".

kristoffSC avatar Dec 18 '22 16:12 kristoffSC

@ChenShuai1981 the lookup support that exists currently ends up issuing gets, puts or posts on single records. For the scan to work, I suspect we would need to issue searches, and get involved with paging the results. This could really impact performance of a scan, as we could end up effective doing table scans, unless we could do predicate pushdown.

davidradl avatar Jun 10 '24 13:06 davidradl

@ChenShuai1981 the lookup support that exists currently ends up issuing gets, puts or posts on single records. For the scan to work, I suspect we would need to issue searches, and get involved with paging the results. This could really impact performance of a scan, as we could end up effective doing table scans, unless we could do predicate pushdown.

Yes, you are right. Since content provider will update information irregularly so we have to periodly send get/post request to fetch them and sync into our database. Scenario like network crawler and system integration. Generally speaking if the results is too large the provider will return a streaming response.

ChenShuai1981 avatar Jun 11 '24 01:06 ChenShuai1981

I am prototyping adding scan support. The reason I think this is useful is that:

  • this will make this connector more comprehensive - so could realistically be contributed to Flink. I hope to do that in a Flip once the scan support is there.
  • we have had issues with the lookup connector where sometimes the predicates come through as filters rather than lookup keys e.g. issue 143.
  • I feel that issue HTTP-99 and its PR #149 , lays down a more comprehensive way to drive the http connector without needing custom Java.

The design I am thinking of is:

  • keep the polling factory and the like as is
  • keep the query creators as-is. The only difference between what we need for a scan query and a lookup query is that the lookup query needs the look up keys.
  • introduce a new getScanRuntime, based off the Flink example
  • the socket example uses the http client in a streaming way. In order to reuse the existing Polling factory and associated customization, I am looking to call polling client.pull() in the pollNext of the SourceReader.
  • implement an equivalent of the fix I did for JDBC https://issues.apache.org/jira/browse/FLINK-33365 so the Lookup code gets access to the filters
  • As per the socket example, there will be no parallelism, split support or watermark support in the source. I assume we will get standard watermark behaviour implemented after the source as per non-Kafka sources.

A picture of the sort of architecture I am prototyping. Please let me know if this sounds reasonable. New bits in green. It is not totally formal Uml - but hopefully you get the idea.

Image

Let me know your thoughts @ChenShuai1981 @kristoffSC @grzegorz8. As this is likely to involve a major rewrite -changing package names and the like, I suggest we could consider upping the version to 2.0.

davidradl avatar Apr 16 '25 13:04 davidradl

@davidradl Hey! Thank you for addressing this feature - it would be great to have it in the connector. Overall, all the points you mentioned look good. However, I have a few comments:

  • Scan abilities.
    • Filter pushdown, limit pushdown - this is fine.
    • Projection pushdown - I assume REST endpoints rarely accept the list of fields to return, so this is not a priority.
  • Parallelism. I agree, let’s stick with parallelism=1. While parallel execution might be possible, I don't think it's worth the added complexity at this point.
  • Result pagination.
    • REST APIs typically return results in pages. How do we want to handle this? This is related to: https://github.com/getindata/flink-http-connector/issues/118
    • We’ll need to consider both bounded and unbounded data scenarios. E.g. for unbounded scenario we might want to implement some backoff delay strategy if there are no further results available at the moment. In my opinion, handling bounded scenario is a must-have while unbounded can be addressed later.
  • Checkpointing. If the scan source is going to be used in streaming execution mode, we need to support checkpointing. What we store in the checkpoint largely depends on the pagination strategy implemented in the source REST API. To ensure reliable scanning scan capability, the REST API pagination strategy has to be stable and idempotent.

grzegorz8 avatar May 02 '25 09:05 grzegorz8

@grzegorz8 thanks for your support on this. On Result pagination:

  • I think it depends on the use cases we want to go after .
  • Use case 1) I think there are use cases that would not need pagination. I am thinking of issuing calls to AI, where we may not have a convenient lookup key to join on.
  • Use case 2) the query is some sort of search. a - the general case would be a table scan select * from tableA b - a more targetted case would be table scan select * from tableA where ...

With volatile data, it is not possible to have a REST API pagination strategy that is stable and idempotent.

One way to offer pagination would be to have a inserts in the url, that would determine the limit and offset to use. When we poll for more results it would need to do a complete paginated search again.

Maybe this sort of rollout

  • no pagination, only one record in response - supports use case 1
  • no pagination, multiple records in response - supports use case 1 and some 2b. Maybe include a limit in the SQL to help with use case 2a).
  • pagination with warnings in the docs about performance. - supports all use cases.

On checkpointing - I am not sure what we need to store, I suggest we would only store how much we have got from the API if we know the data is to going to change under us, then we could ask for more.

I agree Bounded seems a sensible way to start on this. WDYT?

davidradl avatar May 16 '25 15:05 davidradl