kafka-connect-hdfs icon indicating copy to clipboard operation
kafka-connect-hdfs copied to clipboard

Creation of log directory fails because of permission issues when using keytab

Open zako opened this issue 8 years ago • 6 comments

We are experiencing an issue where HDFS connector uses incorrect permissions when creating log directories given configurations with heterogeneous set of keytabs. This happens when we run a script to create several HDFS connectors, each having a different keytab, on an empty one node Kafka Connect cluster.

It seems the problem is within DataWriter which uses the keytab to call UserGroupInformation.loginUserFromKeytab which is a static instance. Since each DataWriter will be running in a separate task/thread, this static call may interfere with other running tasks. So it is possible for the log directory to get created under myuser2 and have the FS WAL file created under a different user, i.e. myuser1 in our example.

Here are the permissions of the log directories in HDFS (scrubbed some names): $ hdfs dfs -ls /logs/mytopic/ Found 1 items drwxrwxrwx - myuser2 supergroup 0 2016-06-21 18:01 /logs/mytopic/0 $ hdfs dfs -ls /logs/mytopic/0 Found 1 items -rw-r--r-- 2 myuser1 supergroup 417 2016-06-21 18:25 /logs/mytopic/0/log

Here is the full stack trace (scrubbed some names): [2016-06-21 18:51:21,479] ERROR Recovery failed at state RECOVERY_PARTITION_PAUSED (io.confluent.connect.hdfs.TopicPartitionWriter) org.apache.kafka.connect.errors.ConnectException: Error creating writer for log file hdfs://mynamenode:8020/logs/mytopic/0/log at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:91) at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:105) at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:441) at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:197) at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:227) at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:234) at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:91) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:370) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.security.AccessControlException: Permission denied: user=myuser2, access=WRITE, inode="/logs/mytopic/0/log":myuser1:supergroup:-rw-r--r-- at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:281) at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:262) at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:175) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:152) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6590) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6572) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6497) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2887) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:3189) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:3153) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:612) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.append(AuthorizationProviderProxyClientProtocol.java:125) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:414) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080) at sun.reflect.GeneratedConstructorAccessor92.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1769) at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1803) at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1796) at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:323) at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:319) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:319) at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1173) at io.confluent.connect.hdfs.wal.WALFile$Writer.<init>(WALFile.java:221) at io.confluent.connect.hdfs.wal.WALFile.createWriter(WALFile.java:67) at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:73) ... 17 more Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=myuser2, access=WRITE, inode="/logs/mytopic/0/log":myuser1:supergroup:-rw-r--r-- at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:281) at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:262) at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:175) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:152) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6590) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6572) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6497) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2887) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:3189) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:3153) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:612) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.append(AuthorizationProviderProxyClientProtocol.java:125) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:414) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080) at org.apache.hadoop.ipc.Client.call(Client.java:1468) at org.apache.hadoop.ipc.Client.call(Client.java:1399) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) at com.sun.proxy.$Proxy52.append(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:313) at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy53.append(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1767) ... 27 more

zako avatar Jun 22 '16 17:06 zako

I think one easy workaround here is to use different topic and log directory for different connectors. This also needed as you want to make sure that WAL by different connectors are isolated. This is crucial to ensure the correct behavior of each connector as the connector relies on the data in HDFS to set the correct offset in case of rebalance and restart.

Ishiihara avatar Jun 22 '16 23:06 Ishiihara

We do create a connector per topic in our setup. Here is an example of the configuration for 2 connectors: { "name": "TopicA", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "topics.dir": "/data/", "topics": "TopicA", "connect.hdfs.principal": "KeytabA@domain", "connect.hdfs.keytab": "./keytabs/KeytabA.keytab", <truncated> }, "tasks": [ { "connector": "TopicA", "task": 0 } ] } { "name": "TopicB", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "topics.dir": "/data/", "topics": "TopicB", "connect.hdfs.principal": "KeytabB@domain", "connect.hdfs.keytab": "./keytabs/KeytabB.keytab", <truncated> }, "tasks": [ { "connector": "TopicB", "task": 0 } ] }

The problem is both of these are initialized and created on the same Kafka Connect node. Kafka Connect will create a task per configuration and submit them to the cached thread pool executor in org.apache.kafka.connect.runtime.Worker to run concurrently. The creation of log, tmp and data directories all happen when instantiating a new instance of DataWriter which occurs when the HDFSSinkTask runs. Since permissions are static and apply to all threads, some directories will be created with the incorrect keytab.

zako avatar Jun 23 '16 15:06 zako

@zako That makes sense and thanks for the analysis. I think to allow multiple users for different connector jobs, we need to use secure impersonation and use doAs to perform creation of topics.dir and logs.dir as well as writing data to HDFS. However, this involves relatively large change to the connector.

Another option is to use different Classloaders to load different connectors.

Ishiihara avatar Jun 23 '16 17:06 Ishiihara

I think doAs sounds reasonable, however, my familiarity with Java security is very limited.

We will forgo the alternative to use different Classloaders. Our temporary workaround will be to use a single keytab to continue testing Kafka Connect and other functionality.

zako avatar Jun 23 '16 17:06 zako

Marking this as an enhancement for later evaluation. If there are other users who require multiple users for different connector jobs in a secure environment, it would be good to know about it.

cotedm avatar Jan 09 '17 16:01 cotedm

@cotedm I've faced the same problem.

dbolshak avatar Dec 26 '18 11:12 dbolshak