Support Protobuf insert
Use case
We are used to work with Protocol Buffer data in our Beam Dataflow ingestion pipeline. We tried many ways to ingest data and the easiest was with this new client-v2. While using JSONEachRow we can insert our protobuf messages through com.google.protobuf.util.JsonFormat.Printer#print() but it is not efficient as mentioned in FastFomats.
So I tried serializing data into RowBinary format. It works with some adjustments to existing code. I'm still wondering how to serialize into Native format to serialize as columns and be closer to MergeTree format. If you have any tip on it I would be very happy to make some tries.
Describe the solution you'd like
- Integrate a
ProtobufSerializerin the source code - Detail how to serialize into
Nativeformat.RowBinaryis good but still row oriented. Compared to what we had to ingest intoBigQuerywe are already way more efficient withClickHouseso maybe we can keep improving this part
Describe the alternatives you've considered
From my experience below serializeProto function is enough to insert protobuf messages. This is at very experimental phase.
public static void serializeProto(GeneratedMessageV3 message, ClickHouseColumn column, OutputStream out) throws IOException {
Object value = null;
// Get the descriptor of the field to serialize
Descriptors.FieldDescriptor descriptor = getFieldDescriptor(message, column.getColumnName());
if (descriptor != null) {
if (descriptor.isRepeated() || message.hasField(descriptor)) {
// Ensure the field has a value otherwise Protobuf returns the default value for type which is not necessarily
// the default value ClickHouse uses in its table schema definition.
value = message.getField(descriptor);
LOGGER.debug("{}({}) - {} - nullable:{} - default:{}",
descriptor.getName(), descriptor.getType(), value, column.isNullable(), column.hasDefault());
if (Set.of(ClickHouseDataType.DateTime, ClickHouseDataType.DateTime64).contains(column.getDataType())) {
// Some events DateTime fields are long. We need to convert them.
// We use `DateTime` and `DateTime64` only, but we may want to support more in the future.
// TODO: add more checking when needed as some Protobuf fields could be `com.google.protobuf.Timestamp`
value = convertFromInt((Long) value);
}
} else if (!message.hasField(descriptor) && !column.isNullable() && !column.hasDefault()) {
// TODO: integrate this condition with the rest to also handle default dates
// The field exists in the proto but it is not set. If it is not nullable or has no default value in
// ClickHouse then we should take default proto value
value = message.getField(descriptor);
}
} else {
// In this case the protobuf does not have this column. In our case it's `insertTime` and nothing needs
// to be done. This is an extra column not part of original data but set at run time by CH for further monitoring
LOGGER.debug("{}({}) - {} - nulable:{} - default:{}",
column.getColumnName(), column.getDataType(), value, column.isNullable(), column.hasDefault());
}
// Same code as in `RowBinaryFormatWriter.commitRow`
if (RowBinaryFormatSerializer.writeValuePreamble(out, true, column, value)) {
ABSerializerUtils.serializeData(out, value, column);
}
}
This method is paired with a single modification in com.clickhouse.client.api.data_formats.internal.SerializerUtils#serializeTupleData:
else if (value instanceof GeneratedMessageV3) {
GeneratedMessageV3 message = (GeneratedMessageV3) value;
// Start: added section
// From Protobuf messages Tuple are actually Protobuf messages themselves. So we need more capabilities to
// translate them.
for (ClickHouseColumn nestedColumn : column.getNestedColumns()) {
// TODO: support cases when nested value is also a `DateTime` or `DateTime64` as above.
Descriptors.FieldDescriptor descriptor = getFieldDescriptor(message, nestedColumn.getColumnName());
Object nestedValue = message.getField(descriptor);
serializeData(stream, nestedValue, nestedColumn);
}
// End: added section
}
Then inserting using the client is very straightforward
// Get ClickHouse table schema
TableSchema schema = clickhouseClient.getTableSchema(tableName);
// Serialise messages into an output stream
ByteArrayOutputStream out = new ByteArrayOutputStream();
for (GeneratedMessageV3 event: c.element().getValue()) {
List<ClickHouseColumn> columnList = schema.getColumns();
for (ClickHouseColumn column : columnList) {
serializeProto(event, column, out);
}
}
// Prepare insert
InputStream inputStream = new ByteArrayInputStream(out.toByteArray());
InsertSettings settings = new InsertSettings();
ClickHouseFormat format = ClickHouseFormat.RowBinaryWithDefaults;
Additional context
Good day, @wattache!
Thank you for the feedback!
You may look into Client.insert using writer https://github.com/ClickHouse/clickhouse-java/blob/1414421a11bcf1d96c0ad6c38de863f732923242/client-v2/src/main/java/com/clickhouse/client/api/Client.java#L1454
It allows you to have complete access to output stream, even with custom compression.
I will discuss this feature with the team.
Hello @chernser
Thank you for your response. Yes sorry I forgot the code snippet but next rows are
// Run insert
try (InsertResponse response = clickhouseClient.insert(tableName, inputStream, format, settings).get()) {
LOGGER.debug("Rows written: " + response.getWrittenRows());
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
My question is more on supporting Native and/or Protobuf in this client:
- using
ClickHouseFormat.Nativeproducescom.clickhouse.client.api.ServerException: Code: 62. DB::Exception: Empty query: While executing Native.because input stream is not properly formatted. Here is my question about what is the good format to ingestNative? - using
ClickHouseFormat.Protobufrequires to have our Protobuf schema on remote ClickHouse nodes which is not supported by ClickHouse Cloud. Here is my question about supportingProtobufon client side. We assume formatting on client side won't be an issue on our end as we have some room to use more resources in our Beam Dataflow.
Hope my feedback is clearer and makes sense to you. Thanks again for your answer ! :)
Hello,
We tried Apache Arrow format with the client and it works well. For Dataflow we just need a custom image to pass --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED to the JVM.
Also for now we just hand-coded the conversion, I'll let you known if we have something automatic to map between Protobuf and Apache Arrow