Browse Source

HDFS-4567. Webhdfs does not need a token for token operations. Contributed by Daryn Sharp.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1454469 13f79535-47bb-0310-9956-ffa450edef68
Kihwal Lee 12 năm trước cách đây
mục cha
commit
eac52101a3

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -69,6 +69,9 @@ Release 0.23.7 - UNRELEASED
 
     HDFS-4566. Webdhfs token cancelation should use authentication (daryn)
 
+    HDFS-4567. Webhdfs does not need a token for token operations. (daryn via
+    kihwal)
+
 Release 0.23.6 - 2013-02-06
 
   INCOMPATIBLE CHANGES

+ 23 - 22
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java

@@ -81,6 +81,7 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelect
 import org.apache.hadoop.util.Progressable;
 import org.mortbay.util.ajax.JSON;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 
 /** A FileSystem for HDFS over the web. */
@@ -104,7 +105,8 @@ public class WebHdfsFileSystem extends FileSystem
 
   private static DelegationTokenRenewer<WebHdfsFileSystem> DT_RENEWER = null;
 
-  private static synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) {
+  @VisibleForTesting
+  protected synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) {
     if (DT_RENEWER == null) {
       DT_RENEWER = new DelegationTokenRenewer<WebHdfsFileSystem>(WebHdfsFileSystem.class);
       DT_RENEWER.start();
@@ -124,6 +126,7 @@ public class WebHdfsFileSystem extends FileSystem
   private UserGroupInformation ugi;
   private InetSocketAddress nnAddr;
   private URI uri;
+  private boolean hasInitedToken;
   private Token<?> delegationToken;
   private final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
   private Path workingDir;
@@ -150,24 +153,26 @@ public class WebHdfsFileSystem extends FileSystem
   protected void initDelegationToken() throws IOException {
     // look for webhdfs token, then try hdfs
     Token<?> token = selectDelegationToken(ugi);
-
-    //since we don't already have a token, go get one
-    boolean createdToken = false;
-    if (token == null) {
-      token = getDelegationToken(null);
-      createdToken = (token != null);
-    }
-
-    // security might be disabled
     if (token != null) {
+      LOG.debug("Found existing DT for " + token.getService());        
       setDelegationToken(token);
-      if (createdToken) {
+      hasInitedToken = true;
+    }
+  }
+
+  protected synchronized Token<?> getDelegationToken() throws IOException {
+    if (!hasInitedToken) {
+      //since we don't already have a token, go get one
+      Token<?> token = getDelegationToken(null);
+      // security might be disabled
+      if (token != null) {
+        setDelegationToken(token);
         addRenewAction(this);
         LOG.debug("Created new DT for " + token.getService());
-      } else {
-        LOG.debug("Found existing DT for " + token.getService());        
       }
+      hasInitedToken = true;
     }
+    return delegationToken;
   }
 
   protected Token<DelegationTokenIdentifier> selectDelegationToken(
@@ -294,20 +299,16 @@ public class WebHdfsFileSystem extends FileSystem
     List<Param<?,?>> authParams = Lists.newArrayList();    
     // Skip adding delegation token for token operations because these
     // operations require authentication.
-    boolean hasToken = false;
+    Token<?> token = null;
     if (UserGroupInformation.isSecurityEnabled() &&
         op != GetOpParam.Op.GETDELEGATIONTOKEN &&
         op != PutOpParam.Op.RENEWDELEGATIONTOKEN &&
         op != PutOpParam.Op.CANCELDELEGATIONTOKEN) {
-      synchronized (this) {
-        hasToken = (delegationToken != null);
-        if (hasToken) {
-          final String encoded = delegationToken.encodeToUrlString();
-          authParams.add(new DelegationParam(encoded));
-        } // else we are talking to an insecure cluster
-      }
+      token = getDelegationToken();
     }
-    if (!hasToken) {
+    if (token != null) {
+      authParams.add(new DelegationParam(token.encodeToUrlString()));
+    } else {
       UserGroupInformation userUgi = ugi;
       UserGroupInformation realUgi = userUgi.getRealUser();
       if (realUgi != null) { // proxy user

+ 338 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java

@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.web.resources.GetOpParam;
+import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
+import org.apache.hadoop.hdfs.web.resources.PutOpParam;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestWebHdfsTokens {
+  static Configuration conf;
+  static UserGroupInformation ugi;
+  
+  @BeforeClass
+  public static void setup() throws IOException {
+    conf = new Configuration();
+    conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+    UserGroupInformation.setConfiguration(conf);    
+    ugi = UserGroupInformation.getCurrentUser();
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Test(timeout=1000)
+  public void testInitWithNoToken() throws IOException {
+    WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
+    doReturn(null).when(fs).getDelegationToken(anyString());
+    doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
+    fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
+    
+    // when not in ugi, don't get one
+    verify(fs).initDelegationToken();
+    verify(fs).selectDelegationToken(ugi);
+    verify(fs, never()).setDelegationToken(any(Token.class));
+    verify(fs, never()).getDelegationToken();
+    verify(fs, never()).getDelegationToken(anyString());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout=1000)
+  public void testInitWithUGIToken() throws IOException {
+    WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
+    Token<DelegationTokenIdentifier> token = mock(Token.class);    
+    doReturn(token).when(fs).selectDelegationToken(ugi);
+    doReturn(null).when(fs).getDelegationToken(anyString());
+    doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
+    fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
+    
+    // when in the ugi, store it but don't renew it
+    verify(fs).initDelegationToken();
+    verify(fs).selectDelegationToken(ugi);
+    verify(fs).setDelegationToken(token);
+    verify(fs, never()).getDelegationToken();
+    verify(fs, never()).getDelegationToken(anyString());
+    verify(fs, never()).addRenewAction(fs);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout=1000)
+  public void testInternalGetDelegationToken() throws IOException {
+    WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
+    Token<DelegationTokenIdentifier> token = mock(Token.class);    
+    doReturn(null).when(fs).selectDelegationToken(ugi);
+    doReturn(token).when(fs).getDelegationToken(anyString());
+    doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
+    fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
+
+    // get token, store it, and renew it
+    Token<?> token2 = fs.getDelegationToken();
+    assertEquals(token2, token);
+    verify(fs).getDelegationToken(null);
+    verify(fs).setDelegationToken(token);
+    verify(fs).addRenewAction(fs);
+    reset(fs);
+
+    // just return token, don't get/set/renew
+    token2 = fs.getDelegationToken();
+    assertEquals(token2, token);
+    verify(fs, never()).getDelegationToken(null);
+    verify(fs, never()).setDelegationToken(any(Token.class));
+    verify(fs, never()).addRenewAction(fs);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout=1000)
+  public void testTokenForNonTokenOp() throws IOException {
+    WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
+    Token<DelegationTokenIdentifier> token = mock(Token.class);    
+    doReturn(null).when(fs).selectDelegationToken(ugi);
+    doReturn(token).when(fs).getDelegationToken(null);
+    doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
+    fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
+
+    // should get/set/renew token
+    fs.toUrl(GetOpParam.Op.OPEN, null);
+    verify(fs).getDelegationToken();
+    verify(fs).getDelegationToken(null);
+    verify(fs).setDelegationToken(token);
+    verify(fs).addRenewAction(fs);
+    reset(fs);
+    
+    // should return prior token
+    fs.toUrl(GetOpParam.Op.OPEN, null);
+    verify(fs).getDelegationToken();
+    verify(fs, never()).getDelegationToken(null);
+    verify(fs, never()).setDelegationToken(token);
+    verify(fs, never()).addRenewAction(fs);
+  }
+  
+  @Test(timeout=1000)
+  public void testNoTokenForGetToken() throws IOException {
+    checkNoTokenForOperation(GetOpParam.Op.GETDELEGATIONTOKEN);
+  }
+  
+  @Test(timeout=1000)
+  public void testNoTokenForCanclToken() throws IOException {
+    checkNoTokenForOperation(PutOpParam.Op.RENEWDELEGATIONTOKEN);
+  }
+
+  @Test(timeout=1000)
+  public void testNoTokenForCancelToken() throws IOException {
+    checkNoTokenForOperation(PutOpParam.Op.CANCELDELEGATIONTOKEN);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void checkNoTokenForOperation(HttpOpParam.Op op) throws IOException {
+    WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
+    doReturn(null).when(fs).selectDelegationToken(ugi);
+    doReturn(null).when(fs).getDelegationToken(null);
+    doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
+    fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
+    
+    // do not get a token!
+    fs.toUrl(op, null);
+    verify(fs, never()).getDelegationToken();
+    verify(fs, never()).getDelegationToken(null);
+    verify(fs, never()).setDelegationToken(any(Token.class));
+    verify(fs, never()).addRenewAction(fs);
+  }
+}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.web.resources.GetOpParam;
+import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
+import org.apache.hadoop.hdfs.web.resources.PutOpParam;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestWebHdfsTokens {
+  static Configuration conf;
+  static UserGroupInformation ugi;
+  
+  @BeforeClass
+  public static void setup() throws IOException {
+    conf = new Configuration();
+    conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+    UserGroupInformation.setConfiguration(conf);    
+    ugi = UserGroupInformation.getCurrentUser();
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Test(timeout=1000)
+  public void testInitWithNoToken() throws IOException {
+    WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
+    doReturn(null).when(fs).getDelegationToken(anyString());
+    doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
+    fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
+    
+    // when not in ugi, don't get one
+    verify(fs).initDelegationToken();
+    verify(fs).selectDelegationToken(ugi);
+    verify(fs, never()).setDelegationToken(any(Token.class));
+    verify(fs, never()).getDelegationToken();
+    verify(fs, never()).getDelegationToken(anyString());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout=1000)
+  public void testInitWithUGIToken() throws IOException {
+    WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
+    Token<DelegationTokenIdentifier> token = mock(Token.class);    
+    doReturn(token).when(fs).selectDelegationToken(ugi);
+    doReturn(null).when(fs).getDelegationToken(anyString());
+    doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
+    fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
+    
+    // when in the ugi, store it but don't renew it
+    verify(fs).initDelegationToken();
+    verify(fs).selectDelegationToken(ugi);
+    verify(fs).setDelegationToken(token);
+    verify(fs, never()).getDelegationToken();
+    verify(fs, never()).getDelegationToken(anyString());
+    verify(fs, never()).addRenewAction(fs);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout=1000)
+  public void testInternalGetDelegationToken() throws IOException {
+    WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
+    Token<DelegationTokenIdentifier> token = mock(Token.class);    
+    doReturn(null).when(fs).selectDelegationToken(ugi);
+    doReturn(token).when(fs).getDelegationToken(anyString());
+    doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
+    fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
+
+    // get token, store it, and renew it
+    Token<?> token2 = fs.getDelegationToken();
+    assertEquals(token2, token);
+    verify(fs).getDelegationToken(null);
+    verify(fs).setDelegationToken(token);
+    verify(fs).addRenewAction(fs);
+    reset(fs);
+
+    // just return token, don't get/set/renew
+    token2 = fs.getDelegationToken();
+    assertEquals(token2, token);
+    verify(fs, never()).getDelegationToken(null);
+    verify(fs, never()).setDelegationToken(any(Token.class));
+    verify(fs, never()).addRenewAction(fs);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout=1000)
+  public void testTokenForNonTokenOp() throws IOException {
+    WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
+    Token<DelegationTokenIdentifier> token = mock(Token.class);    
+    doReturn(null).when(fs).selectDelegationToken(ugi);
+    doReturn(token).when(fs).getDelegationToken(null);
+    doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
+    fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
+
+    // should get/set/renew token
+    fs.toUrl(GetOpParam.Op.OPEN, null);
+    verify(fs).getDelegationToken();
+    verify(fs).getDelegationToken(null);
+    verify(fs).setDelegationToken(token);
+    verify(fs).addRenewAction(fs);
+    reset(fs);
+    
+    // should return prior token
+    fs.toUrl(GetOpParam.Op.OPEN, null);
+    verify(fs).getDelegationToken();
+    verify(fs, never()).getDelegationToken(null);
+    verify(fs, never()).setDelegationToken(token);
+    verify(fs, never()).addRenewAction(fs);
+  }
+  
+  @Test(timeout=1000)
+  public void testNoTokenForGetToken() throws IOException {
+    checkNoTokenForOperation(GetOpParam.Op.GETDELEGATIONTOKEN);
+  }
+  
+  @Test(timeout=1000)
+  public void testNoTokenForCanclToken() throws IOException {
+    checkNoTokenForOperation(PutOpParam.Op.RENEWDELEGATIONTOKEN);
+  }
+
+  @Test(timeout=1000)
+  public void testNoTokenForCancelToken() throws IOException {
+    checkNoTokenForOperation(PutOpParam.Op.CANCELDELEGATIONTOKEN);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void checkNoTokenForOperation(HttpOpParam.Op op) throws IOException {
+    WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
+    doReturn(null).when(fs).selectDelegationToken(ugi);
+    doReturn(null).when(fs).getDelegationToken(null);
+    doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
+    fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
+    
+    // do not get a token!
+    fs.toUrl(op, null);
+    verify(fs, never()).getDelegationToken();
+    verify(fs, never()).getDelegationToken(null);
+    verify(fs, never()).setDelegationToken(any(Token.class));
+    verify(fs, never()).addRenewAction(fs);
+  }
+}

+ 2 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsUrl.java

@@ -113,7 +113,7 @@ public class TestWebHdfsUrl {
 
     WebHdfsFileSystem webhdfs = getWebHdfsFileSystem(ugi, conf);
     Path fsPath = new Path("/");
-    String tokenString = webhdfs.getRenewToken().encodeToUrlString();
+    String tokenString = webhdfs.getDelegationToken().encodeToUrlString();
 
     // send user
     URL getTokenUrl = webhdfs.toUrl(GetOpParam.Op.GETDELEGATIONTOKEN, fsPath);
@@ -194,7 +194,7 @@ public class TestWebHdfsUrl {
 
     WebHdfsFileSystem webhdfs = getWebHdfsFileSystem(ugi, conf);
     Path fsPath = new Path("/");
-    String tokenString = webhdfs.getRenewToken().encodeToUrlString();
+    String tokenString = webhdfs.getDelegationToken().encodeToUrlString();
 
     // send real+effective
     URL getTokenUrl = webhdfs.toUrl(GetOpParam.Op.GETDELEGATIONTOKEN, fsPath);
@@ -380,8 +380,5 @@ public class TestWebHdfsUrl {
     public int getDefaultPort() {
       return super.getDefaultPort();
     }
-    // don't automatically get a token
-    @Override
-    protected void initDelegationToken() throws IOException {}
   }
 }