Prechádzať zdrojové kódy

HDFS-5291. Standby namenode after transition to active goes into safemode. Contributed by Jing Zhao.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1530112 13f79535-47bb-0310-9956-ffa450edef68
Jing Zhao 11 rokov pred
rodič
commit
1fe1942328

+ 30 - 7
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java

@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.net.ConnectTimeoutException;
 
@@ -531,6 +532,15 @@ public class RetryPolicies {
       this.maxDelayBase = maxDelayBase;
     }
 
+    /**
+     * @return 0 if this is our first failover/retry (i.e., retry immediately),
+     *         sleep exponentially otherwise
+     */
+    private long getFailoverOrRetrySleepTime(int times) {
+      return times == 0 ? 0 : 
+        calculateExponentialTime(delayMillis, times, maxDelayBase);
+    }
+    
     @Override
     public RetryAction shouldRetry(Exception e, int retries,
         int failovers, boolean isIdempotentOrAtMostOnce) throws Exception {
@@ -546,11 +556,8 @@ public class RetryPolicies {
           e instanceof StandbyException ||
           e instanceof ConnectTimeoutException ||
           isWrappedStandbyException(e)) {
-        return new RetryAction(
-            RetryAction.RetryDecision.FAILOVER_AND_RETRY,
-            // retry immediately if this is our first failover, sleep otherwise
-            failovers == 0 ? 0 :
-                calculateExponentialTime(delayMillis, failovers, maxDelayBase));
+        return new RetryAction(RetryAction.RetryDecision.FAILOVER_AND_RETRY,
+            getFailoverOrRetrySleepTime(failovers));
       } else if (e instanceof SocketException ||
                  (e instanceof IOException && !(e instanceof RemoteException))) {
         if (isIdempotentOrAtMostOnce) {
@@ -561,8 +568,14 @@ public class RetryPolicies {
               "whether it was invoked");
         }
       } else {
-        return fallbackPolicy.shouldRetry(e, retries, failovers,
-            isIdempotentOrAtMostOnce);
+        RetriableException re = getWrappedRetriableException(e);
+        if (re != null) {
+          return new RetryAction(RetryAction.RetryDecision.RETRY,
+              getFailoverOrRetrySleepTime(retries));
+        } else {
+          return fallbackPolicy.shouldRetry(e, retries, failovers,
+              isIdempotentOrAtMostOnce);
+        }
       }
     }
     
@@ -596,4 +609,14 @@ public class RetryPolicies {
         StandbyException.class);
     return unwrapped instanceof StandbyException;
   }
+  
+  private static RetriableException getWrappedRetriableException(Exception e) {
+    if (!(e instanceof RemoteException)) {
+      return null;
+    }
+    Exception unwrapped = ((RemoteException)e).unwrapRemoteException(
+        RetriableException.class);
+    return unwrapped instanceof RetriableException ? 
+        (RetriableException) unwrapped : null;
+  }
 }

+ 41 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetriableException.java

@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Exception thrown by a server typically to indicate that server is in a state
+ * where request cannot be processed temporarily (such as still starting up).
+ * Client may retry the request. If the service is up, the server may be able to
+ * process a retried request.
+ */
+@InterfaceStability.Evolving
+public class RetriableException extends IOException {
+  private static final long serialVersionUID = 1915561725516487301L;
+  
+  public RetriableException(Exception e) {
+    super(e);
+  }
+  
+  public RetriableException(String msg) {
+    super(msg);
+  }
+}

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -397,6 +397,9 @@ Release 2.2.0 - 2013-10-13
     HDFS-5307. Support both HTTP and HTTPS in jsp pages (Haohui Mai via
     branconli)
 
+    HDFS-5291. Standby namenode after transition to active goes into safemode.
+    (jing9)
+
 Release 2.1.1-beta - 2013-09-23
 
   INCOMPATIBLE CHANGES

+ 73 - 130
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -209,6 +209,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RetryCache;
 import org.apache.hadoop.ipc.RetryCache.CacheEntry;
 import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
+import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.metrics2.annotation.Metric;
@@ -1050,6 +1051,26 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
   }
   
+  /**
+   * @throws RetriableException
+   *           If 1) The NameNode is in SafeMode, 2) HA is enabled, and 3)
+   *           NameNode is in active state
+   * @throws SafeModeException
+   *           Otherwise if NameNode is in SafeMode.
+   */
+  private void checkNameNodeSafeMode(String errorMsg)
+      throws RetriableException, SafeModeException {
+    if (isInSafeMode()) {
+      SafeModeException se = new SafeModeException(errorMsg, safeMode);
+      if (haEnabled && haContext != null
+          && haContext.getState().getServiceState() == HAServiceState.ACTIVE) {
+        throw new RetriableException(se);
+      } else {
+        throw se;
+      }
+    }
+  }
+  
   public static Collection<URI> getNamespaceDirs(Configuration conf) {
     return getStorageDirs(conf, DFS_NAMENODE_NAME_DIR_KEY);
   }
