kafka-connect-hdfs icon indicating copy to clipboard operation
kafka-connect-hdfs copied to clipboard

Data is Hive table shows new columns as NULL after schema evolution with a default value specified when the format is Parquet

Open swathimocharla opened this issue 5 years ago • 12 comments
trafficstars

hi there, I have avro data that is being written to HDFS with hive integration turned on with "format.class":"io.confluent.connect.hdfs.parquet.ParquetFormat" and "schema.compatibility": "BACKWARD". Add a new column to the schema with a default value and send some data. Ex. initial schema: [{"name":"a1","type":"string"}] new schema: [{"name":"a1","type":"string"},{"name": "a2", "type": "string", "default": "a2defval"}] There are 2 issues here:

  1. Send in data for both columns a1, a2. Ex. {"a1":"val2","a2":"a2val2"} Hive shows: val2 NULL Restart the connector and then hive shows val2 a2val2
  2. The older data that was pushed in before schema evolution is not updated with the default value for a2 (the new column) Hive still reflects: val1 NULL

Is this expected? I've done the same tests with the format class Avro, and didn't face either of these issues.

parquet hive table description doesn't show the default values, StorageDescriptor(cols:[FieldSchema(name:a1, type:string, comment:null), FieldSchema(name:a2, type:string, comment:null), FieldSchema(name:partition, type:string, comment:)]

but the schema in the parquet file footer in hdfs shows the default value: parquet.avro.schema▒{"type":"record","name":"myrecord","fields":[{"name":"a1","type":"string"},{"name":"a2","type":{"type":"string","connect.default":"a2defval"},"default":"a2defval"}]

swathimocharla avatar Jul 08 '20 14:07 swathimocharla

@ncliang could you take a look?

swathimocharla avatar Jul 17 '20 07:07 swathimocharla

@kkonstantine , @rhauch , can you help?

swathimocharla avatar Jul 20 '20 09:07 swathimocharla

Update: changed the hiveMetaStore.alterTable(table); to hiveMetaStore.alterTable(table, true); in ParquetHiveUtil.java to allow cascade option in the alter table. After this change the new data after schema evolution looks fine in the hive tables.

hive> select * from test;
OK
// schema with a single column "f1"
value1  NULL    	NULL    0
value2  NULL    	NULL    0
value3  NULL    	NULL    0
// schema evolved with "f2" (String, default value "defvalue"), "f3" (int, default value: 9)
val2    a2val2  	13      0
val3    a2val3  	0       0
val4    a2val4  	4       0
// push data with old schema again "f1"
value4  defvalue        9       0
value5  defvalue        9       0
value6  defvalue        9       0
Time taken: 0.156 seconds, Fetched: 9 row(s)

@kkonstantine can you please share your opinion.

swathimocharla avatar Jan 25 '21 08:01 swathimocharla

@dosvath , any thoughts?

swathimocharla avatar Feb 08 '21 11:02 swathimocharla

Hi @swathimocharla thanks for raising the issue, on a first pass it sounds like the handling for parquet format is different from the avro format. hiveMetaStore.alterTable(table); is used in the AvroHiveUtil as well and seems to be working fine, the parquet behavior will need some more investigation on our end, I will be testing the behavior and follow up here.

dosvath avatar Feb 08 '21 17:02 dosvath

hi @dosvath , yes, avro works as expected and the issue is only with parquet. Thank you for looking into this issue, will wait to hear back from you.

swathimocharla avatar Feb 09 '21 05:02 swathimocharla

Hi @swathimocharla could you share which hadoop and hive version you are using?

dosvath avatar Feb 09 '21 22:02 dosvath

hi @dosvath , we are using CDH 6.3.2 Hadoop 3.0.0 Hive 2.1.1

swathimocharla avatar Feb 15 '21 05:02 swathimocharla

Hi @swathimocharla this connector is compatible with Hadoop 2.x and Hive 2.x, could you try our HDFS3 connector and see if it resolves your issue? https://docs.confluent.io/kafka-connect-hdfs3-sink/current/index.html

dosvath avatar Feb 17 '21 23:02 dosvath

hi @dosvath, we can try this, but we are in production on the current version and this clearly seems to be a bug. It would help finding a resolution on the current version.

swathimocharla avatar Feb 19 '21 08:02 swathimocharla

@dosvath , what is your opinion on the fix in the hiveMetaStore.alterTable(table); to hiveMetaStore.alterTable(table, true); in ParquetHiveUtil.java to allow cascade option in the alter table.

swathimocharla avatar Feb 19 '21 08:02 swathimocharla

Hi @swathimocharla I believe the difference in behavior between the AVRO and Parquet formats is that we apply the avro schema here before altering the table. I think it would be okay to use the workaround you specified for your use-case as it seems the cascade operation is standard when using partitioned tables.

dosvath avatar Feb 26 '21 01:02 dosvath