pulsar
pulsar copied to clipboard
[Bug] Functions get wrong output type for async functions
Search before asking
- [X] I searched in the issues and found nothing similar.
Version
220a3d601602d67f5f44516c5d9895dfaa270380
Minimal reproduce step
- Write an async function like below:
package org.apache.pulsar.functions.api.examples;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.examples.pojo.AvroTestObject;
public class AvroAsyncFunction implements Function<AvroTestObject, CompletableFuture<AvroTestObject>> {
@Override
public CompletableFuture<AvroTestObject> process(AvroTestObject input, Context context) throws Exception {
CompletableFuture<AvroTestObject> future = new CompletableFuture();
Executors.newCachedThreadPool().submit(() -> {
try {
Thread.sleep(500);
input.setBaseValue(input.getBaseValue() + 10);
future.complete(input);
} catch (Exception e) {
future.completeExceptionally(e);
}
});
return future;
}
}
- Create function with the above code:
./bin/pulsar-admin functions create --jar /pulsar/examples.jar --name avro-java-fn --className org.apache.pulsar.functions.api.examples.AvroAsyncFunction --inputs persistent://public/default/avro-java-fn-input --output persistent://public/default/avro-java-fn-output --schema-type avro --custom-schema-inputs '{"persistent://public/default/avro-java-fn-input": "{\"schemaType\": \"avro\"}"}'
- Get the schema of the output topic:
./bin/pulsar-admin schemas get persistent://public/default/avro-java-fn-output-partition-0
What did you expect to see?
{
"version": 0,
"schemaInfo": {
"name": "avro-java-fn-output-partition-0",
"schema": {
"type": "record",
"name": "AvroTestObject",
"namespace": "org.apache.pulsar.functions.api.examples.pojo",
"fields": [
{
"name": "baseValue",
"type": [
"null",
"int"
]
},
{
"name": "objectValue",
"type": [
"null",
"string"
]
}
]
},
"type": "AVRO",
"timestamp": 1708246141330,
"properties": {
"__alwaysAllowNull": "true",
"__jsr310ConversionEnabled": "false"
}
}
}
What did you see instead?
{
"version": 0,
"schemaInfo": {
"name": "avro-java-fn-output-partition-0",
"schema": {
"type": "record",
"name": "CompletableFuture",
"namespace": "java.util.concurrent",
"fields": []
},
"type": "AVRO",
"timestamp": 1708246140630,
"properties": {
"__alwaysAllowNull": "true",
"__jsr310ConversionEnabled": "false"
}
}
}
Anything else?
The reason is that when infer function type args, it won't unwrap the output type for CompletableFuture
No response
Are you willing to submit a PR?
- [X] I'm willing to submit a PR!