flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-37724] Adds AsyncTableFunction as a fully supported UDF type

Open AlanConfluent opened this issue 7 months ago • 4 comments

What is the purpose of the change

This aims to implement FLIP-498 https://cwiki.apache.org/confluence/display/FLINK/FLIP-498%3A+AsyncTableFunction+for+async+table+function+support.

In particular AsyncTableFunctions can now be defined as:

public class TestTableFunction extends AsyncTableFunction<String> {
    public void eval(CompletableFuture<Collection<String>> result, Integer i) {
        result.complete(Arrays.asList("Row1 " + i, "Row2 " + i));
    }
}

They can be registered in the catalog as with other UDFs, such as with:

 tEnv.createTemporarySystemFunction("func", new TestTableFunction());

There are a few new configs:

Name (Prefix table.exec.async-table) Meaning
buffer-capacity The number of outstanding requests the operator allows at once
timeout The total time which can pass before the invocation (including retries) is considered timed out and task execution is failed
retry-strategy FIXED_DELAY is for a retry after a fixed amount of time
retry-delay The time to wait between retries for the FIXED_DELAY strategy.  Could be the base delay time for a (not yet proposed) exponential backoff.
max-attempts The maximum number of attempts while retrying.

Brief change log

  • Adds AsyncTableFunction support to type system, codegen, and runtime

Verifying this change

This change added tests and can be verified as follows:

  • Tested Udf validation (UserDefinedFunctionHelperTest)
  • Tested type inference (TypeInferenceExtractorTest.java)
  • Tested code generation for AsyncTableFunction
  • Tested new split rules for new async correlates
  • ITCases new async correlate use

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
    • It should only affect the new code and shouldn't have detrimental effects to existing AsyncTableFunction use with lookup joins
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
    • More documentation will follow as well beyond the configs

AlanConfluent avatar May 15 '25 21:05 AlanConfluent

CI report:

  • 86761e1e9bf7207b43275b66e8ca6dff1e573509 Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar May 15 '25 21:05 flinkbot

Thanks for the patch, @AlanConfluent. I made a pass and left some comments.

Overall the patch looks good. One question is about support in batch mode. Ideally, we also want the same function call to work in batch. Is there any plan to do that?

@becketqin I appreciate the review. I have responded to all of your comments and pushed a new commit. Also rebased on some other changes as well.

There is a plan to do batch soon-ish, though I wasn't planning an immediate followup after this PR. The same is true for AsyncScalarFunction, which is also currently streaming only. Is this acceptable for new features like this?

https://issues.apache.org/jira/browse/FLINK-37927

AlanConfluent avatar Jun 09 '25 18:06 AlanConfluent

@AlanConfluent Can we push this forward to merge before feature freeze time?

lsyldliu avatar Jun 18 '25 05:06 lsyldliu

@lsyldliu I'll take a final pass on the patch and merge it.

becketqin avatar Jun 18 '25 21:06 becketqin

@AlanConfluent There is conflict in the patch now. Can you resolve it? BTW, can you also clean up the commit history a bit so it is ready for merge? We would like to keep the fixes irrelevant to this FLIP in separate commits. I think this patch is close to be merged. Let's try to get it into release 2.1. @lsyldliu plans to cut the release branch by next Wed. I'll finish the final pass before Monday.

becketqin avatar Jun 20 '25 13:06 becketqin

@becketqin @AlanConfluent If we want to change the config option table.exec.async-table.buffer-capacity to table.exec.async-table.max-concurrent-operations, do we need to change the hint buffer-capacity? It's better to use 'max-concurrent-operations' rather than 'buffer-capacity' as the hint name. image

fsk119 avatar Jun 21 '25 02:06 fsk119

@becketqin @AlanConfluent If we want to change the config option table.exec.async-table.buffer-capacity to table.exec.async-table.max-concurrent-operations, do we need to change the hint buffer-capacity? It's better to use 'max-concurrent-operations' rather than 'buffer-capacity' as the hint name. image

Yes, we need to make sure the hints match the config names.

becketqin avatar Jun 21 '25 21:06 becketqin

@AlanConfluent Can we push this forward to merge before feature freeze time?

Sounds good. I will try to finish this ASAP. I don't think there are any big outstanding things.

AlanConfluent avatar Jun 24 '25 21:06 AlanConfluent

@AlanConfluent Thanks for updating the patch. I made another pass. The patch is very close to be merged. I left a couple of minor comments. We can merge it after the code is rebased to resolve the conflicts.

Sounds good. Rebasing.

There might be a few follow-up patches to complete the whole work:

  1. Add the join type check.
  2. Add batch support.
  3. Add condition support. (This is supported in TableFunction)

I have a followup PR that checks the join type and handles left join. I skipped tacking it on to this PR which is already large, but will make a ticket. And will create a ticket for the other items as well.

Thanks again for the patch. Honestly, it is a little surprising to me how much work it is to enable this feature.

No problem! It is a little surprising for a type which is is already partially supported in Flink.

AlanConfluent avatar Jun 24 '25 22:06 AlanConfluent

https://issues.apache.org/jira/browse/FLINK-37927 https://issues.apache.org/jira/browse/FLINK-37988 https://issues.apache.org/jira/browse/FLINK-37989

AlanConfluent avatar Jun 24 '25 22:06 AlanConfluent

@AlanConfluent Hi, if you want merge this pr to release-2.1, please apply it first in the thread https://lists.apache.org/thread/hj009zm44js1v4tgpps3d1jz9v31opz6.

cc@becketqin, As planned, I'll be cutting out the release-2.1 branch on june 26th at 10:00am.

lsyldliu avatar Jun 25 '25 03:06 lsyldliu

@flinkbot run azure

AlanConfluent avatar Jun 25 '25 20:06 AlanConfluent

@becketqin , when you have a moment, can you take a last look at this PR? I believe all of the outstanding comments have been addressed. Please tell me of there is anything that requires additional attention.

AlanConfluent avatar Jul 07 '25 16:07 AlanConfluent

@AlanConfluent Thanks for updating the patch. Sorry for the late reply as I was on vacation in China. I'll take a final pass this week.

becketqin avatar Jul 14 '25 05:07 becketqin

I just did another rebase and changed the Flink version to be 2.2, since I missed 2.1

AlanConfluent avatar Jul 14 '25 15:07 AlanConfluent

@AlanConfluent Given that retry-on-empty-result is an existing behavior, maybe it is OK to check in the code as is. We can do a quick follow up FLIP to improve the behavior. Please let me know if you have time for this. I can find someone from LinkedIn to help if you don't have time. Thanks.

becketqin avatar Jul 16 '25 02:07 becketqin

@AlanConfluent Given that retry-on-empty-result is an existing behavior, maybe it is OK to check in the code as is. We can do a quick follow up FLIP to improve the behavior. Please let me know if you have time for this. I can find someone from LinkedIn to help if you don't have time. Thanks.

I have some time budget to follow up with this. I think your suggestions are good and it would be nice to make this more flexible. I'm happy to do a followup FLIP covering this and it might make sense to cover async scalar as well. In that case, there is always one result, but it could be null. I think the current code actually allows null, so I'm not sure that the empty response check is ever triggered for async scalar, but maybe it should be. Or maybe it makes less sense for the scalar case?

Tell me what you think.

Either way, happy to merge the code in this state and do an immediate follow up FLIP for empty case.

AlanConfluent avatar Jul 18 '25 17:07 AlanConfluent

@AlanConfluent Thanks for the patch. I just merged it master. Looking forward to the follow up patches!

becketqin avatar Jul 20 '25 22:07 becketqin