Przeglądaj źródła

HDFS-4591. HA clients can fail to fail over while Standby NN is performing long checkpoint. Contributed by Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1456107 13f79535-47bb-0310-9956-ffa450edef68
Aaron Myers 12 lat temu
rodzic
commit
3bf09c5150

+ 13 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java

@@ -162,6 +162,9 @@ public abstract class GenericTestUtils {
     private final CountDownLatch waitLatch = new CountDownLatch(1);
     private final CountDownLatch resultLatch = new CountDownLatch(1);
     
+    private final AtomicInteger fireCounter = new AtomicInteger(0);
+    private final AtomicInteger resultCounter = new AtomicInteger(0);
+    
     // Result fields set after proceed() is called.
     private volatile Throwable thrown;
     private volatile Object returnValue;
@@ -188,6 +191,7 @@ public abstract class GenericTestUtils {
     @Override
     public Object answer(InvocationOnMock invocation) throws Throwable {
       LOG.info("DelayAnswer firing fireLatch");
+      fireCounter.getAndIncrement();
       fireLatch.countDown();
       try {
         LOG.info("DelayAnswer waiting on waitLatch");
@@ -208,6 +212,7 @@ public abstract class GenericTestUtils {
         thrown = t;
         throw t;
       } finally {
+        resultCounter.incrementAndGet();
         resultLatch.countDown();
       }
     }
@@ -235,6 +240,14 @@ public abstract class GenericTestUtils {
     public Object getReturnValue() {
       return returnValue;
     }
+    
+    public int getFireCount() {
+      return fireCounter.get();
+    }
+    
+    public int getResultCount() {
+      return resultCounter.get();
+    }
   }
   
   /**

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -411,6 +411,9 @@ Release 2.0.5-beta - UNRELEASED
 
     HDFS-4583. TestNodeCount fails. (Ivan Mitic via suresh)
 
+    HDFS-4591. HA clients can fail to fail over while Standby NN is performing
+    long checkpoint. (atm)
+
 Release 2.0.4-alpha - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java

@@ -78,6 +78,7 @@ public class DelegationTokenSecretManager
   
   @Override //SecretManager
   public void checkAvailableForRead() throws StandbyException {
+    namesystem.checkOperation(OperationCategory.READ);
     namesystem.readLock();
     try {
       namesystem.checkOperation(OperationCategory.READ);

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -62,6 +62,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
@@ -874,9 +875,10 @@ public class BlockManager {
    */
   public BlocksWithLocations getBlocks(DatanodeID datanode, long size
       ) throws IOException {
+    namesystem.checkOperation(OperationCategory.READ);
     namesystem.readLock();
     try {
-      namesystem.checkSuperuserPrivilege();
+      namesystem.checkOperation(OperationCategory.READ);
       return getBlocksWithLocations(datanode, size);  
     } finally {
       namesystem.readUnlock();

+ 64 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -1129,8 +1129,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    */
   void metaSave(String filename) throws IOException {
     checkSuperuserPrivilege();
+    checkOperation(OperationCategory.UNCHECKED);
     writeLock();
     try {
+      checkOperation(OperationCategory.UNCHECKED);
       File file = new File(System.getProperty("hadoop.log.dir"), filename);
       PrintWriter out = new PrintWriter(new BufferedWriter(
           new OutputStreamWriter(new FileOutputStream(file, true), Charsets.UTF_8)));
@@ -1204,6 +1206,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       UnresolvedLinkException, IOException {
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -1241,6 +1244,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       UnresolvedLinkException, IOException {
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -1351,13 +1355,20 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throws FileNotFoundException, UnresolvedLinkException, IOException {
 
     for (int attempt = 0; attempt < 2; attempt++) {
-      if (attempt == 0) { // first attempt is with readlock
+      boolean isReadOp = (attempt == 0);
+      if (isReadOp) { // first attempt is with readlock
+        checkOperation(OperationCategory.READ);
         readLock();
       }  else { // second attempt is with  write lock
+        checkOperation(OperationCategory.WRITE);
         writeLock(); // writelock is needed to set accesstime
       }
       try {
-        checkOperation(OperationCategory.READ);
+        if (isReadOp) {
+          checkOperation(OperationCategory.READ);
+        } else {
+          checkOperation(OperationCategory.WRITE);
+        }
 
         // if the namenode is in safemode, then do not update access time
         if (isInSafeMode()) {
@@ -1370,7 +1381,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           if (now <= inode.getAccessTime() + getAccessTimePrecision()) {
             // if we have to set access time but we only have the readlock, then
             // restart this entire operation with the writeLock.
-            if (attempt == 0) {
+            if (isReadOp) {
               continue;
             }
           }
@@ -1380,7 +1391,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
             inode.computeFileSize(false), inode.isUnderConstruction(),
             offset, length, needBlockToken);
       } finally {
-        if (attempt == 0) {
+        if (isReadOp) {
           readUnlock();
         } else {
           writeUnlock();
@@ -1436,6 +1447,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -1578,6 +1590,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -1618,6 +1631,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throws IOException, UnresolvedLinkException {
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -1693,6 +1707,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     blockManager.verifyReplication(src, replication, null);
     final boolean isFile;
     FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -1723,6 +1738,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   long getPreferredBlockSize(String filename) 
       throws IOException, UnresolvedLinkException {
     FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.READ);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
@@ -1785,6 +1801,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     boolean skipSync = false;
     final HdfsFileStatus stat;
     FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -1982,6 +1999,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throws IOException {
     boolean skipSync = false;
     FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2119,6 +2137,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
     LocatedBlock lb = null;
     FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2185,8 +2204,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
 
     // Part I. Analyze the state of the file with respect to the input data.
+    checkOperation(OperationCategory.READ);
     readLock();
     try {
+      checkOperation(OperationCategory.READ);
       LocatedBlock[] onRetryBlock = new LocatedBlock[1];
       final INode[] inodes = analyzeFileState(
           src, fileId, clientName, previous, onRetryBlock).getINodes();
@@ -2213,8 +2234,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     // Allocate a new block, add it to the INode and the BlocksMap. 
     Block newBlock = null;
     long offset;
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
       // Run the full analysis again, since things could have changed
       // while chooseTarget() was executing.
       LocatedBlock[] onRetryBlock = new LocatedBlock[1];
@@ -2368,9 +2391,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     final DatanodeDescriptor clientnode;
     final long preferredblocksize;
     final List<DatanodeDescriptor> chosen;
+    checkOperation(OperationCategory.READ);
     readLock();
     try {
-      checkOperation(OperationCategory.WRITE);
+      checkOperation(OperationCategory.READ);
       //check safe mode
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot add datanode; src=" + src
@@ -2410,6 +2434,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   boolean abandonBlock(ExtendedBlock b, String src, String holder)
       throws LeaseExpiredException, FileNotFoundException,
       UnresolvedLinkException, IOException {
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2487,6 +2512,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     throws SafeModeException, UnresolvedLinkException, IOException {
     checkBlock(last);
     boolean success = false;
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2653,6 +2679,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           " to " + dst);
     }
     FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2709,6 +2736,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           + src + " to " + dst);
     }
     FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2795,6 +2823,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
              IOException {
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
     FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2923,6 +2952,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
            StandbyException, IOException {
     HdfsFileStatus stat = null;
     FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.READ);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
@@ -2965,6 +2995,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
     }
     FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -3025,6 +3056,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       FileNotFoundException, UnresolvedLinkException, StandbyException {
     FSPermissionChecker pc = new FSPermissionChecker(fsOwnerShortUserName,
         supergroup);
+    checkOperation(OperationCategory.READ);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
@@ -3045,6 +3077,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   void setQuota(String path, long nsQuota, long dsQuota) 
       throws IOException, UnresolvedLinkException {
     checkSuperuserPrivilege();
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -3068,6 +3101,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   void fsync(String src, String clientName, long lastBlockLength) 
       throws IOException, UnresolvedLinkException {
     NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + clientName);
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -3272,6 +3306,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       String[] newtargetstorages)
       throws IOException, UnresolvedLinkException {
     String src = "";
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -3375,6 +3410,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * Renew the lease(s) held by the given client
    */
   void renewLease(String holder) throws IOException {
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -3416,6 +3452,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     throws AccessControlException, UnresolvedLinkException, IOException {
     DirectoryListing dl;
     FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.READ);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
@@ -3702,10 +3739,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   }
 
   DatanodeInfo[] datanodeReport(final DatanodeReportType type
-      ) throws AccessControlException {
+      ) throws AccessControlException, StandbyException {
     checkSuperuserPrivilege();
+    checkOperation(OperationCategory.UNCHECKED);
     readLock();
     try {
+      checkOperation(OperationCategory.UNCHECKED);
       final DatanodeManager dm = getBlockManager().getDatanodeManager();      
       final List<DatanodeDescriptor> results = dm.getDatanodeListForReport(type);
 
@@ -3729,8 +3768,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    */
   void saveNamespace() throws AccessControlException, IOException {
     checkSuperuserPrivilege();
+    checkOperation(OperationCategory.UNCHECKED);
     readLock();
     try {
+      checkOperation(OperationCategory.UNCHECKED);
       if (!isInSafeMode()) {
         throw new IOException("Safe mode should be turned ON " +
                               "in order to create namespace image.");
@@ -3748,10 +3789,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * 
    * @throws AccessControlException if superuser privilege is violated.
    */
-  boolean restoreFailedStorage(String arg) throws AccessControlException {
+  boolean restoreFailedStorage(String arg) throws AccessControlException,
+      StandbyException {
     checkSuperuserPrivilege();
+    checkOperation(OperationCategory.UNCHECKED);
     writeLock();
     try {
+      checkOperation(OperationCategory.UNCHECKED);
       
       // if it is disabled - enable it and vice versa.
       if(arg.equals("check"))
@@ -3772,6 +3816,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     
   void finalizeUpgrade() throws IOException {
     checkSuperuserPrivilege();
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -4511,6 +4556,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
   CheckpointSignature rollEditLog() throws IOException {
     checkSuperuserPrivilege();
+    checkOperation(OperationCategory.JOURNAL);
     writeLock();
     try {
       checkOperation(OperationCategory.JOURNAL);
@@ -4528,6 +4574,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
                                 NamenodeRegistration bnReg, // backup node
                                 NamenodeRegistration nnReg) // active name-node
   throws IOException {
+    checkOperation(OperationCategory.CHECKPOINT);
     writeLock();
     try {
       checkOperation(OperationCategory.CHECKPOINT);
@@ -4546,6 +4593,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
   void endCheckpoint(NamenodeRegistration registration,
                             CheckpointSignature sig) throws IOException {
+    checkOperation(OperationCategory.CHECKPOINT);
     readLock();
     try {
       checkOperation(OperationCategory.CHECKPOINT);
@@ -4834,6 +4882,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * Client is reporting some bad block locations.
    */
   void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -4868,6 +4917,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   LocatedBlock updateBlockForPipeline(ExtendedBlock block, 
       String clientName) throws IOException {
     LocatedBlock locatedBlock;
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -4899,6 +4949,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   void updatePipeline(String clientName, ExtendedBlock oldBlock, 
       ExtendedBlock newBlock, DatanodeID[] newNodes)
       throws IOException {
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -5026,8 +5077,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    */
   void releaseBackupNode(NamenodeRegistration registration)
     throws IOException {
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
+      checkOperation(OperationCategory.WRITE);
       if(getFSImage().getStorage().getNamespaceID()
          != registration.getNamespaceID())
         throw new IOException("Incompatible namespaceIDs: "
@@ -5066,6 +5119,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
 	String[] cookieTab) throws IOException {
     checkSuperuserPrivilege();
+    checkOperation(OperationCategory.READ);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
@@ -5158,6 +5212,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
       throws IOException {
     Token<DelegationTokenIdentifier> token;
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -5204,6 +5259,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   long renewDelegationToken(Token<DelegationTokenIdentifier> token)
       throws InvalidToken, IOException {
     long expiryTime;
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -5236,6 +5292,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    */
   void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
       throws IOException {
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);

+ 0 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -337,7 +337,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
       throw new IllegalArgumentException(
         "Unexpected not positive size: "+size);
     }
-    namesystem.checkOperation(OperationCategory.READ);
     namesystem.checkSuperuserPrivilege();
     return namesystem.getBlockManager().getBlocks(datanode, size); 
   }
@@ -707,7 +706,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
   @Override // ClientProtocol
   public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
   throws IOException {
-    namesystem.checkOperation(OperationCategory.UNCHECKED);
     DatanodeInfo results[] = namesystem.datanodeReport(type);
     if (results == null ) {
       throw new IOException("Cannot find datanode report");
@@ -732,19 +730,16 @@ class NameNodeRpcServer implements NamenodeProtocols {
 
   @Override // ClientProtocol
   public boolean restoreFailedStorage(String arg) throws IOException { 
-    namesystem.checkOperation(OperationCategory.UNCHECKED);
     return namesystem.restoreFailedStorage(arg);
   }
 
   @Override // ClientProtocol
   public void saveNamespace() throws IOException {
-    namesystem.checkOperation(OperationCategory.UNCHECKED);
     namesystem.saveNamespace();
   }
   
   @Override // ClientProtocol
   public long rollEdits() throws AccessControlException, IOException {
-    namesystem.checkOperation(OperationCategory.JOURNAL);
     CheckpointSignature sig = namesystem.rollEditLog();
     return sig.getCurSegmentTxId();
   }
@@ -789,7 +784,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
 
   @Override // ClientProtocol
   public void metaSave(String filename) throws IOException {
-    namesystem.checkOperation(OperationCategory.UNCHECKED);
     namesystem.metaSave(filename);
   }
 

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java

@@ -18,7 +18,9 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.util.RwLock;
+import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.security.AccessControlException;
 
 /** Namesystem operations. */
@@ -38,4 +40,6 @@ public interface Namesystem extends RwLock, SafeMode {
   public boolean isGenStampInFuture(long generationStamp);
 
   public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal);
+
+  public void checkOperation(OperationCategory read) throws StandbyException;
 }

+ 11 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAContext.java

@@ -64,9 +64,17 @@ public interface HAContext {
   void writeUnlock();
 
   /**
-   * Verify that the given operation category is allowed in the
-   * current state. This is to allow NN implementations (eg BackupNode)
-   * to override it with node-specific handling.
+   * Verify that the given operation category is allowed in the current state.
+   * This is to allow NN implementations (eg BackupNode) to override it with
+   * node-specific handling.
+   * 
+   * If the operation which is being checked will be taking the FSNS lock, it's
+   * advisable to check the operation category both immediately before and after
+   * taking the lock. This is because clients rely on the StandbyException
+   * thrown by this method in order to trigger client failover, and if a client
+   * first tries to contact the Standby NN, it could block for a long time if
+   * the Standby is holding the lock for a while, e.g. when performing a
+   * checkpoint. See HDFS-4591 for more details.
    */
   void checkOperation(OperationCategory op) throws StandbyException;
 

+ 51 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
@@ -26,6 +27,8 @@ import java.io.OutputStream;
 import java.net.URI;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -43,7 +46,10 @@ import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
+import org.apache.hadoop.util.ThreadUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -59,6 +65,8 @@ public class TestStandbyCheckpoints {
   protected MiniDFSCluster cluster;
   protected NameNode nn0, nn1;
   protected FileSystem fs;
+  
+  private static final Log LOG = LogFactory.getLog(TestStandbyCheckpoints.class);
 
   @SuppressWarnings("rawtypes")
   @Before
@@ -231,6 +239,49 @@ public class TestStandbyCheckpoints {
     
     assertTrue(canceledOne);
   }
+  
+  /**
+   * Make sure that clients will receive StandbyExceptions even when a
+   * checkpoint is in progress on the SBN, and therefore the StandbyCheckpointer
+   * thread will have FSNS lock. Regression test for HDFS-4591.
+   */
+  @Test(timeout=120000)
+  public void testStandbyExceptionThrownDuringCheckpoint() throws Exception {
+    
+    // Set it up so that we know when the SBN checkpoint starts and ends.
+    FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nn1);
+    DelayAnswer answerer = new DelayAnswer(LOG);
+    Mockito.doAnswer(answerer).when(spyImage1)
+        .saveNamespace(Mockito.any(FSNamesystem.class),
+            Mockito.any(Canceler.class));
+    
+    // Perform some edits and wait for a checkpoint to start on the SBN.
+    doEdits(0, 2000);
+    nn0.getRpcServer().rollEditLog();
+    answerer.waitForCall();
+    answerer.proceed();
+    assertTrue("SBN is not performing checkpoint but it should be.",
+        answerer.getFireCount() == 1 && answerer.getResultCount() == 0);
+    
+    // Make sure that the lock has actually been taken by the checkpointing
+    // thread.
+    ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
+    try {
+      // Perform an RPC to the SBN and make sure it throws a StandbyException.
+      nn1.getRpcServer().getFileInfo("/");
+      fail("Should have thrown StandbyException, but instead succeeded.");
+    } catch (StandbyException se) {
+      GenericTestUtils.assertExceptionContains("is not supported", se);
+    }
+    
+    // Make sure that the checkpoint is still going on, implying that the client
+    // RPC to the SBN happened during the checkpoint.
+    assertTrue("SBN should have still been checkpointing.",
+        answerer.getFireCount() == 1 && answerer.getResultCount() == 0);
+    answerer.waitForResult();
+    assertTrue("SBN should have finished checkpointing.",
+        answerer.getFireCount() == 1 && answerer.getResultCount() == 1);
+  }
 
   private void doEdits(int start, int stop) throws IOException {
     for (int i = start; i < stop; i++) {

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyIsHot.java

@@ -143,6 +143,7 @@ public class TestStandbyIsHot {
     conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
     // We read from the standby to watch block locations
     HAUtil.setAllowStandbyReads(conf, true);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 0);
     conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
       .nnTopology(MiniDFSNNTopology.simpleHATopology())