cobrix icon indicating copy to clipboard operation
cobrix copied to clipboard

merge copybooks issue

Open geethab123 opened this issue 6 years ago • 12 comments

I was trying to use merge copybooks feature. we have single copy book several segments that starts with 01 and there is no common fields between segments. Here I have two issues. a. I divided my single copy book to 5 copybooks. I have used merge copybook feature .option("copybook", s"/user/cobtest/test_A.txt,/user/cobtest/test_B.txt,/user/cobtest/test_C.txt,/user/cobtest/test_D.txt,/user/cobtest/test_E.txt") and these files are present in the hdfs at this path. I am getting the error message that the copybook /user/cobtest/test_A.txt,/user/cobtest/test_B.txt,/user/cobtest/test_C.txt,/user/cobtest/test_D.txt,/user/cobtest/test_E.txt does not exist.

b. I have several segments that starts with 01 in a single copy book. May be 30 to 40. Does cobrix has any option to divide them into different copybooks automated.

geethab123 avatar Jul 04 '19 23:07 geethab123

For your issue a all you need to do is to replace 'copybook' option with 'copybooks'.

Not sure about your requirements on b. You can take a multisegment copybook and filter only segments you are interested in using segment_filter option (it supports specifying a list of segment ids). After that you can use Spark's .select(...) to project only specific segment fields.

The above will provide you a splitting of the copybook. But for getting more performance you can also use redefine-segment-id-map option (described in README).

yruslan avatar Jul 08 '19 14:07 yruslan

Thank you. I will try your comments. Can you please provide me any suggestions for my question b. I have several segments that starts with 01 in a single copy book. May be 30 to 40. Does cobrix has any option to divide them into different copybooks automated.

geethab123 avatar Jul 08 '19 14:07 geethab123

Please elaborate your requirements on your question b, by providing an example, perhaps. I have specified my suggestions in the previous comment. To my current knowledge on the issue that's all I can advice.

yruslan avatar Jul 08 '19 14:07 yruslan

I have tried with option copybooks for using merge copybooks option. below is my code cobolDataframe = spark .read .format("cobol") .option("copybooks", v_copybook) .option("schema_retention_policy", "collapse_root") //removes the root record header .option("optimize_allocation", true) .option("drop_group_fillers", "false") .option("generate_record_id", false) // this adds the file id and record id .option("improve_locality", false) .option("is_record_sequence", "true") // reader to use 4 byte record headers to extract records from a mainframe file .option("is_rdw_big_endian", "true") .load(v_data)

  cobolDataframeM = spark
    .read
    .format("cobol")
    .option("copybooks", s"$patha,$pathb,$pathc,$pathd,$pathe")
    .option("schema_retention_policy", "collapse_root") //removes the root record headerc
    .option("optimize_allocation", true)
    .option("drop_group_fillers", "false")
    .option("generate_record_id", false) // this adds the file id and record id
    .option("improve_locality", false)
    .option("is_record_sequence", "true") // reader to use 4 byte record headers to extract records from a mainframe file
    .option("is_rdw_big_endian", "true")
    .load(v_data)

All variables are defined. I am getting the below error. Please help me in resolving the issue Exception in thread "main" java.lang.IllegalStateException: Cannot define path to source files: missing parameter: 'path' at za.co.absa.cobrix.spark.cobol.source.parameters.CobolParametersValidator$.validateOrThrow(CobolParametersValidator.scala:84) at za.co.absa.cobrix.spark.cobol.source.DefaultSource.createRelation(DefaultSource.scala:52) at za.co.absa.cobrix.spark.cobol.source.DefaultSource.createRelation(DefaultSource.scala:48) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:330) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:135) at com.cobrix.cobtest$.ProcessMainframeFile(cobtest.scala:140)

geethab123 avatar Jul 09 '19 15:07 geethab123

The error is not related to usage of the "copybooks". Cobrix does not receive a path to the actual files for some reason. Could you please paste more code. I wonder how v_data is defined.

yruslan avatar Jul 10 '19 07:07 yruslan

