Przeglądaj źródła

HDFS-4679. Namenode operation checks should be done in a consistent manner. Contributed by Suresh Srinivas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1466721 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 12 lat temu
rodzic
commit
fd1000bcef

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -399,6 +399,9 @@ Release 2.0.5-beta - UNRELEASED
     HDFS-3940. Add Gset#clear method and clear the block map when namenode is
     shutdown. (suresh)
 
+    HDFS-4679. Namenode operation checks should be done in a consistent
+    manner. (suresh)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 122 - 138
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -1210,7 +1210,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot set permission for " + src, safeMode);
       }
@@ -1248,7 +1247,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot set owner for " + src, safeMode);
       }
@@ -1302,9 +1300,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   LocatedBlocks getBlockLocations(String src, long offset, long length,
       boolean doAccessTime, boolean needBlockToken, boolean checkSafeMode)
       throws FileNotFoundException, UnresolvedLinkException, IOException {
-    FSPermissionChecker pc = getPermissionChecker();
     try {
-      return getBlockLocationsInt(pc, src, offset, length, doAccessTime,
+      return getBlockLocationsInt(src, offset, length, doAccessTime,
                                   needBlockToken, checkSafeMode);
     } catch (AccessControlException e) {
       logAuditEvent(false, "open", src);
@@ -1312,14 +1309,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
   }
 
-  private LocatedBlocks getBlockLocationsInt(FSPermissionChecker pc,
-      String src, long offset, long length, boolean doAccessTime,
-      boolean needBlockToken, boolean checkSafeMode)
+  private LocatedBlocks getBlockLocationsInt(String src, long offset,
+      long length, boolean doAccessTime, boolean needBlockToken,
+      boolean checkSafeMode)
       throws FileNotFoundException, UnresolvedLinkException, IOException {
-    if (isPermissionEnabled) {
-      checkPathAccess(pc, src, FsAction.READ);
-    }
-
     if (offset < 0) {
       throw new HadoopIllegalArgumentException(
           "Negative offset is not supported. File: " + src);
@@ -1347,13 +1340,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * Get block locations within the specified range, updating the
    * access times if necessary. 
    */
-  private LocatedBlocks getBlockLocationsUpdateTimes(String src,
-                                                       long offset, 
-                                                       long length,
-                                                       boolean doAccessTime, 
-                                                       boolean needBlockToken)
-      throws FileNotFoundException, UnresolvedLinkException, IOException {
-
+  private LocatedBlocks getBlockLocationsUpdateTimes(String src, long offset,
+      long length, boolean doAccessTime, boolean needBlockToken)
+      throws FileNotFoundException,
+      UnresolvedLinkException, IOException {
+    FSPermissionChecker pc = getPermissionChecker();
     for (int attempt = 0; attempt < 2; attempt++) {
       boolean isReadOp = (attempt == 0);
       if (isReadOp) { // first attempt is with readlock
@@ -1369,6 +1360,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         } else {
           checkOperation(OperationCategory.WRITE);
         }
+        if (isPermissionEnabled) {
+          checkPathAccess(pc, src, FsAction.READ);
+        }
 
         // if the namenode is in safemode, then do not update access time
         if (isInSafeMode()) {
@@ -1411,6 +1405,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    */
   void concat(String target, String [] srcs) 
       throws IOException, UnresolvedLinkException {
+    if(FSNamesystem.LOG.isDebugEnabled()) {
+      FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
+          " to " + target);
+    }
     try {
       concatInt(target, srcs);
     } catch (AccessControlException e) {
@@ -1421,11 +1419,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
   private void concatInt(String target, String [] srcs) 
       throws IOException, UnresolvedLinkException {
-    if(FSNamesystem.LOG.isDebugEnabled()) {
-      FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
-          " to " + target);
-    }
-    
     // verify args
     if(target.isEmpty()) {
       throw new IllegalArgumentException("Target file name is empty");
@@ -1574,6 +1567,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    */
   void setTimes(String src, long mtime, long atime) 
       throws IOException, UnresolvedLinkException {
+    if (!isAccessTimeSupported() && atime != -1) {
+      throw new IOException("Access time for hdfs is not configured. " +
+                            " Please set " + DFS_NAMENODE_ACCESSTIME_PRECISION_KEY + " configuration parameter.");
+    }
     try {
       setTimesInt(src, mtime, atime);
     } catch (AccessControlException e) {
@@ -1584,16 +1581,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
   private void setTimesInt(String src, long mtime, long atime) 
     throws IOException, UnresolvedLinkException {
-    if (!isAccessTimeSupported() && atime != -1) {
-      throw new IOException("Access time for hdfs is not configured. " +
-                            " Please set " + DFS_NAMENODE_ACCESSTIME_PRECISION_KEY + " configuration parameter.");
-    }
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot set times " + src, safeMode);
+      }
 
       // Write access is required to set access and modification times
       if (isPermissionEnabled) {
@@ -1618,6 +1614,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   void createSymlink(String target, String link,
       PermissionStatus dirPerms, boolean createParent) 
       throws IOException, UnresolvedLinkException {
+    if (!DFSUtil.isValidName(link)) {
+      throw new InvalidPathException("Invalid file name: " + link);
+    }
     try {
       createSymlinkInt(target, link, dirPerms, createParent);
     } catch (AccessControlException e) {
@@ -1629,17 +1628,34 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   private void createSymlinkInt(String target, String link,
       PermissionStatus dirPerms, boolean createParent) 
       throws IOException, UnresolvedLinkException {
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.createSymlink: target="
+          + target + " link=" + link);
+    }
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot create symlink " + link, safeMode);
+      }
       if (!createParent) {
         verifyParentDir(link);
       }
-      createSymlinkInternal(pc, target, link, dirPerms, createParent);
+      if (!dir.isValidToCreate(link)) {
+        throw new IOException("failed to create link " + link 
+            +" either because the filename is invalid or the file exists");
+      }
+      if (isPermissionEnabled) {
+        checkAncestorAccess(pc, link, FsAction.WRITE);
+      }
+      // validate that we have enough inodes.
+      checkFsObjectLimit();
+
+      // add symbolic link to namespace
+      dir.addSymlink(link, target, dirPerms, createParent);
       resultingStat = getAuditFileInfo(link, false);
     } finally {
       writeUnlock();
@@ -1648,37 +1664,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     logAuditEvent(true, "createSymlink", link, target, resultingStat);
   }
 
-  /**
-   * Create a symbolic link.
-   */
-  private void createSymlinkInternal(FSPermissionChecker pc, String target,
-      String link, PermissionStatus dirPerms, boolean createParent)
-      throws IOException, UnresolvedLinkException {
-    assert hasWriteLock();
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.createSymlink: target=" + 
-        target + " link=" + link);
-    }
-    if (isInSafeMode()) {
-      throw new SafeModeException("Cannot create symlink " + link, safeMode);
-    }
-    if (!DFSUtil.isValidName(link)) {
-      throw new InvalidPathException("Invalid file name: " + link);
-    }
-    if (!dir.isValidToCreate(link)) {
-      throw new IOException("failed to create link " + link 
-          +" either because the filename is invalid or the file exists");
-    }
-    if (isPermissionEnabled) {
-      checkAncestorAccess(pc, link, FsAction.WRITE);
-    }
-    // validate that we have enough inodes.
-    checkFsObjectLimit();
-
-    // add symbolic link to namespace
-    dir.addSymlink(link, target, dirPerms, createParent);
-  }
-
   /**
    * Set replication for an existing file.
    * 
@@ -1798,13 +1783,24 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throws AccessControlException, SafeModeException,
       FileAlreadyExistsException, UnresolvedLinkException,
       FileNotFoundException, ParentNotDirectoryException, IOException {
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
+          + ", holder=" + holder
+          + ", clientMachine=" + clientMachine
+          + ", createParent=" + createParent
+          + ", replication=" + replication
+          + ", createFlag=" + flag.toString());
+    }
+    if (!DFSUtil.isValidName(src)) {
+      throw new InvalidPathException(src);
+    }
+
     boolean skipSync = false;
     final HdfsFileStatus stat;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
-      checkOperation(OperationCategory.WRITE);
       startFileInternal(pc, src, permissions, holder, clientMachine, flag,
           createParent, replication, blockSize);
       stat = dir.getFileInfo(src, false);
@@ -1847,21 +1843,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       AccessControlException, UnresolvedLinkException, FileNotFoundException,
       ParentNotDirectoryException, IOException {
     assert hasWriteLock();
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
-          + ", holder=" + holder
-          + ", clientMachine=" + clientMachine
-          + ", createParent=" + createParent
-          + ", replication=" + replication
-          + ", createFlag=" + flag.toString());
-    }
+    checkOperation(OperationCategory.WRITE);
     if (isInSafeMode()) {
       throw new SafeModeException("Cannot create file" + src, safeMode);
     }
-    if (!DFSUtil.isValidName(src)) {
-      throw new InvalidPathException(src);
-    }
-
     // Verify that the destination does not exist as a directory already.
     boolean pathExists = dir.exists(src);
     if (pathExists && dir.isDir(src)) {
@@ -1997,21 +1982,20 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    */
   boolean recoverLease(String src, String holder, String clientMachine)
       throws IOException {
+    if (!DFSUtil.isValidName(src)) {
+      throw new IOException("Invalid file name: " + src);
+    }
+  
     boolean skipSync = false;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-
       if (isInSafeMode()) {
         throw new SafeModeException(
             "Cannot recover the lease of " + src, safeMode);
       }
-      if (!DFSUtil.isValidName(src)) {
-        throw new IOException("Invalid file name: " + src);
-      }
-  
       final INodeFile inode = INodeFile.valueOf(dir.getINode(src), src);
       if (!inode.isUnderConstruction()) {
         return true;
@@ -2135,13 +2119,20 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           "Append is not enabled on this NameNode. Use the " +
           DFS_SUPPORT_APPEND_KEY + " configuration option to enable it.");
     }
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: src=" + src
+          + ", holder=" + holder
+          + ", clientMachine=" + clientMachine);
+    }
+    if (!DFSUtil.isValidName(src)) {
+      throw new InvalidPathException(src);
+    }
+
     LocatedBlock lb = null;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
-      checkOperation(OperationCategory.WRITE);
-
       lb = startFileInternal(pc, src, null, holder, clientMachine, 
                         EnumSet.of(CreateFlag.APPEND), 
                         false, blockManager.maxReplication, 0);
@@ -2434,21 +2425,21 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   boolean abandonBlock(ExtendedBlock b, String src, String holder)
       throws LeaseExpiredException, FileNotFoundException,
       UnresolvedLinkException, IOException {
+    if(NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: " + b
+          + "of file " + src);
+    }
     checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      //
-      // Remove the block from the pending creates list
-      //
-      if(NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
-                                      +b+"of file "+src);
-      }
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot abandon block " + b +
                                     " for fle" + src, safeMode);
       }
+      //
+      // Remove the block from the pending creates list
+      //
       INodeFileUnderConstruction file = checkLease(src, holder);
       dir.removeBlock(src, file, ExtendedBlock.getLocalBlock(b));
       if(NameNode.stateChangeLog.isDebugEnabled()) {
@@ -2510,19 +2501,23 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    */
   boolean completeFile(String src, String holder, ExtendedBlock last) 
     throws SafeModeException, UnresolvedLinkException, IOException {
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
+          src + " for " + holder);
+    }
     checkBlock(last);
     boolean success = false;
     checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
-      checkOperation(OperationCategory.WRITE);
-
-      success = completeFileInternal(src, holder, 
-        ExtendedBlock.getLocalBlock(last));
+      success = completeFileInternal(src, holder,
+          ExtendedBlock.getLocalBlock(last));
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
+    NameNode.stateChangeLog.info("DIR* completeFile: " + src + " is closed by "
+        + holder);
     return success;
   }
 
@@ -2530,10 +2525,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       String holder, Block last) throws SafeModeException,
       UnresolvedLinkException, IOException {
     assert hasWriteLock();
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
-          src + " for " + holder);
-    }
+    checkOperation(OperationCategory.WRITE);
     if (isInSafeMode()) {
       throw new SafeModeException("Cannot complete file " + src, safeMode);
     }
@@ -2569,9 +2561,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
 
     finalizeINodeFileUnderConstruction(src, pendingFile);
-
-    NameNode.stateChangeLog.info("DIR* completeFile: " + src + " is closed by "
-        + holder);
     return true;
   }
 
@@ -2672,18 +2661,19 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
   private boolean renameToInt(String src, String dst) 
     throws IOException, UnresolvedLinkException {
-    boolean status = false;
-    HdfsFileStatus resultingStat = null;
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
           " to " + dst);
     }
