logstash icon indicating copy to clipboard operation
logstash copied to clipboard

Protect new implementation of BufferedTokenizer against OOM

Open andsel opened this issue 11 months ago • 3 comments

In #17229 is applied an implementation change to the BufferedTokenizer. It switched from RubyArray and JRuby classes to pure Java, so that the method extract now return an Iterable instead of the RubyArray.

The implementation in split in two parts:

  • one iterator that is responsible to accumulate the fragments coming from the extract method invocation
  • an outer iterator that decorates the first and simply apply the sizeLimit verification throwing an exception when it receives a token longer than sizeLimit.

Despite the implementation seems more linear, it reintroduces a problem that was surfaced in https://github.com/logstash-plugins/logstash-codec-json_lines/pull/43.

The inner iterator uses a StringBuilder as accumulator, and on every call of the extract, it dices the tokens by separator. Now if a very big payload is pushed down to a codec that uses this BufferedTokenizer, where the payload simply require multiple invocations of extract before resulting in a tokenization (that could also never happen, suppose an offending client that never send the separator), the memory gets filled with data that is useless and would be rejected by the next iterator, but simply never reach it because goes OOM before. In the previous implementation the tokenization part, when firstly detected an overrun of the sizeLimit, dropped every fragment till the next separator was presented, protecting from OOM conditions like this.

Now are my questions:

  • is this really a corner case or could happen in practice?
  • do think it's a point to resolve in the original #17229 or could be fixed in a follow up PR?

andsel avatar Mar 06 '25 16:03 andsel

@andsel Did you check in the inputs (tcp/file) that use the BufferedTokenizer whether this behaviour can occur?

eg (from @jsvd)

for example tcp input gets packets that are limited by the mtu, or aggregated to 16k if they’re tls, and file input reads in chunks, not sure if these smaller < 100kb chunks are accumulated elsewhere otherwise the buffer always gets something smaller.

robbavey avatar Mar 11 '25 16:03 robbavey

It happens also with TCP input, using the following pipeline, with testing data originated from https://github.com/logstash-plugins/logstash-codec-json_lines/pull/43

input {
  tcp {
    port => 1234

    codec => json_lines {
      decode_size_limit_bytes => 32768
    }
  }
}

output {
  stdout {
    codec => rubydebug
  }
}

and streaming the big json with netcat:

cat /path/to/big_single_line.json | netcat localhost 1234
TCP test logs

[2025-03-12T09:43:40,293][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2025-03-12T09:45:29,133][INFO ][logstash.codecs.jsonlines][main][1b68bf0f03797ca47bad81969b1ebe54482840f6f86353b356630f06fecc2da4] ECS compatibility is enabled but `target` option was not specified. This may cause fields to be set at the top-level of the event where they are likely to clash with the Elastic Common Schema. It is recommended to set the `target` option to avoid potential schema conflicts (if your data is ECS compliant or non-conflicting, feel free to ignore this message)




