Update Forward Protocol Specification for zstd compression support
Is your feature request related to a problem? Please describe.
The following PR supports zstd compression.
- #4657
So, we need to update Forward Protocol Specification - CompressedPackedForward Mode.
Describe the solution you'd like
Update the following description and add zstd value to compressed option.
- https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#compressedpackedforward-mode
- https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#option
Would that be Forward Protocol Specification v1.6?
Describe alternatives you've considered
Having no idea.
Additional context
No response
I propose changes to the following two sections.
CompressedPackedForward Mode
https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#compressedpackedforward-mode
### CompressedPackedForward Mode
-It carries a series of events as a msgpack binary, compressed by gzip, on a single request. The supported compression algorithm is only gzip.
+It carries a series of events as a msgpack binary, compressed by gzip or zstd, on a single request.
-- `entries` is a gzipped binary chunk of `MessagePackEventStream`, which MAY be a concatenated binary of multiple gzip binary strings.
-- Client MUST send an option with `compressed` key with the value `gzip`.
-- Client MUST send a gzipped chunk as msgpack `bin` format.
+- `entries` is a gzip/zstd binary chunk of `MessagePackEventStream`, which MAY be a concatenated binary of multiple gzip/zstd binary strings.
+- Client MUST send an option with `compressed` key with the value `gzip` or `zstd`.
+- Client MUST send a gzip/zstd chunk as msgpack `bin` format.
- Server MUST accept `bin` format.
- Server MAY decompress and decode individual events on demand but MAY NOT do right after request arrival. It means it MAY costs less, compared to `Forward` mode, when decoding is not needed by any plugins.
Option
https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#option
- Server MAY just ignore any options given.
- `size`: Clients MAY send the `size` option to show the number of event records in an entries by an integer as a value. Server can know the number of events without unpacking entries (especially for PackedForward and CompressedPackedForward mode).
- `chunk`: Clients MAY send the `chunk` option to confirm the server receives event records. The value is a string of Base64 representation of 128 bits `unique_id` which is an ID of a set of events.
-- `compressed`: Clients MUST send the `compressed` option with value `gzip` to tell servers that entries is `CompressedPackedForward`. Other values will be ignored.
+- `compressed`: Clients MUST send the `compressed` option with value `gzip` or `zstd` to tell servers that entries is `CompressedPackedForward`. Other values will be ignored.
```json
{"chunk": "p8n9gmxTQVC8/nh2wlKKeQ==", "size": 4097}
Does this revision require a voting process?
The changes suggested here looks good to me.
If there are no objections, I would like to set it to v1.6 with the agreement of both Fluentd and Fluent Bit maintainers.
@cosmo0920 Could you please check https://github.com/fluent/fluentd/issues/4758#issuecomment-2572328791 ?
LGTM
This would be good:
--
entriesis a gzipped binary chunk ofMessagePackEventStream, which MAY be a concatenated binary of multiple gzip binary strings. -- Client MUST send an option withcompressedkey with the valuegzip. -- Client MUST send a gzipped chunk as msgpackbinformat. +-entriesis a gzip/zstd binary chunk ofMessagePackEventStream, which MAY be a concatenated binary of multiple gzip/zstd binary strings. +- Client MUST send an option withcompressedkey with the valuegziporzstd. +- Client MUST send a gzip/zstd chunk as msgpackbinformat.
However, I have a question for these sentences. Currently, we're able to decompress compressed chunks one-by-one. So, in paper, we are also able to take turns to decompress gzip compressed chunks and zstd compressed chunks. Is this specification needed to point out explicitly in the revised specification?
@cosmo0920 are u pointing to the case where different sources send different type of compressed chunks?
Incase of forward protocol the message itself has the compression type so the decompression happens wrt the metadata of the compressed chunk.
@cosmo0920 are u pointing to the case where different sources send different type of compressed chunks?
Incase of forward protocol the message itself has the compression type so the decompression happens wrt the metadata of the compressed chunk.
Yes, different sources use the same in_forward case. This is surely occurred because Fluentd is able to be used as an aggregator which will collect the different Fluentd instances with pointing the same forward endpoint. So, probably we need to clarify on it even if the implementation already does.
@cosmo0920 https://github.com/fluent/fluentd/pull/4657/files#diff-71dd20388a1f90eaec72fb257002fc6d44a497d457821d0c105acc0510844be1R314 In this line the compression format is also used as an argument during decompression. I guess this should answer your question.
@cosmo0920 https://github.com/fluent/fluentd/pull/4657/files#diff-71dd20388a1f90eaec72fb257002fc6d44a497d457821d0c105acc0510844be1R314 In this line the compression format is also used as an argument during decompression. I guess this should answer your question.
It’s not answered my question. My question is: Should we write down the current decompression behavior explicitly in the specification document of forward protocol? It's not covered for the actual implementation. This is because Fluent Protocol is shared between Fluentd and Fluent Bit. So, we need to describe the specification in the document instead of the implementations.
got it @cosmo0920 .
Previously there was a possibility that multiple sources could send either gzip compressed chunk or uncompressed one. If we consider uncompressed data also as a compression type then essentially the previous specification was good enough to express that 2 types were supported. Now we are just adding one more number to it so like idk if we really need to be explicit here
@cosmo0920 @Athishpranav2003 Thanks for your review!
This would be good:
--
entriesis a gzipped binary chunk ofMessagePackEventStream, which MAY be a concatenated binary of multiple gzip binary strings. -- Client MUST send an option withcompressedkey with the valuegzip. -- Client MUST send a gzipped chunk as msgpackbinformat. +-entriesis a gzip/zstd binary chunk ofMessagePackEventStream, which MAY be a concatenated binary of multiple gzip/zstd binary strings. +- Client MUST send an option withcompressedkey with the valuegziporzstd. +- Client MUST send a gzip/zstd chunk as msgpackbinformat.However, I have a question for these sentences. Currently, we're able to decompress compressed chunks one-by-one. So, in paper, we are also able to take turns to decompress gzip compressed chunks and zstd compressed chunks. Is this specification needed to point out explicitly in the revised specification?
I see!
I didn't know about this.
Currently, we're able to decompress compressed chunks one-by-one.
Fluentd only decompresses the whole of CompressedMessagePackEventStream. I don't assume that compression types are mixed as follows.
So, in paper, we are also able to take turns to decompress gzip compressed chunks and zstd compressed chunks.
So, it would be better to improve the text to avoid confusion.
@cosmo0920 I'm not familiar with Fluent Bit's specifications, but is that specification OK for Fluent Bit as well?
Fluentd only decompresses the whole of CompressedMessagePackEventStream. I don't assume that compression types are mixed as follows.
Ah, Fluentd also handles CompressedMessagePackEventStream one-by-one. That indicates:
- First CompressedMessagePackEventStream that is compressed as gzip and decompressed with gzip decompressor
- Second CompressedMessagePackEventStream that is compressed as zstd and decompressed with zstd decompressor
- The latter sequence could be compressed with the supported compression methods and decompressed with the corresponding decompressing method.
This could be happened if multiple sender send payloads for an aggregator. I'm not sure you'd been misunderstanding what I meant but am I right here?
I mean, the above situation could be happened and this mixed compressed sequence could be happened. Note that I didn't mean that this contamination would be happened within the specific CompressedMessagePackEventStream.
@cosmo0920 I'm not familiar with Fluent Bit's specifications, but is that specification OK for Fluent Bit as well?
This could be work because Fluent Bit also decompresses payloads which are compressed with certain compression methods.
Note that I didn't mean that this contamination would be happened within the specific CompressedMessagePackEventStream.
Oh! Sorry, I misunderstood! I see!
This could be work because Fluent Bit also decompresses payloads which are compressed with certain compression methods.
Thanks!
As @cosmo0920 says, it is possible for multiple senders to send in different compression types.
As @Athishpranav2003 says, there is no problem because what was originally two types will only become three types.
It would be better to clarify the description to avoid misunderstandings.
This would be good:
--
entriesis a gzipped binary chunk ofMessagePackEventStream, which MAY be a concatenated binary of multiple gzip binary strings. -- Client MUST send an option withcompressedkey with the valuegzip. -- Client MUST send a gzipped chunk as msgpackbinformat. +-entriesis a gzip/zstd binary chunk ofMessagePackEventStream, which MAY be a concatenated binary of multiple gzip/zstd binary strings. +- Client MUST send an option withcompressedkey with the valuegziporzstd. +- Client MUST send a gzip/zstd chunk as msgpackbinformat.However, I have a question for these sentences. Currently, we're able to decompress compressed chunks one-by-one. So, in paper, we are also able to take turns to decompress gzip compressed chunks and zstd compressed chunks. Is this specification needed to point out explicitly in the revised specification?
How about this?
- Server MUST accept `bin` format.
+- Server MUST decompress `entries` in the format according to the value of `compressed` key of the option for each msgpack binary.
- Server MAY decompress and decode individual events on demand but MAY NOT do right after request arrival. It means it MAY costs less, compared to `Forward` mode, when decoding is not needed by any plugins.
I noticed the description of these table should also be changed.
https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#logs-type-3 https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#metrics-or-traces-type-3
#### Logs Type
name | Ruby type | msgpack format | content
--- | --- | --- | ---
tag | String | str | tag name
- entries | CompressedMessagePackEventStream | bin | gzipped msgpack stream of Entry
+ entries | CompressedMessagePackEventStream | bin | compressed msgpack stream of Entry
option | Hash | map | option including key "compressed" (required)
```json
[
"tag.name",
"<<CompressedMessagePackEventStream>>",
{"compressed": "gzip"}
]
```
#### Metrics or Traces Type
name | Ruby type | msgpack format | content
--- | --- | --- | ---
tag | String | str | tag name
- entries | Msgpack stream for observabilities | bin | gzipped msgpack stream of Entry
+ entries | Msgpack stream for observabilities | bin | compressed msgpack stream of Entry
option | Hash | map | option including key "compressed" and "fluent\_signal" (required)
```json
[
"tag.name",
"<<Compressed payloads of observabilities>>",
{"compressed": "gzip", "fluent_signal": 1|2} # 1 for metrics and 2 for traces.
]
```
Based on the above, I re-propose the following change as v1.6.
CompressedPackedForward Mode
https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#compressedpackedforward-mode
### CompressedPackedForward Mode
-It carries a series of events as a msgpack binary, compressed by gzip, on a single request. The supported compression algorithm is only gzip.
+It carries a series of events as a msgpack binary, compressed by gzip or zstd, on a single request.
-- `entries` is a gzipped binary chunk of `MessagePackEventStream`, which MAY be a concatenated binary of multiple gzip binary strings.
-- Client MUST send an option with `compressed` key with the value `gzip`.
-- Client MUST send a gzipped chunk as msgpack `bin` format.
+- `entries` is a gzip/zstd binary chunk of `MessagePackEventStream`, which MAY be a concatenated binary of multiple gzip/zstd binary strings.
+- Client MUST send an option with `compressed` key with the value `gzip` or `zstd`.
+- Client MUST send a gzip/zstd chunk as msgpack `bin` format.
- Server MUST accept `bin` format.
+- Server MUST decompress `entries` in the format according to the value of `compressed` key of the option for each msgpack binary.
- Server MAY decompress and decode individual events on demand but MAY NOT do right after request arrival. It means it MAY costs less, compared to `Forward` mode, when decoding is not needed by any plugins.
Logs Type
https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#logs-type-3
#### Logs Type
name | Ruby type | msgpack format | content
--- | --- | --- | ---
tag | String | str | tag name
- entries | CompressedMessagePackEventStream | bin | gzipped msgpack stream of Entry
+ entries | CompressedMessagePackEventStream | bin | compressed msgpack stream of Entry
option | Hash | map | option including key "compressed" (required)
Metrics or Traces Type
https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#metrics-or-traces-type-3
#### Metrics or Traces Type
name | Ruby type | msgpack format | content
--- | --- | --- | ---
tag | String | str | tag name
- entries | Msgpack stream for observabilities | bin | gzipped msgpack stream of Entry
+ entries | Msgpack stream for observabilities | bin | compressed msgpack stream of Entry
option | Hash | map | option including key "compressed" and "fluent\_signal" (required)
Option
https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#option
- Server MAY just ignore any options given.
- `size`: Clients MAY send the `size` option to show the number of event records in an entries by an integer as a value. Server can know the number of events without unpacking entries (especially for PackedForward and CompressedPackedForward mode).
- `chunk`: Clients MAY send the `chunk` option to confirm the server receives event records. The value is a string of Base64 representation of 128 bits `unique_id` which is an ID of a set of events.
-- `compressed`: Clients MUST send the `compressed` option with value `gzip` to tell servers that entries is `CompressedPackedForward`. Other values will be ignored.
+- `compressed`: Clients MUST send the `compressed` option with value `gzip` or `zstd` to tell servers that entries is `CompressedPackedForward`. Other values will be ignored.
```json
{"chunk": "p8n9gmxTQVC8/nh2wlKKeQ==", "size": 4097}
I propose the following:
-- Server MUST decompress `entries` in the format according to the value of `compressed` key of the option for each msgpack binary.
+- Server MUST decompress `entries` in the format according to the value of `compressed` key of the option which contains `gzip` value for each gzip compressed msgpack binary.
+- Server MAY decompress `entries` in the format according to the value og `compressed` key of the option which contains `zstd` value for each zstd compressed msgpack binary.
+- Server MUST decompress `entries` that are gzip or ztsd compressed formats if a server supports both of decompression formats.
This is because zstd support status on Fluent Bit is still in PoC. So, we need to provide an option to support zstd compression and decompression in forward protocol.
Hmm, but wouldn't that be an incomplete protocol? If the server does not decompress zstd, what happens to the zstd compressed data sent by the client?
This is because zstd support status on Fluent Bit is still in PoC. So, we need to provide an option to support zstd compression and decompression in forward protocol.
Isn't it simply a matter of not being able to use zstd compression until both the server and client support v1.6 protocol? It seems to me that this new protocol version does not have to be immediately compliant with all applications.
If we find any problems in supporting this protocol in the future, we can revise it again at that time.
Hmm, but wouldn't that be an incomplete protocol? If the server does not decompress zstd, what happens to the zstd compressed data sent by the client?
This is because zstd support status on Fluent Bit is still in PoC. So, we need to provide an option to support zstd compression and decompression in forward protocol.
Isn't it simply a matter of not being able to use zstd compression until both the server and client support v1.6 protocol? It seems to me that this new protocol version does not have to be immediately compliant with all applications.
How about extending HELO (and PING/PONG) in forward protocol v1.6? The current version of HELO just sends an option which can be easily to extend to indicate that a fluent server really support gzip or zstd compression method. What do you think of this way to implement how to represent/tell the supported compression methods?
I mean, there is a possibility to support zstd to be delayed in Fluent Bit. So, we need to tell clients which want to send their payloads which is using forward protocol. Plus, there is a possibility to rolling out client at first and aggregator in some of the users' deployment. So, in this case, the newly deployed clients are only supporting the new way of decompression and the still running aggregators are not supporting the new decompression methods. Thus, I wanted to suggest that when a fluent server responds to show ztsd decompression is not supported yet, the client must not use zstd compression.
- Server shows that this server accepts
gzip(andzstd) decompression - Client just show warnings in their logs when users specify to use zstd compression in out_forward.
- Under that case, forward server and client just fall back to use gzip compression instead of zstd compression.
Thus, I wanted to suggest that when a fluent server responds to show ztsd decompression is not supported yet, the client must not use zstd compression.
I see! Let's consider a draft in that direction.
@daipom so essentially do we do a handshake to ensure client supports zstd and allow server to send zstd chunks only when this handshake is successful?
Yup. HELO is able to contains options. Currently, it contains nonce, auth, and keepalive. So, we need to add compression option there like: compression: ["gzip", "zstd"]. If omitting compression, fluent client is not to use zstd compression method.
ref: https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#helo
@Athishpranav2003 Sorry, I haven't worked on this lately.
The protocol would be as @cosmo0920 says.
We need to implement it with in_forward and out_forward.
Let's do that with a different PR than #4657. I will merge #4657!
Let's do that with a different PR than #4657.
https://github.com/fluent/fluentd/blob/30c3ce00ff165b1b5d9f53fc0a027074bbcab0da/lib/fluent/plugin/out_forward.rb#L680-L683
The options of HELO would be added into ri here.
So, for example, we can decompress the chunk if the server side does not support the compression format of the chunk here.
(For efficiency, one possible approach would be to select an appropriate compression format for each server in advance, but for now, there is no need to go that far, I think.)
@daipom so essentially we assume that the client to be forwarded is only known at runtime so we perform this check in the establish connection part and if the check fails we decrypt the info and send it instead of giving an error ryt?(i presume we through a warning log atleast)
@Athishpranav2003 Yes!
instead of giving an error ryt?(i presume we through a warning log at least)
It would be preferable to fallback to plain format rather than to treat it as an error. It would be a good idea to generate a warning log at this time.