fluency icon indicating copy to clipboard operation
fluency copied to clipboard

improve ack log messages

Open sokomishalov opened this issue 2 years ago • 4 comments

When working with fluent-bit and exceeding max message size on it, fluency log message is quite confusing:

com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `org.komamitsu.fluency.fluentd.ingester.Response` (although at least one Creator exists): no int/Int-argument constructor/factory method to deserialize from Number value (0)
 at [Source: (byte[])"

sokomishalov avatar Sep 02 '21 10:09 sokomishalov

Hi @sokomishalov,

Could you give me an instruction including configurations to reproduce this issue?

komamitsu avatar Sep 04 '21 07:09 komamitsu

Well, it's quite simple to reproduce - you just have to exceed fluent-bit buffering limits, fluency configuration is default:

@Test
fun `Stress test`() {
    val builderForFluentd = FluencyBuilderForFluentd()
    val fluency = builderForFluentd.build(24224)
    val hugeMessage = "a".repeat(1 * 1024 * 1024)
    repeat(1000) { fluency.emit("test", mapOf("foo" to hugeMessage)) }
}

Also I've discovered that same issue was created before (#171)

sokomishalov avatar Sep 04 '21 17:09 sokomishalov

I tried to reproduce this issue, but didn't. I changed Fluent Bit's buffer size down to 4KB since no error happened with the default 32KB.

Fluency test code

        FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
        Fluency fluency = builder.build();
        HashMap<String, Object> map = new HashMap<>();
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 1000; i++) {
            sb.append("a");
        }
        map.put("name", sb.toString());
        for (int i = 0; i < 1000; i++) {
            fluency.emit("foo.bar", map);
        }
        fluency.close();

Fluent Bit

$ fluent-bit -V
Fluent Bit v1.8.6
$ cat fluentbit.conf
[INPUT]
    Name              forward
    Listen            0.0.0.0
    Port              24224
    Buffer_Chunk_Size 4K
    Buffer_Max_Size   4K

[OUTPUT]
    Name   stdout
    Match  *

$ fluent-bit -c fluentbit.conf
    :
[2021/09/05 22:29:13] [ warn] [input:forward:forward.0] fd=25 incoming data exceed limit (4000 bytes)
[2021/09/05 22:29:14] [ warn] [input:forward:forward.0] fd=25 incoming data exceed limit (4000 bytes)
[2021/09/05 22:29:14] [ warn] [input:forward:forward.0] fd=25 incoming data exceed limit (4000 bytes)
[2021/09/05 22:29:16] [ warn] [input:forward:forward.0] fd=25 incoming data exceed limit (4000 bytes)
    :

Fluency log

    :
51593 [pool-1-thread-1] ERROR o.k.f.f.i.sender.NetworkSender - Failed to send 1015025 bytes data
51593 [pool-1-thread-1] WARN  o.k.f.f.i.sender.RetryableSender - Sender failed to send data. sender=RetryableSender{baseSender=TCPSender{config=Config{host='127.0.0.1', port=24224, connectionTimeoutMilli=5000, readTimeoutMilli=5000, waitBeforeCloseMilli=1000} Config{senderErrorHandler=null}} NetworkSender{config=Config{host='127.0.0.1', port=24224, connectionTimeoutMilli=5000, readTimeoutMilli=5000, waitBeforeCloseMilli=1000} Config{senderErrorHandler=null}, failureDetector=null} org.komamitsu.fluency.fluentd.ingester.sender.TCPSender@62ffe26a, retryStrategy=ExponentialBackOffRetryStrategy{config=Config{baseIntervalMillis=400, maxIntervalMillis=30000} Config{maxRetryCount=7}} RetryStrategy{config=Config{baseIntervalMillis=400, maxIntervalMillis=30000} Config{maxRetryCount=7}}, isClosed=false} org.komamitsu.fluency.fluentd.ingester.sender.RetryableSender@61a17cdb, retry=7
java.io.IOException: Broken pipe
    at java.base/sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
    at java.base/sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
    at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:182)
    at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:130)
    at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:493)
    at java.base/java.nio.channels.SocketChannel.write(SocketChannel.java:507)
    at org.komamitsu.fluency.fluentd.ingester.sender.TCPSender.sendBuffers(TCPSender.java:86)
    at org.komamitsu.fluency.fluentd.ingester.sender.TCPSender.sendBuffers(TCPSender.java:31)
    at org.komamitsu.fluency.fluentd.ingester.sender.NetworkSender.sendInternal(NetworkSender.java:102)
    at org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender.sendInternalWithRestoreBufferPositions(FluentdSender.java:74)
    at org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender.send(FluentdSender.java:56)
    at org.komamitsu.fluency.fluentd.ingester.sender.RetryableSender.sendInternal(RetryableSender.java:77)
    at org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender.sendInternalWithRestoreBufferPositions(FluentdSender.java:74)
    at org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender.send(FluentdSender.java:56)
    at org.komamitsu.fluency.fluentd.ingester.FluentdIngester.ingest(FluentdIngester.java:87)
    :

komamitsu avatar Sep 05 '21 13:09 komamitsu

Somehow I can't reproduce it too. I'll be back when either I or someone else got luck with it.

sokomishalov avatar Sep 21 '21 09:09 sokomishalov