瀏覽代碼

HDFS-4148. Disallow write/modify operations on files and directories in a snapshot. Contributed by Brandon Li.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1409023 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 12 年之前
父節點
當前提交
2116d0520e

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt

@@ -67,3 +67,6 @@ Branch-2802 Snapshot (Unreleased)
 
 
   HDFS-4177. Add a snapshot parameter to INodeDirectory.getChildrenList() for
   HDFS-4177. Add a snapshot parameter to INodeDirectory.getChildrenList() for
   selecting particular snapshot children list views.  (szetszwo)
   selecting particular snapshot children list views.  (szetszwo)
+
+  HDFS-4148. Disallow write/modify operations on files and directories in a
+  snapshot. (Brandon Li via suresh)

+ 25 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -134,6 +134,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotAccessControlException;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
@@ -1311,7 +1312,8 @@ public class DFSClient implements java.io.Closeable {
                                      ParentNotDirectoryException.class,
                                      ParentNotDirectoryException.class,
                                      NSQuotaExceededException.class, 
                                      NSQuotaExceededException.class, 
                                      DSQuotaExceededException.class,
                                      DSQuotaExceededException.class,
-                                     UnresolvedPathException.class);
+                                     UnresolvedPathException.class,
+                                     SnapshotAccessControlException.class);
     }
     }
   }
   }
 
 
@@ -1342,7 +1344,8 @@ public class DFSClient implements java.io.Closeable {
                                      SafeModeException.class,
                                      SafeModeException.class,
                                      DSQuotaExceededException.class,
                                      DSQuotaExceededException.class,
                                      UnsupportedOperationException.class,
                                      UnsupportedOperationException.class,
-                                     UnresolvedPathException.class);
+                                     UnresolvedPathException.class,
+                                     SnapshotAccessControlException.class);
     }
     }
     return DFSOutputStream.newStreamForAppend(this, src, buffersize, progress,
     return DFSOutputStream.newStreamForAppend(this, src, buffersize, progress,
         lastBlock, stat, dfsClientConf.createChecksum());
         lastBlock, stat, dfsClientConf.createChecksum());
@@ -1395,7 +1398,8 @@ public class DFSClient implements java.io.Closeable {
                                      FileNotFoundException.class,
                                      FileNotFoundException.class,
                                      SafeModeException.class,
                                      SafeModeException.class,
                                      DSQuotaExceededException.class,
                                      DSQuotaExceededException.class,
-                                     UnresolvedPathException.class);
+                                     UnresolvedPathException.class,
+                                     SnapshotAccessControlException.class);
     }
     }
   }
   }
 
 
@@ -1413,7 +1417,8 @@ public class DFSClient implements java.io.Closeable {
       throw re.unwrapRemoteException(AccessControlException.class,
       throw re.unwrapRemoteException(AccessControlException.class,
                                      NSQuotaExceededException.class,
                                      NSQuotaExceededException.class,
                                      DSQuotaExceededException.class,
                                      DSQuotaExceededException.class,
-                                     UnresolvedPathException.class);
+                                     UnresolvedPathException.class,
+                                     SnapshotAccessControlException.class);
     }
     }
   }
   }
 
 
