pulsar-flink icon indicating copy to clipboard operation
pulsar-flink copied to clipboard

[Documentation] Documentation for the PulsarDeserializationSchema

Open rec7y33 opened this issue 3 years ago • 3 comments

Hi,

Previously with pulsar-flink version 2.4.29 we could do something like this : FlinkPulsarSource<LiveRawData> fps = new FlinkPulsarSource<>(serviceUrl, adminUrl, JsonDeser.of(LiveRawData.class), props); to deserialize custom object using jsondeser.

How do we do that now with version 2.7.1? I have tried : PulsarDeserializationSchema<LiveRawData> pulsarDeserialization = new PulsarDeserializationSchemaBuilder<LiveRawData>() .setRecordClass(LiveRawData.class) .build(); FlinkPulsarSource<LiveRawData> fps = new FlinkPulsarSource<>(serviceUrl, adminUrl, pulsarDeserialization, props);

However I got this error: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Logical type 'RAW('org.apache.flink.types.Row', ?)' does not support a conversion from or to class 'com.meta.pulsar_core.models.LiveRawData'. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:360) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:213) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:816) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:248) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1058) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)

Please advise. Thanks.

rec7y33 avatar Jan 22 '21 03:01 rec7y33

I am sorry that the documentation for this part is not good enough. You can refer to the following code to solve the problem.

PulsarDeserializationSchema<LiveRawData> pulsarDeserialization = PulsarDeserializationSchema.valueOnly(JsonDeser.of(LiveRawData.class));
FlinkPulsarSource<LiveRawData> fps = new FlinkPulsarSource<>(serviceUrl, adminUrl, pulsarDeserialization, props);

JsonDeser is the implementation of DeserializationSchema, the parameter of PulsarDeserializationSchema#valueOnly, you can replace it with any implementation of DeserializationSchema, as long as it matches your data.

jianyun8023 avatar Jan 25 '21 00:01 jianyun8023

@jianyun8023 Have we finished the documentation on how to correctly use json or other deserialization method? If so, I think we can close this issue.

syhily avatar Nov 04 '21 15:11 syhily

@syhily We can update the documentation to explain how to use json, but JsonDeser is something that needs to be deprecated.

jianyun8023 avatar Nov 08 '21 08:11 jianyun8023