mcap
mcap copied to clipboard
rust: optional protobuf schema and direct protobuf message writing support
Encoding protobuf message in mcap logs is a common use case that could be streamlined. Currently a file descriptor set must be assembled and write to the schema manually, and messages must be encoded into a buffer before writing that buffer to mcap.
There are two popular rust protobuf crates: protobuf and prost. protobuf
provides reflection and json encoding support, whereas prost
generates more idiomatic rust structs with less heap allocations. Supporting both may be relevant to this project, however working with protobuf
may be easier since message descriptors can be accessed via the MessageDyn trait, while prost
(or protoc) can be configured to generate a file descriptor set file that must be loaded in order to write file descriptor sets to the mcap schema.
At the moment I'm using protobuf
and am more familiar with it.
A protobuf
file descriptor set schema may be naively built:
fn protobuf_schema(
message_descriptor: &protobuf::reflect::MessageDescriptor,
) -> Result<Arc<Schema<'static>>, protobuf::Error> {
fn collect_dependencies(
descriptor: &protobuf::reflect::FileDescriptor,
already_collected: &mut HashSet<String>,
) -> Vec<protobuf::descriptor::FileDescriptorProto> {
let mut descriptors = vec![descriptor.proto().to_owned()];
already_collected.insert(descriptor.name().to_string());
for dep in descriptor.deps() {
if already_collected.get(dep.name()).is_none() {
descriptors.extend(collect_dependencies(dep, already_collected));
}
}
descriptors
}
let data = Cow::Owned(protobuf::Message::write_to_bytes(
&protobuf::descriptor::FileDescriptorSet {
file: collect_dependencies(message_descriptor.file_descriptor(), &mut HashSet::new()),
..Default::default()
},
)?);
Ok(Arc::new(Schema {
name: message_descriptor.full_name().to_string(),
encoding: "protobuf".to_string(),
data,
}))
}
It would be nice for this functionality to be built into the rust mcap library ex.
Writer::add_protobuf_channel(&mut self, topic: String, message_descriptor: &protobuf::reflect::MessageDescriptor, metadata: Option<BTreeMap<String, String>>) -> Result
In order to encode a message, we can do:
fn write_protobuf_message<W: Write + Seek>(
writer: &mut mcap::Writer<W>,
channel_id: u16,
sequence_number: u32,
log_time: SystemTime,
publish_time: SystemTime,
message: &dyn protobuf::MessageDyn,
) -> anyhow::Result<()> {
// TODO: ideally use write_to_writer_dyn to avoid extra allocation and copy
let data = message.write_to_bytes_dyn().context("encode protobuf")?;
writer
.write_to_known_channel(
&mcap::records::MessageHeader {
channel_id,
sequence: sequence_number,
log_time: log_time
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64,
publish_time: publish_time
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64,
},
&data,
)
.context("write mcap message")
}
however this requires encoding the message into a temporary buffer before writing to the mcap writer.
Writer could potentially expose a method to borrow the a Write object ex. Writer::message_writer(&mut self, channel_id, sequence, log_time, publish_time) -> &MessageWriter
where MessageWriter implements Write and computes the message length for you to allow using protobuf::MessageDyn::write_to_writer_dyn(&self, w: &mut dyn Write)
The library could also expose a convenience method ex. Writer::write_protobuf_message_to_existing_channel(channel_id, sequence, log_time, publish_time, message: &protobuf::MessageDyn)
.
If zero-copy encoding is to be supported for prost
too, we could have something like Writer<W: BufMut>::write_prost_message_to_existing_channel(channel_id, sequence, log_time, publish_time, message: &prost::Message)
This functionality could be added as optional features in the mcap crate, or as additional crate(s) implementing an extension trait on Writer.