hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT]problem when inserting data to a non-partitioned table created by flink sql via spark sql cli

Open bithw1 opened this issue 1 year ago • 10 comments

I am using Hudi 0.15.0 and Flink 1.17.1, following are the steps to reproduce the problem:

From the flink-sql cli: do the following sql statements

CREATE CATALOG hudi_catalog WITH (
    'type' = 'hudi',
    'mode' = 'hms',
    'default-database' = 'default',
    'hive.conf.dir' = '/home/hadoop/software/hive-3.1.3/conf', 
    'table.external' = 'true'
);

create database if not exists hudi_catalog.`default`;

use hudi_catalog.`default`;

CREATE TABLE test_hudi_flink_mor_2 (
  a int PRIMARY KEY NOT ENFORCED,
  b int,
  c int
  
)
WITH (
  'connector' = 'hudi',
  'path' = '/tmp/test_hudi_flink_mor_2',
  'table.type' = 'MERGE_ON_READ',
  'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',  --- Using ComplexAvroKeyGenerator
  'hoodie.datasource.write.recordkey.field' = 'a',
  'write.precombine.key'='b',
  'hoodie.datasource.write.hive_style_partitioning' = 'true',
  'hive_sync.conf.dir'='/home/hadoop/software/hive-3.1.3/conf'
);


insert into test_hudi_flink_mor_2 values (1,1,1),(2,2,2);

So far so good, then I run insert into test_hudi_flink_mor_2 select 3,3, 3 on the spark-sql cli, an error occurred, key exception message is as follows:


org.apache.hudi.exception.HoodieException: Config conflict(key  current value   existing value):
KeyGenerator:   org.apache.hudi.keygen.ComplexAvroKeyGenerator  org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator
        at org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:229)
        at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:232)
        at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187)

When I run show create table test_hudi_flink_mor_2 on spark-sql cli, it shows that keygenerator is using ComplexAvroKeyGenerator,but when i look at the /tmp/test_hudi_flink_mor_2/.hoodie/hoodie.properties, hoodie.table.keygenerator.class=org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator, per the exception, it looks to me that there is a bug here...

bithw1 avatar Sep 26 '24 10:09 bithw1

btw, i tried a little more, when I create the hudi table as a partitioned table, then i am able to insert from spark sql cli, the problem should be related with non-partitioned table.

bithw1 avatar Sep 26 '24 10:09 bithw1

The non-partitioned key generator is right, did you specify the key generator on Spark side?

danny0405 avatar Sep 27 '24 01:09 danny0405

The non-partitioned key generator is right, did you specify the key generator on Spark side?

@danny0405 I don't think non-partitioned key generator is right here. When I am creating the hudi table using flink sql, I have explicitly specified the key generator as ComplexAvroKeyGenerator. But, it is saved in the hoodies.properties as hoodie.table.keygenerator.class=org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator

From the spark side, I did nothing but insert one record insert into test_hudi_flink_mor_2 select 3,3, 3,

Basically, what I am doing is to create an hudi table from flink sql and use it from spark sql

bithw1 avatar Sep 27 '24 01:09 bithw1

yeah, we do have some set up logic in HoodieTableFactory and HoodieHiveCatalog, can you dig a little bit why the non-partitioned key generator is set up regardless of the explicit set up? I guess it is located in HoodieTableFactory.

danny0405 avatar Sep 27 '24 01:09 danny0405

Hi @bithw1

I am able to replicate this issue from spark side as well.

CREATE DATABASE IF NOT EXISTS spark_catalog.`default`;
USE spark_catalog.`default`;
DROP TABLE IF EXISTS hudi_table;

CREATE TABLE hudi_table (a int, b int, c int) USING HUDI TBLPROPERTIES (type = 'mor', primaryKey = 'a', preCombineField = 'b', hoodie.datasource.write.keygenerator.class = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator') PARTITIONED BY (c);

Exception:

spark-sql (hudi)> insert into hudi_table values (1,1,1),(2,2,2);
06:27:36.949 [main] ERROR org.apache.spark.sql.hive.thriftserver.SparkSQLDriver - Failed in [insert into hudi_table values (1,1,1),(2,2,2)]
org.apache.hudi.exception.HoodieException: Config conflict(key	current value	existing value):
KeyGenerator:	org.apache.hudi.keygen.ComplexAvroKeyGenerator	org.apache.hudi.keygen.SimpleKeyGenerator
	at org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:229) ~[hudi-spark3.5-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:232) ~[hudi-spark3.5-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187) ~[hudi-spark3.5-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125) ~[hudi-spark3.5-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:100) ~[hudi-spark3.5-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:61) ~[hudi-spark3.5-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) ~[spark-sql_2.12-3.5.1.jar:3.5.1]
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) ~[spark-sql_2.12-3.5.1.jar:3.5.1]
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) ~[spark-sql_2.12-3.5.1.jar:3.5.1]

