fluentd icon indicating copy to clipboard operation
fluentd copied to clipboard

Broken hadoop_snappy compression in some cases

Open alheio opened this issue 1 year ago • 1 comments

Describe the bug

Hello!

I have logs from two apps.

app1 - input in_tail one file, webhdfs output with file buffer and hadoop_snappy compression app2 - input in_tail files by mask, webhdfs output with file buffer and hadoop_snappy compression

app1 is watching on 1 file, buffer stores chunks like 20mb per 2 minutes and flush chunk into hdfs by time app2 is watching on 50 files, buffer stores like 50mb per 5 minutes and flush data into hdfs by time

App1 works fine, but in case of app2 Im getting "invalid compression" on like 5% of files (chunks) from different hosts while processing files.

Exception in thread "main" java.lang.InternalError: Could not decompress data. Input is invalid.
        at org.apache.hadoop.io.compress.snappy.SnappyDecompressor.decompressBytesDirect(Native Method)
        at org.apache.hadoop.io.compress.snappy.SnappyDecompressor.decompress(SnappyDecompressor.java:239)
        at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:88)
        at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
        at java.io.InputStream.read(InputStream.java:101)
        at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:93)
        at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:61)
        at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:121)
        at org.apache.hadoop.fs.shell.Display$Cat.printToStdout(Display.java:106)
        at org.apache.hadoop.fs.shell.Display$Cat.processPath(Display.java:101)
        at org.apache.hadoop.fs.shell.Command.processPaths(Command.java:317)
        at org.apache.hadoop.fs.shell.Command.processPathArgument(Command.java:289)
        at org.apache.hadoop.fs.shell.Command.processArgument(Command.java:271)
        at org.apache.hadoop.fs.shell.Command.processArguments(Command.java:255)
        at org.apache.hadoop.fs.shell.FsCommand.processRawArguments(FsCommand.java:118)
        at org.apache.hadoop.fs.shell.Command.run(Command.java:165)
        at org.apache.hadoop.fs.FsShell.run(FsShell.java:315)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
        at org.apache.hadoop.fs.FsShell.main(FsShell.java:372)

How can i tune config? Why hadoop_snappy generates invalid blocks? If i set compression "text" instead of "hadoop_snappy", all works fine, and no invalid records while parsing data in hdfs.

To Reproduce

To reproduce the issue, you could do what i did in "describe bug" section with fillowing dockerfile:

FROM fluent/fluentd:v1.16.5-debian-amd64-1.0

USER root

# Timezone
ENV TZ="Europe/Moscow"
RUN ln -snf "/usr/share/zoneinfo/$TZ" "/etc/localtime" \
    && echo "$TZ" > "/etc/timezone"

# for snappy gem native libs building
RUN apt update \
    && apt -y install build-essential autoconf automake libtool libsnappy-dev \
    && apt clean

# plugins
RUN fluent-gem install \
    fluent-plugin-webhdfs \
    fluent-plugin-prometheus \
    snappy

USER fluent

Expected behavior

Expected behaviour - valid compression on result files.

Your Environment

- Official fluentd docker image fluent/fluentd:v1.16.5-debian-amd64-1.0
- Fluentd version: 1.16.5
- gem 'fluent-plugin-prometheus' version '2.1.0'
- gem 'fluent-plugin-webhdfs' version '1.6.0'

Your Configuration

<system>
    workers 2
  </system>

  <source>
    @type monitor_agent
    bind "0.0.0.0"
    port 24220
  </source>

  <source>
    @type prometheus
    bind "0.0.0.0"
    port 24231
    metrics_path "/metrics"
  </source>

  <filter mytags.*>
    @type prometheus
    <metric>
      name fluentd_input_status_num_records_total
      type counter
      desc The total number of incoming records
      <labels>
        tag ${tag}
        hostname ${hostname}
      </labels>
    </metric>
  </filter>

  <source>
    @type prometheus_monitor
  </source>

  <source>
    @type prometheus_output_monitor
    interval 10
    <labels>
      hostname ${hostname}
    </labels>
  </source>

  <worker 0>
    <source>
      tag "mytags.app-tag-1"
      @type tail
      path "/path/to/logs/app1.log"
      pos_file "/home/fluentd/pipeline_workdir/app1.log.pos"
      read_from_head false
      <parse>
        @type "none"
        unmatched_lines
      </parse>
    </source>
    <match mytags.app-tag-1>
      @type webhdfs
      namenode namenode1:50070
      standby_namenode namenode2:50070
      ignore_start_check_error true
      retry_known_errors yes
      retry_times 10000
      retry_interval 30
      open_timeout 180
      read_timeout 180
      append no
      username hdfsuser
      path "/path/in/hdfs/app1/%Y/%m/%d/%H/#{Socket.gethostname}.${chunk_id}.log"
      compress hadoop_snappy
      <format>
        @type "single_value"
      </format>
      <buffer time>
        @type "file"
        path "/home/fluentd/pipeline_workdir/app1.log.buf"
        chunk_limit_size 512MB
        flush_mode interval
        flush_interval 2m
        flush_thread_count 5
        retry_type periodic
        retry_wait 30s
        timekey_zone "+0300"
        timekey 3600
      </buffer>
    </match>
  </worker>

  <worker 1>
    <source>
      tag "mytags.app2"
      @type tail
      path "/path2/to/logs/app2*.log"
      pos_file "/home/fluentd/pipeline_workdir/app2.log.pos"
      read_from_head false
      follow_inodes true
      rotate_wait 60
      <parse>
        @type "none"
        unmatched_lines
      </parse>
    </source>
    <match mytags.app2>
      @type webhdfs
      namenode namenode1:50070
      standby_namenode namenode2:50070
      ignore_start_check_error true
      retry_known_errors yes
      retry_times 10000
      retry_interval 30
      open_timeout 180
      read_timeout 180
      append no
      username hdfsuser
      path "/path/in/hdfs/app2/%Y/%m/%d/%H/#{Socket.gethostname}.${chunk_id}.log"
      compress hadoop_snappy
      <format>
        @type "single_value"
      </format>
      <buffer time>
        @type "file"
        path "/home/fluentd/pipeline_workdir/app2.log.buf"
        chunk_limit_size 512MB
        flush_mode interval
        flush_interval 2m
        flush_thread_count 5
        retry_type periodic
        retry_wait 30s
        timekey_zone "+0300"
        timekey 3600
      </buffer>
    </match>
  </worker>

Your Error Log

no errors on fluentd side

Additional context

The only difference between two pipelines is in_tail watching on 50 files by mask.

alheio avatar Apr 17 '24 13:04 alheio

If i place both pipelines on one worker, im getting "invalid snappy compression" errors on both pipelines (only on the first pipeline, if second pipeline has "text" compression codec)

alheio avatar Apr 18 '24 10:04 alheio

According to https://github.com/fluent/fluent-plugin-webhdfs#performance-notifications , It says that "you should configure 'path' for each node", Does your configuration actually follows it? (I guess "Your Configuration" is masked some extent)

kenhys avatar Jul 09 '24 05:07 kenhys

This issue has been automatically marked as stale because it has been open 30 days with no activity. Remove stale label or comment or this issue will be closed in 7 days

github-actions[bot] avatar Aug 08 '24 10:08 github-actions[bot]

This issue was automatically closed because of stale in 7 days

github-actions[bot] avatar Aug 15 '24 10:08 github-actions[bot]