+    if (!DFSUtil.isValidName(dst)) {
+      throw new IOException("Invalid name: " + dst);
+    }
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
+    boolean status = false;
+    HdfsFileStatus resultingStat = null;
     writeLock();
     try {
-      checkOperation(OperationCategory.WRITE);
-
       status = renameToInternal(pc, src, dst);
       if (status) {
         resultingStat = getAuditFileInfo(dst, false);
@@ -2703,12 +2693,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   private boolean renameToInternal(FSPermissionChecker pc, String src, String dst)
     throws IOException, UnresolvedLinkException {
     assert hasWriteLock();
+      checkOperation(OperationCategory.WRITE);
     if (isInSafeMode()) {
       throw new SafeModeException("Cannot rename " + src, safeMode);
     }
-    if (!DFSUtil.isValidName(dst)) {
-      throw new IOException("Invalid name: " + dst);
-    }
     if (isPermissionEnabled) {
       //We should not be doing this.  This is move() not renameTo().
       //but for now,
@@ -2730,16 +2718,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   /** Rename src to dst */
   void renameTo(String src, String dst, Options.Rename... options)
       throws IOException, UnresolvedLinkException {
-    HdfsFileStatus resultingStat = null;
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
           + src + " to " + dst);
     }
+    if (!DFSUtil.isValidName(dst)) {
+      throw new InvalidPathException("Invalid name: " + dst);
+    }
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
+    HdfsFileStatus resultingStat = null;
     writeLock();
     try {
-      checkOperation(OperationCategory.WRITE);
       renameToInternal(pc, src, dst, options);
       resultingStat = getAuditFileInfo(dst, false);
     } finally {
@@ -2758,12 +2748,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   private void renameToInternal(FSPermissionChecker pc, String src, String dst,
       Options.Rename... options) throws IOException {
     assert hasWriteLock();
+    checkOperation(OperationCategory.WRITE);
     if (isInSafeMode()) {
       throw new SafeModeException("Cannot rename " + src, safeMode);
     }
-    if (!DFSUtil.isValidName(dst)) {
-      throw new InvalidPathException("Invalid name: " + dst);
-    }
     if (isPermissionEnabled) {
       checkParentAccess(pc, src, FsAction.WRITE);
       checkAncestorAccess(pc, dst, FsAction.WRITE);
@@ -2950,16 +2938,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   HdfsFileStatus getFileInfo(String src, boolean resolveLink) 
     throws AccessControlException, UnresolvedLinkException,
            StandbyException, IOException {
+    if (!DFSUtil.isValidName(src)) {
+      throw new InvalidPathException("Invalid file name: " + src);
+    }
     HdfsFileStatus stat = null;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.READ);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-
-      if (!DFSUtil.isValidName(src)) {
-        throw new InvalidPathException("Invalid file name: " + src);
-      }
       if (isPermissionEnabled) {
         checkTraverse(pc, src);
       }
@@ -3016,16 +3003,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
   private boolean mkdirsInt(String src, PermissionStatus permissions,
       boolean createParent) throws IOException, UnresolvedLinkException {
-    HdfsFileStatus resultingStat = null;
-    boolean status = false;
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
     }
+    if (!DFSUtil.isValidName(src)) {
+      throw new InvalidPathException(src);
+    }
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
+    HdfsFileStatus resultingStat = null;
+    boolean status = false;
     writeLock();
     try {
-      checkOperation(OperationCategory.WRITE);
       status = mkdirsInternal(pc, src, permissions, createParent);
       if (status) {
         resultingStat = dir.getFileInfo(src, false);
@@ -3047,6 +3036,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       PermissionStatus permissions, boolean createParent) 
       throws IOException, UnresolvedLinkException {
     assert hasWriteLock();
+    checkOperation(OperationCategory.WRITE);   
     if (isInSafeMode()) {
       throw new SafeModeException("Cannot create directory " + src, safeMode);
     }
@@ -3058,9 +3048,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       // a new directory is not created.
       return true;
     }
-    if (!DFSUtil.isValidName(src)) {
-      throw new InvalidPathException(src);
-    }
     if (isPermissionEnabled) {
       checkAncestorAccess(pc, src, FsAction.WRITE);
     }
@@ -3331,8 +3318,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,
       String[] newtargetstorages)
       throws IOException, UnresolvedLinkException {
-    String src = "";
+    LOG.info("commitBlockSynchronization(lastblock=" + lastblock
+             + ", newgenerationstamp=" + newgenerationstamp
+             + ", newlength=" + newlength
+             + ", newtargets=" + Arrays.asList(newtargets)
+             + ", closeFile=" + closeFile
+             + ", deleteBlock=" + deleteblock
+             + ")");
     checkOperation(OperationCategory.WRITE);
+    String src = "";
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -3344,13 +3338,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           "Cannot commitBlockSynchronization while in safe mode",
           safeMode);
       }
-      LOG.info("commitBlockSynchronization(lastblock=" + lastblock
-               + ", newgenerationstamp=" + newgenerationstamp
-               + ", newlength=" + newlength
-               + ", newtargets=" + Arrays.asList(newtargets)
-               + ", closeFile=" + closeFile
-               + ", deleteBlock=" + deleteblock
-               + ")");
       final BlockInfo storedBlock = blockManager.getStoredBlock(ExtendedBlock
         .getLocalBlock(lastblock));
       if (storedBlock == null) {
@@ -3440,7 +3427,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
       }
@@ -4916,11 +4902,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    */
   void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
     checkOperation(OperationCategory.WRITE);
+    NameNode.stateChangeLog.info("*DIR* reportBadBlocks");
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      
-      NameNode.stateChangeLog.info("*DIR* reportBadBlocks");
       for (int i = 0; i < blocks.length; i++) {
         ExtendedBlock blk = blocks[i].getBlock();
         DatanodeInfo[] nodes = blocks[i].getLocations();
@@ -4983,6 +4968,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       ExtendedBlock newBlock, DatanodeID[] newNodes)
       throws IOException {
     checkOperation(OperationCategory.WRITE);
+    LOG.info("updatePipeline(block=" + oldBlock
+             + ", newGenerationStamp=" + newBlock.getGenerationStamp()
+             + ", newLength=" + newBlock.getNumBytes()
+             + ", newNodes=" + Arrays.asList(newNodes)
+             + ", clientName=" + clientName
+             + ")");
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -4992,12 +4983,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       }
       assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
         + oldBlock + " has different block identifier";
-      LOG.info("updatePipeline(block=" + oldBlock
-               + ", newGenerationStamp=" + newBlock.getGenerationStamp()
-               + ", newLength=" + newBlock.getNumBytes()
-               + ", newNodes=" + Arrays.asList(newNodes)
-               + ", clientName=" + clientName
-               + ")");
       updatePipelineInternal(clientName, oldBlock, newBlock, newNodes);
     } finally {
       writeUnlock();
@@ -5249,7 +5234,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot issue delegation token", safeMode);
       }

+ 9 - 9
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java

@@ -99,7 +99,7 @@ public class TestSafeMode {
    */
   @Test
   public void testManualSafeMode() throws IOException {      
-    fs = (DistributedFileSystem)cluster.getFileSystem();
+    fs = cluster.getFileSystem();
     Path file1 = new Path("/tmp/testManualSafeMode/file1");
     Path file2 = new Path("/tmp/testManualSafeMode/file2");
     
@@ -112,7 +112,7 @@ public class TestSafeMode {
     // now bring up just the NameNode.
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).format(false).build();
     cluster.waitActive();
-    dfs = (DistributedFileSystem)cluster.getFileSystem();
+    dfs = cluster.getFileSystem();
     
     assertTrue("No datanode is started. Should be in SafeMode", 
                dfs.setSafeMode(SafeModeAction.SAFEMODE_GET));
@@ -322,11 +322,11 @@ public class TestSafeMode {
         fs.rename(file1, new Path("file2"));
       }});
 
-    try {
-      fs.setTimes(file1, 0, 0);
-    } catch (IOException ioe) {
-      fail("Set times failed while in SM");
-    }
+    runFsFun("Set time while in SM", new FSRun() {
+      @Override
+      public void run(FileSystem fs) throws IOException {
+        fs.setTimes(file1, 0, 0);
+      }});
 
     try {
       DFSTestUtil.readFile(fs, file1);
@@ -350,7 +350,7 @@ public class TestSafeMode {
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
 
     cluster.restartNameNode();
-    fs = (DistributedFileSystem)cluster.getFileSystem();
+    fs = cluster.getFileSystem();
 
     String tipMsg = cluster.getNamesystem().getSafemode();
     assertTrue("Safemode tip message looks right: " + tipMsg,
@@ -375,7 +375,7 @@ public class TestSafeMode {
    * @throws IOException when there's an issue connecting to the test DFS.
    */
   public void testSafeModeUtils() throws IOException {
-    dfs = (DistributedFileSystem)cluster.getFileSystem();
+    dfs = cluster.getFileSystem();
 
     // Enter safemode.
     dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);