cobrix
cobrix copied to clipboard
merge copybooks issue
I was trying to use merge copybooks feature. we have single copy book several segments that starts with 01 and there is no common fields between segments. Here I have two issues. a. I divided my single copy book to 5 copybooks. I have used merge copybook feature .option("copybook", s"/user/cobtest/test_A.txt,/user/cobtest/test_B.txt,/user/cobtest/test_C.txt,/user/cobtest/test_D.txt,/user/cobtest/test_E.txt") and these files are present in the hdfs at this path. I am getting the error message that the copybook /user/cobtest/test_A.txt,/user/cobtest/test_B.txt,/user/cobtest/test_C.txt,/user/cobtest/test_D.txt,/user/cobtest/test_E.txt does not exist.
b. I have several segments that starts with 01 in a single copy book. May be 30 to 40. Does cobrix has any option to divide them into different copybooks automated.
For your issue a all you need to do is to replace 'copybook' option with 'copybooks'.
Not sure about your requirements on b. You can take a multisegment copybook and filter only segments you are interested in using segment_filter option (it supports specifying a list of segment ids). After that you can use Spark's .select(...) to project only specific segment fields.
The above will provide you a splitting of the copybook. But for getting more performance you can also use redefine-segment-id-map option (described in README).
Thank you. I will try your comments. Can you please provide me any suggestions for my question b. I have several segments that starts with 01 in a single copy book. May be 30 to 40. Does cobrix has any option to divide them into different copybooks automated.
Please elaborate your requirements on your question b, by providing an example, perhaps. I have specified my suggestions in the previous comment. To my current knowledge on the issue that's all I can advice.
I have tried with option copybooks for using merge copybooks option. below is my code cobolDataframe = spark .read .format("cobol") .option("copybooks", v_copybook) .option("schema_retention_policy", "collapse_root") //removes the root record header .option("optimize_allocation", true) .option("drop_group_fillers", "false") .option("generate_record_id", false) // this adds the file id and record id .option("improve_locality", false) .option("is_record_sequence", "true") // reader to use 4 byte record headers to extract records from a mainframe file .option("is_rdw_big_endian", "true") .load(v_data)
cobolDataframeM = spark
.read
.format("cobol")
.option("copybooks", s"$patha,$pathb,$pathc,$pathd,$pathe")
.option("schema_retention_policy", "collapse_root") //removes the root record headerc
.option("optimize_allocation", true)
.option("drop_group_fillers", "false")
.option("generate_record_id", false) // this adds the file id and record id
.option("improve_locality", false)
.option("is_record_sequence", "true") // reader to use 4 byte record headers to extract records from a mainframe file
.option("is_rdw_big_endian", "true")
.load(v_data)
All variables are defined. I am getting the below error. Please help me in resolving the issue Exception in thread "main" java.lang.IllegalStateException: Cannot define path to source files: missing parameter: 'path' at za.co.absa.cobrix.spark.cobol.source.parameters.CobolParametersValidator$.validateOrThrow(CobolParametersValidator.scala:84) at za.co.absa.cobrix.spark.cobol.source.DefaultSource.createRelation(DefaultSource.scala:52) at za.co.absa.cobrix.spark.cobol.source.DefaultSource.createRelation(DefaultSource.scala:48) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:330) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:135) at com.cobrix.cobtest$.ProcessMainframeFile(cobtest.scala:140)
The error is not related to usage of the "copybooks". Cobrix does not receive a path to the actual files for some reason. Could you please paste more code. I wonder how v_data is defined.
v_copybook is the single copybook that has many levels at 01. copybook and data files are in hdfs. hdfs path is given as v_copybook and v_data.
when I check the cobrix master code this path error is coming for the value PARAM_COPYBOOK_PATH is not set. I am thinking that because this value is not set so I am getting the path error.
my copybook hdfs path is "/user/cobtest/UIEWCDS.txt"
my data hdfs path is "/user/cobtest/BREQIUT"
My copybook is below
01 TEST-PYMT-WHSE-RECORD-33.
03 TEST-KEY-33.
05 TEST-ACCT-33 PIC 9(18).
05 TEST-CHECK-ID-33.
10 TEST-IND-33 PIC X(3).
10 TEST-PYMT-ID-33 PIC X(10).
05 TEST-DIRECTION-33 PIC X(1).
03 TEST-DATA-GROUP-33.
05 TEST-DATA-GROUP-N-33 PIC S9(08) COMP.
03 TEST-MAIN-SEGMENT.
05 TEST-PROCESSING-DETAILS.
10 TEST-TYPE PIC X(3).
10 TEST-EXPEDITED PIC X(1).
10 TEST-MANUAL-ACCEPT PIC X(1).
01 TEST-PYMT-WHSE-RECORD-34.
03 TEST-KEY-34.
05 TEST-ACCT-34 PIC 9(18).
05 TEST-CHECK-ID-34.
10 TEST-IND-ORG-34 PIC X(3).
10 TEST-PYMT-ID-34 PIC X(10).
05 TEST-DIRECTION-34 PIC X(1).
03 TEST-DATA-GROUP-34.
05 TEST-DATA-GROUP-N-34 PIC S9(08) COMP.
01 TEST-PYMT-WHSE-RECORD-35.
03 TEST-KEY-35.
05 TEST-ACCT-35 PIC 9(18).
05 TEST-CHECK-ID-35.
10 TEST-IND-ORG-35 PIC X(3).
10 TEST-PYMT-ID-35 PIC X(10).
05 TEST-DIRECTION-35 PIC X(1).
The 'path' variable is set automatically when you invoke load() so you don't need to set it explicitly.
I'm unable to reproduce the error. I see that kind of error for the first time. Could you please paste the full code (including variable definitions) that you use - will try to reproduce the issue.
below is my code package com.cobrix
import org.apache.spark.{SparkConf, SparkContext} //import org.apache.spark.sql.{SaveMode, SparkSession} // import za.co.absa.cobrix.spark.cobol.source import za.co.absa.cobrix.spark.cobol._ import za.co.absa.cobrix.cobol.parser.CopybookParser import za.co.absa.cobrix.spark.cobol.schema.{CobolSchema, SchemaRetentionPolicy} import java.io.File import java.io.PrintWriter
import org.apache.spark.SparkConf import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Column, SaveMode, SparkSession, DataFrame} import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.ArrayType
import za.co.absa.cobrix.spark.cobol.utils.SparkUtils
object cobtest extends Serializable {
/** * * @param args */ def main(args: Array[String]): Unit = {
val v_copybook = args(1)
val v_data = args(0)
val v_fixedOrVariable = args(2)
val vSparkmaster = args(3)
val v_savePath = args(4)
println(v_copybook)
val v_recordLengthField = args(5)
val v_segmentField = args(6)
val v_segmentFilter = args(7)
val sparkConf: SparkConf = new SparkConf().setAppName("cobtest")
implicit val spark: SparkSession = SparkSession.builder.config(sparkConf).master(vSparkmaster).enableHiveSupport().getOrCreate()
spark.conf.set("spark.sql.codegen.wholeStage", false)
ProcessMainframeFile(v_copybook, v_data, v_fixedOrVariable,
vSparkmaster, v_savePath, v_recordLengthField,
v_segmentField, v_segmentFilter)
}
/*** * * @param v_copybook * @param v_data * @param v_fixedOrVariable * @param vSparkmaster * @param v_savePath * @param v_recordLengthField * @param v_segmentField * @param v_segmentField * @param spark */ def ProcessMainframeFile(v_copybook:String, v_data:String, v_fixedOrVariable:String, vSparkmaster:String, v_savePath:String, v_recordLengthField:String, v_segmentField:String, v_segmentFilter:String) (implicit spark:SparkSession):Unit = {
var cobolDataframe:DataFrame = null;
var cobolDataframeM:DataFrame = null;
val patha="/user/cobtest/TEST_A.txt"
val pathb="/user/cobtest/TEST_B.txt"
val pathc="/user/cobtest/TEST_C.txt"
val pathd="/user/cobtest/TEST_D.txt"
val pathe="/user/cobtest/TEST_E.txt"
import spark.implicits._
if (v_fixedOrVariable == "fixed") {
cobolDataframe = spark
.read
.format("cobol")
.option("copybook", v_copybook)
.option("schema_retention_policy", "collapse_root") //removes the root record header
.option("optimize_allocation", true)
.option("drop_group_fillers", "false")
.option("generate_record_id", false) // this adds the file id and record id
.option("improve_locality", false)
.load(v_data) //"/src/main/data/X71386C.txt")
}
else if (v_fixedOrVariable == "variable") { // 1 means variable length
cobolDataframe = spark
.read
.format("cobol")
.option("copybook", v_copybook)
.option("schema_retention_policy", "collapse_root") //removes the root record header
.option("optimize_allocation", true)
.option("drop_group_fillers", "false")
.option("generate_record_id", false) // this adds the file id and record id
.option("improve_locality", false)
.option("is_record_sequence", "true") // reader to use 4 byte record headers to extract records from a mainframe file
.load(v_data)
}
else if (v_fixedOrVariable == "variableBigEndian") {
cobolDataframe = spark
.read
.format("cobol")
.option("copybook", v_copybook)
.option("schema_retention_policy", "collapse_root") //removes the root record headerc
.option("optimize_allocation", true)
.option("drop_group_fillers", "false")
.option("generate_record_id", false) // this adds the file id and record id
.option("improve_locality", false)
.option("is_record_sequence", "true") // reader to use 4 byte record headers to extract records from a mainframe file
.option("is_rdw_big_endian", "true")
//.option("record_header_parser", "com.example.spark.codec.app.CustomRecordHeadersParser")// Custom record header parser class
.load(v_data)
}
else if (v_fixedOrVariable == "merge") {
cobolDataframe = spark
.read
.format("cobol")
.option("copybooks", v_copybook)
.option("schema_retention_policy", "collapse_root") //removes the root record headerc
.option("optimize_allocation", true)
.option("drop_group_fillers", "false")
.option("generate_record_id", false) // this adds the file id and record id
.option("improve_locality", false)
.option("is_record_sequence", "true") // reader to use 4 byte record headers to extract records from a mainframe file
.option("is_rdw_big_endian", "true")
//.option("record_header_parser", "com.example.spark.codec.app.CustomRecordHeadersParser")// Custom record header parser class
.load(v_data)
cobolDataframeM = spark
.read
.format("cobol")
.option("copybooks", s"$patha,$pathb,$pathc,$pathd,$pathe")
.option("schema_retention_policy", "collapse_root") //removes the root record headerc
.option("optimize_allocation", true)
.option("drop_group_fillers", "false")
.option("generate_record_id", false) // this adds the file id and record id
.option("improve_locality", false)
.option("is_record_sequence", "true") // reader to use 4 byte record headers to extract records from a mainframe file
.option("is_rdw_big_endian", "true")
//.option("record_header_parser", "com.example.spark.codec.app.CustomRecordHeadersParser")// Custom record header parser class
.load(v_data)
cobolDataframeM.printSchema()
cobolDataframeM.show
}
else if (v_fixedOrVariable == "recordLength") {
cobolDataframe = spark
.read
.format("cobol")
.option("copybook", v_copybook)
.option("schema_retention_policy", "collapse_root") //removes the root record headerc
.option("optimize_allocation", true)
.option("drop_group_fillers", "false")
.option("generate_record_id", false) // this adds the file id and record id
.option("record_length_field",v_recordLengthField)
.option("improve_locality", false)
.load(v_data)
}
else if (v_fixedOrVariable == "segment") {
cobolDataframe = spark
.read
.format("cobol")
.option("copybook", v_copybook)
.option("schema_retention_policy", "collapse_root") //removes the root record headerc
.option("optimize_allocation", true)
.option("drop_group_fillers", "false")
.option("generate_record_id", false) // this adds the file id and record id
.option("is_record_sequence", "true") // reader to use 4 byte record headers to extract records from a mainframe file
.option("is_rdw_big_endian", "true")
.option("segment_field", v_segmentField)
.option("segment_filter", v_segmentFilter)
.option("improve_locality", false)
.load(v_data)
}
val df = SparkUtils.flattenSchema(cobolDataframe)
df.printSchema()
df.show(20, false)
df.write.mode(SaveMode.Overwrite).format("ORC").save(v_savePath)
}
}
can you please respond for my issue. as per suggestion I have shared the code.
Hi, still cannot reproduce the issue, unfortunately. Could you please try:
- Add this profile to your POM file to enable snapshot versions
<profiles>
<profile>
<id>allow-snapshots</id>
<activation><activeByDefault>true</activeByDefault></activation>
<repositories>
<repository>
<id>snapshots-repo</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<releases><enabled>false</enabled></releases>
<snapshots><enabled>true</enabled></snapshots>
</repository>
</repositories>
</profile>
</profiles>
- Use the latest snapshot version of Cobrix
<dependency>
<groupId>za.co.absa.cobrix</groupId>
<artifactId>spark-cobol</artifactId>
<version>0.5.4-SNAPSHOT</version>
</dependency>
- Change your program to print/log contents of all variables (
v_datais the most important one). - Send all the output your program prints. The snapshot version contains additional logging that might be helpful to identify the issue.
Additional suggestion (not related to the particular issue). Please, remove these 2 options since they are advanced performance tuning features and it is the best for them to have default values:
.option("optimize_allocation", true)
.option("improve_locality", false)
Thank you. I will try your suggestions.
In cobrix master we have test cases for fixed length only for copybooks option. Is it possible to add variable length file for copybooks option. I am not getting where the issue is. I have removed the variable length options still its not working. In your test case you are testing with ascii file. Is it possible to use binary file in the test case.
CobolParametersValidator.scala file none of the values copyBookPathFileName, copyBookMultiPathFileNames are set. I tried to use as fixed length like below cobolDataframe = spark .read .format("cobol") .option("copybooks", v_copybook) .load(v_data) still getting the same error I am getting the error in below code falling into the case case (None, None, None) =>
(copyBookContents, copyBookPathFileName, copyBookMultiPathFileNames) match {
case (Some(contents), Some(fileName), _) =>
throw new IllegalStateException(s"Both '$PARAM_COPYBOOK_PATH' and '$PARAM_COPYBOOK_CONTENTS' options cannot be specified at the same time")
case (Some(contents), _, Some(filenames)) =>
throw new IllegalStateException(s"Both '$PARAM_MULTI_COPYBOOK_PATH' and '$PARAM_COPYBOOK_CONTENTS' options cannot be specified at the same time")
case (_, Some(filename), Some(filenames)) =>
throw new IllegalStateException(s"Both '$PARAM_COPYBOOK_PATH' and '$PARAM_MULTI_COPYBOOK_PATH' options cannot be specified at the same time")
case (Some(contents), None, None) =>
// This is fine
case (None, Some(fileName), None) => validatePath(fileName)
// This is fine
case (None, None, Some(fileNames)) =>
for(fileName <-fileNames.split(","))
validatePath(fileName)
case (None, None, None) =>
throw new IllegalStateException(s"Cannot define path to source files: missing parameter: '$PARAM_SOURCE_PATH'")
}