cobrix icon indicating copy to clipboard operation
cobrix copied to clipboard

Ignoring records

Open MaksymFedorchuk opened this issue 4 years ago • 5 comments

First of all, thanks for being so passionate about what you do. Let's say we have a file with 5 records: sdksdkdfksd dsfsdlflldfllld dfsdffdfsdfsf sdf;sd;ds;sdll sdfsffddsdsff And we want to ignore n records from beginning and k records from the end without counting empty lines, is there a way to do it in cobrix with some options for example option("ignore_beginning_records", n), option("ignore_end_records", k)?

MaksymFedorchuk avatar Jul 09 '21 09:07 MaksymFedorchuk

One way to do it is to turn on record number generation and then filter by record id:

    val dfSrc = spark
      .read
      .format("cobol")
      .option("copybook_contents", copybook)
      .option("generate_record_id", true)
      .option("schema_retention_policy", "collapse_root")
      .load("/a/data/path")
   val count = dfSrc.count()
   val df = dfSrc.filter("Record_Id" > n && "Record_Id" < k)

Record_Id will start from 0 for each file. When record id generation is turned on, there will be also File_Id that will be different for different files. Together, File_Id and Record_Id is unique for a given data load.

yruslan avatar Jul 09 '21 10:07 yruslan

Thanks!

MaksymFedorchuk avatar Jul 09 '21 10:07 MaksymFedorchuk

But the problem is, the files in my case always contain header and trailer(each is 1 line), if we have multiple files we'll need to use windowing function in order to remove last k records from each file and that will cause shuffling, with first n records there's no need for windowing function. Something like this:

val w = Window.partitionBy("File_Id")
df.withColumn("max_Record_Id", max("Record_Id").over(w))
    .where(col("Record_Id") >= n && col("Record_Id") <= col("max_Record_Id") - k)

MaksymFedorchuk avatar Jul 10 '21 06:07 MaksymFedorchuk

Yes, this makes sense. I'll add this feature request to the backlog. It might be helpful for our workflows for which we have header and footer records as well. In our case these headers and footers have a specific segment id (specific set of values of a certain field), so we just filter out these records by the value of segment id. But your proposal is more generic.

yruslan avatar Jul 10 '21 15:07 yruslan

I really like this idea, for most of my files i can use the segment_filter feature as suggested, and so far 100% of my outlier use-cases have been solvable using file_start_offset and file_end_offset but that solution doesn't work with variable lengths records.

I'm not 100% certain that we share the exact same use case but here is what I'm currently doing to handle header and trailer records:

In cases where I have a record type field, I use segment_filter to select only the header, for example:

  val headerData = this.spark.read
    .format("cobol")
    .option("schema_retention_policy", "collapse_root")
    .option("segment_field", "RECORD-ID")
    .option("segment_filter", "HDR")
    .option("copybook_contents", copybookLoader("/copybooks/HEADER.cbl"))
    .load(inputFile)

And in cases where I don't have a record type field, and the data is in fixed format I do the following:

  val recordSize = 126
  val skipByteCount = fileSize - recordSize
  val headerData = spark.read
    .format("cobol")
    .option("schema_retention_policy", "collapse_root")
    .option("file_end_offset", skipByteCount.toString) // skip all bytes after the header
    .option("copybook_contents", copybookLoader("/copybooks/HEADER.cbl"))
    .load(inputFile)

mark-weghorst avatar Aug 23 '21 18:08 mark-weghorst