rangareddy avatar Oct 08 '24 06:10 rangareddy

After checking the .hoodie/hoodie.properties file, we found that the hoodie.table.keygenerator.class property is set to org.apache.hudi.keygen.SimpleKeyGenerator by default, and it cannot be changed at runtime. To resolve this issue, we need to update the keygenerator class to org.apache.hudi.keygen.ComplexAvroKeyGenerator.

hoodie.table.keygenerator.class=org.apache.hudi.keygen.ComplexAvroKeyGenerator
DROP TABLE IF EXISTS hudi_table;
CREATE TABLE hudi_table (a int, b int, c int) USING HUDI TBLPROPERTIES (type = 'mor', primaryKey = 'a', preCombineField = 'b', hoodie.datasource.write.keygenerator.class = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator', hoodie.table.keygenerator.class='org.apache.hudi.keygen.ComplexAvroKeyGenerator') PARTITIONED BY (c);
insert into hudi_table values (1,1,1),(2,2,2);

select * from hudi_table;
20241008063231261	20241008063231261_1_0	2	c=2	d685c02a-c97f-4147-a5b7-acb476e3e6c6-0_1-64-77_20241008063231261.parquet	2	2	2
20241008063231261	20241008063231261_0_0	1	c=1	85b3ae97-01a4-4b6c-840a-00a5f783f410-0_0-64-76_20241008063231261.parquet	1	1	1

rangareddy avatar Oct 08 '24 06:10 rangareddy

Thanks @rangareddy for the clarification and clue. Per your guide, I modify my test case by adding hoodie.table.keygenerator.class=org.apache.hudi.keygen.ComplexAvroKeyGenerator. to the table creation ddl (run by flink sql), but when I looked at the hoodie.properties, flink doesn't honor this configuration at all, it is still using hoodie.table.keygenerator.class=org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator

This problem is one of the evidences that hudi configurations are so confusing and redundant. Also,it looks to me that spark and flink are using totally different configurations for almost the same thing, don't they belong to the same hudi project?

bithw1 avatar Oct 08 '24 07:10 bithw1

After checking the .hoodie/hoodie.properties file, we found that the hoodie.table.keygenerator.class property is set to org.apache.hudi.keygen.SimpleKeyGenerator by default, and it cannot be changed at runtime. To resolve this issue, we need to update the keygenerator class to org.apache.hudi.keygen.ComplexAvroKeyGenerator.

hoodie.table.keygenerator.class=org.apache.hudi.keygen.ComplexAvroKeyGenerator
DROP TABLE IF EXISTS hudi_table;
CREATE TABLE hudi_table (a int, b int, c int) USING HUDI TBLPROPERTIES (type = 'mor', primaryKey = 'a', preCombineField = 'b', hoodie.datasource.write.keygenerator.class = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator', hoodie.table.keygenerator.class='org.apache.hudi.keygen.ComplexAvroKeyGenerator') PARTITIONED BY (c);
insert into hudi_table values (1,1,1),(2,2,2);

select * from hudi_table;
20241008063231261	20241008063231261_1_0	2	c=2	d685c02a-c97f-4147-a5b7-acb476e3e6c6-0_1-64-77_20241008063231261.parquet	2	2	2
20241008063231261	20241008063231261_0_0	1	c=1	85b3ae97-01a4-4b6c-840a-00a5f783f410-0_0-64-76_20241008063231261.parquet	1	1	1

I think there is still bug there. I am not sure whether hoodie.datasource.write.keygenerator.class and hoodie.table.keygenerator.class configures the same thing.. if hoodie.datasource.write.keygenerator.class has been specified explicitly by the end user, why hoodie.table.keygenerator.class (saved in the hoodie.properties) doesn't honor this user configured property(hoodie.datasource.write.keygenerator.class)

bithw1 avatar Oct 08 '24 07:10 bithw1

Hi @bithw1

There is an existing open issue HUDI-5262 and a corresponding pull request PR #7394 that has not been merged yet.

Till PR merges, you need to use hoodie.table.keygenerator.class to set the keygenerator when creating a table in spark-sql

https://github.com/apache/hudi/issues/7351#issuecomment-1339756510

Please let me know if anything is required.

rangareddy avatar Oct 08 '24 11:10 rangareddy

@jonvex Any insights on this ?

ad1happy2go avatar Oct 14 '24 10:10 ad1happy2go