@@ -1427,7 +1432,8 @@ public class DFSClient implements java.io.Closeable {
       namenode.concat(trg, srcs);
       namenode.concat(trg, srcs);
     } catch(RemoteException re) {
     } catch(RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
       throw re.unwrapRemoteException(AccessControlException.class,
-                                     UnresolvedPathException.class);
+                                     UnresolvedPathException.class,
+                                     SnapshotAccessControlException.class);
     }
     }
   }
   }
   /**
   /**
@@ -1447,7 +1453,8 @@ public class DFSClient implements java.io.Closeable {
                                      ParentNotDirectoryException.class,
                                      ParentNotDirectoryException.class,
                                      SafeModeException.class,
                                      SafeModeException.class,
                                      NSQuotaExceededException.class,
                                      NSQuotaExceededException.class,
-                                     UnresolvedPathException.class);
+                                     UnresolvedPathException.class,
+                                     SnapshotAccessControlException.class);
     }
     }
   }
   }
   /**
   /**
@@ -1475,7 +1482,8 @@ public class DFSClient implements java.io.Closeable {
       throw re.unwrapRemoteException(AccessControlException.class,
       throw re.unwrapRemoteException(AccessControlException.class,
                                      FileNotFoundException.class,
                                      FileNotFoundException.class,
                                      SafeModeException.class,
                                      SafeModeException.class,
-                                     UnresolvedPathException.class);
+                                     UnresolvedPathException.class,
+                                     SnapshotAccessControlException.class);
     }
     }
   }
   }
   
   
@@ -1797,7 +1805,8 @@ public class DFSClient implements java.io.Closeable {
       throw re.unwrapRemoteException(AccessControlException.class,
       throw re.unwrapRemoteException(AccessControlException.class,
                                      FileNotFoundException.class,
                                      FileNotFoundException.class,
                                      SafeModeException.class,
                                      SafeModeException.class,
-                                     UnresolvedPathException.class);
+                                     UnresolvedPathException.class,
+                                     SnapshotAccessControlException.class);
     }
     }
   }
   }
 
 
@@ -1818,7 +1827,8 @@ public class DFSClient implements java.io.Closeable {
       throw re.unwrapRemoteException(AccessControlException.class,
       throw re.unwrapRemoteException(AccessControlException.class,
                                      FileNotFoundException.class,
                                      FileNotFoundException.class,
                                      SafeModeException.class,
                                      SafeModeException.class,
-                                     UnresolvedPathException.class);                                   
+                                     UnresolvedPathException.class,
+                                     SnapshotAccessControlException.class);                                   
     }
     }
   }
   }
 
 
@@ -2049,7 +2059,8 @@ public class DFSClient implements java.io.Closeable {
                                      SafeModeException.class,
                                      SafeModeException.class,
                                      NSQuotaExceededException.class,
                                      NSQuotaExceededException.class,
                                      DSQuotaExceededException.class,
                                      DSQuotaExceededException.class,
-                                     UnresolvedPathException.class);
+                                     UnresolvedPathException.class,
+                                     SnapshotAccessControlException.class);
     }
     }
   }
   }
   
   
@@ -2092,7 +2103,8 @@ public class DFSClient implements java.io.Closeable {
                                      FileNotFoundException.class,
                                      FileNotFoundException.class,
                                      NSQuotaExceededException.class,
                                      NSQuotaExceededException.class,
                                      DSQuotaExceededException.class,
                                      DSQuotaExceededException.class,
-                                     UnresolvedPathException.class);
+                                     UnresolvedPathException.class,
+                                     SnapshotAccessControlException.class);
     }
     }
   }
   }
 
 
@@ -2108,7 +2120,8 @@ public class DFSClient implements java.io.Closeable {
     } catch(RemoteException re) {
     } catch(RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
       throw re.unwrapRemoteException(AccessControlException.class,
                                      FileNotFoundException.class,
                                      FileNotFoundException.class,
-                                     UnresolvedPathException.class);
+                                     UnresolvedPathException.class,
+                                     SnapshotAccessControlException.class);
     }
     }
   }
   }
 
 

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -69,6 +69,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotAccessControlException;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RemoteException;
@@ -1301,7 +1302,8 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
                                      ParentNotDirectoryException.class,
                                      ParentNotDirectoryException.class,
                                      NSQuotaExceededException.class,
                                      NSQuotaExceededException.class,
                                      SafeModeException.class,
                                      SafeModeException.class,
-                                     UnresolvedPathException.class);
+                                     UnresolvedPathException.class,
+                                     SnapshotAccessControlException.class);
     }
     }
     streamer = new DataStreamer();
     streamer = new DataStreamer();
   }
   }

+ 27 - 13
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotAccessControlException;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.Idempotent;
 import org.apache.hadoop.io.retry.Idempotent;
@@ -163,6 +164,7 @@ public interface ClientProtocol {
    *           quota restriction
    *           quota restriction
    * @throws SafeModeException create not allowed in safemode
    * @throws SafeModeException create not allowed in safemode
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink
+   * @throws SnapshotAccessControlException if path is in RO snapshot
    * @throws IOException If an I/O error occurred
    * @throws IOException If an I/O error occurred
    *
    *
    * RuntimeExceptions:
    * RuntimeExceptions:
@@ -174,7 +176,7 @@ public interface ClientProtocol {
       AlreadyBeingCreatedException, DSQuotaExceededException,
       AlreadyBeingCreatedException, DSQuotaExceededException,
       FileAlreadyExistsException, FileNotFoundException,
       FileAlreadyExistsException, FileNotFoundException,
       NSQuotaExceededException, ParentNotDirectoryException, SafeModeException,
       NSQuotaExceededException, ParentNotDirectoryException, SafeModeException,
-      UnresolvedLinkException, IOException;
+      UnresolvedLinkException, SnapshotAccessControlException, IOException;
 
 
   /**
   /**
    * Append to the end of the file. 
    * Append to the end of the file. 
@@ -194,6 +196,7 @@ public interface ClientProtocol {
    *           restriction
    *           restriction
    * @throws SafeModeException append not allowed in safemode
    * @throws SafeModeException append not allowed in safemode
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink
+   * @throws SnapshotAccessControlException if path is in RO snapshot
    * @throws IOException If an I/O error occurred.
    * @throws IOException If an I/O error occurred.
    *
    *
    * RuntimeExceptions:
    * RuntimeExceptions:
@@ -202,7 +205,7 @@ public interface ClientProtocol {
   public LocatedBlock append(String src, String clientName)
   public LocatedBlock append(String src, String clientName)
       throws AccessControlException, DSQuotaExceededException,
       throws AccessControlException, DSQuotaExceededException,
       FileNotFoundException, SafeModeException, UnresolvedLinkException,
       FileNotFoundException, SafeModeException, UnresolvedLinkException,
-      IOException;
+      SnapshotAccessControlException, IOException;
 
 
   /**
   /**
    * Set replication for an existing file.
    * Set replication for an existing file.
@@ -224,13 +227,14 @@ public interface ClientProtocol {
    * @throws FileNotFoundException If file <code>src</code> is not found
    * @throws FileNotFoundException If file <code>src</code> is not found
    * @throws SafeModeException not allowed in safemode
    * @throws SafeModeException not allowed in safemode
    * @throws UnresolvedLinkException if <code>src</code> contains a symlink
    * @throws UnresolvedLinkException if <code>src</code> contains a symlink
+   * @throws SnapshotAccessControlException if path is in RO snapshot
    * @throws IOException If an I/O error occurred
    * @throws IOException If an I/O error occurred
    */
    */
   @Idempotent
   @Idempotent
   public boolean setReplication(String src, short replication)
   public boolean setReplication(String src, short replication)
       throws AccessControlException, DSQuotaExceededException,
       throws AccessControlException, DSQuotaExceededException,
       FileNotFoundException, SafeModeException, UnresolvedLinkException,
       FileNotFoundException, SafeModeException, UnresolvedLinkException,
-      IOException;
+      SnapshotAccessControlException, IOException;
 
 
   /**
   /**
    * Set permissions for an existing file/directory.
    * Set permissions for an existing file/directory.
@@ -239,12 +243,13 @@ public interface ClientProtocol {
    * @throws FileNotFoundException If file <code>src</code> is not found
    * @throws FileNotFoundException If file <code>src</code> is not found
    * @throws SafeModeException not allowed in safemode
    * @throws SafeModeException not allowed in safemode
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink
+   * @throws SnapshotAccessControlException if path is in RO snapshot
    * @throws IOException If an I/O error occurred
    * @throws IOException If an I/O error occurred
    */
    */
   @Idempotent
   @Idempotent
   public void setPermission(String src, FsPermission permission)
   public void setPermission(String src, FsPermission permission)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       throws AccessControlException, FileNotFoundException, SafeModeException,
-      UnresolvedLinkException, IOException;
+      UnresolvedLinkException, SnapshotAccessControlException, IOException;
 
 
   /**
   /**
    * Set Owner of a path (i.e. a file or a directory).
    * Set Owner of a path (i.e. a file or a directory).
@@ -257,12 +262,13 @@ public interface ClientProtocol {
    * @throws FileNotFoundException If file <code>src</code> is not found
    * @throws FileNotFoundException If file <code>src</code> is not found
    * @throws SafeModeException not allowed in safemode
    * @throws SafeModeException not allowed in safemode
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink
+   * @throws SnapshotAccessControlException if path is in RO snapshot
    * @throws IOException If an I/O error occurred
    * @throws IOException If an I/O error occurred
    */
    */
   @Idempotent
   @Idempotent
   public void setOwner(String src, String username, String groupname)
   public void setOwner(String src, String username, String groupname)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       throws AccessControlException, FileNotFoundException, SafeModeException,
