vector
vector copied to clipboard
Support `parquet` columnar format in the `aws_s3` sink
Similar to https://github.com/timberio/vector/issues/1373 we should support the Parquet format. The parquet
format is a columnar format that enables faster and more efficient data access schemes such as column selection and indexing.
Implementation
Unfortunately, I do not have deep experience with this format as I do with ORC, but like everything else, we should start very simple. Fortunately, there appears to be a Rust library that supports basic writing of this data.
One issue with that parquet library is that it requires nightly rust https://github.com/sunchao/parquet-rs/blob/master/src/lib.rs#L123 and https://github.com/apache/arrow/blob/master/rust/parquet/src/lib.rs#L18, we can't use these features because we are on the stable compiler.
Any idea what the timeline is for those to be on stable?
Well really specialization
is the last one on the updated apache/arrow
but that feature has been in progress since 2016, so we will likely not see it any time soon. Our best bet would be to attempt to remove that feature from that library but I am not sure how hard that would be or if that is even possible.
I think that it still requires nightly, but just throwing into the mix https://github.com/apache/arrow/tree/master/rust/parquet
Progress in Apache Arrow project:
- PR: Remove Rust specialization - https://github.com/apache/arrow/pull/8698
- JIRA: Support stable Rust - https://issues.apache.org/jira/browse/ARROW-6717
- JIRA (comment): re. remaining dependency other than specialization - https://issues.apache.org/jira/browse/ARROW-6718?focusedCommentId=17002842&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17002842
IIRC Apache Arrow now works on stable Rust 🎉 (unless SIMD feature is used). Would it be possible to re-evaluate this one? In case the team doesn't have bandwidth, but somebody can help with review and some pointers, I can look into it.
Hi @Alexx-G . This hasn't been scheduled yet, but you wanted to pick it up, I'd be happy to advise/review!
I've checked the parquet
crate it seems to have some basic support for writing data.
@binarylogic @jszwedko I see that the ORC issue has some additional context, do you have any insights/pointers for this one? Especially about schema and the changes around the current encoding logic.
I'll try to run some tests with Athena+S3 and parquet
to see whether I can get an MVP.
Is there an update on this? I'm very interested in JSON input to parquet output to S3.
Any update on this? I am also interested in JSON input and Parquet output to s3
Any chance this can be looked at ? Right now, we send logs from vector to Kinesis just to get them converted to Parquet... I'd be happy to skip that costly step
This codec isn't currently on our roadmap, but we could help a community PR get merged 👍
Proposal
Add parquet
codec and add support for it in the aws_s3
sink.
This can be achieved with official Rust crate, through Serializer
construct, and tied with custom type implementing Encoder<Vec<Event>>
.
Each batch would be transformed to a single Parquet file with a single row group. With that, configuration of batch can be used to define desired size of row group/Parquet file.
Schema
Since Parquet requires schema we need to derive one.
Each passing event has it's own implicit schema and by joining them we get a unified one. This unified schema can be:
- Specific for each batch.
- Build up from batch to batch during runtime.
While 2. option can still result in exported schemas to be different from batch to batch they would have tendency to change less than with 1. option. This is relevant for streams that have events with varying schemas, while for consistent ones both options behave the same.
When joining schemas we can get a conflicting situation. When there are multiple types used for the same field/column some resolution is needed:
- Choose one type and drop conflicting events.
- Unify types into more general one. This needs to be done for primitive and logical type. For a unifying type, String looks like the best option.
In my opinion, 2. option is better. It's more reliable and we can document this behavior.
Options
Add options:
-
encoding.parquet.encoding
which accepts a map between fields and encodings. If encoding isn't supported for the field type, warning would be logged and plain encoding used. -
encoding.parquet.compression
which applies compression on all of the columns.
By default plain encoding and no compression are used.
Alternatives
We can expose an option for users to define their own static schema for passing events which would try to cast or filter out conflicting values.
Hey @ktff. @spencergilbert and I discussed this a bit. I don't think anyone on the team is really an expert in Parquet, so we have a couple questions.
-
Today the encoders in Vector can only operate on individual events. We have plans to eventually have a 2nd layer of encoders that can run on batches of events, but that doesn't exist today. So the main question here is, can a batch be built from already encoded events.
-
What's the reasoning for deriving a schema over having the user provide one (and Vector could ensure it matches). Having the schema potentially change for each batch emitted seems undesirable, but again I don't really know enough about parquet to fully understand the impact here.
Hey @fuchsnj
Regarding 1. question, no, a Parquet batch can't be built from already encoded events. It's necessary to intercept them before that, or process them in a suitable way for Parquet. Fortunately there is a way to to that, by implementing https://github.com/vectordotdev/vector/blob/018637ad93ebb74bcec597e89d33127ac83202d8/src/sinks/util/encoding.rs#L9 for Vec<Event>
.
There are similar cases:
- https://github.com/vectordotdev/vector/blob/018637ad93ebb74bcec597e89d33127ac83202d8/src/sinks/elasticsearch/encoder.rs#L56
- https://github.com/vectordotdev/vector/blob/018637ad93ebb74bcec597e89d33127ac83202d8/src/sinks/splunk_hec/logs/encoder.rs#L61
- https://github.com/vectordotdev/vector/blob/018637ad93ebb74bcec597e89d33127ac83202d8/src/sinks/gcp/chronicle_unstructured.rs#L297
- https://github.com/vectordotdev/vector/blob/018637ad93ebb74bcec597e89d33127ac83202d8/src/sinks/datadog/logs/sink.rs#L127
Current aws_s3
sink is using https://github.com/vectordotdev/vector/blob/018637ad93ebb74bcec597e89d33127ac83202d8/src/sinks/util/encoding.rs#L18 which would be replaced by ParquetEncoder
when it's configured.
Regarding 2. question, Parquet is a file format that is usually the final/long term destination for storing data that can later be read for queries, by ,for example, Amazon Athena. Such systems will/do encounter differences in schema with time, especially if they are performing federated queries, so they usually have the means to reconcile different schemas. That said, I came to this conclusion via research so comments from those with experience would be much appreciated.
So my main argument is that it's better to have the event reach it's destination and leave it to the user to transform the event into desired schema before or after the sink if they so wish/need to, then to require configuration of fixed schema that drops events which forces those that do have events with varying schema to yet again define the schema in some transformer before this sink to transform the events or to change the fixed schema.
-
The
impl Encoder<Vec<ProcessedEvent>>
is the sink specific encoder. While you could have a setting that replaces this on the sink, this doesn't follow our current conventions, and would only work for that specific sink. What we would prefer (which isn't quite supported yet) is expanding our codecs to addparquet
, which would allow this to be used in any sink. The current (single event only) codecs are selected here. -
So my main argument is that it's better to have the event reach it's destination and leave it to the user to transform the event into desired schema before or after the sink if they so wish/need to, then to require configuration of fixed schema that drops events which forces those that do have events with varying schema to yet again define the schema in some transformer before this sink to transform the events or to change the fixed schema.
Our current codecs define a
schema_requirement
which can type check events at Vector startup to ensure events will match the expected parquet schema. This would prevent events from being dropped at runtime for not matching the schema. (Note that full schema support in Vector is relatively new, and not enabled by default yet (It's the global settingschema.enabled
)).
For point 1, I'm in agreement with @fuchsnj . It'd be really nice if we could update the codec model to do the batch encoding in a generic way to work across all sinks using codecs. The lack of this came up recently with the avro
codec not outputting the schema at the head of each batch (https://github.com/vectordotdev/vector/issues/16994) and would come up with other codecs that require headers like csv
.
Adding support for "batched" codecs is also discussed here: https://github.com/vectordotdev/vector/issues/16637
For point 1., while implementation of parquet
would be sink specific encoder at the moment, it can be written to be generic in a sense that it can be reused in a different sink and/or in the batch encoder abstraction/feature once that comes.
Going by https://github.com/vectordotdev/vector/issues/16637 for parquet
codec envelope is not enough, it's column vise format so it needs to deconstruct all events to form those columns. That is inline with https://github.com/vectordotdev/vector/issues/16637#issuecomment-1462691621 for batch sinks.
If batch encoding was already implemented that would simplify things. Going by impl
diversity of RequestBuilder
that's not something trivially doable. From what I see there are two parts to it. Abstraction over encoders and abstraction over events. For parquet
codec in aws_s3
sink, abstraction over encoders is necessary. Just using Box<dyn Encoder<E>>
seems fine for that. The second abstraction is the one that would allow for this codec to be used in other sinks. I would say that that is an orthogonal concern/feature since it would need to unify all types of events in batch sinks into a single thing and then modify, if necessary, all of the codecs to use this unified type. That's a completely other issue.
For point 2. @fuchsnj can you provide some docs for schema_requirement
, I can't find it in the docs/website. But that feature would be useful.
We currently collect logs and store as parquet-on-s3, using fluentd and an internal plugin we manage for the parquet conversion on our aggregators. We are very interested in migrating to Vector and this issue is currently the remaining blocker.
FWIW, in our setup we configure a schema to use for each type of log and we would hope that the Vector implementation would at least support an option for specifying schemas.
@fuchsnj I found the schema_requirement
. That seems like exactly what's needed. So we can go with that.
Instead of determining schema during runtime, add option to specify schema for passing events.
Happy to see this discussion being continued. We are currently using Vector for Kubernetes cluster log collection but would have other use cases with much higher throughput where Vector is an interesting option. I would like to add some thoughts which would be mandatory to us:
- Since we are dividing concerns in multiple micro services some producer (source) shall be able to change a schema in a predefined manner (e.g. adding fields in JSON). Vector (transformer / aggregator) is another service with its own schema. It should simply ignore the additional field of the source without dropping any events. Later Vector may update its schema independently so it will pick up previously added fields send to it. On the other hand if fields defined in Vector but not send by the source they should be
null
for that event when written to some sink. This relaxed behavior may be configurable to be more restrictive? I am mentioning this specifically since I tested fluentd plugin with coolumnify and it made it difficult to migrate schemas without data loss. Still the configuration options of the fluentd plugin may be a nice hint of what people are expecting? - On the batch encoding (may not be a concern of the current issue) it would be quite powerful to have some additional preprocessing options. For my use case I am thinking of sorting the batch by an array of fields. Utilizing parquets run length encoding (which should be done by the respective parquet writing library?) it can make a rather large difference in output size when sorted by fields with low cardinality or fields which are functional dependent on each other. Considering we are talking about an S3 sink which probably is long time storage this would reduce costs on the one hand side and also improve performance when reading and filtering data with apache spark, trino / athena, etc. In my case i could reduces file sizes by up to 20% but i have not seen this as a feature anywhere so far except the hive syntex. Still the benefits are measured without hive.
- What I would be genuinely curious about is how acknowledgements would behave in a batched manner? To stay consistent with the current semantics obviously only when an event gets committed to a sink the acknowledgement would be propagated. But would it be reasonable to consider an event acknowledged towards upstream sources when written to Vectors disk buffer which may be utilized for batching anyway? Surely this is a difficult problem and out of the scope of the parquet encoder. Still I like to spark the thought.
- Since we are dividing concerns in multiple micro services some producer (source) shall be able to change a schema in a predefined manner (e.g. adding fields in JSON). Vector (transformer / aggregator) is another service with its own schema. It should simply ignore the additional field of the source without dropping any events. Later Vector may update its schema independently so it will pick up previously added fields send to it. On the other hand if fields defined in Vector but not send by the source they should be
null
for that event when written to some sink. This relaxed behavior may be configurable to be more restrictive? I am mentioning this specifically since I tested fluentd plugin with coolumnify and it made it difficult to migrate schemas without data loss. Still the configuration options of the fluentd plugin may be a nice hint of what people are expecting?
Yeah, that's interesting - I certainly appreciate wanting to migrations easier. I imagine that could be "manually" done today with remap
, checking if your field exists and if not insert a null
value. On the flip side, I don't feel great about dropping portions of events that aren't part of the schema, but that matches some existing behavior in csv
. An alternative could be erroring those when encoding and being able to route them to a different sink, DLQ style.
- On the batch encoding (may not be a concern of the current issue) it would be quite powerful to have some additional preprocessing options. For my use case I am thinking of sorting the batch by an array of fields. Utilizing parquets run length encoding (which should be done by the respective parquet writing library?) it can make a rather large difference in output size when sorted by fields with low cardinality or fields which are functional dependent on each other. Considering we are talking about an S3 sink which probably is long time storage this would reduce costs on the one hand side and also improve performance when reading and filtering data with apache spark, trino / athena, etc. In my case i could reduces file sizes by up to 20% but i have not seen this as a feature anywhere so far except the hive syntex. Still the benefits are measured without hive.
Definitely an interesting feature to keep in mind, we'd definitely want to get the basic implementation in before adding additional tooling to it though 👍
- What I would be genuinely curious about is how acknowledgements would behave in a batched manner? To stay consistent with the current semantics obviously only when an event gets committed to a sink the acknowledgement would be propagated. But would it be reasonable to consider an event acknowledged towards upstream sources when written to Vectors disk buffer which may be utilized for batching anyway? Surely this is a difficult problem and out of the scope of the parquet encoder. Still I like to spark the thought.
It's been a while but I feel as though I remember seeing that discussion in the past, if it's not currently the behavior I expect there are arguments that it could be.
@ktff is this still something you're planning to work on?
@spencergilbert I do. I was on vacation hence the silence. The 1. point raised by @fuchsnj remains uresolved. Simplified, there are two ways forward:
- Implement
parquet
codec only foraws_s3
sink. Once the support for batched codecs lands, that limitation can be lifted. - Wait for batched codecs support and then merge
parquet
codec.
In both cases, my plan is to implement codec first and in the case of 2. submit it once batched codecs have landed.
Hope you had a nice vacation! Sounds good to me.
@ktff I happened to find a similar solution. If you need help anywhere, I'd be happy to contribute together! If I can figure it out, that is. 🚀
@Kikkon the draft contains functioning parquet
codec for aws_s3
sink. It's missing two important things though:
- Proper dealing with non conforming events. This can be resolved with accepting a performance hit.
- Support for batched codecs. This is the only blocking issue.
I'm currently not in the position to add support for batched codecs hence the limbo state of https://github.com/vectordotdev/vector/pull/17395. If this is something you feel confident to add to Vector then reach out to @jszwedko. Once that lands I'll be able to finish the PR and get it merged.
@Kikkon the draft contains functioning
parquet
codec foraws_s3
sink. It's missing two important things though:
- Proper dealing with non conforming events. This can be resolved with accepting a performance hit.
- Support for batched codecs. This is the only blocking issue.
I'm currently not in the position to add support for batched codecs hence the limbo state of #17395. If this is something you feel confident to add to Vector then reach out to @jszwedko. Once that lands I'll be able to finish the PR and get it merged.
@ktff I have some experience using Parquet, but am not familiar with Vector. The issue with this PR is: Parquet does not support append writes. If appending, it may require merging new and old files which has performance costs. However, batch codecs implementation does not yet exist in Vector. So for now the PR is pending. if right?
@jszwedko The Vector community has plans for a proposal to support batch codecs. Perhaps once I become more familiar with Vector's architecture, I can discuss with everyone how we could add batch codecs.
@jszwedko The Vector community has plans for a proposal to support batch codecs. Perhaps once I become more familiar with Vector's architecture, I can discuss with everyone how we could add batch codecs.
Hey! Yes, we would like to add the concept of batches to codecs but haven't been able to prioritize it on our side just yet. We'd be happy to help guide a contribution for it. I believe @lukesteensen would have some thoughts about what it could look like and also be able to answer questions.
Can you give me some advice? @lukesteensen 🫡