pulsar-flink
pulsar-flink copied to clipboard
Documentation Regarding the PulsarSerializationSchema in
In trying to migrate from a now deprecated Pulsar-Flink connector, I am attempting to use this project but I have hit an issue that I have not yet been able to work around and have not found examples to follow. I would normally raise this in a mailing list but did not see one specific to this codebase.
When I create FlinkPulsarSource and FlinkPulsarSink instances, the constructors use the PulsarDeserializationSchema and PulsarSerializationSchema. I believe I have an understanding of the deserialization implementation, but the serialize() method API is unclear to me.
public class PojoStreamingSerializer
implements PulsarDeserializationSchema<Pojo>, PulsarSerializationSchema<Pojo>
{
public Schema<Pojo> getSchema()
{
// What are we supposed to return here?
return null;
}
SerializerDeserializer serializerDeserializer;
public Pojo deserialize( Message<Pojo> message ) throws IOException
{
Pojo pojo = pojo.getInstance();
serializerDeserializer.setBuffer( message.getData() );
pojo.deserialize( serializer );
return pojo;
}
public void serialize( Pojo pojo, TypedMessageBuilder<Pojo> typedMessageBuilder )
{
serializer.getOutput().clear();
pojo.serialize( serializerDeserializer );
byte[] byteList = serializerDeserializer.getByte();
// How do I pass the bytes out?
}
}
I have included some sample code with questions included where I have been unable to determine how the code must work. My questions are:
- What is the purpose of the getSchema method and what should I be returning If all I want to do is convert to and from a byte array and Pojo?
- Is my assumption that the Message getData() method would contain the serialized Pojo instance byte array correct?
- How is the serialize method intended to be used? The Pojo instance coming in can be converted to a byte array easily but how is the TypedMessageBuilder intended to be use to pass the serialized bytes out? The 'value' method wants a Pojo typed value. The 'keyBytes' method takes a byte array, but the serialized object is not key.
It would be helpful if documentation examples demonstrated this simple use case, but for now, I am just looking to understand.
Hello, I'm sorry it took so long to get back to you.
When this pulsar-flink connector implements the codec of the data, the codec capability is put into Pulsar to do it. So you need to implement getSchema method to convert POJO and bytes to each other.
It is rough as follows.
public class PojoStreamingSerializer
implements PulsarDeserializationSchema<Pojo>, PulsarSerializationSchema<Pojo>
{
public Schema<Pojo> getSchema()
{
return new PojoSchema();
}
SerializerDeserializer serializerDeserializer;
public Pojo deserialize( Message<Pojo> message ) throws IOException
{
return message.getValue();
}
public void serialize( Pojo pojo, TypedMessageBuilder<Pojo> typedMessageBuilder )
{
typedMessageBuilder.value(pojo);
}
public static class PojoSchema implements Schema<Pojo> {
SerializerDeserializer serializerDeserializer;
@Override
public byte[] encode(Pojo pojo) {
serializer.getOutput().clear();
pojo.serialize( serializerDeserializer );
return serializerDeserializer.getByte();
}
@Override
public Pojo decode(byte[] bytes) {
Pojo pojo = pojo.getInstance();
serializerDeserializer.setBuffer( message.getData() );
pojo.deserialize( serializer );
return pojo;
}
@Override
public SchemaInfo getSchemaInfo() {
// Store bytes in Pulsar.
return Schema.BYTES.getSchemaInfo();
}
@Override
public Schema<Pojo> clone() {
return this;
}
}
}
I will update the custom encoding example in the documentation later.