flink-connector-opensearch icon indicating copy to clipboard operation
flink-connector-opensearch copied to clipboard

[FLINK-30488] OpenSearch implementation of Async Sink

Open reta opened this issue 3 years ago • 11 comments

OpenSearch implementation of Async Sink (https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), a few TODO items:

  • [x] More test cases
  • [x] More sensitive defaults

reta avatar Dec 30 '22 19:12 reta

@zentol @MartijnVisser would appreciate if you guys have time for the review, adding AsyncSink support for OpenSearch, discussed initially [1]

[1] https://github.com/apache/flink/pull/18541#issuecomment-1026931087

reta avatar Jan 03 '23 21:01 reta

@zentol @MartijnVisser doing my one per month ping diligence :-), please

reta avatar Feb 06 '23 16:02 reta

@zentol @MartijnVisser doing my one per month ping diligence :-), please

I'm currently a bit over capacity. Don't know if the same applies for @zentol tbh

@dannycranmer Could you potentially help out? You also have the experience with the Async API, or perhaps @hlteoh37 ?

MartijnVisser avatar Feb 07 '23 13:02 MartijnVisser

Sure, I can take a look

hlteoh37 avatar Feb 07 '23 15:02 hlteoh37

Thanks a lot for the review @hlteoh37 , I believe I addressed or/and answered all your comments, please let me know if I missed something

reta avatar Feb 08 '23 14:02 reta

Thanks @dannycranmer , I think I went through all your comments, thanks a lot, really appreciate it.

reta avatar Feb 10 '23 20:02 reta

@reta The PR looks good to me minus the Mockito comment. However I have questions over the approach here. We are adding a new sink alongside the existing sink, we will have OpensearchSink and OpensearchAsyncSink. How do the users know which one to pick? Why not replace the existing sink with the new implementation? The Jira mentions docs, however there is no update here. Will you create a followup PR for that?

If this has already been discussed on mailing lists I missed that, please give me a link :D

dannycranmer avatar Feb 23 '23 15:02 dannycranmer

Thanks a lot for review @dannycranmer

@reta The PR looks good to me minus the Mockito comment. However I have questions over the approach here. We are adding a new sink alongside the existing sink, we will have OpensearchSink and OpensearchAsyncSink. How do the users know which one to pick? Why not replace the existing sink with the new implementation?

This is a indeed a good question, I think the main difference between those are within internal APIs the implementation is based upon:

  • OpensearchAsyncSink uses RestHighLevelClient::bulkdAsync directly to dispatch the bulk requests
  • OpensearchSink uses BulkProcessor and offers more flexibility with respect to failure handling and backoff policies (no straight equivalent in RestHighLevelClient)

I have covered this part in the docs, thank you.

The Jira mentions docs, however there is no update here. Will you create a followup PR for that?

Updated the documentation, thank you

If this has already been discussed on mailing lists I missed that, please give me a link :D

You mean the AsyncSync implementation for OpenSearch? No, it was not discussed on mailing list but was mentioned on the initial pull request https://github.com/apache/flink/pull/18541#issuecomment-1026931087

reta avatar Mar 03 '23 16:03 reta

@dannycranmer would appreciate if you could take a look, thank you

reta avatar Mar 28 '23 14:03 reta

@reta I am reluctant to introduce a new Sink API based on the internal implementation unless there is a really good/semantic reason. I would prefer to encapsulate the internals via a single Flink layer that can support either RestHighLevelClient/BulkProcessor based on configuration. How will this look for SQL? We usually use a simple identifier like "opensearch", I fear that "opensearch-async" adds no semantic value to the user.

We should keep the Sink API as simple as possible with sensible defaults, and allow advanced users to configure as they wish. For instance, a user should not need decide to use OpensearchSink vs OpensearchAsyncSink, they should just use OpensearchSink and configure as needed.

There could be reasons to have 2x Sinks if they support fundamentally different features/APIs but I would expect the naming to reflect this, for example OpensearchRestHighLevelClientSink/OpensearchBulkProcessorSink.

Apologies for raising these concerns late in the process but I cannot see this has been considered before. @MartijnVisser what are your thoughts?

dannycranmer avatar Mar 29 '23 10:03 dannycranmer

I am reluctant to introduce a new Sink API based on the internal implementation unless there is a really good/semantic reason.

Thanks @dannycranmer , I understand your concerns. I will move this pull request to draft (for now) so we could get to it at some point in the future, when migrating off the RestHighLevelClient to opensearch-java, thanks again for review and your thoughts.

reta avatar Mar 29 '23 17:03 reta