[Bug] [sync] 两个带kerberos认证的CDH-6.3.2集群之间同步数据报错
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
以上是我的报错。
------------------------我是分割线---------------------
{
"job": {
"content": [
{
"reader" : {
"parameter" : {
"path" : "hdfs://ns1/user/hive/warehouse/payment_i17_ods_phq.db/city_info/*",
"hadoopConfig" : {
"hadoop.user.name": "dataarchitecture_reader",
"dfs.ha.namenodes.ns1": "nn1,nn2",
"fs.defaultFS": "hdfs://ns1",
"dfs.namenode.rpc-address.ns1.nn2": "node104:8020",
"dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"dfs.namenode.rpc-address.ns1.nn1": "node105:8020",
"dfs.nameservices": "ns1",
"fs.hdfs.impl.disable.cache": "true",
"fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem",
"hadoop.security.authentication": "Kerberos",
"dfs.namenode.kerberos.principal": "hdfs/[email protected]",
"dfs.datanode.kerberos.principal": "hdfs/[email protected]",
"yarn.resourcemanager.principal": "rm/[email protected]",
"dfs.namenode.kerberos.internal.spnego.principal": "HTTP/[email protected]",
"hadoop.security.authorization": "true",
"dfs.namenode.keytab.file": "/app/chunjun/kerberos/dataarchitecture_reader.keytab",
"java.security.krb5.conf": "/app/chunjun/krb5/krb5_cdh1.conf",
"useLocalFile": "true",
"principalFile": "/app/chunjun/kerberos/dataarchitecture_reader.keytab",
"principal": "dataarchitecture_reader/[email protected]",
"dfs.client.use.datanode.hostname":true,
"dfs.client.use.namenode.hostname":true,
"sun.security.krb5.disableReferrals":true
},
"column": [
{
"name":"city_id",
"type":"string"
},
{
"name":"city_name",
"type":"string"
}
],
"defaultFS" : "hdfs://ns1",
"fieldDelimiter" : "\u0001",
"encoding" : "utf-8",
"fileType" : "text"
},
"name" : "hdfsreader"
},
"writer": {
"name": "hdfswriter",
"parameter": {
"path": "hdfs://nameservice1/user/hive/warehouse/pxb_test.db/prpjpayrefrec_oracle",
"defaultFS": "hdfs://nameservice1",
"column": [
{
"name":"city_id",
"type":"string"
},
{
"name":"city_name",
"type":"string"
}
],
"fileType": "text",
"maxFileSize": 10485760,
"nextCheckRows": 20000,
"fieldDelimiter": "\u0001",
"encoding": "utf-8",
"fileName": "dt=2023050813",
"writeMode": "overwrite",
"hadoopConfig": {
"dfs.ha.namenodes.nameservice1": "nn10,nn20",
"hadoop.user.name": "data_dev_pxb",
"fs.defaultFS": "hdfs://nameservice1",
"dfs.namenode.rpc-address.nameservice1.nn20": "twobigdata01:8020",
"dfs.client.failover.proxy.provider.nameservice1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"dfs.namenode.rpc-address.nameservice1.nn10": "twobigdata02:8020",
"dfs.nameservices": "nameservice1",
"fs.hdfs.impl.disable.cache": "true",
"fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem",
"hadoop.security.authentication": "Kerberos",
"dfs.namenode.kerberos.principal": "hdfs/[email protected]",
"dfs.datanode.kerberos.principal": "hdfs/[email protected]",
"yarn.resourcemanager.principal": "rm/[email protected]",
"dfs.namenode.kerberos.internal.spnego.principal": "HTTP/[email protected]",
"hadoop.security.authorization": "true",
"dfs.namenode.keytab.file": "/app/chunjun/kerberos/data_dev_pxb.keytab",
"java.security.krb5.conf": "/app/chunjun/krb5/krb5_cdh2.conf",
"useLocalFile": "true",
"principalFile": "/app/chunjun/kerberos//data_dev_pxb.keytab",
"principal": "data_dev_pxb/[email protected]",
"dfs.client.use.datanode.hostname":"true",
"dfs.client.use.namenode.hostname":"true"
}
}
}
}
],
"setting": {
"speed": {
"channel": 1,
"bytes": 0
}
}
}
}
--------------我是分割线------------- 以上是我的json文件
What you expected to happen
不知道在CDH之间同步数据,kerberos认证无法通过,是为什么,因为我是用stream插件,进行sync模式的时候,分别能通过chunjun读写上下游的CDH集群,但是两个CDH之间就是无法完成读写,总会报kerberos错误。(我确认kerberos账号及使用的keytab、krb5.conf文件没有问题,因为每一个cdh集群都能通过stream插件单独完成读写验证)
How to reproduce
按照我写的json格式进行验证,就能复现问题
Anything else
No response
Version
master
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
老哥想问一下为什么我读取hdfs进行Kerberos认证后报文件不存在
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:811)
at com.dtstack.chunjun.environment.MyLocalStreamEnvironment.execute(MyLocalStreamEnvironment.java:194)
at com.dtstack.chunjun.Main.exeSyncJob(Main.java:227)
at com.dtstack.chunjun.Main.main(Main.java:118)
at com.dtstack.chunjun.client.local.LocalClusterClientHelper.submit(LocalClusterClientHelper.java:35)
at com.dtstack.chunjun.client.Launcher.main(Launcher.java:119)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: com.dtstack.chunjun.throwable.ChunJunRuntimeException: error to create hdfs splits
at com.dtstack.chunjun.connector.hdfs.source.BaseHdfsInputFormat.lambda$createInputSplitsInternal$0(BaseHdfsInputFormat.java:81)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:360)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1709)
at com.dtstack.chunjun.connector.hdfs.source.BaseHdfsInputFormat.createInputSplitsInternal(BaseHdfsInputFormat.java:75)
at com.dtstack.chunjun.source.format.BaseRichInputFormat.createInputSplits(BaseRichInputFormat.java:128)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.
at com.dtstack.chunjun.source.format.BaseRichInputFormat.open(BaseRichInputFormat.java:150)
at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:126)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.run