v_copybook is the single copybook that has many levels at 01. copybook and data files are in hdfs. hdfs path is given as v_copybook and v_data. when I check the cobrix master code this path error is coming for the value PARAM_COPYBOOK_PATH is not set. I am thinking that because this value is not set so I am getting the path error. my copybook hdfs path is "/user/cobtest/UIEWCDS.txt" my data hdfs path is "/user/cobtest/BREQIUT" My copybook is below 01 TEST-PYMT-WHSE-RECORD-33.
03 TEST-KEY-33.
05 TEST-ACCT-33 PIC 9(18).
05 TEST-CHECK-ID-33.
10 TEST-IND-33 PIC X(3).
10 TEST-PYMT-ID-33 PIC X(10).
05 TEST-DIRECTION-33 PIC X(1).
03 TEST-DATA-GROUP-33.
05 TEST-DATA-GROUP-N-33 PIC S9(08) COMP.
03 TEST-MAIN-SEGMENT.
05 TEST-PROCESSING-DETAILS.
10 TEST-TYPE PIC X(3).
10 TEST-EXPEDITED PIC X(1).
10 TEST-MANUAL-ACCEPT PIC X(1).
01 TEST-PYMT-WHSE-RECORD-34.
03 TEST-KEY-34.
05 TEST-ACCT-34 PIC 9(18).
05 TEST-CHECK-ID-34.
10 TEST-IND-ORG-34 PIC X(3).
10 TEST-PYMT-ID-34 PIC X(10).
05 TEST-DIRECTION-34 PIC X(1).
03 TEST-DATA-GROUP-34.
05 TEST-DATA-GROUP-N-34 PIC S9(08) COMP.
01 TEST-PYMT-WHSE-RECORD-35.
03 TEST-KEY-35.
05 TEST-ACCT-35 PIC 9(18).
05 TEST-CHECK-ID-35.
10 TEST-IND-ORG-35 PIC X(3).
10 TEST-PYMT-ID-35 PIC X(10).
05 TEST-DIRECTION-35 PIC X(1).

geethab123 avatar Jul 10 '19 16:07 geethab123

The 'path' variable is set automatically when you invoke load() so you don't need to set it explicitly. I'm unable to reproduce the error. I see that kind of error for the first time. Could you please paste the full code (including variable definitions) that you use - will try to reproduce the issue.

yruslan avatar Jul 10 '19 17:07 yruslan

below is my code package com.cobrix

import org.apache.spark.{SparkConf, SparkContext} //import org.apache.spark.sql.{SaveMode, SparkSession} // import za.co.absa.cobrix.spark.cobol.source import za.co.absa.cobrix.spark.cobol._ import za.co.absa.cobrix.cobol.parser.CopybookParser import za.co.absa.cobrix.spark.cobol.schema.{CobolSchema, SchemaRetentionPolicy} import java.io.File import java.io.PrintWriter

import org.apache.spark.SparkConf import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Column, SaveMode, SparkSession, DataFrame} import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.ArrayType

import za.co.absa.cobrix.spark.cobol.utils.SparkUtils

