kafka-connect-hdfs icon indicating copy to clipboard operation
kafka-connect-hdfs copied to clipboard

Enable hive integration (table creation) when data are already present on HDFS

Open RossierFl opened this issue 6 years ago • 12 comments

Hi all,

I would like to enable the Hive integration when there is already some data that have been dumped to HDFS. When there is no data present, the creation of the table and the update of the schema are done in TopicPartitionWriter. https://github.com/confluentinc/kafka-connect-hdfs/blob/d8f98e3cca7509d80d40982f6a962b90f183a6c3/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java#L354-L360

But if there is some data already present on HDFS, currentSchema won't be null and this condition will be ignored. Resulting in a error in the connector :

[2019-02-01 15:26:51,949] ERROR Adding Hive partition threw unexpected error (io.confluent.connect.hdfs.TopicPartitionWriter:819)
io.confluent.connect.storage.errors.HiveMetaStoreException: Invalid partition for databasenamse.topicname: time=event/bucket=hourly/date=2019-02-01/hour=15
	at io.confluent.connect.storage.hive.HiveMetaStore$1.call(HiveMetaStore.java:123)
	at io.confluent.connect.storage.hive.HiveMetaStore$1.call(HiveMetaStore.java:108)
	at io.confluent.connect.storage.hive.HiveMetaStore.doAction(HiveMetaStore.java:98)
	at io.confluent.connect.storage.hive.HiveMetaStore.addPartition(HiveMetaStore.java:133)
	at io.confluent.connect.hdfs.TopicPartitionWriter$3.call(TopicPartitionWriter.java:817)
	at io.confluent.connect.hdfs.TopicPartitionWriter$3.call(TopicPartitionWriter.java:813)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: InvalidObjectException(message:databasenamse.topicname table not found)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$append_partition_by_name_with_environment_context_result$append_partition_by_name_with_environment_context_resultStandardScheme.read(ThriftHiveMetastore.java:51619)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$append_partition_by_name_with_environment_context_result$append_partition_by_name_with_environment_context_resultStandardScheme.read(ThriftHiveMetastore.java:51596)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$append_partition_by_name_with_environment_context_result.read(ThriftHiveMetastore.java:51519)
	at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_append_partition_by_name_with_environment_context(ThriftHiveMetastore.java:1667)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.append_partition_by_name_with_environment_context(ThriftHiveMetastore.java:1651)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.appendPartition(HiveMetaStoreClient.java:607)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.appendPartition(HiveMetaStoreClient.java:601)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:152)
	at com.sun.proxy.$Proxy53.appendPartition(Unknown Source)
	at io.confluent.connect.storage.hive.HiveMetaStore$1.call(HiveMetaStore.java:115)
	... 9 more

This behavior have been returned here : https://github.com/confluentinc/kafka-connect-hdfs/issues/272 and a PR have been opened to try to resolve this : https://github.com/confluentinc/kafka-connect-hdfs/pull/403 but the creator reported some problems in the long term (perhaps because his condition is in a loop and at every rebalance there is a bunch on table creation pending)

But then there is a code in DataWriter that actually try to sync with Hive https://github.com/confluentinc/kafka-connect-hdfs/blob/d8f98e3cca7509d80d40982f6a962b90f183a6c3/src/main/java/io/confluent/connect/hdfs/DataWriter.java#L382

And this one is called in HdfsSinkTask.start() so this method should normally synchronise with Hive, create the table, sync the schema but this is not the case. As described here and https://github.com/confluentinc/kafka-connect-hdfs/issues/272. When this HdfsSinkTask.start() is called, the variable context.assigment() does not contains the current assignment of this task, it's actually an empty set. This assignment is only available when HdfsSinkTask.open() is called and not before https://github.com/confluentinc/kafka-connect-hdfs/blob/d8f98e3cca7509d80d40982f6a962b90f183a6c3/src/main/java/io/confluent/connect/hdfs/DataWriter.java#L421

So the following line actually set assigment to an empty set : https://github.com/confluentinc/kafka-connect-hdfs/blob/d8f98e3cca7509d80d40982f6a962b90f183a6c3/src/main/java/io/confluent/connect/hdfs/HdfsSinkTask.java#L52

And the result is that the two following instructions are actually never used because assignment is empty. https://github.com/confluentinc/kafka-connect-hdfs/blob/d8f98e3cca7509d80d40982f6a962b90f183a6c3/src/main/java/io/confluent/connect/hdfs/HdfsSinkTask.java#L83-L86

For the recover line, that's not a problem because there is the same instruction in open(). And the problem have surely already been addressed. But that's not the case for syncWithHive() that is actually unused as it should be.

One way to perhaps explain that this problem have not been spotted with unit test is that the variable context, used almost everywhere in the tests, contains the assignment at all time. So there is no way to detect this behavior without forcing that assignment to be null during start().

So my proposal is really simple, I would propose to delete the syncWithHive part of HdfsSinkTask and place it in DataWriter.open(). It actually run fine and create the table if needed. I let it run for a couple of times (more that 4 days) with always some datas to dump without problem. I simulate a multiple rebalances by changing the number of tasks and everything looks fine.

Can someone have a look and start the discussion about this ? Thanks.

RossierFl avatar Feb 11 '19 12:02 RossierFl

It looks like @RossierFl hasn't signed our Contributor License Agreement, yet.

The purpose of a CLA is to ensure that the guardian of a project's outputs has the necessary ownership or grants of rights over all contributions to allow them to distribute under the chosen licence. Wikipedia

You can read and sign our full Contributor License Agreement here.

Once you've signed reply with [clabot:check] to prove it.

Appreciation of efforts,

clabot

ghost avatar Feb 11 '19 12:02 ghost

It looks like @RossierFl hasn't signed our Contributor License Agreement, yet.

The purpose of a CLA is to ensure that the guardian of a project's outputs has the necessary ownership or grants of rights over all contributions to allow them to distribute under the chosen licence. Wikipedia

You can read and sign our full Contributor License Agreement here.

Once you've signed reply with [clabot:check] to prove it.

Appreciation of efforts,

clabot

ghost avatar Feb 11 '19 12:02 ghost

[clabot:check]

RossierFl avatar Feb 11 '19 12:02 RossierFl

@confluentinc It looks like @RossierFl just signed our Contributor License Agreement. :+1:

Always at your service,

clabot

ghost avatar Feb 11 '19 12:02 ghost

Hi there,

Could I have a review please ?

BR

RossierFl avatar Mar 18 '19 10:03 RossierFl

Hi! Can you please merge this PR? This is super useful! @miguno

nooshin-mirzadeh avatar Sep 26 '19 15:09 nooshin-mirzadeh

This can be merged then ? @kirmandi ?

RossierFl avatar Oct 29 '19 12:10 RossierFl

as i have no write permission, nope not by me, but in general, yes I agree that this can be merged

kirmandi avatar Oct 30 '19 07:10 kirmandi

This PR fixes a real issue, users need it, I really don't get why it takes more than 8 months old to get any feedback or merge it.

fabiohecht avatar Oct 30 '19 08:10 fabiohecht

@ncliang Can you have a look on this PR ? Can someone reference someone with write permission on this PR ?

RossierFl avatar Nov 04 '19 10:11 RossierFl

@RossierFl sorry, I was not aware of this PR. Thanks for bringing this to my attention.

Could you add some tests using the HiveTestBase class? See for example, tests in AvroHiveUtilTest.

ncliang avatar Nov 04 '19 18:11 ncliang

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.


Florian Rossier, INI-DNA-DL seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

cla-assistant[bot] avatar Aug 27 '23 12:08 cla-assistant[bot]