kafka-connect-hdfs
kafka-connect-hdfs copied to clipboard
Enable hive integration (table creation) when data are already present on HDFS
Hi all,
I would like to enable the Hive integration when there is already some data that have been dumped to HDFS. When there is no data present, the creation of the table and the update of the schema are done in TopicPartitionWriter. https://github.com/confluentinc/kafka-connect-hdfs/blob/d8f98e3cca7509d80d40982f6a962b90f183a6c3/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java#L354-L360
But if there is some data already present on HDFS, currentSchema
won't be null and this condition will be ignored. Resulting in a error in the connector :
[2019-02-01 15:26:51,949] ERROR Adding Hive partition threw unexpected error (io.confluent.connect.hdfs.TopicPartitionWriter:819)
io.confluent.connect.storage.errors.HiveMetaStoreException: Invalid partition for databasenamse.topicname: time=event/bucket=hourly/date=2019-02-01/hour=15
at io.confluent.connect.storage.hive.HiveMetaStore$1.call(HiveMetaStore.java:123)
at io.confluent.connect.storage.hive.HiveMetaStore$1.call(HiveMetaStore.java:108)
at io.confluent.connect.storage.hive.HiveMetaStore.doAction(HiveMetaStore.java:98)
at io.confluent.connect.storage.hive.HiveMetaStore.addPartition(HiveMetaStore.java:133)
at io.confluent.connect.hdfs.TopicPartitionWriter$3.call(TopicPartitionWriter.java:817)
at io.confluent.connect.hdfs.TopicPartitionWriter$3.call(TopicPartitionWriter.java:813)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: InvalidObjectException(message:databasenamse.topicname table not found)
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$append_partition_by_name_with_environment_context_result$append_partition_by_name_with_environment_context_resultStandardScheme.read(ThriftHiveMetastore.java:51619)
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$append_partition_by_name_with_environment_context_result$append_partition_by_name_with_environment_context_resultStandardScheme.read(ThriftHiveMetastore.java:51596)
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$append_partition_by_name_with_environment_context_result.read(ThriftHiveMetastore.java:51519)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_append_partition_by_name_with_environment_context(ThriftHiveMetastore.java:1667)
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.append_partition_by_name_with_environment_context(ThriftHiveMetastore.java:1651)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.appendPartition(HiveMetaStoreClient.java:607)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.appendPartition(HiveMetaStoreClient.java:601)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:152)
at com.sun.proxy.$Proxy53.appendPartition(Unknown Source)
at io.confluent.connect.storage.hive.HiveMetaStore$1.call(HiveMetaStore.java:115)
... 9 more
This behavior have been returned here : https://github.com/confluentinc/kafka-connect-hdfs/issues/272 and a PR have been opened to try to resolve this : https://github.com/confluentinc/kafka-connect-hdfs/pull/403 but the creator reported some problems in the long term (perhaps because his condition is in a loop and at every rebalance there is a bunch on table creation pending)
But then there is a code in DataWriter that actually try to sync with Hive https://github.com/confluentinc/kafka-connect-hdfs/blob/d8f98e3cca7509d80d40982f6a962b90f183a6c3/src/main/java/io/confluent/connect/hdfs/DataWriter.java#L382
And this one is called in HdfsSinkTask.start()
so this method should normally synchronise with Hive, create the table, sync the schema but this is not the case.
As described here and https://github.com/confluentinc/kafka-connect-hdfs/issues/272. When this HdfsSinkTask.start() is called, the variable context.assigment()
does not contains the current assignment of this task, it's actually an empty set. This assignment is only available when HdfsSinkTask.open() is called and not before
https://github.com/confluentinc/kafka-connect-hdfs/blob/d8f98e3cca7509d80d40982f6a962b90f183a6c3/src/main/java/io/confluent/connect/hdfs/DataWriter.java#L421
So the following line actually set assigment to an empty set : https://github.com/confluentinc/kafka-connect-hdfs/blob/d8f98e3cca7509d80d40982f6a962b90f183a6c3/src/main/java/io/confluent/connect/hdfs/HdfsSinkTask.java#L52
And the result is that the two following instructions are actually never used because assignment
is empty.
https://github.com/confluentinc/kafka-connect-hdfs/blob/d8f98e3cca7509d80d40982f6a962b90f183a6c3/src/main/java/io/confluent/connect/hdfs/HdfsSinkTask.java#L83-L86
For the recover
line, that's not a problem because there is the same instruction in open()
. And the problem have surely already been addressed. But that's not the case for syncWithHive()
that is actually unused as it should be.
One way to perhaps explain that this problem have not been spotted with unit test is that the variable context
, used almost everywhere in the tests, contains the assignment at all time. So there is no way to detect this behavior without forcing that assignment to be null during start()
.
So my proposal is really simple, I would propose to delete the syncWithHive
part of HdfsSinkTask and place it in DataWriter.open()
. It actually run fine and create the table if needed. I let it run for a couple of times (more that 4 days) with always some datas to dump without problem. I simulate a multiple rebalances by changing the number of tasks and everything looks fine.
Can someone have a look and start the discussion about this ? Thanks.
It looks like @RossierFl hasn't signed our Contributor License Agreement, yet.
The purpose of a CLA is to ensure that the guardian of a project's outputs has the necessary ownership or grants of rights over all contributions to allow them to distribute under the chosen licence. Wikipedia
You can read and sign our full Contributor License Agreement here.
Once you've signed reply with [clabot:check]
to prove it.
Appreciation of efforts,
clabot
It looks like @RossierFl hasn't signed our Contributor License Agreement, yet.
The purpose of a CLA is to ensure that the guardian of a project's outputs has the necessary ownership or grants of rights over all contributions to allow them to distribute under the chosen licence. Wikipedia
You can read and sign our full Contributor License Agreement here.
Once you've signed reply with [clabot:check]
to prove it.
Appreciation of efforts,
clabot
[clabot:check]
@confluentinc It looks like @RossierFl just signed our Contributor License Agreement. :+1:
Always at your service,
clabot
Hi there,
Could I have a review please ?
BR
Hi! Can you please merge this PR? This is super useful! @miguno
This can be merged then ? @kirmandi ?
as i have no write permission, nope not by me, but in general, yes I agree that this can be merged
This PR fixes a real issue, users need it, I really don't get why it takes more than 8 months old to get any feedback or merge it.
@ncliang Can you have a look on this PR ? Can someone reference someone with write permission on this PR ?
@RossierFl sorry, I was not aware of this PR. Thanks for bringing this to my attention.
Could you add some tests using the HiveTestBase
class? See for example, tests in AvroHiveUtilTest
.
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
Florian Rossier, INI-DNA-DL seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.