java.lang.OutOfMemoryError: Java heap space
Dumping heap to java_pid3230.hprof ...
Heap dump file created [393883507 bytes in 0.239 secs]
[2025-03-12T09:45:45,209][ERROR][logstash.inputs.tcp      ][main][1b68bf0f03797ca47bad81969b1ebe54482840f6f86353b356630f06fecc2da4] localhost/127.0.0.1:50744: closing due:
java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3541) ~[?:?]
	at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:242) ~[?:?]
	at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:587) ~[?:?]
	at java.lang.StringBuilder.append(StringBuilder.java:179) ~[?:?]
	at org.logstash.common.BufferedTokenizer$DataSplitter.append(BufferedTokenizer.java:102) ~[logstash-core.jar:?]
	at org.logstash.common.BufferedTokenizer.extract(BufferedTokenizer.java:135) ~[logstash-core.jar:?]
	at org.logstash.common.BufferedTokenizerExt.extract(BufferedTokenizerExt.java:83) ~[logstash-core.jar:?]
	at java.lang.invoke.LambdaForm$DMH/0x00000008007dcc00.invokeVirtual(LambdaForm$DMH) ~[?:?]
	at java.lang.invoke.LambdaForm$MH/0x00000008007f0800.invoke(LambdaForm$MH) ~[?:?]
	at java.lang.invoke.DelegatingMethodHandle$Holder.delegate(DelegatingMethodHandle$Holder) ~[?:?]
	at java.lang.invoke.LambdaForm$MH/0x00000008007e4000.guard(LambdaForm$MH) ~[?:?]
	at java.lang.invoke.DelegatingMethodHandle$Holder.delegate(DelegatingMethodHandle$Holder) ~[?:?]
	at java.lang.invoke.LambdaForm$MH/0x00000008007e4000.guard(LambdaForm$MH) ~[?:?]
	at java.lang.invoke.Invokers$Holder.linkToCallSite(Invokers$Holder) ~[?:?]
	at Users.andrea.workspace.logstash_plugins.logstash_minus_codec_minus_json_lines.lib.logstash.codecs.json_lines.RUBY$method$decode$0(/Users/andrea/workspace/logstash_plugins/logstash-codec-json_lines/lib/logstash/codecs/json_lines.rb:69) ~[?:?]
	at java.lang.invoke.DirectMethodHandle$Holder.invokeStatic(DirectMethodHandle$Holder) ~[?:?]
	at java.lang.invoke.LambdaForm$MH/0x0000000800bda400.invoke(LambdaForm$MH) ~[?:?]
	at java.lang.invoke.LambdaForm$MH/0x0000000800849800.invoke(LambdaForm$MH) ~[?:?]
	at java.lang.invoke.DelegatingMethodHandle$Holder.delegate(DelegatingMethodHandle$Holder) ~[?:?]
	at java.lang.invoke.LambdaForm$MH/0x00000008007e7800.guard(LambdaForm$MH) ~[?:?]
	at java.lang.invoke.DelegatingMethodHandle$Holder.delegate(DelegatingMethodHandle$Holder) ~[?:?]
	at java.lang.invoke.LambdaForm$MH/0x00000008007e7800.guard(LambdaForm$MH) ~[?:?]
	at java.lang.invoke.Invokers$Holder.linkToCallSite(Invokers$Holder) ~[?:?]
	at Users.andrea.workspace.logstash_andsel.vendor.bundle.jruby.$3_dot_1_dot_0.gems.logstash_minus_input_minus_tcp_minus_7_dot_0_dot_2_minus_java.lib.logstash.inputs.tcp.RUBY$method$decode_buffer$0(/Users/andrea/workspace/logstash_andsel/vendor/bundle/jruby/3.1.0/gems/logstash-input-tcp-7.0.2-java/lib/logstash/inputs/tcp.rb:215) ~[?:?]
	at java.lang.invoke.LambdaForm$DMH/0x00000008010cb400.invokeStatic(LambdaForm$DMH) ~[?:?]
	at java.lang.invoke.LambdaForm$MH/0x00000008010d3400.invoke(LambdaForm$MH) ~[?:?]
	at java.lang.invoke.DelegatingMethodHandle$Holder.delegate(DelegatingMethodHandle$Holder) ~[?:?]
	at java.lang.invoke.LambdaForm$MH/0x00000008010d4400.guard(LambdaForm$MH) ~[?:?]
	at java.lang.invoke.DelegatingMethodHandle$Holder.delegate(DelegatingMethodHandle$Holder) ~[?:?]
	at java.lang.invoke.LambdaForm$MH/0x00000008010d4400.guard(LambdaForm$MH) ~[?:?]
	at java.lang.invoke.LambdaForm$MH/0x00000008010d0c00.linkToCallSite(LambdaForm$MH) ~[?:?]
	at Users.andrea.workspace.logstash_andsel.vendor.bundle.jruby.$3_dot_1_dot_0.gems.logstash_minus_input_minus_tcp_minus_7_dot_0_dot_2_minus_java.lib.logstash.inputs.tcp.decoder_impl.RUBY$method$decode$0(/Users/andrea/workspace/logstash_andsel/vendor/bundle/jruby/3.1.0/gems/logstash-input-tcp-7.0.2-java/lib/logstash/inputs/tcp/decoder_impl.rb:23) ~[?:?]