object cobtest extends Serializable {

/** * * @param args */ def main(args: Array[String]): Unit = {

val v_copybook = args(1)
val v_data = args(0)
val v_fixedOrVariable = args(2)
val vSparkmaster = args(3)
val v_savePath = args(4)
println(v_copybook)
val v_recordLengthField = args(5)
val v_segmentField = args(6)
val v_segmentFilter = args(7)
val sparkConf: SparkConf = new SparkConf().setAppName("cobtest")
implicit val spark: SparkSession = SparkSession.builder.config(sparkConf).master(vSparkmaster).enableHiveSupport().getOrCreate()
spark.conf.set("spark.sql.codegen.wholeStage", false)

ProcessMainframeFile(v_copybook, v_data, v_fixedOrVariable,
  vSparkmaster, v_savePath, v_recordLengthField,
  v_segmentField, v_segmentFilter)

}

/*** * * @param v_copybook * @param v_data * @param v_fixedOrVariable * @param vSparkmaster * @param v_savePath * @param v_recordLengthField * @param v_segmentField * @param v_segmentField * @param spark */ def ProcessMainframeFile(v_copybook:String, v_data:String, v_fixedOrVariable:String, vSparkmaster:String, v_savePath:String, v_recordLengthField:String, v_segmentField:String, v_segmentFilter:String) (implicit spark:SparkSession):Unit = {

var cobolDataframe:DataFrame = null;
var cobolDataframeM:DataFrame = null;
val patha="/user/cobtest/TEST_A.txt"
val pathb="/user/cobtest/TEST_B.txt"
val pathc="/user/cobtest/TEST_C.txt"
val pathd="/user/cobtest/TEST_D.txt"
val pathe="/user/cobtest/TEST_E.txt"

import spark.implicits._
if (v_fixedOrVariable == "fixed") {
  cobolDataframe = spark
    .read
    .format("cobol")
    .option("copybook", v_copybook)
    .option("schema_retention_policy", "collapse_root") //removes the root record header
    .option("optimize_allocation", true)
    .option("drop_group_fillers", "false")
    .option("generate_record_id", false) // this adds the file id and record id
    .option("improve_locality", false)
    .load(v_data) //"/src/main/data/X71386C.txt")
}
else if (v_fixedOrVariable == "variable") { // 1 means variable length
  cobolDataframe = spark
    .read
    .format("cobol")
    .option("copybook", v_copybook)
    .option("schema_retention_policy", "collapse_root") //removes the root record header
    .option("optimize_allocation", true)
    .option("drop_group_fillers", "false")
    .option("generate_record_id", false) // this adds the file id and record id
    .option("improve_locality", false)
    .option("is_record_sequence", "true") // reader to use 4 byte record headers to extract records from a mainframe file
    .load(v_data)
}
else if (v_fixedOrVariable == "variableBigEndian") {
  cobolDataframe = spark
    .read
    .format("cobol")
    .option("copybook", v_copybook)
    .option("schema_retention_policy", "collapse_root") //removes the root record headerc
    .option("optimize_allocation", true)
    .option("drop_group_fillers", "false")
    .option("generate_record_id", false) // this adds the file id and record id
    .option("improve_locality", false)
    .option("is_record_sequence", "true") // reader to use 4 byte record headers to extract records from a mainframe file
    .option("is_rdw_big_endian", "true")
    //.option("record_header_parser", "com.example.spark.codec.app.CustomRecordHeadersParser")// Custom record header parser class
    .load(v_data)
}
else if (v_fixedOrVariable == "merge") {

  cobolDataframe = spark
    .read
    .format("cobol")
    .option("copybooks", v_copybook)
    .option("schema_retention_policy", "collapse_root") //removes the root record headerc
    .option("optimize_allocation", true)
    .option("drop_group_fillers", "false")
    .option("generate_record_id", false) // this adds the file id and record id
    .option("improve_locality", false)
    .option("is_record_sequence", "true") // reader to use 4 byte record headers to extract records from a mainframe file
    .option("is_rdw_big_endian", "true")
    //.option("record_header_parser", "com.example.spark.codec.app.CustomRecordHeadersParser")// Custom record header parser class
    .load(v_data)

  cobolDataframeM = spark
    .read
    .format("cobol")
    .option("copybooks", s"$patha,$pathb,$pathc,$pathd,$pathe")
    .option("schema_retention_policy", "collapse_root") //removes the root record headerc
    .option("optimize_allocation", true)
    .option("drop_group_fillers", "false")
    .option("generate_record_id", false) // this adds the file id and record id
    .option("improve_locality", false)
    .option("is_record_sequence", "true") // reader to use 4 byte record headers to extract records from a mainframe file
    .option("is_rdw_big_endian", "true")
    //.option("record_header_parser", "com.example.spark.codec.app.CustomRecordHeadersParser")// Custom record header parser class
    .load(v_data)
  cobolDataframeM.printSchema()
  cobolDataframeM.show
}
else if (v_fixedOrVariable == "recordLength") {
  cobolDataframe = spark
    .read
    .format("cobol")
    .option("copybook", v_copybook)
    .option("schema_retention_policy", "collapse_root") //removes the root record headerc
    .option("optimize_allocation", true)
    .option("drop_group_fillers", "false")
    .option("generate_record_id", false) // this adds the file id and record id
    .option("record_length_field",v_recordLengthField)
    .option("improve_locality", false)
    .load(v_data)
}
else if (v_fixedOrVariable == "segment") {
  cobolDataframe = spark
    .read
    .format("cobol")
    .option("copybook", v_copybook)
    .option("schema_retention_policy", "collapse_root") //removes the root record headerc
    .option("optimize_allocation", true)
    .option("drop_group_fillers", "false")
    .option("generate_record_id", false) // this adds the file id and record id
    .option("is_record_sequence", "true") // reader to use 4 byte record headers to extract records from a mainframe file
    .option("is_rdw_big_endian", "true")
    .option("segment_field", v_segmentField)
    .option("segment_filter", v_segmentFilter)
    .option("improve_locality", false)
    .load(v_data)
}

val df = SparkUtils.flattenSchema(cobolDataframe)
df.printSchema()
df.show(20, false)
df.write.mode(SaveMode.Overwrite).format("ORC").save(v_savePath)

}

}