-      UnresolvedLinkException, IOException;
+      UnresolvedLinkException, SnapshotAccessControlException, IOException;
 
 
   /**
   /**
    * The client can give up on a block by calling abandonBlock().
    * The client can give up on a block by calling abandonBlock().
@@ -385,10 +391,11 @@ public interface ClientProtocol {
    * @return true if successful, or false if the old name does not exist
    * @return true if successful, or false if the old name does not exist
    * or if the new name already belongs to the namespace.
    * or if the new name already belongs to the namespace.
    * 
    * 
+   * @throws SnapshotAccessControlException if path is in RO snapshot
    * @throws IOException an I/O error occurred 
    * @throws IOException an I/O error occurred 
    */
    */
   public boolean rename(String src, String dst) 
   public boolean rename(String src, String dst) 
-      throws UnresolvedLinkException, IOException;
+      throws UnresolvedLinkException, SnapshotAccessControlException, IOException;
 
 
   /**
   /**
    * Moves blocks from srcs to trg and delete srcs
    * Moves blocks from srcs to trg and delete srcs
@@ -398,9 +405,10 @@ public interface ClientProtocol {
    * @throws IOException if some arguments are invalid
    * @throws IOException if some arguments are invalid
    * @throws UnresolvedLinkException if <code>trg</code> or <code>srcs</code>
    * @throws UnresolvedLinkException if <code>trg</code> or <code>srcs</code>
    *           contains a symlink
    *           contains a symlink
+   * @throws SnapshotAccessControlException if path is in RO snapshot
    */
    */
   public void concat(String trg, String[] srcs) 
   public void concat(String trg, String[] srcs) 
-      throws IOException, UnresolvedLinkException;
+      throws IOException, UnresolvedLinkException, SnapshotAccessControlException;
 
 
   /**
   /**
    * Rename src to dst.
    * Rename src to dst.
@@ -434,13 +442,14 @@ public interface ClientProtocol {
    * @throws SafeModeException rename not allowed in safemode
    * @throws SafeModeException rename not allowed in safemode
    * @throws UnresolvedLinkException If <code>src</code> or
    * @throws UnresolvedLinkException If <code>src</code> or
    *           <code>dst</code> contains a symlink
    *           <code>dst</code> contains a symlink
+   * @throws SnapshotAccessControlException if path is in RO snapshot
    * @throws IOException If an I/O error occurred
    * @throws IOException If an I/O error occurred
    */
    */
   public void rename2(String src, String dst, Options.Rename... options)
   public void rename2(String src, String dst, Options.Rename... options)
       throws AccessControlException, DSQuotaExceededException,
       throws AccessControlException, DSQuotaExceededException,
       FileAlreadyExistsException, FileNotFoundException,
       FileAlreadyExistsException, FileNotFoundException,
       NSQuotaExceededException, ParentNotDirectoryException, SafeModeException,
       NSQuotaExceededException, ParentNotDirectoryException, SafeModeException,
-      UnresolvedLinkException, IOException;
+      UnresolvedLinkException, SnapshotAccessControlException, IOException;
   
   
   /**
   /**
    * Delete the given file or directory from the file system.
    * Delete the given file or directory from the file system.
@@ -457,11 +466,12 @@ public interface ClientProtocol {
    * @throws FileNotFoundException If file <code>src</code> is not found
    * @throws FileNotFoundException If file <code>src</code> is not found
    * @throws SafeModeException create not allowed in safemode
    * @throws SafeModeException create not allowed in safemode
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink
+   * @throws SnapshotAccessControlException if path is in RO snapshot
    * @throws IOException If an I/O error occurred
    * @throws IOException If an I/O error occurred
    */
    */
   public boolean delete(String src, boolean recursive)
   public boolean delete(String src, boolean recursive)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       throws AccessControlException, FileNotFoundException, SafeModeException,
-      UnresolvedLinkException, IOException;
+      UnresolvedLinkException, SnapshotAccessControlException, IOException;
   
   
   /**
   /**
    * Create a directory (or hierarchy of directories) with the given
    * Create a directory (or hierarchy of directories) with the given
@@ -482,6 +492,7 @@ public interface ClientProtocol {
    *           is not a directory
    *           is not a directory
    * @throws SafeModeException create not allowed in safemode
    * @throws SafeModeException create not allowed in safemode
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink
+   * @throws SnapshotAccessControlException if path is in RO snapshot
    * @throws IOException If an I/O error occurred.
    * @throws IOException If an I/O error occurred.
    *
    *
    * RunTimeExceptions:
    * RunTimeExceptions:
@@ -492,7 +503,7 @@ public interface ClientProtocol {
       throws AccessControlException, FileAlreadyExistsException,
       throws AccessControlException, FileAlreadyExistsException,
       FileNotFoundException, NSQuotaExceededException,
       FileNotFoundException, NSQuotaExceededException,
       ParentNotDirectoryException, SafeModeException, UnresolvedLinkException,
       ParentNotDirectoryException, SafeModeException, UnresolvedLinkException,
-      IOException;
+      SnapshotAccessControlException, IOException;
 
 
   /**
   /**
    * Get a partial listing of the indicated directory
    * Get a partial listing of the indicated directory
@@ -800,12 +811,13 @@ public interface ClientProtocol {
    * @throws QuotaExceededException if the directory size 
    * @throws QuotaExceededException if the directory size 
    *           is greater than the given quota
    *           is greater than the given quota
    * @throws UnresolvedLinkException if the <code>path</code> contains a symlink. 
    * @throws UnresolvedLinkException if the <code>path</code> contains a symlink. 
+   * @throws SnapshotAccessControlException if path is in RO snapshot
    * @throws IOException If an I/O error occurred
    * @throws IOException If an I/O error occurred
    */
    */
   @Idempotent
   @Idempotent
   public void setQuota(String path, long namespaceQuota, long diskspaceQuota)
   public void setQuota(String path, long namespaceQuota, long diskspaceQuota)
       throws AccessControlException, FileNotFoundException,
       throws AccessControlException, FileNotFoundException,