[2025-03-12T09:45:45,295][WARN ][io.netty.util.concurrent.DefaultPromise][main][1b68bf0f03797ca47bad81969b1ebe54482840f6f86353b356630f06fecc2da4] An exception was thrown by org.logstash.tcp.InputLoop$InputHandler$FlushOnCloseListener.operationComplete()
java.lang.OutOfMemoryError: Java heap space
	at java.lang.Object.clone(Native Method) ~[?:?]
	at java.lang.String.encode8859_1(String.java:1029) ~[?:?]
	at java.lang.String.encode8859_1(String.java:1024) ~[?:?]
	at java.lang.String.encode(String.java:870) ~[?:?]
	at java.lang.String.getBytes(String.java:1818) ~[?:?]
	at org.logstash.common.BufferedTokenizerExt.toEncodedRubyString(BufferedTokenizerExt.java:96) ~[logstash-core.jar:?]
	at org.logstash.common.BufferedTokenizerExt.flush(BufferedTokenizerExt.java:114) ~[logstash-core.jar:?]
	at org.logstash.common.BufferedTokenizerExt$INVOKER$i$0$0$flush.call(BufferedTokenizerExt$INVOKER$i$0$0$flush.gen) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:456) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:195) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:346) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66) ~[jruby.jar:?]
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:128) ~[jruby.jar:?]
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:115) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:446) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:92) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:103) ~[jruby.jar:?]
	at org.jruby.ir.instructions.CallBase.interpret(CallBase.java:545) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:363) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66) ~[jruby.jar:?]
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:128) ~[jruby.jar:?]
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:115) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:446) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:92) ~[jruby.jar:?]
	at org.jruby.ir.instructions.CallBase.interpret(CallBase.java:548) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:363) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66) ~[jruby.jar:?]
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:128) ~[jruby.jar:?]
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:115) ~[jruby.jar:?]
	at org.jruby.gen.LogStash$$Inputs$$Tcp$$DecoderImpl_1354257020.flush(org/jruby/gen/LogStash$$Inputs$$Tcp$$DecoderImpl_1354257020.gen:13) ~[?:?]
	at org.logstash.tcp.InputLoop$InputHandler$FlushOnCloseListener.operationComplete(InputLoop.java:174) ~[logstash-input-tcp-7.0.2.jar:?]
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590) ~[netty-common-4.1.118.Final.jar:4.1.118.Final]
^C[2025-03-12T09:47:36,392][WARN ][logstash.runner          ] SIGINT received. Shutting down.
[2025-03-12T09:47:40,658][INFO ][logstash.javapipeline    ][main] Pipeline terminated {"pipeline.id"=>"main"}
[2025-03-12T09:47:41,404][WARN ][logstash.runner          ] Received shutdown signal, but pipeline is still waiting for in-flight events
to be processed. Sending another ^C will force quit Logstash, but this may cause
data loss.
[2025-03-12T09:47:41,579][INFO ][logstash.pipelinesregistry] Removed pipeline from registry successfully {:pipeline_id=>:main}
[2025-03-12T09:47:41,584][INFO ][logstash.runner          ] Logstash shut down.

With the input file Logstash will fail because at https://github.com/logstash-plugins/logstash-input-file/blob/55a4a7099f05f29351672417036c1342850c7adc/lib/filewatch/watched_file.rb#L250 expect an array, so the iterable has to be converted to a Ruby array and the proposal PR implementation throws a NoMethodError

andsel avatar Mar 12 '25 09:03 andsel

Final decision Given that the OOM can happen in the file input, the plan is to apply #17293 on top of #17229 once it's merged.

andsel avatar Mar 12 '25 14:03 andsel