[FLINK-30488] OpenSearch implementation of Async Sink
OpenSearch implementation of Async Sink (https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), a few TODO items:
- [x] More test cases
- [x] More sensitive defaults
@zentol @MartijnVisser would appreciate if you guys have time for the review, adding AsyncSink support for OpenSearch, discussed initially [1]
[1] https://github.com/apache/flink/pull/18541#issuecomment-1026931087
@zentol @MartijnVisser doing my one per month ping diligence :-), please
@zentol @MartijnVisser doing my one per month ping diligence :-), please
I'm currently a bit over capacity. Don't know if the same applies for @zentol tbh
@dannycranmer Could you potentially help out? You also have the experience with the Async API, or perhaps @hlteoh37 ?
Sure, I can take a look
Thanks a lot for the review @hlteoh37 , I believe I addressed or/and answered all your comments, please let me know if I missed something
Thanks @dannycranmer , I think I went through all your comments, thanks a lot, really appreciate it.
@reta The PR looks good to me minus the Mockito comment. However I have questions over the approach here. We are adding a new sink alongside the existing sink, we will have OpensearchSink and OpensearchAsyncSink. How do the users know which one to pick? Why not replace the existing sink with the new implementation? The Jira mentions docs, however there is no update here. Will you create a followup PR for that?
If this has already been discussed on mailing lists I missed that, please give me a link :D
Thanks a lot for review @dannycranmer
@reta The PR looks good to me minus the Mockito comment. However I have questions over the approach here. We are adding a new sink alongside the existing sink, we will have
OpensearchSinkandOpensearchAsyncSink. How do the users know which one to pick? Why not replace the existing sink with the new implementation?
This is a indeed a good question, I think the main difference between those are within internal APIs the implementation is based upon:
-
OpensearchAsyncSinkusesRestHighLevelClient::bulkdAsyncdirectly to dispatch the bulk requests -
OpensearchSinkusesBulkProcessorand offers more flexibility with respect to failure handling and backoff policies (no straight equivalent inRestHighLevelClient)
I have covered this part in the docs, thank you.
The Jira mentions docs, however there is no update here. Will you create a followup PR for that?
Updated the documentation, thank you
If this has already been discussed on mailing lists I missed that, please give me a link :D
You mean the AsyncSync implementation for OpenSearch? No, it was not discussed on mailing list but was mentioned on the initial pull request https://github.com/apache/flink/pull/18541#issuecomment-1026931087
@dannycranmer would appreciate if you could take a look, thank you
@reta I am reluctant to introduce a new Sink API based on the internal implementation unless there is a really good/semantic reason. I would prefer to encapsulate the internals via a single Flink layer that can support either RestHighLevelClient/BulkProcessor based on configuration. How will this look for SQL? We usually use a simple identifier like "opensearch", I fear that "opensearch-async" adds no semantic value to the user.
We should keep the Sink API as simple as possible with sensible defaults, and allow advanced users to configure as they wish. For instance, a user should not need decide to use OpensearchSink vs OpensearchAsyncSink, they should just use OpensearchSink and configure as needed.
There could be reasons to have 2x Sinks if they support fundamentally different features/APIs but I would expect the naming to reflect this, for example OpensearchRestHighLevelClientSink/OpensearchBulkProcessorSink.
Apologies for raising these concerns late in the process but I cannot see this has been considered before. @MartijnVisser what are your thoughts?
I am reluctant to introduce a new Sink API based on the internal implementation unless there is a really good/semantic reason.
Thanks @dannycranmer , I understand your concerns. I will move this pull request to draft (for now) so we could get to it at some point in the future, when migrating off the RestHighLevelClient to opensearch-java, thanks again for review and your thoughts.