seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Improve] [connector-file-base] Improve the write performance of ORC file format

Open shfshihuafeng opened this issue 1 month ago • 1 comments

Search before asking

  • [x] I had searched in the issues and found no similar issues.

What happened

Improving Orc Writing Performance by Batch Writing Files, I test the written of ORC format with 5,000,000 data ,the test results are as follows:

before improvement


       Job Statistic Information

Start Time : 2025-11-25 09:47:46 End Time : 2025-11-25 09:48:48 Total Time(s) : 61 Total Read Count : 1 Total Write Count : 5000000 Total Failed Count : -4999999

after improvement(Change to batch written)


       Job Statistic Information

Start Time : 2025-11-25 10:17:03 End Time : 2025-11-25 10:17:37 Total Time(s) : 33 Total Read Count : 1 Total Write Count : 5000000 Total Failed Count : -4999999

SeaTunnel Version

dev

SeaTunnel Config

env {
  parallelism = 1
  job.mode = "BATCH"
  checkpoint.interval=600000
}


source {
  Http {
    result_table_name = "http"
    url = "http://x/hello/love"
    method = "GET"
    socket_timeout_ms=300000
    #format = "json"
    #schema = {
     #fields {
      #  name = "string"
       # age=int
        #city=string
        #}
      #}
    }
  }

transform {
  

Replace {
    source_table_name = "http"
    result_table_name = "et3"
    replace_field = "content"
    pattern = "(\\[)|(\\])"
    replacement = ""
    is_regex = true
    replace_first = true
  }
    
    
  Replace {
    source_table_name = "et3"
    result_table_name = "et4"
    replace_field = "content"
    # 再将对象间的逗号替换为#
    pattern = "\\},\\s*\\{"
    replacement = "}#{"
    is_regex = true
    replace_first = false
  }

  Sql {
    source_table_name = "et4"
    result_table_name = "et5"
    query = "SELECT * FROM et4 LATERAL VIEW  EXPLODE(SPLIT(content,'#')) as ex"
  }
    JsonPath {
    source_table_name = "et5"
    result_table_name = "et6"
    columns = [
    {
    "src_field" = "ex"
    "path" = "$.name"
    "dest_field" = "name"
    "dest_type" = "string"
    },
    {
    "src_field" = "ex"
    "path" = "$.age"
    "dest_field" = "age"
    "dest_type" = "int"
    }, 
    {
    "src_field" = "ex"
    "path" = "$.city"
    "dest_field" = "city"
    "dest_type" = "string"
    }
    ]
  }
   
   
sql {
    source_table_name = "et6"
    result_table_name = "et7"
    query = "SELECT name,age,city  FROM et6"
   }
}
sink {
 HdfsFile {
    source_table_name = "et7"
    fs.defaultFS = "hdfs://x:9000"
    #batch_size=5000000
    #have_partition = true
    #partition_by = ["city"]
    #compress_codec = "zlib"
    #partition_dir_expression = "${k0}=${v0}"
    path = "/shf"
    file_format_type = "orc"
    }

}

Running Command

bin/seatunnel.sh --config ./config/http2hdfs_orc.config  -m local

Error Exception

none

Zeta or Flink or Spark Version

zeta

Java or Scala Version

java 1.8

Screenshots

No response

Are you willing to submit PR?

  • [x] Yes I am willing to submit a PR!

Code of Conduct

shfshihuafeng avatar Nov 25 '25 02:11 shfshihuafeng

good job

davidzollo avatar Nov 26 '25 15:11 davidzollo