-      UnresolvedLinkException, IOException;
+      UnresolvedLinkException, SnapshotAccessControlException, IOException;
 
 
   /**
   /**
    * Write all metadata for this file into persistent storage.
    * Write all metadata for this file into persistent storage.
@@ -836,12 +848,13 @@ public interface ClientProtocol {
    * @throws AccessControlException permission denied
    * @throws AccessControlException permission denied
    * @throws FileNotFoundException file <code>src</code> is not found
    * @throws FileNotFoundException file <code>src</code> is not found
    * @throws UnresolvedLinkException if <code>src</code> contains a symlink. 
    * @throws UnresolvedLinkException if <code>src</code> contains a symlink. 
+   * @throws SnapshotAccessControlException if path is in RO snapshot
    * @throws IOException If an I/O error occurred
    * @throws IOException If an I/O error occurred
    */
    */
   @Idempotent
   @Idempotent
   public void setTimes(String src, long mtime, long atime)
   public void setTimes(String src, long mtime, long atime)
       throws AccessControlException, FileNotFoundException, 
       throws AccessControlException, FileNotFoundException, 
-      UnresolvedLinkException, IOException;
+      UnresolvedLinkException, SnapshotAccessControlException, IOException;
 
 
   /**
   /**
    * Create symlink to a file or directory.
    * Create symlink to a file or directory.
@@ -859,13 +872,14 @@ public interface ClientProtocol {
    * @throws ParentNotDirectoryException If parent of <code>link</code> is not a
    * @throws ParentNotDirectoryException If parent of <code>link</code> is not a
    *           directory.
    *           directory.
    * @throws UnresolvedLinkException if <code>link</target> contains a symlink. 
    * @throws UnresolvedLinkException if <code>link</target> contains a symlink. 
+   * @throws SnapshotAccessControlException if path is in RO snapshot
    * @throws IOException If an I/O error occurred
    * @throws IOException If an I/O error occurred
    */
    */
   public void createSymlink(String target, String link, FsPermission dirPerm,
   public void createSymlink(String target, String link, FsPermission dirPerm,
       boolean createParent) throws AccessControlException,
       boolean createParent) throws AccessControlException,
       FileAlreadyExistsException, FileNotFoundException,
       FileAlreadyExistsException, FileNotFoundException,
       ParentNotDirectoryException, SafeModeException, UnresolvedLinkException,
       ParentNotDirectoryException, SafeModeException, UnresolvedLinkException,