geethab123 avatar Jul 10 '19 19:07 geethab123

can you please respond for my issue. as per suggestion I have shared the code.

geethab123 avatar Jul 17 '19 04:07 geethab123

Hi, still cannot reproduce the issue, unfortunately. Could you please try:

  1. Add this profile to your POM file to enable snapshot versions
<profiles>
  <profile>
     <id>allow-snapshots</id>
        <activation><activeByDefault>true</activeByDefault></activation>
     <repositories>
       <repository>
         <id>snapshots-repo</id>
         <url>https://oss.sonatype.org/content/repositories/snapshots</url>
         <releases><enabled>false</enabled></releases>
         <snapshots><enabled>true</enabled></snapshots>
       </repository>
     </repositories>
   </profile>
</profiles>
  1. Use the latest snapshot version of Cobrix
        <dependency>
            <groupId>za.co.absa.cobrix</groupId>
            <artifactId>spark-cobol</artifactId>
            <version>0.5.4-SNAPSHOT</version>
        </dependency>
  1. Change your program to print/log contents of all variables (v_data is the most important one).
  2. Send all the output your program prints. The snapshot version contains additional logging that might be helpful to identify the issue.

Additional suggestion (not related to the particular issue). Please, remove these 2 options since they are advanced performance tuning features and it is the best for them to have default values:

    .option("optimize_allocation", true)
    .option("improve_locality", false)

yruslan avatar Jul 17 '19 06:07 yruslan

Thank you. I will try your suggestions.

geethab123 avatar Jul 17 '19 14:07 geethab123

In cobrix master we have test cases for fixed length only for copybooks option. Is it possible to add variable length file for copybooks option. I am not getting where the issue is. I have removed the variable length options still its not working. In your test case you are testing with ascii file. Is it possible to use binary file in the test case.

CobolParametersValidator.scala file none of the values copyBookPathFileName, copyBookMultiPathFileNames are set. I tried to use as fixed length like below cobolDataframe = spark .read .format("cobol") .option("copybooks", v_copybook) .load(v_data) still getting the same error I am getting the error in below code falling into the case case (None, None, None) =>

(copyBookContents, copyBookPathFileName, copyBookMultiPathFileNames) match {
  case (Some(contents), Some(fileName), _) =>
    throw new IllegalStateException(s"Both '$PARAM_COPYBOOK_PATH' and '$PARAM_COPYBOOK_CONTENTS' options cannot be specified at the same time")
  case (Some(contents), _, Some(filenames)) =>
    throw new IllegalStateException(s"Both '$PARAM_MULTI_COPYBOOK_PATH' and '$PARAM_COPYBOOK_CONTENTS' options cannot be specified at the same time")
  case (_, Some(filename), Some(filenames)) =>
    throw new IllegalStateException(s"Both '$PARAM_COPYBOOK_PATH' and '$PARAM_MULTI_COPYBOOK_PATH' options cannot be specified at the same time")
  case (Some(contents), None, None) =>
  // This is fine
  case (None, Some(fileName), None) => validatePath(fileName)
  // This is fine
  case (None, None, Some(fileNames)) =>
    for(fileName <-fileNames.split(","))
      validatePath(fileName)
  case (None, None, None) =>
    throw new IllegalStateException(s"Cannot define path to source files: missing parameter: '$PARAM_SOURCE_PATH'")
}

geethab123 avatar Jul 18 '19 15:07 geethab123