pulsar-flink
pulsar-flink copied to clipboard
[Documentation] Documentation for the PulsarDeserializationSchema
Hi,
Previously with pulsar-flink version 2.4.29 we could do something like this :
FlinkPulsarSource<LiveRawData> fps = new FlinkPulsarSource<>(serviceUrl, adminUrl, JsonDeser.of(LiveRawData.class), props);
to deserialize custom object using jsondeser.
How do we do that now with version 2.7.1?
I have tried :
PulsarDeserializationSchema<LiveRawData> pulsarDeserialization = new PulsarDeserializationSchemaBuilder<LiveRawData>() .setRecordClass(LiveRawData.class) .build();
FlinkPulsarSource<LiveRawData> fps = new FlinkPulsarSource<>(serviceUrl, adminUrl, pulsarDeserialization, props);
However I got this error:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Logical type 'RAW('org.apache.flink.types.Row', ?)' does not support a conversion from or to class 'com.meta.pulsar_core.models.LiveRawData'. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:360) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:213) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:816) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:248) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1058) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)
Please advise. Thanks.
I am sorry that the documentation for this part is not good enough. You can refer to the following code to solve the problem.
PulsarDeserializationSchema<LiveRawData> pulsarDeserialization = PulsarDeserializationSchema.valueOnly(JsonDeser.of(LiveRawData.class));
FlinkPulsarSource<LiveRawData> fps = new FlinkPulsarSource<>(serviceUrl, adminUrl, pulsarDeserialization, props);
JsonDeser
is the implementation of DeserializationSchema
, the parameter of PulsarDeserializationSchema#valueOnly
, you can replace it with any implementation of DeserializationSchema
, as long as it matches your data.
@jianyun8023 Have we finished the documentation on how to correctly use json or other deserialization method? If so, I think we can close this issue.
@syhily We can update the documentation to explain how to use json, but JsonDeser
is something that needs to be deprecated.