浏览代码

HDFS-6776. Using distcp to copy data between insecure and secure cluster via webdhfs doesn't work. (yzhangal via tucu)

Alejandro Abdelnur 10 年之前
父节点
当前提交
16a4558fda

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -481,6 +481,9 @@ Release 2.6.0 - UNRELEASED
       HDFS-6986. DistributedFileSystem must get delegation tokens from configured 
       KeyProvider. (zhz via tucu)
 
+    HDFS-6776. Using distcp to copy data between insecure and secure cluster via webdhfs 
+    doesn't work. (yzhangal via tucu)
+
 Release 2.5.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java

@@ -402,8 +402,7 @@ public class DelegationTokenSecretManager
     final Token<DelegationTokenIdentifier> token = namenode.getRpcServer(
         ).getDelegationToken(new Text(renewer));
     if (token == null) {
-      throw new IOException("Failed to get the token for " + renewer
-          + ", user=" + ugi.getShortUserName());
+      return null;
     }
 
     final InetSocketAddress addr = namenode.getNameNodeAddress();

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java

@@ -283,6 +283,9 @@ public class NamenodeWebHdfsMethods {
       final String renewer) throws IOException {
     final Credentials c = DelegationTokenSecretManager.createCredentials(
         namenode, ugi, renewer != null? renewer: ugi.getShortUserName());
+    if (c == null) {
+      return null;
+    }
     final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next();
     Text kind = request.getScheme().equals("http") ? WebHdfsFileSystem.TOKEN_KIND
         : SWebHdfsFileSystem.TOKEN_KIND;

+ 17 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java

@@ -41,6 +41,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.DelegationTokenRenewer;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -102,6 +103,11 @@ public class WebHdfsFileSystem extends FileSystem
 
   /** Delegation token kind */
   public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
+
+  @VisibleForTesting
+  public static final String CANT_FALLBACK_TO_INSECURE_MSG =
+      "The client is configured to only allow connecting to secure cluster";
+
   private boolean canRefreshDelegationToken;
 
   private UserGroupInformation ugi;
@@ -112,6 +118,7 @@ public class WebHdfsFileSystem extends FileSystem
   private Path workingDir;
   private InetSocketAddress nnAddrs[];
   private int currentNNAddrIndex;
+  private boolean disallowFallbackToInsecureCluster;
 
   /**
    * Return the protocol scheme for the FileSystem.
@@ -193,6 +200,9 @@ public class WebHdfsFileSystem extends FileSystem
 
     this.workingDir = getHomeDirectory();
     this.canRefreshDelegationToken = UserGroupInformation.isSecurityEnabled();
+    this.disallowFallbackToInsecureCluster = !conf.getBoolean(
+        CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+        CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
     this.delegationToken = null;
   }
 
@@ -1294,7 +1304,13 @@ public class WebHdfsFileSystem extends FileSystem
         return JsonUtil.toDelegationToken(json);
       }
     }.run();
-    token.setService(tokenServiceName);
+    if (token != null) {
+      token.setService(tokenServiceName);
+    } else {
+      if (disallowFallbackToInsecureCluster) {
+        throw new AccessControlException(CANT_FALLBACK_TO_INSECURE_MSG);
+      }
+    }
     return token;
   }
 

+ 41 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java

@@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,6 +46,7 @@ import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
 import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
@@ -482,4 +484,43 @@ public class TestWebHDFS {
       }
     }
   }
+
+  @Test
+  public void testDTInInsecureClusterWithFallback()
+      throws IOException, URISyntaxException {
+    MiniDFSCluster cluster = null;
+    final Configuration conf = WebHdfsTestUtil.createConf();
+    conf.setBoolean(CommonConfigurationKeys
+        .IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, true);
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+      final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
+          WebHdfsFileSystem.SCHEME);
+      Assert.assertNull(webHdfs.getDelegationToken(null));
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  @Test
+  public void testDTInInsecureCluster() throws Exception {
+    MiniDFSCluster cluster = null;
+    final Configuration conf = WebHdfsTestUtil.createConf();
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+      final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
+          WebHdfsFileSystem.SCHEME);
+      webHdfs.getDelegationToken(null);
+      fail("No exception is thrown.");
+    } catch (AccessControlException ace) {
+      Assert.assertTrue(ace.getMessage().startsWith(
+          WebHdfsFileSystem.CANT_FALLBACK_TO_INSECURE_MSG));
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }