cobrix
cobrix copied to clipboard
Issue with customrecordparser-
Hi,
We have a file with variable length and length is determined by first 6 bytes-(has multiple copybook so merged all of them-) I am using custom record parser for this file, below are the sample parser and main class which is used- Issue: Nothing is matching with that 6 bytes and all are throwing an error- and also tried to read the rejected records given size as 1000 as a sample but all the records are blank/nulls- In the below code I have given 5 sample patterns for the file, however we have around 22 different patterns-
Can you please give us a suggestion, whether we are on the right path and how to achieve this?
Appreciate your cooperation-
Thanks-
package com.example.spark.cobol.app
import za.co.absa.cobrix.cobol.parser.common.Constants import za.co.absa.cobrix.cobol.parser.headerparsers.{RecordHeaderParser, RecordMetadata}
class CustomRecordHeadersParser extends Serializable with RecordHeaderParser {
/** RDW header is a 5 byte header */ override def getHeaderLength: Int = 6
override def isHeaderDefinedInCopybook: Boolean = true
override def getRecordMetadata(header: Array[Byte], fileOffset: Long, fileSize: Long, recordNum: Long): RecordMetadata = { val rdwHeaderBlock = getHeaderLength if (header.length < rdwHeaderBlock) { RecordMetadata(-1, isValid = false) } else if(header.map(_ & 0xFF).mkString("") == "01KJUG"){ RecordMetadata(8034, isValid = true) } else if(header.map(_ & 0xFF).mkString("") == "40NJHY"){ RecordMetadata(18034, isValid = true) } else if(header.map(_ & 0xFF).mkString("") == "87BGHO"){ RecordMetadata(6575, isValid = true) } else if(header.map(_ & 0xFF).mkString("") == "09GHGT"){ RecordMetadata(2678, isValid = true) } else if(header.map(_ & 0xFF).mkString("").take(3) == "HDR"){ RecordMetadata(65, isValid = false) } else { //throw new IllegalStateException(s"Custom RDW headers is not matching with any of the patterns-") RecordMetadata(1000, isValid = true) } }
}
package com.example.spark.cobol.app
import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SparkSession
object SparkCodecApp {
def main(args: Array[String]): Unit = { // Switch logging level to WARN Logger.getLogger("org").setLevel(Level.WARN) Logger.getLogger("akka").setLevel(Level.WARN)
val spark = SparkSession
.builder()
.appName("Spark-Cobol Custom header parser example")
.getOrCreate()
val df = spark
.read
.format("cobol")
.option("copybook", "../example_data/copybook_codec.cob")
.option("is_record_sequence", "true")
.option("generate_record_id", true)
.option("schema_retention_policy", "collapse_root")
.option("record_header_parser", "com.example.spark.cobol.app.CustomRecordHeadersParser") // Custom record header parser class
.load("../example_data/data_codec/somefile.dat")
df.printSchema()
df.show
}
}
I'm not sure
header.map(_ & 0xFF).mkString("")
does what you expect. It might depend on the encoding of the header. If you have an example of binary representation of your headers, I can advice of how to decode.
For ASCII header you can use new String(...):
val header = Array[Byte](0x54.toByte, 0x55.toByte, 0x56.toByte, 0x57.toByte)
header.map(_ & 0xFF).mkString("") // res0: String = 84858687
new String(header, "ASCII") // res1: String = TUVW