pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[Bug] Functions get wrong output type for async functions

Open jiangpengcheng opened this issue 1 year ago • 0 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Version

220a3d601602d67f5f44516c5d9895dfaa270380

Minimal reproduce step

  1. Write an async function like below:
package org.apache.pulsar.functions.api.examples;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.examples.pojo.AvroTestObject;

public class AvroAsyncFunction implements Function<AvroTestObject, CompletableFuture<AvroTestObject>> {
    @Override
    public CompletableFuture<AvroTestObject> process(AvroTestObject input, Context context) throws Exception {
        CompletableFuture<AvroTestObject> future = new CompletableFuture();

        Executors.newCachedThreadPool().submit(() -> {
            try {
                Thread.sleep(500);
                input.setBaseValue(input.getBaseValue() + 10);
                future.complete(input);
            } catch (Exception e) {
                future.completeExceptionally(e);
            }
        });

        return future;
    }
}

  1. Create function with the above code:
./bin/pulsar-admin functions create --jar /pulsar/examples.jar --name avro-java-fn --className org.apache.pulsar.functions.api.examples.AvroAsyncFunction --inputs persistent://public/default/avro-java-fn-input --output persistent://public/default/avro-java-fn-output --schema-type avro --custom-schema-inputs '{"persistent://public/default/avro-java-fn-input": "{\"schemaType\": \"avro\"}"}'
  1. Get the schema of the output topic:
./bin/pulsar-admin schemas get persistent://public/default/avro-java-fn-output-partition-0

What did you expect to see?

{
  "version": 0,
  "schemaInfo": {
    "name": "avro-java-fn-output-partition-0",
    "schema": {
      "type": "record",
      "name": "AvroTestObject",
      "namespace": "org.apache.pulsar.functions.api.examples.pojo",
      "fields": [
        {
          "name": "baseValue",
          "type": [
            "null",
            "int"
          ]
        },
        {
          "name": "objectValue",
          "type": [
            "null",
            "string"
          ]
        }
      ]
    },
    "type": "AVRO",
    "timestamp": 1708246141330,
    "properties": {
      "__alwaysAllowNull": "true",
      "__jsr310ConversionEnabled": "false"
    }
  }
}

What did you see instead?

{
  "version": 0,
  "schemaInfo": {
    "name": "avro-java-fn-output-partition-0",
    "schema": {
      "type": "record",
      "name": "CompletableFuture",
      "namespace": "java.util.concurrent",
      "fields": []
    },
    "type": "AVRO",
    "timestamp": 1708246140630,
    "properties": {
      "__alwaysAllowNull": "true",
      "__jsr310ConversionEnabled": "false"
    }
  }
}

Anything else?

The reason is that when infer function type args, it won't unwrap the output type for CompletableFuture

No response

Are you willing to submit a PR?

  • [X] I'm willing to submit a PR!

jiangpengcheng avatar Feb 18 '24 09:02 jiangpengcheng