Ver Fonte

Merge -r 1412076:1412077 from trunk to branch-2. Fixes: HADOOP-9049. DelegationTokenRenewer needs to be Singleton and FileSystems should register/deregister to/from. Contributed by Karthik Kambatla.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1412079 13f79535-47bb-0310-9956-ffa450edef68
Thomas White há 12 anos atrás
pai
commit
7a8bc99858

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -152,6 +152,9 @@ Release 2.0.3-alpha - Unreleased
 
     HADOOP-6607. Add different variants of non caching HTTP headers. (tucu)
 
+    HADOOP-9049. DelegationTokenRenewer needs to be Singleton and FileSystems
+    should register/deregister to/from. (Karthik Kambatla via tomwhite)
+
 Release 2.0.2-alpha - 2012-09-07 
 
   INCOMPATIBLE CHANGES

+ 47 - 12
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java

@@ -33,7 +33,7 @@ import org.apache.hadoop.util.Time;
  * A daemon thread that waits for the next file system to renew.
  */
 @InterfaceAudience.Private
-public class DelegationTokenRenewer<T extends FileSystem & DelegationTokenRenewer.Renewable>
+public class DelegationTokenRenewer
     extends Thread {
   /** The renewable interface used by the renewer. */
   public interface Renewable {
@@ -93,7 +93,7 @@ public class DelegationTokenRenewer<T extends FileSystem & DelegationTokenRenewe
      * @param newTime the new time
      */
     private void updateRenewalTime() {
-      renewalTime = RENEW_CYCLE + Time.now();
+      renewalTime = renewCycle + Time.now();
     }
 
     /**
@@ -134,34 +134,69 @@ public class DelegationTokenRenewer<T extends FileSystem & DelegationTokenRenewe
   }
 
   /** Wait for 95% of a day between renewals */
-  private static final int RENEW_CYCLE = 24 * 60 * 60 * 950;
+  private static final int RENEW_CYCLE = 24 * 60 * 60 * 950; 
 
-  private DelayQueue<RenewAction<T>> queue = new DelayQueue<RenewAction<T>>();
+  @InterfaceAudience.Private
+  protected static int renewCycle = RENEW_CYCLE;
 
-  public DelegationTokenRenewer(final Class<T> clazz) {
+  /** Queue to maintain the RenewActions to be processed by the {@link #run()} */
+  private volatile DelayQueue<RenewAction<?>> queue = new DelayQueue<RenewAction<?>>();
+  
+  /**
+   * Create the singleton instance. However, the thread can be started lazily in
+   * {@link #addRenewAction(FileSystem)}
+   */
+  private static DelegationTokenRenewer INSTANCE = null;
+
+  private DelegationTokenRenewer(final Class<? extends FileSystem> clazz) {
     super(clazz.getSimpleName() + "-" + DelegationTokenRenewer.class.getSimpleName());
     setDaemon(true);
   }
 
+  public static synchronized DelegationTokenRenewer getInstance() {
+    if (INSTANCE == null) {
+      INSTANCE = new DelegationTokenRenewer(FileSystem.class);
+    }
+    return INSTANCE;
+  }
+
   /** Add a renew action to the queue. */
-  public void addRenewAction(final T fs) {
+  public synchronized <T extends FileSystem & Renewable> void addRenewAction(final T fs) {
     queue.add(new RenewAction<T>(fs));
+    if (!isAlive()) {
+      start();
+    }
   }
 
+  /** Remove the associated renew action from the queue */
+  public synchronized <T extends FileSystem & Renewable> void removeRenewAction(
+      final T fs) {
+    for (RenewAction<?> action : queue) {
+      if (action.weakFs.get() == fs) {
+        queue.remove(action);
+        return;
+      }
+    }
+  }
+
+  @SuppressWarnings("static-access")
   @Override
   public void run() {
     for(;;) {
-      RenewAction<T> action = null;
+      RenewAction<?> action = null;
       try {
-        action = queue.take();
-        if (action.renew()) {
-          action.updateRenewalTime();
-          queue.add(action);
+        synchronized (this) {
+          action = queue.take();
+          if (action.renew()) {
+            action.updateRenewalTime();
+            queue.add(action);
+          }
         }
       } catch (InterruptedException ie) {
         return;
       } catch (Exception ie) {
-        T.LOG.warn("Failed to renew token, action=" + action, ie);
+        action.weakFs.get().LOG.warn("Failed to renew token, action=" + action,
+            ie);
       }
     }
   }

+ 159 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDelegationTokenRenewer.java

@@ -0,0 +1,159 @@
+package org.apache.hadoop.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Progressable;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDelegationTokenRenewer {
+  private static final int RENEW_CYCLE = 1000;
+  private static final int MAX_RENEWALS = 100;
+
+  @SuppressWarnings("rawtypes")
+  static class TestToken extends Token {
+    public volatile int renewCount = 0;
+
+    @Override
+    public long renew(Configuration conf) {
+      if (renewCount == MAX_RENEWALS) {
+        Thread.currentThread().interrupt();
+      } else {
+        renewCount++;
+      }
+      return renewCount;
+    }
+  }
+  
+  static class TestFileSystem extends FileSystem implements
+      DelegationTokenRenewer.Renewable {
+    private Configuration mockConf = mock(Configuration.class);;
+    private TestToken testToken = new TestToken();
+
+    @Override
+    public Configuration getConf() {
+      return mockConf;
+    }
+
+    @Override
+    public Token<?> getRenewToken() {
+      return testToken;
+    }
+
+    @Override
+    public URI getUri() {
+      return null;
+    }
+
+    @Override
+    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+      return null;
+    }
+
+    @Override
+    public FSDataOutputStream create(Path f, FsPermission permission,
+        boolean overwrite, int bufferSize, short replication, long blockSize,
+        Progressable progress) throws IOException {
+      return null;
+    }
+
+    @Override
+    public FSDataOutputStream append(Path f, int bufferSize,
+        Progressable progress) throws IOException {
+      return null;
+    }
+
+    @Override
+    public boolean rename(Path src, Path dst) throws IOException {
+      return false;
+    }
+
+    @Override
+    public boolean delete(Path f, boolean recursive) throws IOException {
+      return false;
+    }
+
+    @Override
+    public FileStatus[] listStatus(Path f) throws FileNotFoundException,
+        IOException {
+      return null;
+    }
+
+    @Override
+    public void setWorkingDirectory(Path new_dir) {
+    }
+
+    @Override
+    public Path getWorkingDirectory() {
+      return null;
+    }
+
+    @Override
+    public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+      return false;
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path f) throws IOException {
+      return null;
+    }
+
+    @Override
+    public <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
+      return;
+    }
+  }
+
+  private DelegationTokenRenewer renewer;
+
+  @Before
+  public void setup() {
+    DelegationTokenRenewer.renewCycle = RENEW_CYCLE;
+    renewer = DelegationTokenRenewer.getInstance();
+  }
+
+  @Test
+  public void testAddRenewAction() throws IOException, InterruptedException {
+    TestFileSystem tfs = new TestFileSystem();
+    renewer.addRenewAction(tfs);
+
+    for (int i = 0; i < 10; i++) {
+      Thread.sleep(RENEW_CYCLE);
+      if (tfs.testToken.renewCount > 0) {
+        return;
+      }
+    }
+
+    assertTrue("Token not renewed even after 10 seconds",
+        (tfs.testToken.renewCount > 0));
+  }
+
+  @Test
+  public void testRemoveRenewAction() throws IOException, InterruptedException {
+    TestFileSystem tfs = new TestFileSystem();
+    renewer.addRenewAction(tfs);
+
+    for (int i = 0; i < 10; i++) {
+      Thread.sleep(RENEW_CYCLE);
+      if (tfs.testToken.renewCount > 0) {
+        renewer.removeRenewAction(tfs);
+        break;
+      }
+    }
+
+    assertTrue("Token not renewed even once",
+        (tfs.testToken.renewCount > 0));
+    assertTrue("Token not removed",
+        (tfs.testToken.renewCount < MAX_RENEWALS));
+  }
+}

+ 19 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java

@@ -82,12 +82,8 @@ import org.xml.sax.helpers.XMLReaderFactory;
 @InterfaceStability.Evolving
 public class HftpFileSystem extends FileSystem
     implements DelegationTokenRenewer.Renewable {
-  private static final DelegationTokenRenewer<HftpFileSystem> dtRenewer
-      = new DelegationTokenRenewer<HftpFileSystem>(HftpFileSystem.class);
-  
   static {
     HttpURLConnection.setFollowRedirects(true);
-    dtRenewer.start();
   }
 
   public static final Text TOKEN_KIND = new Text("HFTP delegation");
@@ -106,6 +102,16 @@ public class HftpFileSystem extends FileSystem
   private static final HftpDelegationTokenSelector hftpTokenSelector =
       new HftpDelegationTokenSelector();
 
+  private DelegationTokenRenewer dtRenewer = null;
+
+  private synchronized void addRenewAction(final HftpFileSystem hftpFs) {
+    if (dtRenewer == null) {
+      dtRenewer = DelegationTokenRenewer.getInstance();
+    }
+
+    dtRenewer.addRenewAction(hftpFs);
+  }
+
   public static final SimpleDateFormat getDateFormat() {
     final SimpleDateFormat df = new SimpleDateFormat(HFTP_DATE_FORMAT);
     df.setTimeZone(TimeZone.getTimeZone(HFTP_TIMEZONE));
@@ -202,7 +208,7 @@ public class HftpFileSystem extends FileSystem
     if (token != null) {
       setDelegationToken(token);
       if (createdToken) {
-        dtRenewer.addRenewAction(this);
+        addRenewAction(this);
         LOG.debug("Created new DT for " + token.getService());
       } else {
         LOG.debug("Found existing DT for " + token.getService());
@@ -395,6 +401,14 @@ public class HftpFileSystem extends FileSystem
     return new FSDataInputStream(new RangeHeaderInputStream(u));
   }
 
+  @Override
+  public void close() throws IOException {
+    super.close();
+    if (dtRenewer != null) {
+      dtRenewer.removeRenewAction(this); // blocks
+    }
+  }
+
   /** Class to parse and store a listing reply from the server. */
   class LsParser extends DefaultHandler {
 

+ 13 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java

@@ -124,15 +124,14 @@ public class WebHdfsFileSystem extends FileSystem
   public static final WebHdfsDelegationTokenSelector DT_SELECTOR
       = new WebHdfsDelegationTokenSelector();
 
-  private static DelegationTokenRenewer<WebHdfsFileSystem> DT_RENEWER = null;
+  private DelegationTokenRenewer dtRenewer = null;
 
-  private static synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) {
-    if (DT_RENEWER == null) {
-      DT_RENEWER = new DelegationTokenRenewer<WebHdfsFileSystem>(WebHdfsFileSystem.class);
-      DT_RENEWER.start();
+  private synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) {
+    if (dtRenewer == null) {
+      dtRenewer = DelegationTokenRenewer.getInstance();
     }
 
-    DT_RENEWER.addRenewAction(webhdfs);
+    dtRenewer.addRenewAction(webhdfs);
   }
 
   /** Is WebHDFS enabled in conf? */
@@ -766,6 +765,14 @@ public class WebHdfsFileSystem extends FileSystem
         new OffsetUrlOpener(url), new OffsetUrlOpener(null)));
   }
 
+  @Override
+  public void close() throws IOException {
+    super.close();
+    if (dtRenewer != null) {
+      dtRenewer.removeRenewAction(this); // blocks
+    }
+  }
+
   class OffsetUrlOpener extends ByteRangeInputStream.URLOpener {
     OffsetUrlOpener(final URL url) {
       super(url);