@@ -1351,9 +1372,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot set permission for " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot set permission for " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       checkOwner(pc, src);
       dir.setPermission(src, permission);
@@ -1390,9 +1409,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot set owner for " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot set owner for " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       checkOwner(pc, src);
       if (!pc.isSuperUser()) {
@@ -1472,8 +1489,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       for (LocatedBlock b : ret.getLocatedBlocks()) {
         // if safemode & no block locations yet then throw safemodeException
         if ((b.getLocations() == null) || (b.getLocations().length == 0)) {
-          throw new SafeModeException("Zero blocklocations for " + src,
-              safeMode);
+          SafeModeException se = new SafeModeException(
+              "Zero blocklocations for " + src, safeMode);
+          if (haEnabled && haContext != null && 
+              haContext.getState().getServiceState() == HAServiceState.ACTIVE) {
+            throw new RetriableException(se);
+          } else {
+            throw se;
+          }
         }
       }
     }
@@ -1614,9 +1637,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot concat " + target, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot concat " + target);
       concatInternal(pc, target, srcs, logRetryCache);
       resultingStat = getAuditFileInfo(target, false);
     } finally {
@@ -1764,9 +1785,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot set times " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot set times " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
 
       // Write access is required to set access and modification times
@@ -1829,9 +1848,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot create symlink " + link, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot create symlink " + link);
       link = FSDirectory.resolvePath(link, pathComponents, dir);
       if (!createParent) {
         verifyParentDir(link);
@@ -1889,9 +1906,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot set replication for " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot set replication for " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       if (isPermissionEnabled) {
         checkPathAccess(pc, src, FsAction.WRITE);
@@ -2021,9 +2036,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot create file" + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot create file" + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       startFileInternal(pc, src, permissions, holder, clientMachine, create,
           overwrite, createParent, replication, blockSize, logRetryCache);
@@ -2242,10 +2255,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException(
-            "Cannot recover the lease of " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot recover the lease of " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       final INodeFile inode = INodeFile.valueOf(dir.getINode(src), src);
       if (!inode.isUnderConstruction()) {
@@ -2396,9 +2406,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot append to file" + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot append to file" + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       lb = appendFileInternal(pc, src, holder, clientMachine, logRetryCache);
     } catch (StandbyException se) {
@@ -2548,9 +2556,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     checkBlock(previous);
     onRetryBlock[0] = null;
     checkOperation(OperationCategory.WRITE);
-    if (isInSafeMode()) {
-      throw new SafeModeException("Cannot add block to " + src, safeMode);
-    }
+    checkNameNodeSafeMode("Cannot add block to " + src);
 
     // have we exceeded the configured limit of fs objects.
     checkFsObjectLimit();
@@ -2659,10 +2665,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     try {
       checkOperation(OperationCategory.READ);
       //check safe mode
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot add datanode; src=" + src
-            + ", blk=" + blk, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot add datanode; src=" + src + ", blk=" + blk);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
 
       //check lease
@@ -2707,10 +2710,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot abandon block " + b +
-                                    " for fle" + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot abandon block " + b + " for fle" + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
 
       //
@@ -2793,9 +2793,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot complete file " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot complete file " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       success = completeFileInternal(src, holder,
         ExtendedBlock.getLocalBlock(last), fileId);
@@ -2971,9 +2969,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot rename " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot rename " + src);
       src = FSDirectory.resolvePath(src, srcComponents, dir);
       dst = FSDirectory.resolvePath(dst, dstComponents, dir);
       checkOperation(OperationCategory.WRITE);
@@ -3044,9 +3040,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot rename " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot rename " + src);
       src = FSDirectory.resolvePath(src, srcComponents, dir);
       dst = FSDirectory.resolvePath(dst, dstComponents, dir);
       renameToInternal(pc, src, dst, cacheEntry != null, options);
@@ -3152,9 +3146,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot delete " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot delete " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       if (!recursive && dir.isNonEmptyDirectory(src)) {
         throw new IOException(src + " is non empty");
@@ -3373,9 +3365,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);   
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot create directory " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot create directory " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       status = mkdirsInternal(pc, src, permissions, createParent);
       if (status) {
@@ -3475,9 +3465,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot set quota on " + path, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot set quota on " + path);
       dir.setQuota(path, nsQuota, dsQuota);
     } finally {
       writeUnlock();
@@ -3500,9 +3488,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot fsync file " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot fsync file " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
       if (lastBlockLength > 0) {
@@ -3727,11 +3713,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       // If a DN tries to commit to the standby, the recovery will
       // fail, and the next retry will succeed on the new NN.
   
-      if (isInSafeMode()) {
-        throw new SafeModeException(
-          "Cannot commitBlockSynchronization while in safe mode",
-          safeMode);
-      }
+      checkNameNodeSafeMode(
+          "Cannot commitBlockSynchronization while in safe mode");
       final BlockInfo storedBlock = getStoredBlock(
           ExtendedBlock.getLocalBlock(lastblock));
       if (storedBlock == null) {
@@ -3877,9 +3860,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot renew lease for " + holder);
       leaseManager.renewLease(holder);
     } finally {
       writeUnlock();
@@ -4262,8 +4243,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     try {
       checkOperation(OperationCategory.UNCHECKED);
       if (!isInSafeMode()) {
-        throw new IOException("Safe mode should be turned ON " +
-                              "in order to create namespace image.");
+        throw new IOException("Safe mode should be turned ON "
+            + "in order to create namespace image.");
       }
       getFSImage().saveNamespace(this);
       success = true;
@@ -4340,7 +4321,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * replicas, and calculates the ratio of safe blocks to the total number
    * of blocks in the system, which is the size of blocks in
    * {@link FSNamesystem#blockManager}. When the ratio reaches the
-   * {@link #threshold} it starts the {@link SafeModeMonitor} daemon in order
+   * {@link #threshold} it starts the SafeModeMonitor daemon in order
    * to monitor whether the safe mode {@link #extension} is passed.
    * Then it leaves safe mode and destroys itself.
    * <p>
@@ -4348,10 +4329,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * not tracked because the name node is not intended to leave safe mode
    * automatically in the case.
    *
-   * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction)
-   * @see SafeModeMonitor
+   * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction, boolean)
    */
-  class SafeModeInfo {
+  public class SafeModeInfo {
     // configuration fields
     /** Safe mode threshold condition %.*/
     private double threshold;
@@ -5093,9 +5073,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.JOURNAL);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Log not rolled", safeMode);
-      }
+      checkNameNodeSafeMode("Log not rolled");
       LOG.info("Roll Edit Log from " + Server.getRemoteAddress());
       return getFSImage().rollEditLog();
     } finally {
@@ -5116,9 +5094,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     try {
       checkOperation(OperationCategory.CHECKPOINT);
 
-      if (isInSafeMode()) {
-        throw new SafeModeException("Checkpoint not started", safeMode);
-      }
+      checkNameNodeSafeMode("Checkpoint not started");
       LOG.info("Start checkpoint for " + backupNode.getAddress());
       cmd = getFSImage().startCheckpoint(backupNode, activeNamenode);
       getEditLog().logSync();
@@ -5152,9 +5128,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     try {
       checkOperation(OperationCategory.CHECKPOINT);
 
-      if (isInSafeMode()) {
-        throw new SafeModeException("Checkpoint not ended", safeMode);
-      }
+      checkNameNodeSafeMode("Checkpoint not ended");
       LOG.info("End checkpoint for " + registration.getAddress());
       getFSImage().endCheckpoint(sig);
       success = true;
@@ -5506,10 +5480,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   long nextGenerationStamp(boolean legacyBlock)
       throws IOException, SafeModeException {
     assert hasWriteLock();
-    if (isInSafeMode()) {
-      throw new SafeModeException(
-          "Cannot get next generation stamp", safeMode);
-    }
+    checkNameNodeSafeMode("Cannot get next generation stamp");
 
     long gs;
     if (legacyBlock) {
@@ -5562,12 +5533,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   /**
    * Increments, logs and then returns the block ID
    */
-  private long nextBlockId() throws SafeModeException {
+  private long nextBlockId() throws IOException {
     assert hasWriteLock();
-    if (isInSafeMode()) {
-      throw new SafeModeException(
-          "Cannot get next block ID", safeMode);
-    }
+    checkNameNodeSafeMode("Cannot get next block ID");
     final long blockId = blockIdGenerator.nextValue();
     getEditLog().logAllocateBlockId(blockId);
     // NB: callers sync the log
@@ -5577,10 +5545,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   private INodeFileUnderConstruction checkUCBlock(ExtendedBlock block,
       String clientName) throws IOException {
     assert hasWriteLock();
-    if (isInSafeMode()) {
-      throw new SafeModeException("Cannot get a new generation stamp and an " +
-                                "access token for block " + block, safeMode);
-    }
+    checkNameNodeSafeMode("Cannot get a new generation stamp and an "
+        + "access token for block " + block);
     
     // check stored block state
     BlockInfo storedBlock = getStoredBlock(ExtendedBlock.getLocalBlock(block));
@@ -5693,9 +5659,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     boolean success = false;
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Pipeline not updated", safeMode);
-      }
+      checkNameNodeSafeMode("Pipeline not updated");
       assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
         + oldBlock + " has different block identifier";
       updatePipelineInternal(clientName, oldBlock, newBlock, newNodes,
@@ -5955,9 +5919,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot issue delegation token", safeMode);
-      }
+      checkNameNodeSafeMode("Cannot issue delegation token");
       if (!isAllowedDelegationTokenOp()) {
         throw new IOException(
           "Delegation Token can be issued only with kerberos or web authentication");
@@ -6002,9 +5964,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     try {
       checkOperation(OperationCategory.WRITE);
 
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot renew delegation token", safeMode);
-      }
+      checkNameNodeSafeMode("Cannot renew delegation token");
       if (!isAllowedDelegationTokenOp()) {
         throw new IOException(
             "Delegation Token can be renewed only with kerberos or web authentication");
@@ -6035,9 +5995,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     try {
       checkOperation(OperationCategory.WRITE);
 
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot cancel delegation token", safeMode);
-      }
+      checkNameNodeSafeMode("Cannot cancel delegation token");
       String canceller = getRemoteUser().getUserName();
       DelegationTokenIdentifier id = dtSecretManager
         .cancelToken(token, canceller);
@@ -6558,10 +6516,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot allow snapshot for " + path,
-            safeMode);
-      }
+      checkNameNodeSafeMode("Cannot allow snapshot for " + path);
       checkSuperuserPrivilege();
 
       dir.writeLock();
@@ -6586,10 +6541,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot disallow snapshot for " + path,
-            safeMode);
-      }
+      checkNameNodeSafeMode("Cannot disallow snapshot for " + path);
       checkSuperuserPrivilege();
 
       dir.writeLock();
@@ -6627,10 +6579,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     String snapshotPath = null;
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot create snapshot for "
-            + snapshotRoot, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot create snapshot for " + snapshotRoot);
       if (isPermissionEnabled) {
         checkOwner(pc, snapshotRoot);
       }
@@ -6679,10 +6628,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     boolean success = false;
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot rename snapshot for " + path,
-            safeMode);
-      }
+      checkNameNodeSafeMode("Cannot rename snapshot for " + path);
       if (isPermissionEnabled) {
         checkOwner(pc, path);
       }
@@ -6797,10 +6743,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException(
-            "Cannot delete snapshot for " + snapshotRoot, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot delete snapshot for " + snapshotRoot);
       if (isPermissionEnabled) {
         checkOwner(pc, snapshotRoot);
       }

+ 1 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeModeException.java

@@ -33,10 +33,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 public class SafeModeException extends IOException {
   private static final long serialVersionUID = 1L;
 
-  public SafeModeException() {}
-
   public SafeModeException(String text, FSNamesystem.SafeModeInfo mode ) {
     super(text + ". Name node is in safe mode.\n" + mode.getTurnOffTip());
   }
-
-}
+}

+ 51 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java

@@ -17,12 +17,18 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.ha;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -101,6 +107,50 @@ public class TestHASafeMode {
     }
   }
   
+  /**
+   * Make sure the client retries when the active NN is in safemode
+   */
+  @Test (timeout=300000)
+  public void testClientRetrySafeMode() throws Exception {
+    final Map<Path, Boolean> results = Collections
+        .synchronizedMap(new HashMap<Path, Boolean>());
+    final Path test = new Path("/test");
+    // let nn0 enter safemode
+    NameNodeAdapter.enterSafeMode(nn0, false);
+    LOG.info("enter safemode");
+    new Thread() {
+      @Override
+      public void run() {
+        try {
+          boolean mkdir = fs.mkdirs(test);
+          LOG.info("mkdir finished, result is " + mkdir);
+          synchronized (TestHASafeMode.this) {
+            results.put(test, mkdir);
+            TestHASafeMode.this.notifyAll();
+          }
+        } catch (Exception e) {
+          LOG.info("Got Exception while calling mkdir", e);
+        }
+      }
+    }.start();
+    
+    // make sure the client's call has actually been handled by the active NN
+    assertFalse("The directory should not be created while NN in safemode",
+        fs.exists(test));
+    
+    Thread.sleep(1000);
+    // let nn0 leave safemode
+    NameNodeAdapter.leaveSafeMode(nn0);
+    LOG.info("leave safemode");
+    
+    synchronized (this) {
+      while (!results.containsKey(test)) {
+        this.wait();
+      }
+      assertTrue(results.get(test));
+    }
+  }
+  
   private void restartStandby() throws IOException {
     cluster.shutdownNameNode(1);
     // Set the safemode extension to be lengthy, so that the tests