logstash-codec-protobuf icon indicating copy to clipboard operation
logstash-codec-protobuf copied to clipboard

Support streaming inputs with delimited protobufs.

Open AlexJF opened this issue 8 years ago • 4 comments

When used with certain inputs that receive stream data (such as TCP), the default implementation of protobuf parsing may fail (if a complete serialized protobuf is not received in a single call to decode) or silently ignore all but the first protobuf if several were included in the streamed blob.

This commit adds a 'streaming' configuration parameter. If set to true, data received from a logstash input is buffered until a protobuf message can be decoded successfully.

In this setup, protobuf messages should be sent to logstash in delimited format, that is, each serialized message should be prefixed by a varint containing the total size of the serialized message.

For reference, here's a link to the logstash line codec which does something similar, but looking for newlines: https://github.com/logstash-plugins/logstash-codec-line/blob/master/lib/logstash/codecs/line.rb#L38

AlexJF avatar Nov 14 '16 10:11 AlexJF

Coming from a protobuf background and not really into logstash I poked around a little. Indeed logstash is not really a friendly environment for event processing of that scheme. My opinions are the following.

a) TCP source should be adapted to hand over the control of the socket and the thread to the decoder plugin. From there the ruby protobuf library would get support for reading length delimited protobufs from a blocking input stream. b) the TCP source gets a strategy for splitting length delimited input accordingly and the plugin would continue to work as is.

Main reasons that I do not like with the current approach are: The logic about varint length encoded messages belongs into the protobuf library more than into this plugin and it would only exits here to make up for deficits of the TCP source.

Id love to hear your opinion about it @AlexJF

Kaiserchen avatar Nov 17 '16 09:11 Kaiserchen

The logic about varint length encoded messages belongs into the protobuf library more than into this plugin and it would only exits here to make up for deficits of the TCP source.

I'm not sure I get this part of your reasoning. According to the protobuf docs they seem to pretty clearly throw this responsibility on the users of the library: https://developers.google.com/protocol-buffers/docs/techniques#streaming.

Also I don't necessarily agree with the fact that this should go into the TCP input source itself. It is the codec itself that should know how to best construct the data. For instance, I imagine most simple streaming inputs would simply include a 2-to-4 byte length field as opposed to the varint length field that is usually used with delimited protobufs (and that I'm using here).

I do agree that things would be a lot simpler and flexible if we could define a chain of codecs (like we can do with Netty for example with chains of decoders). This would allow us to have something like:

tcp {
  codecs: [
    varint_delimited: {...},
    protobuf: {...}
}

There's apparently some work being done in this regard (https://github.com/elastic/logstash/issues/3873) but until it is finished, in my opinion, this codec is the right place to do it.

AlexJF avatar Nov 17 '16 10:11 AlexJF

Lets look at TCP input

https://github.com/logstash-plugins/logstash-input-tcp/blob/master/lib/logstash/inputs/tcp.rb#L168 it will for example remove all CR/LF from the input stream. So if you have ever a CR/LF in one of your Proto messages say in a text field. It will be unparsable by the codec. The Issues with TCPinput are to profound to not attack the problem there!

Regarding of protobuf splitting, sure its the library users responsiblity to split the messages, but almost all implementations offer a CodedInputStream that would be perfect to use with my suggestion a that I made previously.

Most of the work in the codec is needed, because the TCP source ruins to much. Given the split on CR/LF you could use Base64Encoding over protobufs + some sort of separator for better message splitting.

This really boils down to a problematic implementation of TCPsource

Update: TCP source gives you all the data, the line above is behind a config I didn't see the first time. I try to make a proposal of how one could readjust the chunks.

Kaiserchen avatar Nov 17 '16 12:11 Kaiserchen

You're painting the TCP input worse than it really is.

The code you point at is used for parsing proxy information in case proxy_protocol is being used (and it was only added 3 days ago). It will remove 2 CR/LF from the beginning of the proxy header protocol which is necessary for parsing that header. You'll notice that ignoring proxy-related stuff, the only thing it does is:

  • Read up to 16KB of data into a buffer: https://github.com/logstash-plugins/logstash-input-tcp/blob/master/lib/logstash/inputs/tcp.rb#L164 and https://github.com/logstash-plugins/logstash-input-tcp/blob/master/lib/logstash/inputs/tcp.rb#L239
  • Pass the newly read data into the codec: https://github.com/logstash-plugins/logstash-input-tcp/blob/master/lib/logstash/inputs/tcp.rb#L182

Could they give us the socket directly and we could then use some other way of reading that would feel more natural? Perhaps but we, at least at the moment, have to make do with what we have.

Regarding using the library input stream, if the ruby protobuf library we are using in this plugin already has some class capable of directly parsing delimited protobuf streams, we can definitely use it. I just went with this implementation because I searched for a .readDelimitedFrom() (or similar) function like exists on the Java Protobuf library and found none. This was actually my first experience with Ruby and the Ruby Protobuf library so I have no doubt some things can be improved.

Update: Sorry didn't see your update :) I'll eagerly await your proposal but I don't know what you mean by readjusting chunks. I've updated my commit and now close the connection on decode error and have tested this code in an actual deployed environment and everything seems to be working.

AlexJF avatar Nov 17 '16 13:11 AlexJF