瀏覽代碼

HDFS-4835. Port trunk WebHDFS changes to branch-0.23. Contributed by Robert Parker.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1484574 13f79535-47bb-0310-9956-ffa450edef68
Kihwal Lee 12 年之前
父節點
當前提交
ed28094c20

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -23,6 +23,9 @@ Release 0.23.8 - UNRELEASED
 
     HDFS-4477. Secondary namenode may retain old tokens. (daryn via kihwal)
 
+    HDFS-4835. Port trunk WebHDFS changes to branch-0.23. Includes backport
+    of HDFS-4805 and HADOOP-9549. (Robert Parker via kihwal)
+
 Release 0.23.7 - 2013-04-18
 
   INCOMPATIBLE CHANGES

+ 19 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java

@@ -82,12 +82,8 @@ import org.xml.sax.helpers.XMLReaderFactory;
 @InterfaceStability.Evolving
 public class HftpFileSystem extends FileSystem
     implements DelegationTokenRenewer.Renewable {
-  private static final DelegationTokenRenewer<HftpFileSystem> dtRenewer
-      = new DelegationTokenRenewer<HftpFileSystem>(HftpFileSystem.class);
-  
   static {
     HttpURLConnection.setFollowRedirects(true);
-    dtRenewer.start();
   }
 
   public static final Text TOKEN_KIND = new Text("HFTP delegation");
@@ -106,6 +102,15 @@ public class HftpFileSystem extends FileSystem
   private static final HftpDelegationTokenSelector hftpTokenSelector =
       new HftpDelegationTokenSelector();
 
+  private DelegationTokenRenewer dtRenewer = null;
+
+  private synchronized void addRenewAction(final HftpFileSystem hftpFs) {
+    if (dtRenewer == null) {
+      dtRenewer = DelegationTokenRenewer.getInstance();
+    }
+    dtRenewer.addRenewAction(hftpFs);
+  }
+
   public static final SimpleDateFormat getDateFormat() {
     final SimpleDateFormat df = new SimpleDateFormat(HFTP_DATE_FORMAT);
     df.setTimeZone(TimeZone.getTimeZone(HFTP_TIMEZONE));
@@ -190,7 +195,7 @@ public class HftpFileSystem extends FileSystem
     if (token != null) {
       setDelegationToken(token);
       if (createdToken) {
-        dtRenewer.addRenewAction(this);
+        addRenewAction(this);
         LOG.debug("Created new DT for " + token.getService());
       } else {
         LOG.debug("Found existing DT for " + token.getService());
@@ -200,7 +205,7 @@ public class HftpFileSystem extends FileSystem
 
   protected Token<DelegationTokenIdentifier> selectDelegationToken(
       UserGroupInformation ugi) {
-  	return hftpTokenSelector.selectToken(nnSecureUri, ugi.getTokens(), getConf());
+    return hftpTokenSelector.selectToken(nnSecureUri, ugi.getTokens(), getConf());
   }
   
 
@@ -389,6 +394,14 @@ public class HftpFileSystem extends FileSystem
     return new FSDataInputStream(new RangeHeaderInputStream(u));
   }
 
+  @Override
+  public void close() throws IOException {
+    super.close();
+    if (dtRenewer != null) {
+      dtRenewer.removeRenewAction(this); // blocks
+    }
+  }
+
   /** Class to parse and store a listing reply from the server. */
   class LsParser extends DefaultHandler {
 

+ 124 - 22
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenRenewer.java

@@ -18,23 +18,30 @@
 
 package org.apache.hadoop.hdfs.security.token.delegation;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.io.IOException;
 import java.lang.ref.WeakReference;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.fs.FileSystem;
 
 /**
  * A daemon thread that waits for the next file system to renew.
  */
 @InterfaceAudience.Private
-public class DelegationTokenRenewer<T extends FileSystem & DelegationTokenRenewer.Renewable>
+public class DelegationTokenRenewer
     extends Thread {
+  private static final Log LOG = LogFactory
+      .getLog(DelegationTokenRenewer.class);
+
   /** The renewable interface used by the renewer. */
   public interface Renewable {
     /** @return the renew token. */
@@ -48,18 +55,25 @@ public class DelegationTokenRenewer<T extends FileSystem & DelegationTokenRenewe
    * An action that will renew and replace the file system's delegation 
    * tokens automatically.
    */
-  private static class RenewAction<T extends FileSystem & Renewable>
+  public static class RenewAction<T extends FileSystem & Renewable>
       implements Delayed {
     /** when should the renew happen */
     private long renewalTime;
     /** a weak reference to the file system so that it can be garbage collected */
     private final WeakReference<T> weakFs;
+    private Token<?> token;
+    boolean isValid = true;
 
     private RenewAction(final T fs) {
       this.weakFs = new WeakReference<T>(fs);
-      updateRenewalTime();
+      this.token = fs.getRenewToken();
+      updateRenewalTime(renewCycle);
     }
  
+    public boolean isValid() {
+      return isValid;
+    }
+
     /** Get the delay until this event should happen. */
     @Override
     public long getDelay(final TimeUnit unit) {
@@ -76,28 +90,32 @@ public class DelegationTokenRenewer<T extends FileSystem & DelegationTokenRenewe
 
     @Override
     public int hashCode() {
-      return (int)renewalTime ^ (int)(renewalTime >>> 32);
+      return token.hashCode();
     }
 
     @Override
     public boolean equals(final Object that) {
-      if (that == null || !(that instanceof RenewAction)) {
+      if (this == that) {
+        return true;
+      } else if (that == null || !(that instanceof RenewAction)) {
         return false;
       }
-      return compareTo((Delayed)that) == 0;
+      return token.equals(((RenewAction<?>)that).token);
     }
 
     /**
      * Set a new time for the renewal.
-     * It can only be called when the action is not in the queue.
+     * It can only be called when the action is not in the queue or any
+     * collection because the hashCode may change
      * @param newTime the new time
      */
-    private void updateRenewalTime() {
-      renewalTime = RENEW_CYCLE + System.currentTimeMillis();
+    private void updateRenewalTime(long delay) {
+      renewalTime = System.currentTimeMillis() + delay - delay/10;
     }
 
     /**
      * Renew or replace the delegation token for this file system.
+     * It can only be called when the action is not in the queue.
      * @return
      * @throws IOException
      */
@@ -107,15 +125,19 @@ public class DelegationTokenRenewer<T extends FileSystem & DelegationTokenRenewe
       if (b) {
         synchronized(fs) {
           try {
-            fs.getRenewToken().renew(fs.getConf());
+            long expires = token.renew(fs.getConf());
+            updateRenewalTime(expires - System.currentTimeMillis());
           } catch (IOException ie) {
             try {
               Token<?>[] tokens = fs.addDelegationTokens(null, null);
               if (tokens.length == 0) {
                 throw new IOException("addDelegationTokens returned no tokens");
               }
-              fs.setDelegationToken(tokens[0]);
+              token = tokens[0];
+              updateRenewalTime(renewCycle);
+              fs.setDelegationToken(token);
             } catch (IOException ie2) {
+              isValid = false;
               throw new IOException("Can't renew or get new delegation token ", ie);
             }
           }
@@ -124,44 +146,124 @@ public class DelegationTokenRenewer<T extends FileSystem & DelegationTokenRenewe
       return b;
     }
 
+    private void cancel() throws IOException, InterruptedException {
+      final T fs = weakFs.get();
+      if (fs != null) {
+        token.cancel(fs.getConf());
+      }
+    }
+
     @Override
     public String toString() {
       Renewable fs = weakFs.get();
       return fs == null? "evaporated token renew"
           : "The token will be renewed in " + getDelay(TimeUnit.SECONDS)
-            + " secs, renewToken=" + fs.getRenewToken();
+            + " secs, renewToken=" + token;
     }
   }
 
-  /** Wait for 95% of a day between renewals */
-  private static final int RENEW_CYCLE = 24 * 60 * 60 * 950;
+  /** assumes renew cycle for a token is 24 hours... */
+  private static final long RENEW_CYCLE = 24 * 60 * 60 * 1000; 
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  public static long renewCycle = RENEW_CYCLE;
 
-  private DelayQueue<RenewAction<T>> queue = new DelayQueue<RenewAction<T>>();
+  /** Queue to maintain the RenewActions to be processed by the {@link #run()} */
+  private volatile DelayQueue<RenewAction<?>> queue = new DelayQueue<RenewAction<?>>();
+
+  /** For testing purposes */
+  @VisibleForTesting
+  protected int getRenewQueueLength() {
+    return queue.size();
+  }
 
-  public DelegationTokenRenewer(final Class<T> clazz) {
+  /**
+   * Create the singleton instance. However, the thread can be started lazily in
+   * {@link #addRenewAction(FileSystem)}
+   */
+  private static DelegationTokenRenewer INSTANCE = null;
+
+  private DelegationTokenRenewer(final Class<? extends FileSystem> clazz) {
     super(clazz.getSimpleName() + "-" + DelegationTokenRenewer.class.getSimpleName());
     setDaemon(true);
   }
 
+  public static synchronized DelegationTokenRenewer getInstance() {
+    if (INSTANCE == null) {
+      INSTANCE = new DelegationTokenRenewer(FileSystem.class);
+    }
+    return INSTANCE;
+  }
+
+  @VisibleForTesting
+  static synchronized void reset() {
+    if (INSTANCE != null) {
+      INSTANCE.queue.clear();
+      INSTANCE.interrupt();
+      try {
+        INSTANCE.join();
+      } catch (InterruptedException e) {
+        LOG.warn("Failed to reset renewer");
+      } finally {
+        INSTANCE = null;
+      }
+    }
+  }
+
   /** Add a renew action to the queue. */
-  public void addRenewAction(final T fs) {
-    queue.add(new RenewAction<T>(fs));
+  @SuppressWarnings("static-access")
+  public <T extends FileSystem & Renewable> RenewAction<T> addRenewAction(final T fs) {
+    synchronized (this) {
+      if (!isAlive()) {
+        start();
+      }
+    }
+    RenewAction<T> action = new RenewAction<T>(fs);
+    if (action.token != null) {
+      queue.add(action);
+    } else {
+      fs.LOG.error("does not have a token for renewal");
+    }
+    return action;
+  }
+
+  /**
+   * Remove the associated renew action from the queue
+   * 
+   * @throws IOException
+   */
+  public <T extends FileSystem & Renewable> void removeRenewAction(
+      final T fs) throws IOException {
+    RenewAction<T> action = new RenewAction<T>(fs);
+    if (queue.remove(action)) {
+      try {
+        action.cancel();
+      } catch (InterruptedException ie) {
+        LOG.error("Interrupted while canceling token for " + fs.getUri()
+            + "filesystem");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(ie.getStackTrace());
+        }
+      }
+    }
   }
 
+  @SuppressWarnings("static-access")
   @Override
   public void run() {
     for(;;) {
-      RenewAction<T> action = null;
+      RenewAction<?> action = null;
       try {
         action = queue.take();
         if (action.renew()) {
-          action.updateRenewalTime();
           queue.add(action);
         }
       } catch (InterruptedException ie) {
         return;
       } catch (Exception ie) {
-        T.LOG.warn("Failed to renew token, action=" + action, ie);
+        action.weakFs.get().LOG.warn("Failed to renew token, action=" + action,
+            ie);
       }
     }
   }

+ 8 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java

@@ -105,16 +105,17 @@ public class WebHdfsFileSystem extends FileSystem
   public static final WebHdfsDelegationTokenSelector DT_SELECTOR
       = new WebHdfsDelegationTokenSelector();
 
-  private static DelegationTokenRenewer<WebHdfsFileSystem> DT_RENEWER = null;
+  private  DelegationTokenRenewer dtRenewer = null;
+  @VisibleForTesting
+  DelegationTokenRenewer.RenewAction<?> action;
 
   @VisibleForTesting
   protected synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) {
-    if (DT_RENEWER == null) {
-      DT_RENEWER = new DelegationTokenRenewer<WebHdfsFileSystem>(WebHdfsFileSystem.class);
-      DT_RENEWER.start();
+    if (dtRenewer == null) {
+      dtRenewer = DelegationTokenRenewer.getInstance();
     }
 
-    DT_RENEWER.addRenewAction(webhdfs);
+    action = dtRenewer.addRenewAction(webhdfs);
   }
 
   /** Is WebHDFS enabled in conf? */
@@ -162,7 +163,8 @@ public class WebHdfsFileSystem extends FileSystem
   }
 
   protected synchronized Token<?> getDelegationToken() throws IOException {
-    if (!hasInitedToken) {
+    // we haven't inited yet, or we used to have a token but it expired
+    if (!hasInitedToken || (action != null && !action.isValid())) {
       //since we don't already have a token, go get one
       Token<?> token = getDelegationToken(null);
       // security might be disabled

+ 21 - 19
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHftpFileSystem.java

@@ -26,6 +26,8 @@ import java.net.URL;
 import java.net.HttpURLConnection;
 import java.util.Random;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.BeforeClass;
 import org.junit.AfterClass;
@@ -36,6 +38,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -54,6 +57,8 @@ public class TestHftpFileSystem {
   private static HftpFileSystem hftpFs = null;
   private static String blockPoolId = null;
 
+  private static String hftpUri = null;
+
   private static Path[] TEST_PATHS = new Path[] {
       // URI does not encode, Request#getPathInfo returns /foo
       new Path("/foo;bar"),
@@ -95,8 +100,7 @@ public class TestHftpFileSystem {
     cluster = new MiniDFSCluster.Builder(config).numDataNodes(2).build();
     hdfs = cluster.getFileSystem();
     blockPoolId = cluster.getNamesystem().getBlockPoolId();
-    final String hftpUri = 
-      "hftp://" + config.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
+    hftpUri = "hftp://" + config.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
     hftpFs = (HftpFileSystem) new Path(hftpUri).getFileSystem(config);
   }
   
@@ -107,6 +111,21 @@ public class TestHftpFileSystem {
     cluster.shutdown();
   }
 
+  @Before
+  public void initFileSystems() throws IOException {
+    hdfs = cluster.getFileSystem();
+    hftpFs = (HftpFileSystem) new Path(hftpUri).getFileSystem(config);
+    // clear out the namespace
+    for (FileStatus stat : hdfs.listStatus(new Path("/"))) {
+      hdfs.delete(stat.getPath(), true);
+    }
+  }
+
+  @After
+  public void resetFileSystems() throws IOException {
+    FileSystem.closeAll();
+  }
+
   /**
    * Test file creation and access with file names that need encoding. 
    */
@@ -274,19 +293,9 @@ public class TestHftpFileSystem {
     assertEquals("Stream closed", ioe.getMessage());
   }
   
-  public void resetFileSystem() throws IOException {
-    // filesystem caching has a quirk/bug that it caches based on the user's
-    // given uri.  the result is if a filesystem is instantiated with no port,
-    // it gets the default port.  then if the default port is changed,
-    // and another filesystem is instantiated with no port, the prior fs
-    // is returned, not a new one using the changed port.  so let's flush
-    // the cache between tests...
-    FileSystem.closeAll();
-  }
   
   @Test
   public void testHftpDefaultPorts() throws IOException {
-    resetFileSystem();
     Configuration conf = new Configuration();
     URI uri = URI.create("hftp://localhost");
     HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
@@ -303,7 +312,6 @@ public class TestHftpFileSystem {
   
   @Test
   public void testHftpCustomDefaultPorts() throws IOException {
-    resetFileSystem();
     Configuration conf = new Configuration();
     conf.setInt("dfs.http.port", 123);
     conf.setInt("dfs.https.port", 456);
@@ -323,7 +331,6 @@ public class TestHftpFileSystem {
 
   @Test
   public void testHftpCustomUriPortWithDefaultPorts() throws IOException {
-    resetFileSystem();
     Configuration conf = new Configuration();
     URI uri = URI.create("hftp://localhost:123");
     HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
@@ -340,7 +347,6 @@ public class TestHftpFileSystem {
 
   @Test
   public void testHftpCustomUriPortWithCustomDefaultPorts() throws IOException {
-    resetFileSystem();
     Configuration conf = new Configuration();
     conf.setInt("dfs.http.port", 123);
     conf.setInt("dfs.https.port", 456);
@@ -362,7 +368,6 @@ public class TestHftpFileSystem {
 
   @Test
   public void testHsftpDefaultPorts() throws IOException {
-    resetFileSystem();
     Configuration conf = new Configuration();
     URI uri = URI.create("hsftp://localhost");
     HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
@@ -379,7 +384,6 @@ public class TestHftpFileSystem {
 
   @Test
   public void testHsftpCustomDefaultPorts() throws IOException {
-    resetFileSystem();
     Configuration conf = new Configuration();
     conf.setInt("dfs.http.port", 123);
     conf.setInt("dfs.https.port", 456);
@@ -399,7 +403,6 @@ public class TestHftpFileSystem {
 
   @Test
   public void testHsftpCustomUriPortWithDefaultPorts() throws IOException {
-    resetFileSystem();
     Configuration conf = new Configuration();
     URI uri = URI.create("hsftp://localhost:123");
     HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
@@ -416,7 +419,6 @@ public class TestHftpFileSystem {
 
   @Test
   public void testHsftpCustomUriPortWithCustomDefaultPorts() throws IOException {
-    resetFileSystem();
     Configuration conf = new Configuration();
     conf.setInt("dfs.http.port", 123);
     conf.setInt("dfs.https.port", 456);

+ 49 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java

@@ -28,6 +28,9 @@ import java.net.URI;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenRenewer;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenRenewer.RenewAction;
+
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
 import org.apache.hadoop.hdfs.web.resources.GetOpParam;
@@ -199,4 +202,50 @@ public class TestWebHdfsTokens {
       assertFalse(op.getRequireAuth());
     }
   }
+
+  @Test
+  public void testGetTokenAfterFailure() throws Exception {
+    Configuration conf = mock(Configuration.class);
+    Token<?> token1 = mock(Token.class);
+    Token<?> token2 = mock(Token.class);
+    long renewCycle = 1000;
+
+    DelegationTokenRenewer.renewCycle = renewCycle;
+    WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
+    doReturn(conf).when(fs).getConf();
+    doReturn(token1).doReturn(token2).when(fs).getDelegationToken(null);
+    // cause token renewer to abandon the token
+    doThrow(new IOException("renew failed")).when(token1).renew(conf);
+    doThrow(new IOException("get failed")).when(fs).addDelegationTokens(null, null);
+
+    // trigger token acquisition
+    Token<?> token = fs.getDelegationToken();
+    RenewAction<?> action = fs.action;
+    assertSame(token1, token);
+    assertTrue(action.isValid());
+
+    // fetch again and make sure it's the same as before
+    token = fs.getDelegationToken();
+    assertSame(token1, token);
+    assertSame(action, fs.action);
+    assertTrue(fs.action.isValid());
+
+    // upon renewal, token will go bad based on above stubbing
+    Thread.sleep(renewCycle);
+    assertSame(action, fs.action);
+    assertFalse(fs.action.isValid());
+
+    // now that token is invalid, should get a new one
+    token = fs.getDelegationToken();
+    assertSame(token2, token);
+    assertNotSame(action, fs.action);
+    assertTrue(fs.action.isValid());
+    action = fs.action;
+
+    // should get same one again
+    token = fs.getDelegationToken();
+    assertSame(token2, token);
+    assertSame(action, fs.action);
+    assertTrue(fs.action.isValid());
+  }
 }