[FLINK-37724] Adds AsyncTableFunction as a fully supported UDF type
What is the purpose of the change
This aims to implement FLIP-498 https://cwiki.apache.org/confluence/display/FLINK/FLIP-498%3A+AsyncTableFunction+for+async+table+function+support.
In particular AsyncTableFunctions can now be defined as:
public class TestTableFunction extends AsyncTableFunction<String> {
public void eval(CompletableFuture<Collection<String>> result, Integer i) {
result.complete(Arrays.asList("Row1 " + i, "Row2 " + i));
}
}
They can be registered in the catalog as with other UDFs, such as with:
tEnv.createTemporarySystemFunction("func", new TestTableFunction());
There are a few new configs:
| Name (Prefix table.exec.async-table) | Meaning |
|---|---|
| buffer-capacity | The number of outstanding requests the operator allows at once |
| timeout | The total time which can pass before the invocation (including retries) is considered timed out and task execution is failed |
| retry-strategy | FIXED_DELAY is for a retry after a fixed amount of time |
| retry-delay | The time to wait between retries for the FIXED_DELAY strategy. Could be the base delay time for a (not yet proposed) exponential backoff. |
| max-attempts | The maximum number of attempts while retrying. |
Brief change log
- Adds
AsyncTableFunctionsupport to type system, codegen, and runtime
Verifying this change
This change added tests and can be verified as follows:
- Tested Udf validation (UserDefinedFunctionHelperTest)
- Tested type inference (TypeInferenceExtractorTest.java)
- Tested code generation for AsyncTableFunction
- Tested new split rules for new async correlates
- ITCases new async correlate use
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / no)
- The public API, i.e., is any changed class annotated with
@Public(Evolving): (yes / no) - The serializers: (yes / no / don't know)
- The runtime per-record code paths (performance sensitive): (yes / no / don't know)
- It should only affect the new code and shouldn't have detrimental effects to existing
AsyncTableFunctionuse with lookup joins
- It should only affect the new code and shouldn't have detrimental effects to existing
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
- The S3 file system connector: (yes / no / don't know)
Documentation
- Does this pull request introduce a new feature? (yes / no)
- If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
- More documentation will follow as well beyond the configs
CI report:
- 86761e1e9bf7207b43275b66e8ca6dff1e573509 Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
Thanks for the patch, @AlanConfluent. I made a pass and left some comments.
Overall the patch looks good. One question is about support in batch mode. Ideally, we also want the same function call to work in batch. Is there any plan to do that?
@becketqin I appreciate the review. I have responded to all of your comments and pushed a new commit. Also rebased on some other changes as well.
There is a plan to do batch soon-ish, though I wasn't planning an immediate followup after this PR. The same is true for AsyncScalarFunction, which is also currently streaming only. Is this acceptable for new features like this?
https://issues.apache.org/jira/browse/FLINK-37927
@AlanConfluent Can we push this forward to merge before feature freeze time?
@lsyldliu I'll take a final pass on the patch and merge it.
@AlanConfluent There is conflict in the patch now. Can you resolve it? BTW, can you also clean up the commit history a bit so it is ready for merge? We would like to keep the fixes irrelevant to this FLIP in separate commits. I think this patch is close to be merged. Let's try to get it into release 2.1. @lsyldliu plans to cut the release branch by next Wed. I'll finish the final pass before Monday.
@becketqin @AlanConfluent If we want to change the config option table.exec.async-table.buffer-capacity to table.exec.async-table.max-concurrent-operations, do we need to change the hint buffer-capacity? It's better to use 'max-concurrent-operations' rather than 'buffer-capacity' as the hint name.
@becketqin @AlanConfluent If we want to change the config option
table.exec.async-table.buffer-capacitytotable.exec.async-table.max-concurrent-operations, do we need to change the hintbuffer-capacity? It's better to use 'max-concurrent-operations' rather than 'buffer-capacity' as the hint name.
Yes, we need to make sure the hints match the config names.
@AlanConfluent Can we push this forward to merge before feature freeze time?
Sounds good. I will try to finish this ASAP. I don't think there are any big outstanding things.
@AlanConfluent Thanks for updating the patch. I made another pass. The patch is very close to be merged. I left a couple of minor comments. We can merge it after the code is rebased to resolve the conflicts.
Sounds good. Rebasing.
There might be a few follow-up patches to complete the whole work:
- Add the join type check.
- Add batch support.
- Add condition support. (This is supported in TableFunction)
I have a followup PR that checks the join type and handles left join. I skipped tacking it on to this PR which is already large, but will make a ticket. And will create a ticket for the other items as well.
Thanks again for the patch. Honestly, it is a little surprising to me how much work it is to enable this feature.
No problem! It is a little surprising for a type which is is already partially supported in Flink.
https://issues.apache.org/jira/browse/FLINK-37927 https://issues.apache.org/jira/browse/FLINK-37988 https://issues.apache.org/jira/browse/FLINK-37989
@AlanConfluent Hi, if you want merge this pr to release-2.1, please apply it first in the thread https://lists.apache.org/thread/hj009zm44js1v4tgpps3d1jz9v31opz6.
cc@becketqin, As planned, I'll be cutting out the release-2.1 branch on june 26th at 10:00am.
@flinkbot run azure
@becketqin , when you have a moment, can you take a last look at this PR? I believe all of the outstanding comments have been addressed. Please tell me of there is anything that requires additional attention.
@AlanConfluent Thanks for updating the patch. Sorry for the late reply as I was on vacation in China. I'll take a final pass this week.
I just did another rebase and changed the Flink version to be 2.2, since I missed 2.1
@AlanConfluent Given that retry-on-empty-result is an existing behavior, maybe it is OK to check in the code as is. We can do a quick follow up FLIP to improve the behavior. Please let me know if you have time for this. I can find someone from LinkedIn to help if you don't have time. Thanks.
@AlanConfluent Given that retry-on-empty-result is an existing behavior, maybe it is OK to check in the code as is. We can do a quick follow up FLIP to improve the behavior. Please let me know if you have time for this. I can find someone from LinkedIn to help if you don't have time. Thanks.
I have some time budget to follow up with this. I think your suggestions are good and it would be nice to make this more flexible. I'm happy to do a followup FLIP covering this and it might make sense to cover async scalar as well. In that case, there is always one result, but it could be null. I think the current code actually allows null, so I'm not sure that the empty response check is ever triggered for async scalar, but maybe it should be. Or maybe it makes less sense for the scalar case?
Tell me what you think.
Either way, happy to merge the code in this state and do an immediate follow up FLIP for empty case.
@AlanConfluent Thanks for the patch. I just merged it master. Looking forward to the follow up patches!
