[Discuss]SDK supports batching protobuf format
Batching support was added to the Protobuf format.SDK supports now?
We don't support batching yet, PRs welcome!
I will try
Since this a large feature, would you mind sharing what's the high level proposed approach you're planning to follow so that we can discuss it a bit? Maybe if there are multiple approaches we can discuss pros/cons for each
Sure, I'll write a document and put it here to discuss
Hi @pierDipi Here is my train of thought and some design ideas for the development of the new feature. Overall, the goal is to make minimal changes to the existing code while meeting the requirements and ensuring better scalability for future extensions. If you have any suggestions or comments, let's discuss and exchange ideas. If there are any necessary changes later on, please leave a comment on the ISSUE to explain. After finalizing the plan through discussion, I will proceed with the coding. Thank you!
1. Batch Data
The limitation for batch data in the specification is that they have no restrictions on the same source, producer, content type, etc. The only limitation is that all CloudEvents in the same batch must have the same value for the specversion property. That is, the fields in the code must be the same.
For version 0.3:
Required fields: "specversion", "id", "type", "source"
Optional fields: "datacontenttype", "datacontentencoding", "schemaurl", "subject", "time"
For version 1.0:
Required fields: "specversion", "id", "type", "source"
Optional fields: "datacontenttype", "dataschema", "subject", "time"
The extension fields and data fields can be different from each other.
2. Define the interface
Add a new interface CloudEventBatch that inherits from CloudEvent. CloudEventBatch can essentially be abstracted as a CloudEvent, but this CloudEvent is special in that the specversion attribute is the same, and the content of the data fields and extension fields for each CloudEvent can be different.
public interface CloudEventBatch extends CloudEvent{
@Nullable
List<CloudEventData> getDataBatch();
@Nullable
List<CloudEvent> getCloudEventBatch();
}
Add a method that supports batch CloudEventData to the CloudEventWriter interface.
public interface CloudEventWriter<R> extends CloudEventContextWriter {
/**
* End the write with a data payload.
*
* @param data the data to write
* @return an eventual return value
* @throws CloudEventRWException if the message writer cannot be ended.
*/
R end(CloudEventData data) throws CloudEventRWException;
//new add method to support batch CloudEvent
default R end(List<? extends CloudEventData> datas) throws CloudEventRWException{
if(Objects.isNull(datas) || datas.size() == 0){
return null;
}
return end(datas.get(0));
}
/**
* End the write.
*
* @return an eventual return value
* @throws CloudEventRWException if the message writer cannot be ended.
*/
R end() throws CloudEventRWException;
}
Modify the CloudEventBuilder as follows.
@ParametersAreNullableByDefault
public interface CloudEventBuilder<R extends CloudEvent> extends CloudEventWriter<R> {
//newly add method for cloudevent batch
static io.cloudevents.core.v1.CloudEventBatchBuilder batchV1() {
return new io.cloudevents.core.v1.CloudEventBatchBuilder();
}
}
Add CloudEventBatchBuilder implementation for CloudEventBuilder.
public final class CloudEventBatchBuilder extends BaseCloudEventBuilder<CloudEventBatchBuilder, CloudEventBatchV1> {
}
public final class CloudEventBatchBuilder extends BaseCloudEventBuilder<CloudEventBatchBuilder, CloudEventBatchV03> {
}
Then define the implementation of CloudEventBatch.
public final class CloudEventBatchV1 extends BaseCloudEventBatch {
}
public final class CloudEventV03 extends BaseCloudEventBatch {
}
Modify the EventFormat as follows.
@ParametersAreNonnullByDefault
public interface EventFormat {
/**
* Serialize a {@link CloudEvent} to a byte array.
*
* @param event the event to serialize.
* @return the byte representation of the provided event.
* @throws EventSerializationException if something goes wrong during serialization.
*/
<T extends CloudEvent> byte[] serialize(T event) throws EventSerializationException;
/**
* Like {@link #deserialize(byte[], CloudEventDataMapper)}, but with the identity {@link CloudEventDataMapper}.
*
* @see #deserialize(byte[], CloudEventDataMapper)
*/
default <T extends CloudEvent> T deserialize(byte[] bytes) throws EventDeserializationException {
return this.deserialize(bytes, CloudEventDataMapper.identity());
}
/**
* Deserialize a byte array to a {@link CloudEvent}.
*
* @param bytes the serialized event.
* @param mapper the mapper to use to map the data.
* @return the deserialized event.
* @throws EventDeserializationException if something goes wrong during deserialization.
*/
<T extends CloudEvent> T deserialize(byte[] bytes, CloudEventDataMapper<? extends CloudEventData> mapper) throws EventDeserializationException;
}
Make corresponding modifications to the implementation.
3. Design Explanation
- CloudEventBatch is abstracted as a special CloudEvent object that extends the CloudEvent interface. Therefore, the entire implementation is also based on CloudEvent to carry out the transformation.
- In order to support batch CloudEventData, the CloudEventWriter interface adds a default R end(List<? extends CloudEventData> datas) throws CloudEventRWException method.
- Corresponding CloudEventBatch and CloudEventBatchBuilder implementations are provided for V03 and V1 versions.