-      IOException;
+      SnapshotAccessControlException, IOException;
 
 
   /**
   /**
    * Return the target of the given symlink. If there is an intermediate
    * Return the target of the given symlink. If there is an intermediate

+ 107 - 39
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -59,6 +59,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.ByteArray;
 import org.apache.hadoop.hdfs.util.ByteArray;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
@@ -220,6 +221,7 @@ public class FSDirectory implements Closeable {
    * @throws FileAlreadyExistsException
    * @throws FileAlreadyExistsException
    * @throws QuotaExceededException
    * @throws QuotaExceededException
    * @throws UnresolvedLinkException
    * @throws UnresolvedLinkException
+   * @throws SnapshotAccessControlException 
    */
    */
   INodeFileUnderConstruction addFile(String path, 
   INodeFileUnderConstruction addFile(String path, 
                 PermissionStatus permissions,
                 PermissionStatus permissions,
@@ -230,7 +232,7 @@ public class FSDirectory implements Closeable {
                 DatanodeDescriptor clientNode,
                 DatanodeDescriptor clientNode,
                 long generationStamp) 
                 long generationStamp) 
     throws FileAlreadyExistsException, QuotaExceededException,
     throws FileAlreadyExistsException, QuotaExceededException,
-      UnresolvedLinkException {
+      UnresolvedLinkException, SnapshotAccessControlException {
     waitForReady();
     waitForReady();
 
 
     // Always do an implicit mkdirs for parent directory tree.
     // Always do an implicit mkdirs for parent directory tree.
@@ -447,13 +449,14 @@ public class FSDirectory implements Closeable {
   }
   }
 
 
   /**
   /**
+   * @throws SnapshotAccessControlException 
    * @see #unprotectedRenameTo(String, String, long)
    * @see #unprotectedRenameTo(String, String, long)
    * @deprecated Use {@link #renameTo(String, String, Rename...)} instead.
    * @deprecated Use {@link #renameTo(String, String, Rename...)} instead.
    */
    */
   @Deprecated
   @Deprecated
   boolean renameTo(String src, String dst) 
   boolean renameTo(String src, String dst) 
       throws QuotaExceededException, UnresolvedLinkException, 
       throws QuotaExceededException, UnresolvedLinkException, 
-      FileAlreadyExistsException {
+      FileAlreadyExistsException, SnapshotAccessControlException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: "
       NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: "
           +src+" to "+dst);
           +src+" to "+dst);
@@ -503,14 +506,15 @@ public class FSDirectory implements Closeable {
    * @return true if rename succeeds; false otherwise
    * @return true if rename succeeds; false otherwise
    * @throws QuotaExceededException if the operation violates any quota limit
    * @throws QuotaExceededException if the operation violates any quota limit
    * @throws FileAlreadyExistsException if the src is a symlink that points to dst
    * @throws FileAlreadyExistsException if the src is a symlink that points to dst
+   * @throws SnapshotAccessControlException if path is in RO snapshot
    * @deprecated See {@link #renameTo(String, String)}
    * @deprecated See {@link #renameTo(String, String)}
    */
    */
   @Deprecated
   @Deprecated
   boolean unprotectedRenameTo(String src, String dst, long timestamp)
   boolean unprotectedRenameTo(String src, String dst, long timestamp)
     throws QuotaExceededException, UnresolvedLinkException, 
     throws QuotaExceededException, UnresolvedLinkException, 
-    FileAlreadyExistsException {
+    FileAlreadyExistsException, SnapshotAccessControlException {
     assert hasWriteLock();
     assert hasWriteLock();
-    INodesInPath srcInodesInPath = rootDir.getExistingPathINodes(src, false);
+    INodesInPath srcInodesInPath = rootDir.getMutableINodesInPath(src, false);
     INode[] srcInodes = srcInodesInPath.getINodes();
     INode[] srcInodes = srcInodesInPath.getINodes();
     INode srcInode = srcInodes[srcInodes.length-1];
     INode srcInode = srcInodes[srcInodes.length-1];
     
     
@@ -552,6 +556,10 @@ public class FSDirectory implements Closeable {
     byte[][] dstComponents = INode.getPathComponents(dst);
     byte[][] dstComponents = INode.getPathComponents(dst);
     INodesInPath dstInodesInPath = rootDir.getExistingPathINodes(dstComponents,
     INodesInPath dstInodesInPath = rootDir.getExistingPathINodes(dstComponents,
         dstComponents.length, false);
         dstComponents.length, false);
+    if (dstInodesInPath.isSnapshot()) {
+      throw new SnapshotAccessControlException(
+          "Modification on RO snapshot is disallowed");
+    }
     INode[] dstInodes = dstInodesInPath.getINodes();
     INode[] dstInodes = dstInodesInPath.getINodes();
     if (dstInodes[dstInodes.length-1] != null) {
     if (dstInodes[dstInodes.length-1] != null) {
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
@@ -635,7 +643,8 @@ public class FSDirectory implements Closeable {
       }
       }
     }
     }
     String error = null;
     String error = null;
-    final INodesInPath srcInodesInPath = rootDir.getExistingPathINodes(src, false);
+    final INodesInPath srcInodesInPath = rootDir.getMutableINodesInPath(src,
+        false);
     final INode[] srcInodes = srcInodesInPath.getINodes();
     final INode[] srcInodes = srcInodesInPath.getINodes();
     final INode srcInode = srcInodes[srcInodes.length - 1];
     final INode srcInode = srcInodes[srcInodes.length - 1];
     // validate source
     // validate source
@@ -672,8 +681,8 @@ public class FSDirectory implements Closeable {
       throw new IOException(error);
       throw new IOException(error);
     }
     }
     final byte[][] dstComponents = INode.getPathComponents(dst);
     final byte[][] dstComponents = INode.getPathComponents(dst);
-    INodesInPath dstInodesInPath = rootDir.getExistingPathINodes(dstComponents,
-        dstComponents.length, false);
+    final INodesInPath dstInodesInPath = rootDir.getMutableINodesInPath(
+        dstComponents, false);
     final INode[] dstInodes = dstInodesInPath.getINodes();
     final INode[] dstInodes = dstInodesInPath.getINodes();
     INode dstInode = dstInodes[dstInodes.length - 1];
     INode dstInode = dstInodes[dstInodes.length - 1];
     if (dstInodes.length == 1) {
     if (dstInodes.length == 1) {
@@ -804,9 +813,11 @@ public class FSDirectory implements Closeable {
    * @param oldReplication old replication - output parameter
    * @param oldReplication old replication - output parameter
    * @return array of file blocks
    * @return array of file blocks
    * @throws QuotaExceededException
    * @throws QuotaExceededException
+   * @throws SnapshotAccessControlException 
    */
    */
   Block[] setReplication(String src, short replication, short[] oldReplication)
   Block[] setReplication(String src, short replication, short[] oldReplication)
-      throws QuotaExceededException, UnresolvedLinkException {
+      throws QuotaExceededException, UnresolvedLinkException,
+      SnapshotAccessControlException {
     waitForReady();
     waitForReady();
     Block[] fileBlocks = null;
     Block[] fileBlocks = null;
     writeLock();
     writeLock();
@@ -820,14 +831,12 @@ public class FSDirectory implements Closeable {
     }
     }
   }
   }
 
 
-  Block[] unprotectedSetReplication(String src, 
-                                    short replication,
-                                    short[] oldReplication
-                                    ) throws QuotaExceededException, 
-                                    UnresolvedLinkException {
+  Block[] unprotectedSetReplication(String src, short replication,
+      short[] oldReplication) throws QuotaExceededException,
+      UnresolvedLinkException, SnapshotAccessControlException {
     assert hasWriteLock();
     assert hasWriteLock();
 
 
-    final INodesInPath inodesInPath = rootDir.getExistingPathINodes(src, true);
+    final INodesInPath inodesInPath = rootDir.getMutableINodesInPath(src, true);
     final INode[] inodes = inodesInPath.getINodes();
     final INode[] inodes = inodesInPath.getINodes();
     INode inode = inodes[inodes.length - 1];
     INode inode = inodes[inodes.length - 1];
     if (inode == null) {
     if (inode == null) {
@@ -890,8 +899,26 @@ public class FSDirectory implements Closeable {
     }
     }
   }
   }
 
 
+  boolean existsMutable(String src) throws UnresolvedLinkException,
+      SnapshotAccessControlException {
+    src = normalizePath(src);
+    readLock();
+    try {
+      INode inode = rootDir.getMutableNode(src, false);
+      if (inode == null) {
+         return false;
+      }
+      return inode.isDirectory() || inode.isSymlink() 
+        ? true 
+        : ((INodeFile)inode).getBlocks() != null;
+    } finally {
+      readUnlock();
+    }
+  }
+  
   void setPermission(String src, FsPermission permission)
   void setPermission(String src, FsPermission permission)
-      throws FileNotFoundException, UnresolvedLinkException {
+      throws FileNotFoundException, UnresolvedLinkException,
+      SnapshotAccessControlException {
     writeLock();
     writeLock();
     try {
     try {
       unprotectedSetPermission(src, permission);
       unprotectedSetPermission(src, permission);
@@ -900,11 +927,12 @@ public class FSDirectory implements Closeable {
     }
     }
     fsImage.getEditLog().logSetPermissions(src, permission);
     fsImage.getEditLog().logSetPermissions(src, permission);
   }
   }
-
-  void unprotectedSetPermission(String src, FsPermission permissions) 
-      throws FileNotFoundException, UnresolvedLinkException {
+  
+  void unprotectedSetPermission(String src, FsPermission permissions)
+      throws FileNotFoundException, UnresolvedLinkException,
+      SnapshotAccessControlException {
     assert hasWriteLock();
     assert hasWriteLock();
-    INode inode = rootDir.getNode(src, true);
+    INode inode = rootDir.getMutableNode(src, true);
     if (inode == null) {
     if (inode == null) {
       throw new FileNotFoundException("File does not exist: " + src);
       throw new FileNotFoundException("File does not exist: " + src);
     }
     }
@@ -912,7 +940,8 @@ public class FSDirectory implements Closeable {
   }
   }
 
 
   void setOwner(String src, String username, String groupname)
   void setOwner(String src, String username, String groupname)
-      throws FileNotFoundException, UnresolvedLinkException {
+      throws FileNotFoundException, UnresolvedLinkException,
+      SnapshotAccessControlException {
     writeLock();
     writeLock();
     try {
     try {
       unprotectedSetOwner(src, username, groupname);
       unprotectedSetOwner(src, username, groupname);
@@ -922,10 +951,11 @@ public class FSDirectory implements Closeable {
     fsImage.getEditLog().logSetOwner(src, username, groupname);
     fsImage.getEditLog().logSetOwner(src, username, groupname);
   }
   }
 
 
-  void unprotectedSetOwner(String src, String username, String groupname) 
-      throws FileNotFoundException, UnresolvedLinkException {
+  void unprotectedSetOwner(String src, String username, String groupname)
+      throws FileNotFoundException, UnresolvedLinkException,
+      SnapshotAccessControlException {
     assert hasWriteLock();
     assert hasWriteLock();
-    INode inode = rootDir.getNode(src, true);
+    INode inode = rootDir.getMutableNode(src, true);
     if (inode == null) {
     if (inode == null) {
       throw new FileNotFoundException("File does not exist: " + src);
       throw new FileNotFoundException("File does not exist: " + src);
     }
     }
@@ -1020,7 +1050,7 @@ public class FSDirectory implements Closeable {
     int filesRemoved;
     int filesRemoved;
     writeLock();
     writeLock();
     try {
     try {
-      final INodesInPath inodesInPath = rootDir.getExistingPathINodes(
+      final INodesInPath inodesInPath = rootDir.getMutableINodesInPath(
           normalizePath(src), false);
           normalizePath(src), false);
       final INode[] inodes = inodesInPath.getINodes();
       final INode[] inodes = inodesInPath.getINodes();
       if (checkPathINodes(inodes, src) == 0) {
       if (checkPathINodes(inodes, src) == 0) {
@@ -1095,14 +1125,15 @@ public class FSDirectory implements Closeable {
    * <br>
    * <br>
    * @param src a string representation of a path to an inode
    * @param src a string representation of a path to an inode
    * @param mtime the time the inode is removed
    * @param mtime the time the inode is removed
+   * @throws SnapshotAccessControlException if path is in RO snapshot
    */ 
    */ 
   void unprotectedDelete(String src, long mtime) 
   void unprotectedDelete(String src, long mtime) 
-    throws UnresolvedLinkException {
+    throws UnresolvedLinkException, SnapshotAccessControlException {
     assert hasWriteLock();
     assert hasWriteLock();
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
     int filesRemoved = 0;
     int filesRemoved = 0;
 
 
-    final INodesInPath inodesInPath = rootDir.getExistingPathINodes(
+    final INodesInPath inodesInPath = rootDir.getMutableINodesInPath(
         normalizePath(src), false);
         normalizePath(src), false);
     final INode[] inodes = inodesInPath.getINodes();
     final INode[] inodes = inodesInPath.getINodes();
     if (checkPathINodes(inodes, src) == 0) {
     if (checkPathINodes(inodes, src) == 0) {
@@ -1324,6 +1355,20 @@ public class FSDirectory implements Closeable {
     }
     }
   }
   }
   
   
+  /**
+   * Get {@link INode} associated with the file / directory.
+   * @throws SnapshotAccessControlException if path is in RO snapshot
+   */
+  public INode getMutableINode(String src) throws UnresolvedLinkException,
+      SnapshotAccessControlException {
+    readLock();
+    try {
+      return rootDir.getMutableNode(src, true);
+    } finally {
+      readUnlock();
+    }
+  }
+
   /**
   /**
    * Get the parent node of path.
    * Get the parent node of path.
    * 
    * 
@@ -1342,14 +1387,15 @@ public class FSDirectory implements Closeable {
   
   
   /** 
   /** 
    * Check whether the filepath could be created
    * Check whether the filepath could be created
+   * @throws SnapshotAccessControlException if path is in RO snapshot
    */
    */
-  boolean isValidToCreate(String src) throws UnresolvedLinkException {
+  boolean isValidToCreate(String src) throws UnresolvedLinkException,
+      SnapshotAccessControlException {
     String srcs = normalizePath(src);
     String srcs = normalizePath(src);
     readLock();
     readLock();
     try {
     try {
-      if (srcs.startsWith("/") && 
-          !srcs.endsWith("/") && 
-          rootDir.getNode(srcs, false) == null) {
+      if (srcs.startsWith("/") && !srcs.endsWith("/")
+          && rootDir.getMutableNode(srcs, false) == null) {
         return true;
         return true;
       } else {
       } else {
         return false;
         return false;
@@ -1372,6 +1418,22 @@ public class FSDirectory implements Closeable {
       readUnlock();
       readUnlock();
     }
     }
   }
   }
+  
+  /**
+   * Check whether the path specifies a directory
+   * @throws SnapshotAccessControlException if path is in RO snapshot
+   */
+  boolean isDirMutable(String src) throws UnresolvedLinkException,
+      SnapshotAccessControlException {
+    src = normalizePath(src);
+    readLock();
+    try {
+      INode node = rootDir.getMutableNode(src, false);
+      return node != null && node.isDirectory();
+    } finally {
+      readUnlock();
+    }
+  }
 
 
   /** Updates namespace and diskspace consumed for all
   /** Updates namespace and diskspace consumed for all
    * directories until the parent directory of file represented by path.
    * directories until the parent directory of file represented by path.
@@ -1510,11 +1572,12 @@ public class FSDirectory implements Closeable {
    * @throws QuotaExceededException if directory creation violates 
    * @throws QuotaExceededException if directory creation violates 
    *                                any quota limit
    *                                any quota limit
    * @throws UnresolvedLinkException if a symlink is encountered in src.                      
    * @throws UnresolvedLinkException if a symlink is encountered in src.                      
+   * @throws SnapshotAccessControlException if path is in RO snapshot
    */
    */
   boolean mkdirs(String src, PermissionStatus permissions,
   boolean mkdirs(String src, PermissionStatus permissions,
       boolean inheritPermission, long now)
       boolean inheritPermission, long now)
       throws FileAlreadyExistsException, QuotaExceededException, 
       throws FileAlreadyExistsException, QuotaExceededException, 
-             UnresolvedLinkException {
+             UnresolvedLinkException, SnapshotAccessControlException {
     src = normalizePath(src);
     src = normalizePath(src);
     String[] names = INode.getPathNames(src);
     String[] names = INode.getPathNames(src);
     byte[][] components = INode.getPathComponents(names);
     byte[][] components = INode.getPathComponents(names);
@@ -1524,6 +1587,10 @@ public class FSDirectory implements Closeable {
     try {
     try {
       INodesInPath inodesInPath = rootDir.getExistingPathINodes(components,
       INodesInPath inodesInPath = rootDir.getExistingPathINodes(components,
           components.length, false);
           components.length, false);
+      if (inodesInPath.isSnapshot()) {
+        throw new SnapshotAccessControlException(
+            "Modification on RO snapshot is disallowed");
+      }
       INode[] inodes = inodesInPath.getINodes();
       INode[] inodes = inodesInPath.getINodes();
 
 
       // find the index of the first null in inodes[]
       // find the index of the first null in inodes[]
@@ -1935,10 +2002,11 @@ public class FSDirectory implements Closeable {
    * @throws QuotaExceededException if the directory tree size is 
    * @throws QuotaExceededException if the directory tree size is 
    *                                greater than the given quota
    *                                greater than the given quota
    * @throws UnresolvedLinkException if a symlink is encountered in src.
    * @throws UnresolvedLinkException if a symlink is encountered in src.
+   * @throws SnapshotAccessControlException if path is in RO snapshot
    */
    */
   INodeDirectory unprotectedSetQuota(String src, long nsQuota, long dsQuota)
   INodeDirectory unprotectedSetQuota(String src, long nsQuota, long dsQuota)
-    throws FileNotFoundException, QuotaExceededException, 
-      UnresolvedLinkException {
+      throws FileNotFoundException, QuotaExceededException,
+      UnresolvedLinkException, SnapshotAccessControlException {
     assert hasWriteLock();
     assert hasWriteLock();
     // sanity check
     // sanity check
     if ((nsQuota < 0 && nsQuota != HdfsConstants.QUOTA_DONT_SET && 
     if ((nsQuota < 0 && nsQuota != HdfsConstants.QUOTA_DONT_SET && 
@@ -1951,9 +2019,8 @@ public class FSDirectory implements Closeable {
     }
     }
     
     
     String srcs = normalizePath(src);
     String srcs = normalizePath(src);
-
-    final INodesInPath inodesInPath = rootDir.getExistingPathINodes(src, true);
-    final INode[] inodes = inodesInPath.getINodes();
+    final INode[] inodes = rootDir.getMutableINodesInPath(srcs, true)
+        .getINodes();
     INode targetNode = inodes[inodes.length-1];
     INode targetNode = inodes[inodes.length-1];
     if (targetNode == null) {
     if (targetNode == null) {
       throw new FileNotFoundException("Directory does not exist: " + srcs);
       throw new FileNotFoundException("Directory does not exist: " + srcs);
@@ -1998,11 +2065,12 @@ public class FSDirectory implements Closeable {
   /**
   /**
    * See {@link ClientProtocol#setQuota(String, long, long)} for the 
    * See {@link ClientProtocol#setQuota(String, long, long)} for the 
    * contract.
    * contract.
+   * @throws SnapshotAccessControlException if path is in RO snapshot
    * @see #unprotectedSetQuota(String, long, long)
    * @see #unprotectedSetQuota(String, long, long)
    */
    */
-  void setQuota(String src, long nsQuota, long dsQuota) 
-    throws FileNotFoundException, QuotaExceededException,
-    UnresolvedLinkException { 
+  void setQuota(String src, long nsQuota, long dsQuota)
+      throws FileNotFoundException, QuotaExceededException,
+      UnresolvedLinkException, SnapshotAccessControlException {
     writeLock();
     writeLock();
     try {
     try {
       INodeDirectory dir = unprotectedSetQuota(src, nsQuota, dsQuota);
       INodeDirectory dir = unprotectedSetQuota(src, nsQuota, dsQuota);

+ 6 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -1396,7 +1396,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     // replication and blocks sizes should be the same for ALL the blocks
     // replication and blocks sizes should be the same for ALL the blocks
 
 
     // check the target
     // check the target
-    final INodeFile trgInode = INodeFile.valueOf(dir.getINode(target), target);
+    final INodeFile trgInode = INodeFile.valueOf(dir.getMutableINode(target),
+        target);
     if(trgInode.isUnderConstruction()) {
     if(trgInode.isUnderConstruction()) {
       throw new HadoopIllegalArgumentException("concat: target file "
       throw new HadoopIllegalArgumentException("concat: target file "
           + target + " is under construction");
           + target + " is under construction");
@@ -1431,7 +1432,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       if(i==srcs.length-1)
       if(i==srcs.length-1)
         endSrc=true;
         endSrc=true;
 
 
-      final INodeFile srcInode = INodeFile.valueOf(dir.getINode(src), src);
+      final INodeFile srcInode = INodeFile.valueOf(dir.getMutableINode(src), src);
       if(src.isEmpty() 
       if(src.isEmpty() 
           || srcInode.isUnderConstruction()
           || srcInode.isUnderConstruction()
           || srcInode.numBlocks() == 0) {
           || srcInode.numBlocks() == 0) {
@@ -1514,7 +1515,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       if (isPermissionEnabled) {
       if (isPermissionEnabled) {
         checkPathAccess(src, FsAction.WRITE);
         checkPathAccess(src, FsAction.WRITE);
       }
       }
-      INode inode = dir.getINode(src);
+      INode inode = dir.getMutableINode(src);
       if (inode != null) {
       if (inode != null) {
         dir.setTimes(src, inode, mtime, atime, true);
         dir.setTimes(src, inode, mtime, atime, true);
         if (auditLog.isInfoEnabled() && isExternalInvocation()) {
         if (auditLog.isInfoEnabled() && isExternalInvocation()) {
@@ -1787,7 +1788,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
     }
 
 
     // Verify that the destination does not exist as a directory already.
     // Verify that the destination does not exist as a directory already.
-    boolean pathExists = dir.exists(src);
+    boolean pathExists = dir.existsMutable(src);
     if (pathExists && dir.isDir(src)) {
     if (pathExists && dir.isDir(src)) {
       throw new FileAlreadyExistsException("Cannot create file " + src
       throw new FileAlreadyExistsException("Cannot create file " + src
           + "; already exists as a directory.");
           + "; already exists as a directory.");
@@ -2929,7 +2930,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     if (isPermissionEnabled) {
     if (isPermissionEnabled) {
       checkTraverse(src);
       checkTraverse(src);
     }
     }
-    if (dir.isDir(src)) {
+    if (dir.isDirMutable(src)) {
       // all the users of mkdirs() are used to expect 'true' even if
       // all the users of mkdirs() are used to expect 'true' even if
       // a new directory is not created.
       // a new directory is not created.
       return true;
       return true;

+ 39 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
@@ -136,6 +137,44 @@ public class INodeDirectory extends INode {
     return getINodesInPath(path, resolveLink).getINode(0);
     return getINodesInPath(path, resolveLink).getINode(0);
   }
   }
 
 
+  /**
+   * @return the INode of the last component in src, or null if the last
+   * component does not exist.
+   * @throws UnresolvedLinkException if symlink can't be resolved
+   * @throws SnapshotAccessControlException if path is in RO snapshot
+   */
+  INode getMutableNode(String src, boolean resolveLink)
+      throws UnresolvedLinkException, SnapshotAccessControlException {
+    INode[] inodes = getMutableINodesInPath(src, resolveLink).getINodes();
+    return inodes[inodes.length - 1];
+  }
+
+  /**
+   * @return the INodesInPath of the components in src
+   * @throws UnresolvedLinkException if symlink can't be resolved
+   * @throws SnapshotAccessControlException if path is in RO snapshot
+   */
+  INodesInPath getMutableINodesInPath(String src, boolean resolveLink)
+      throws UnresolvedLinkException, SnapshotAccessControlException {
+    return getMutableINodesInPath(INode.getPathComponents(src), resolveLink);
+  }
+  
+  /**
+   * @return the INodesInPath of the components in src
+   * @throws UnresolvedLinkException if symlink can't be resolved
+   * @throws SnapshotAccessControlException if path is in RO snapshot
+   */
+  INodesInPath getMutableINodesInPath(byte[][] components, boolean resolveLink)
+      throws UnresolvedLinkException, SnapshotAccessControlException {
+    INodesInPath inodesInPath = getExistingPathINodes(components,
+        components.length, resolveLink);
+    if (inodesInPath.isSnapshot()) {
+      throw new SnapshotAccessControlException(
+          "Modification on RO snapshot is disallowed");
+    }
+    return inodesInPath;
+  }
+
   /**
   /**
    * Retrieve existing INodes from a path. If existing is big enough to store
    * Retrieve existing INodes from a path. If existing is big enough to store
    * all path components (existing and non-existing), then existing INodes
    * all path components (existing and non-existing), then existing INodes

+ 33 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotAccessControlException.java

@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.snapshot;
+
+import org.apache.hadoop.security.AccessControlException;
+
+/** Snapshot access related exception. */
+public class SnapshotAccessControlException extends AccessControlException {
+  private static final long serialVersionUID = 1L;
+
+  public SnapshotAccessControlException(final String message) {
+    super(message);
+  }
+
+  public SnapshotAccessControlException(final Throwable cause) {
+    super(cause);
+  }
+}

+ 156 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestDisallowModifyROSnapshot.java

@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.snapshot;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import java.util.ArrayList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * This class tests snapshot functionality. One or multiple snapshots are
+ * created. The snapshotted directory is changed and verification is done to
+ * ensure snapshots remain unchanges.
+ */
+public class TestDisallowModifyROSnapshot {
+  private final static Path dir = new Path("/TestSnapshot");
+  private final static Path sub1 = new Path(dir, "sub1");
+  private final static Path sub2 = new Path(dir, "sub2");
+
+  protected static Configuration conf;
+  protected static MiniDFSCluster cluster;
+  protected static FSNamesystem fsn;
+  protected static DistributedFileSystem fs;
+
+  /**
+   * The list recording all previous snapshots. Each element in the array
+   * records a snapshot root.
+   */
+  protected static ArrayList<Path> snapshotList = new ArrayList<Path>();
+  static Path objInSnapshot = null;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    conf = new Configuration();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+
+    fsn = cluster.getNamesystem();
+    fs = cluster.getFileSystem();
+
+    Path path1 = new Path(sub1, "dir1");
+    assertTrue(fs.mkdirs(path1));
+    Path path2 = new Path(sub2, "dir2");
+    assertTrue(fs.mkdirs(path2));
+    SnapshotTestHelper.createSnapshot(fs, sub1, "testSnapshot");
+    objInSnapshot = SnapshotTestHelper.getSnapshotPath(sub1, "testSnapshot",
+        "dir1");
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test(expected = SnapshotAccessControlException.class)
+  public void testSetReplication() throws Exception {
+    fs.setReplication(objInSnapshot, (short) 1);
+  }
+
+  @Test(expected = SnapshotAccessControlException.class)
+  public void testSetPermission() throws Exception {
+    fs.setPermission(objInSnapshot, new FsPermission("777"));
+  }
+
+  @Test(expected = SnapshotAccessControlException.class)
+  public void testSetOwner() throws Exception {
+    fs.setOwner(objInSnapshot, "username", "groupname");
+  }
+
+  @Test
+  public void testRename() throws Exception {
+    try {
+      fs.rename(objInSnapshot, new Path("/invalid/path"));
+      fail("Didn't throw SnapshotAccessControlException");
+    } catch (SnapshotAccessControlException e) { /* Ignored */ }
+
+    try {
+      fs.rename(sub2, objInSnapshot);
+      fail("Didn't throw SnapshotAccessControlException");
+    } catch (SnapshotAccessControlException e) { /* Ignored */ }
+
+    try {
+      fs.rename(sub2, objInSnapshot, (Options.Rename) null);
+      fail("Didn't throw SnapshotAccessControlException");
+    } catch (SnapshotAccessControlException e) { /* Ignored */ }
+  }
+
+  @Test(expected = SnapshotAccessControlException.class)
+  public void testDelete() throws Exception {
+    fs.delete(objInSnapshot, true);
+  }
+
+  @Test(expected = SnapshotAccessControlException.class)
+  public void testQuota() throws Exception {
+    fs.setQuota(objInSnapshot, 100, 100);
+  }
+
+  @Test(expected = SnapshotAccessControlException.class)
+  public void testSetTime() throws Exception {
+    fs.setTimes(objInSnapshot, 100, 100);
+  }
+
+  @Test(expected = SnapshotAccessControlException.class)
+  public void testCreate() throws Exception {
+    @SuppressWarnings("deprecation")
+    DFSClient dfsclient = new DFSClient(conf);
+    dfsclient.create(objInSnapshot.toString(), true);
+  }
+
+  @Test(expected = SnapshotAccessControlException.class)
+  public void testAppend() throws Exception {
+    fs.append(objInSnapshot, 65535, null);
+  }
+
+  @Test(expected = SnapshotAccessControlException.class)
+  public void testMkdir() throws Exception {
+    fs.mkdirs(objInSnapshot, new FsPermission("777"));
+  }
+
+  @Test(expected = SnapshotAccessControlException.class)
+  public void testCreateSymlink() throws Exception {
+    @SuppressWarnings("deprecation")
+    DFSClient dfsclient = new DFSClient(conf);
+    // TODO: if link is objInSnapshot, ParentNotDirectoryException got thrown
+    // first by verifyParentDir()
+    dfsclient.createSymlink(sub2.toString(), "/TestSnapshot/sub1/.snapshot",
+        false);
+  }
+}