Procházet zdrojové kódy

HADOOP-1298. Implement permissions for HDFS, disabled by default. Contributed by Nicholas and taton.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@609029 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting před 17 roky
rodič
revize
3e4fa8b592
29 změnil soubory, kde provedl 1102 přidání a 160 odebrání
  1. 4 0
      CHANGES.txt
  2. 18 0
      conf/hadoop-default.xml
  3. 3 3
      src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java
  4. 37 3
      src/java/org/apache/hadoop/dfs/ClientProtocol.java
  5. 53 5
      src/java/org/apache/hadoop/dfs/DFSClient.java
  6. 5 1
      src/java/org/apache/hadoop/dfs/DFSFileInfo.java
  7. 18 4
      src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
  8. 2 2
      src/java/org/apache/hadoop/dfs/FSConstants.java
  9. 62 13
      src/java/org/apache/hadoop/dfs/FSDirectory.java
  10. 55 8
      src/java/org/apache/hadoop/dfs/FSEditLog.java
  11. 9 2
      src/java/org/apache/hadoop/dfs/FSImage.java
  12. 162 38
      src/java/org/apache/hadoop/dfs/FSNamesystem.java
  13. 135 29
      src/java/org/apache/hadoop/dfs/INode.java
  14. 22 8
      src/java/org/apache/hadoop/dfs/NameNode.java
  15. 8 6
      src/java/org/apache/hadoop/dfs/NamenodeFsck.java
  16. 178 0
      src/java/org/apache/hadoop/dfs/PermissionChecker.java
  17. 72 0
      src/java/org/apache/hadoop/dfs/SerialNumberManager.java
  18. 12 0
      src/java/org/apache/hadoop/fs/permission/FsPermission.java
  19. 22 0
      src/java/org/apache/hadoop/fs/permission/PermissionStatus.java
  20. 14 4
      src/java/org/apache/hadoop/ipc/Server.java
  21. 4 8
      src/java/org/apache/hadoop/security/UnixUserGroupInformation.java
  22. 4 6
      src/java/org/apache/hadoop/security/UserGroupInformation.java
  23. 5 3
      src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java
  24. 1 1
      src/test/org/apache/hadoop/dfs/TestBalancer.java
  25. 9 9
      src/test/org/apache/hadoop/dfs/TestBlockReplacement.java
  26. 5 1
      src/test/org/apache/hadoop/dfs/TestEditLog.java
  27. 4 1
      src/test/org/apache/hadoop/dfs/TestGetBlocks.java
  28. 2 5
      src/test/org/apache/hadoop/dfs/TestReplication.java
  29. 177 0
      src/test/org/apache/hadoop/security/TestPermission.java

+ 4 - 0
CHANGES.txt

@@ -56,6 +56,10 @@ Trunk (unreleased changes)
 
 
     HADOOP-2336. Shell commands to modify file permissions. (rangadi)
     HADOOP-2336. Shell commands to modify file permissions. (rangadi)
 
 
+    HADOOP-1298. Implement file permissions for HDFS, disabled by
+    default.  Enable with dfs.permissions=true.
+    (Tsz Wo (Nicholas) & taton via cutting)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     HADOOP-2045.  Change committer list on website to a table, so that
     HADOOP-2045.  Change committer list on website to a table, so that

+ 18 - 0
conf/hadoop-default.xml

@@ -279,6 +279,24 @@ creations/deletions), or "all".</description>
       directories, for redundancy. </description>
       directories, for redundancy. </description>
 </property>
 </property>
 
 
+<property>
+  <name>dfs.permissions</name>
+  <value>false</value>
+  <description>
+    If "true", enable permission checking in HDFS.
+    If "false", permission checking is turned off,
+    but all other behavior is unchanged.
+    Switching from one parameter value to the other does not change the mode,
+    owner or group of files or directories.
+  </description>
+</property>
+
+<property>
+  <name>dfs.permissions.supergroup</name>
+  <value>supergroup</value>
+  <description>The name of the group of super-users.</description>
+</property>
+
 <property>
 <property>
   <name>dfs.client.buffer.dir</name>
   <name>dfs.client.buffer.dir</name>
   <value>${hadoop.tmp.dir}/dfs/tmp</value>
   <value>${hadoop.tmp.dir}/dfs/tmp</value>

+ 3 - 3
src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java

@@ -2074,7 +2074,7 @@ class BlockCrcUpgradeObjectNamenode extends UpgradeObjectNamenode {
   }
   }
   
   
   void updateBlockLevelStats(String path, BlockLevelStats stats) {
   void updateBlockLevelStats(String path, BlockLevelStats stats) {
-    DFSFileInfo[] fileArr = getFSNamesystem().getListing(path);
+    DFSFileInfo[] fileArr = getFSNamesystem().dir.getListing(path);
     
     
     for (DFSFileInfo file:fileArr) {
     for (DFSFileInfo file:fileArr) {
       if (file.isDir()) {
       if (file.isDir()) {
@@ -2083,7 +2083,7 @@ class BlockCrcUpgradeObjectNamenode extends UpgradeObjectNamenode {
         // Get the all the blocks.
         // Get the all the blocks.
         LocatedBlocks blockLoc = null;
         LocatedBlocks blockLoc = null;
         try {
         try {
-          blockLoc = getFSNamesystem().getBlockLocations(null,
+          blockLoc = getFSNamesystem().getBlockLocationsInternal(null,
               file.getPath().toString(), 0, file.getLen());
               file.getPath().toString(), 0, file.getLen());
           int numBlocks = blockLoc.locatedBlockCount();
           int numBlocks = blockLoc.locatedBlockCount();
           for (int i=0; i<numBlocks; i++) {
           for (int i=0; i<numBlocks; i++) {
@@ -2142,7 +2142,7 @@ class BlockCrcUpgradeObjectNamenode extends UpgradeObjectNamenode {
   
   
   private int deleteCrcFiles(String path) {
   private int deleteCrcFiles(String path) {
     // Recursively deletes files
     // Recursively deletes files
-    DFSFileInfo[] fileArr = getFSNamesystem().getListing(path);
+    DFSFileInfo[] fileArr = getFSNamesystem().dir.getListing(path);
     
     
     int numFilesDeleted = 0;
     int numFilesDeleted = 0;
     
     

+ 37 - 3
src/java/org/apache/hadoop/dfs/ClientProtocol.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.dfs;
 import java.io.*;
 import java.io.*;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.dfs.FSConstants.UpgradeAction;
 import org.apache.hadoop.dfs.FSConstants.UpgradeAction;
+import org.apache.hadoop.fs.permission.*;
 
 
 /**********************************************************************
 /**********************************************************************
  * ClientProtocol is used by a piece of DFS user code to communicate 
  * ClientProtocol is used by a piece of DFS user code to communicate 
@@ -38,8 +39,9 @@ interface ClientProtocol extends VersionedProtocol {
    * 20 : getContentLength returns the total size in bytes of a directory subtree
    * 20 : getContentLength returns the total size in bytes of a directory subtree
    * 21 : add lease holder as a parameter in abandonBlock(...)
    * 21 : add lease holder as a parameter in abandonBlock(...)
    * 22 : Serialization of FileStatus has changed.
    * 22 : Serialization of FileStatus has changed.
+   * 23 : added setOwner(...) and setPermission(...); changed create(...) and mkdir(...)
    */
    */
-  public static final long versionID = 22L;
+  public static final long versionID = 23L;
   
   
   ///////////////////////////////////////
   ///////////////////////////////////////
   // File contents
   // File contents
@@ -95,8 +97,16 @@ interface ClientProtocol extends VersionedProtocol {
    * Blocks have a maximum size.  Clients that intend to
    * Blocks have a maximum size.  Clients that intend to
    * create multi-block files must also use reportWrittenBlock()
    * create multi-block files must also use reportWrittenBlock()
    * and addBlock().
    * and addBlock().
+   *
+   * If permission denied,
+   * an {@link AccessControlException} will be thrown as an
+   * {@link org.apache.hadoop.ipc.RemoteException}.
+   *
+   * @param src The path of the directory being created
+   * @param masked The masked permission
    */
    */
   public void create(String src, 
   public void create(String src, 
+                     FsPermission masked,
                              String clientName, 
                              String clientName, 
                              boolean overwrite, 
                              boolean overwrite, 
                              short replication,
                              short replication,
@@ -121,6 +131,22 @@ interface ClientProtocol extends VersionedProtocol {
                                 short replication
                                 short replication
                                 ) throws IOException;
                                 ) throws IOException;
 
 
+  /**
+   * Set permissions for an existing file/directory.
+   */
+  public void setPermission(String src, FsPermission permission
+      ) throws IOException;
+
+  /**
+   * Set owner of a path (i.e. a file or a directory).
+   * The parameters username and groupname cannot both be null.
+   * @param src
+   * @param username If it is null, the original username remains unchanged.
+   * @param groupname If it is null, the original groupname remains unchanged.
+   */
+  public void setOwner(String src, String username, String groupname
+      ) throws IOException;
+
   /**
   /**
    * If the client has not yet called reportWrittenBlock(), it can
    * If the client has not yet called reportWrittenBlock(), it can
    * give up on it by calling abandonBlock().  The client can then
    * give up on it by calling abandonBlock().  The client can then
@@ -203,9 +229,17 @@ interface ClientProtocol extends VersionedProtocol {
 
 
   /**
   /**
    * Create a directory (or hierarchy of directories) with the given
    * Create a directory (or hierarchy of directories) with the given
-   * name.
+   * name and permission.
+   *
+   * If permission denied,
+   * an {@link AccessControlException} will be thrown as an
+   * {@link org.apache.hadoop.ipc.RemoteException}.
+   *
+   * @param src The path of the directory being created
+   * @param masked The masked permission of the directory being created
+   * @return True if the operation success.
    */
    */
-  public boolean mkdirs(String src) throws IOException;
+  public boolean mkdirs(String src, FsPermission masked) throws IOException;
 
 
   /**
   /**
    * Get a listing of the indicated directory
    * Get a listing of the indicated directory

+ 53 - 5
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -22,6 +22,7 @@ import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.conf.*;
@@ -332,18 +333,38 @@ class DFSClient implements FSConstants {
     return create(src, overwrite, replication, blockSize, progress,
     return create(src, overwrite, replication, blockSize, progress,
         conf.getInt("io.file.buffer.size", 4096));
         conf.getInt("io.file.buffer.size", 4096));
   }
   }
+  /**
+   * Call
+   * {@link #create(String,FsPermission,boolean,short,long,Progressable,int)}
+   * with default permission.
+   * @see FsPermission#getDefault(Configuration)
+   */
+  public OutputStream create(String src,
+      boolean overwrite,
+      short replication,
+      long blockSize,
+      Progressable progress,
+      int buffersize
+      ) throws IOException {
+    return create(src, FsPermission.getDefault(),
+        overwrite, replication, blockSize, progress, buffersize);
+  }
   /**
   /**
    * Create a new dfs file with the specified block replication 
    * Create a new dfs file with the specified block replication 
    * with write-progress reporting and return an output stream for writing
    * with write-progress reporting and return an output stream for writing
    * into the file.  
    * into the file.  
    * 
    * 
    * @param src stream name
    * @param src stream name
+   * @param permission The permission of the directory being created.
+   * If permission == null, use {@link FsPermission#getDefault()}.
    * @param overwrite do not check for file existence if true
    * @param overwrite do not check for file existence if true
    * @param replication block replication
    * @param replication block replication
    * @return output stream
    * @return output stream
    * @throws IOException
    * @throws IOException
+   * @see {@link ClientProtocol#create(String, FsPermission, String, boolean, short, long)}
    */
    */
   public OutputStream create(String src, 
   public OutputStream create(String src, 
+                             FsPermission permission,
                              boolean overwrite, 
                              boolean overwrite, 
                              short replication,
                              short replication,
                              long blockSize,
                              long blockSize,
@@ -351,8 +372,13 @@ class DFSClient implements FSConstants {
                              int buffersize
                              int buffersize
                              ) throws IOException {
                              ) throws IOException {
     checkOpen();
     checkOpen();
-    OutputStream result = new DFSOutputStream(
-        src, overwrite, replication, blockSize, progress, buffersize);
+    if (permission == null) {
+      permission = FsPermission.getDefault();
+    }
+    FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
+    LOG.debug(src + ": masked=" + masked);
+    OutputStream result = new DFSOutputStream(src, masked,
+        overwrite, replication, blockSize, progress, buffersize);
     synchronized (pendingCreates) {
     synchronized (pendingCreates) {
       pendingCreates.put(src.toString(), result);
       pendingCreates.put(src.toString(), result);
     }
     }
@@ -491,7 +517,27 @@ class DFSClient implements FSConstants {
    */
    */
   public boolean mkdirs(String src) throws IOException {
   public boolean mkdirs(String src) throws IOException {
     checkOpen();
     checkOpen();
-    return namenode.mkdirs(src);
+    return namenode.mkdirs(src, null);
+  }
+
+  /**
+   * Create a directory (or hierarchy of directories) with the given
+   * name and permission.
+   *
+   * @param src The path of the directory being created
+   * @param permission The permission of the directory being created.
+   * If permission == null, use {@link FsPermission#getDefault()}.
+   * @return True if the operation success.
+   * @see {@link ClientProtocol#mkdirs(String, FsPermission)}
+   */
+  public boolean mkdirs(String src, FsPermission permission)throws IOException{
+    checkOpen();
+    if (permission == null) {
+      permission = FsPermission.getDefault();
+    }
+    FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
+    LOG.debug(src + ": masked=" + masked);
+    return namenode.mkdirs(src, masked);
   }
   }
 
 
   /**
   /**
@@ -1394,8 +1440,10 @@ class DFSClient implements FSConstants {
     private Progressable progress;
     private Progressable progress;
     /**
     /**
      * Create a new output stream to the given DataNode.
      * Create a new output stream to the given DataNode.
+     * @see {@link ClientProtocol#create(String, FsPermission, String, boolean, short, long)}
      */
      */
-    public DFSOutputStream(String src, boolean overwrite, 
+    public DFSOutputStream(String src, FsPermission masked,
+                           boolean overwrite,
                            short replication, long blockSize,
                            short replication, long blockSize,
                            Progressable progress,
                            Progressable progress,
                            int buffersize
                            int buffersize
@@ -1422,7 +1470,7 @@ class DFSClient implements FSConstants {
       checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 
       checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 
                                               bytesPerChecksum);
                                               bytesPerChecksum);
       namenode.create(
       namenode.create(
-          src.toString(), clientName, overwrite, replication, blockSize);
+          src, masked, clientName, overwrite, replication, blockSize);
     }
     }
 
 
     private void openBackupStream() throws IOException {
     private void openBackupStream() throws IOException {

+ 5 - 1
src/java/org/apache/hadoop/dfs/DFSFileInfo.java

@@ -52,7 +52,11 @@ class DFSFileInfo extends FileStatus {
           node.isDirectory(), 
           node.isDirectory(), 
           node.isDirectory() ? 0 : ((INodeFile)node).getReplication(), 
           node.isDirectory() ? 0 : ((INodeFile)node).getReplication(), 
           node.isDirectory() ? 0 : ((INodeFile)node).getPreferredBlockSize(),
           node.isDirectory() ? 0 : ((INodeFile)node).getPreferredBlockSize(),
-          node.getModificationTime(), new Path(path));
+          node.getModificationTime(),
+          node.getFsPermission(),
+          node.getUserName(),
+          node.getGroupName(),
+          new Path(path));
   }
   }
 
 
   /**
   /**

+ 18 - 4
src/java/org/apache/hadoop/dfs/DistributedFileSystem.java

@@ -119,9 +119,8 @@ public class DistributedFileSystem extends FileSystem {
     int bufferSize, short replication, long blockSize,
     int bufferSize, short replication, long blockSize,
     Progressable progress) throws IOException {
     Progressable progress) throws IOException {
 
 
-    return new FSDataOutputStream( dfs.create(getPathName(f), overwrite, 
-                                              replication, blockSize, 
-                                              progress, bufferSize) );
+    return new FSDataOutputStream(dfs.create(getPathName(f), permission,
+        overwrite, replication, blockSize, progress, bufferSize));
   }
   }
 
 
   public boolean setReplication(Path src, 
   public boolean setReplication(Path src, 
@@ -176,7 +175,7 @@ public class DistributedFileSystem extends FileSystem {
   }
   }
 
 
   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
-    return dfs.mkdirs(getPathName(f));
+    return dfs.mkdirs(getPathName(f), permission);
   }
   }
 
 
   public void close() throws IOException {
   public void close() throws IOException {
@@ -333,4 +332,19 @@ public class DistributedFileSystem extends FileSystem {
       return p;
       return p;
     }
     }
   }
   }
+
+  /** {@inheritDoc }*/
+  public void setPermission(Path p, FsPermission permission
+      ) throws IOException {
+    dfs.namenode.setPermission(getPathName(p), permission);
+  }
+
+  /** {@inheritDoc }*/
+  public void setOwner(Path p, String username, String groupname
+      ) throws IOException {
+    if (username == null && groupname == null) {
+      throw new IOException("username == null && groupname == null");
+    }
+    dfs.namenode.setOwner(getPathName(p), username, groupname);
+  }
 }
 }

+ 2 - 2
src/java/org/apache/hadoop/dfs/FSConstants.java

@@ -179,7 +179,7 @@ public interface FSConstants {
   // Version is reflected in the data storage file.
   // Version is reflected in the data storage file.
   // Versions are negative.
   // Versions are negative.
   // Decrement LAYOUT_VERSION to define a new version.
   // Decrement LAYOUT_VERSION to define a new version.
-  public static final int LAYOUT_VERSION = -10;
+  public static final int LAYOUT_VERSION = -11;
   // Current version: 
   // Current version: 
-  // a directory has a block list length of -1
+  // Added permission information to INode.
 }
 }

+ 62 - 13
src/java/org/apache/hadoop/dfs/FSDirectory.java

@@ -22,6 +22,7 @@ import java.util.*;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsContext;
@@ -39,7 +40,7 @@ import org.apache.hadoop.dfs.BlocksMap.BlockInfo;
 class FSDirectory implements FSConstants {
 class FSDirectory implements FSConstants {
 
 
   FSNamesystem namesystem = null;
   FSNamesystem namesystem = null;
-  INodeDirectory rootDir = new INodeDirectory(INodeDirectory.ROOT_NAME);
+  final INodeDirectory rootDir;
   FSImage fsImage;  
   FSImage fsImage;  
   boolean ready = false;
   boolean ready = false;
   // Metrics record
   // Metrics record
@@ -47,12 +48,12 @@ class FSDirectory implements FSConstants {
     
     
   /** Access an existing dfs name directory. */
   /** Access an existing dfs name directory. */
   public FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
   public FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
-    this.fsImage = new FSImage();
-    namesystem = ns;
-    initialize(conf);
+    this(new FSImage(), ns, conf);
   }
   }
 
 
   public FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) throws IOException {
   public FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) throws IOException {
+    rootDir = new INodeDirectory(INodeDirectory.ROOT_NAME,
+        ns.createFsOwnerPermissions(new FsPermission((short)0755)));
     this.fsImage = fsImage;
     this.fsImage = fsImage;
     namesystem = ns;
     namesystem = ns;
     initialize(conf);
     initialize(conf);
@@ -116,6 +117,7 @@ class FSDirectory implements FSConstants {
    * Add the given filename to the fs.
    * Add the given filename to the fs.
    */
    */
   INode addFile(String path, 
   INode addFile(String path, 
+                PermissionStatus permissions,
                 short replication,
                 short replication,
                 long preferredBlockSize,
                 long preferredBlockSize,
                 String clientName,
                 String clientName,
@@ -126,10 +128,11 @@ class FSDirectory implements FSConstants {
 
 
     // Always do an implicit mkdirs for parent directory tree.
     // Always do an implicit mkdirs for parent directory tree.
     long modTime = FSNamesystem.now();
     long modTime = FSNamesystem.now();
-    if (!mkdirs(new Path(path).getParent().toString(), modTime)) {
+    if (!mkdirs(new Path(path).getParent().toString(), permissions, true,
+        modTime)) {
       return null;
       return null;
     }
     }
-    INodeFile newNode = new INodeFileUnderConstruction(replication, 
+    INodeFile newNode = new INodeFileUnderConstruction(permissions,replication,
                                  preferredBlockSize, modTime, clientName, 
                                  preferredBlockSize, modTime, clientName, 
                                  clientMachine, clientNode);
                                  clientMachine, clientNode);
     synchronized (rootDir) {
     synchronized (rootDir) {
@@ -155,16 +158,17 @@ class FSDirectory implements FSConstants {
   /**
   /**
    */
    */
   INode unprotectedAddFile( String path, 
   INode unprotectedAddFile( String path, 
+                            PermissionStatus permissions,
                             Block[] blocks, 
                             Block[] blocks, 
                             short replication,
                             short replication,
                             long modificationTime,
                             long modificationTime,
                             long preferredBlockSize) {
                             long preferredBlockSize) {
     INode newNode;
     INode newNode;
     if (blocks == null)
     if (blocks == null)
-      newNode = new INodeDirectory(modificationTime);
+      newNode = new INodeDirectory(permissions, modificationTime);
     else
     else
-      newNode = new INodeFile(blocks.length, replication, modificationTime,
-                              preferredBlockSize);
+      newNode = new INodeFile(permissions, blocks.length, replication,
+          modificationTime, preferredBlockSize);
     synchronized (rootDir) {
     synchronized (rootDir) {
       try {
       try {
         newNode = rootDir.addNode(path, newNode);
         newNode = rootDir.addNode(path, newNode);
@@ -376,6 +380,47 @@ class FSDirectory implements FSConstants {
       return ((INodeFile)fileNode).getPreferredBlockSize();
       return ((INodeFile)fileNode).getPreferredBlockSize();
     }
     }
   }
   }
+
+  boolean exists(String src) {
+    src = normalizePath(src);
+    synchronized(rootDir) {
+      INode inode = rootDir.getNode(src);
+      if (inode == null) {
+         return false;
+      }
+      return inode.isDirectory()? true: ((INodeFile)inode).getBlocks() != null;
+    }
+  }
+
+  void setPermission(String src, FsPermission permission
+      ) throws IOException {
+    unprotectedSetPermission(src, permission);
+    fsImage.getEditLog().logSetPermissions(src, permission);
+  }
+
+  void unprotectedSetPermission(String src, FsPermission permissions) {
+    synchronized(rootDir) {
+      rootDir.getNode(src).setPermission(permissions);
+    }
+  }
+
+  void setOwner(String src, String username, String groupname
+      ) throws IOException {
+    unprotectedSetOwner(src, username, groupname);
+    fsImage.getEditLog().logSetOwner(src, username, groupname);
+  }
+
+  void unprotectedSetOwner(String src, String username, String groupname) {
+    synchronized(rootDir) {
+      INode inode = rootDir.getNode(src);
+      if (username != null) {
+        inode.setUser(username);
+      }
+      if (groupname != null) {
+        inode.setGroup(groupname);
+      }
+    }
+  }
     
     
   /**
   /**
    * Remove the file from management, return blocks
    * Remove the file from management, return blocks
@@ -551,7 +596,8 @@ class FSDirectory implements FSConstants {
   /**
   /**
    * Create directory entries for every item
    * Create directory entries for every item
    */
    */
-  boolean mkdirs(String src, long now) {
+  boolean mkdirs(String src, PermissionStatus permissions,
+      boolean inheritPermission, long now) {
     src = normalizePath(src);
     src = normalizePath(src);
 
 
     // Use this to collect all the dirs we need to construct
     // Use this to collect all the dirs we need to construct
@@ -573,7 +619,8 @@ class FSDirectory implements FSConstants {
     for (int i = numElts - 1; i >= 0; i--) {
     for (int i = numElts - 1; i >= 0; i--) {
       String cur = v.get(i);
       String cur = v.get(i);
       try {
       try {
-        INode inserted = unprotectedMkdir(cur, now);
+        INode inserted = unprotectedMkdir(cur, permissions,
+            inheritPermission || i != 0, now);
         if (inserted != null) {
         if (inserted != null) {
           NameNode.stateChangeLog.debug("DIR* FSDirectory.mkdirs: "
           NameNode.stateChangeLog.debug("DIR* FSDirectory.mkdirs: "
                                         +"created directory "+cur);
                                         +"created directory "+cur);
@@ -596,9 +643,11 @@ class FSDirectory implements FSConstants {
 
 
   /**
   /**
    */
    */
-  INode unprotectedMkdir(String src, long timestamp) throws FileNotFoundException {
+  INodeDirectory unprotectedMkdir(String src, PermissionStatus permissions,
+      boolean inheritPermission, long timestamp) throws FileNotFoundException {
     synchronized (rootDir) {
     synchronized (rootDir) {
-      return rootDir.addNode(src, new INodeDirectory(timestamp));
+      return rootDir.addNode(src, new INodeDirectory(permissions, timestamp),
+          inheritPermission);
     }
     }
   }
   }
 
 

+ 55 - 8
src/java/org/apache/hadoop/dfs/FSEditLog.java

@@ -31,9 +31,8 @@ import java.util.ArrayList;
 import java.lang.Math;
 import java.lang.Math;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileChannel;
 
 
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 
 
 /**
 /**
@@ -49,6 +48,8 @@ class FSEditLog {
   //the following two are used only for backword compatibility :
   //the following two are used only for backword compatibility :
   @Deprecated private static final byte OP_DATANODE_ADD = 5;
   @Deprecated private static final byte OP_DATANODE_ADD = 5;
   @Deprecated private static final byte OP_DATANODE_REMOVE = 6;
   @Deprecated private static final byte OP_DATANODE_REMOVE = 6;
+  private static final byte OP_SET_PERMISSIONS = 7;
+  private static final byte OP_SET_OWNER = 8;
   private static int sizeFlushBuffer = 512*1024;
   private static int sizeFlushBuffer = 512*1024;
 
 
   private ArrayList<EditLogOutputStream> editStreams = null;
   private ArrayList<EditLogOutputStream> editStreams = null;
@@ -470,10 +471,14 @@ class FSEditLog {
                 blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
                 blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
               }
               }
             }
             }
+            PermissionStatus permissions = PermissionChecker.ANONYMOUS;
+            if (logVersion <= -11) {
+              permissions = PermissionStatus.read(in);
+            }
 
 
             // add to the file tree
             // add to the file tree
-            fsDir.unprotectedAddFile(name.toString(), blocks, replication, 
-                                     mtime, blockSize);
+            fsDir.unprotectedAddFile(name.toString(), permissions,
+                blocks, replication, mtime, blockSize);
             break;
             break;
           }
           }
           case OP_SET_REPLICATION: {
           case OP_SET_REPLICATION: {
@@ -535,6 +540,7 @@ class FSEditLog {
           }
           }
           case OP_MKDIR: {
           case OP_MKDIR: {
             UTF8 src = null;
             UTF8 src = null;
+            PermissionStatus permissions = PermissionChecker.ANONYMOUS;
             if (logVersion >= -4) {
             if (logVersion >= -4) {
               src = new UTF8();
               src = new UTF8();
               src.readFields(in);
               src.readFields(in);
@@ -550,8 +556,12 @@ class FSEditLog {
               }
               }
               src = (UTF8) writables[0];
               src = (UTF8) writables[0];
               timestamp = Long.parseLong(((UTF8)writables[1]).toString());
               timestamp = Long.parseLong(((UTF8)writables[1]).toString());
+
+              if (logVersion <= -11) {
+                permissions = PermissionStatus.read(in);
+              }
             }
             }
-            fsDir.unprotectedMkdir(src.toString(), timestamp);
+            fsDir.unprotectedMkdir(src.toString(),permissions,false,timestamp);
             break;
             break;
           }
           }
           case OP_DATANODE_ADD: {
           case OP_DATANODE_ADD: {
@@ -572,6 +582,22 @@ class FSEditLog {
             //Datanodes are not persistent any more.
             //Datanodes are not persistent any more.
             break;
             break;
           }
           }
+          case OP_SET_PERMISSIONS: {
+            if (logVersion > -11)
+              throw new IOException("Unexpected opcode " + opcode
+                                    + " for version " + logVersion);
+            fsDir.unprotectedSetPermission(
+                readUTF8String(in), FsPermission.read(in));
+            break;
+          }
+          case OP_SET_OWNER: {
+            if (logVersion > -11)
+              throw new IOException("Unexpected opcode " + opcode
+                                    + " for version " + logVersion);
+            fsDir.unprotectedSetOwner(
+                readUTF8String(in), readUTF8String(in), readUTF8String(in));
+            break;
+          }
           default: {
           default: {
             throw new IOException("Never seen opcode " + opcode);
             throw new IOException("Never seen opcode " + opcode);
           }
           }
@@ -587,6 +613,13 @@ class FSEditLog {
     return numEdits;
     return numEdits;
   }
   }
   
   
+  private String readUTF8String(DataInputStream in) throws IOException {
+    UTF8 utf8 = new UTF8();
+    utf8.readFields(in);
+    String s = utf8.toString();
+    return s.length() == 0? null: s;
+  }
+
   static short adjustReplication(short replication) {
   static short adjustReplication(short replication) {
     FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
     FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
     short minReplication = fsNamesys.getMinReplication();
     short minReplication = fsNamesys.getMinReplication();
@@ -750,7 +783,8 @@ class FSEditLog {
       FSEditLog.toLogLong(newNode.getPreferredBlockSize())};
       FSEditLog.toLogLong(newNode.getPreferredBlockSize())};
     logEdit(OP_ADD,
     logEdit(OP_ADD,
             new ArrayWritable(UTF8.class, nameReplicationPair), 
             new ArrayWritable(UTF8.class, nameReplicationPair), 
-            new ArrayWritable(Block.class, newNode.getBlocks()));
+            new ArrayWritable(Block.class, newNode.getBlocks()),
+            newNode.getPermissionStatus());
   }
   }
   
   
   /** 
   /** 
@@ -761,7 +795,8 @@ class FSEditLog {
       new UTF8(path),
       new UTF8(path),
       FSEditLog.toLogLong(newNode.getModificationTime())
       FSEditLog.toLogLong(newNode.getModificationTime())
     };
     };
-    logEdit(OP_MKDIR, new ArrayWritable(UTF8.class, info));
+    logEdit(OP_MKDIR, new ArrayWritable(UTF8.class, info),
+        newNode.getPermissionStatus());
   }
   }
   
   
   /** 
   /** 
@@ -785,6 +820,18 @@ class FSEditLog {
             FSEditLog.toLogReplication(replication));
             FSEditLog.toLogReplication(replication));
   }
   }
   
   
+  /**  Add set permissions record to edit log */
+  void logSetPermissions(String src, FsPermission permissions) {
+    logEdit(OP_SET_PERMISSIONS, new UTF8(src), permissions);
+  }
+
+  /**  Add set owner record to edit log */
+  void logSetOwner(String src, String username, String groupname) {
+    UTF8 u = new UTF8(username == null? "": username);
+    UTF8 g = new UTF8(groupname == null? "": groupname);
+    logEdit(OP_SET_OWNER, new UTF8(src), u, g);
+  }
+
   /** 
   /** 
    * Add delete file record to edit log
    * Add delete file record to edit log
    */
    */

+ 9 - 2
src/java/org/apache/hadoop/dfs/FSImage.java

@@ -36,6 +36,7 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Random;
 import java.lang.Math;
 import java.lang.Math;
 
 
+import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.dfs.FSConstants.StartupOption;
 import org.apache.hadoop.dfs.FSConstants.StartupOption;
 import org.apache.hadoop.dfs.FSConstants.NodeType;
 import org.apache.hadoop.dfs.FSConstants.NodeType;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.UTF8;
@@ -711,8 +712,12 @@ class FSImage extends Storage {
             blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
             blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
           }
           }
         }
         }
-        fsDir.unprotectedAddFile(name.toString(), blocks, replication,
-                                 modificationTime, blockSize);
+        PermissionStatus permissions = PermissionChecker.ANONYMOUS;
+        if (imgVersion <= -11) {
+          permissions = PermissionStatus.read(in);
+        }
+        fsDir.unprotectedAddFile(name.toString(), permissions,
+            blocks, replication, modificationTime, blockSize);
       }
       }
       
       
       // load datanode info
       // load datanode info
@@ -847,6 +852,7 @@ class FSImage extends Storage {
         out.writeInt(blocks.length);
         out.writeInt(blocks.length);
         for (Block blk : blocks)
         for (Block blk : blocks)
           blk.write(out);
           blk.write(out);
+        fileINode.getPermissionStatus().write(out);
         return;
         return;
       }
       }
       // write directory inode
       // write directory inode
@@ -854,6 +860,7 @@ class FSImage extends Storage {
       out.writeLong(inode.getModificationTime());
       out.writeLong(inode.getModificationTime());
       out.writeLong(0);   // preferred block size
       out.writeLong(0);   // preferred block size
       out.writeInt(-1);    // # of blocks
       out.writeInt(-1);    // # of blocks
+      inode.getPermissionStatus().write(out);
     }
     }
     for(INode child : ((INodeDirectory)inode).getChildren()) {
     for(INode child : ((INodeDirectory)inode).getChildren()) {
       saveImage(fullName, child, out);
       saveImage(fullName, child, out);

+ 162 - 38
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -21,11 +21,14 @@ import org.apache.commons.logging.*;
 
 
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.dfs.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.dfs.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.mapred.StatusHttpServer;
 import org.apache.hadoop.mapred.StatusHttpServer;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.Server;
 
 
 import java.io.BufferedWriter;
 import java.io.BufferedWriter;
@@ -38,6 +41,8 @@ import java.util.*;
 import java.util.Map.Entry;
 import java.util.Map.Entry;
 import java.text.SimpleDateFormat;
 import java.text.SimpleDateFormat;
 
 
+import javax.security.auth.login.LoginException;
+
 /***************************************************
 /***************************************************
  * FSNamesystem does the actual bookkeeping work for the
  * FSNamesystem does the actual bookkeeping work for the
  * DataNode.
  * DataNode.
@@ -53,6 +58,10 @@ import java.text.SimpleDateFormat;
 class FSNamesystem implements FSConstants {
 class FSNamesystem implements FSConstants {
   public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.FSNamesystem");
   public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.FSNamesystem");
 
 
+  private boolean isPermissionEnabled;
+  private UserGroupInformation fsOwner;
+  private String supergroup;
+
   //
   //
   // Stores the correct file name hierarchy
   // Stores the correct file name hierarchy
   //
   //
@@ -204,12 +213,10 @@ class FSNamesystem implements FSConstants {
   private static final SimpleDateFormat DATE_FORM =
   private static final SimpleDateFormat DATE_FORM =
     new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
 
-
   /**
   /**
    * FSNamesystem constructor.
    * FSNamesystem constructor.
    */
    */
   FSNamesystem(NameNode nn, Configuration conf) throws IOException {
   FSNamesystem(NameNode nn, Configuration conf) throws IOException {
-    fsNamesystemObject = this;
     try {
     try {
       initialize(nn, conf);
       initialize(nn, conf);
     } catch(IOException e) {
     } catch(IOException e) {
@@ -285,7 +292,6 @@ class FSNamesystem implements FSConstants {
    * is stored
    * is stored
    */
    */
   FSNamesystem(FSImage fsImage, Configuration conf) throws IOException {
   FSNamesystem(FSImage fsImage, Configuration conf) throws IOException {
-    fsNamesystemObject = this;
     setConfigurationParameters(conf);
     setConfigurationParameters(conf);
     this.dir = new FSDirectory(fsImage, this, conf);
     this.dir = new FSDirectory(fsImage, this, conf);
   }
   }
@@ -295,6 +301,19 @@ class FSNamesystem implements FSConstants {
    */
    */
   private void setConfigurationParameters(Configuration conf) 
   private void setConfigurationParameters(Configuration conf) 
                                           throws IOException {
                                           throws IOException {
+    fsNamesystemObject = this;
+    try {
+      fsOwner = UnixUserGroupInformation.login(conf);
+    } catch (LoginException e) {
+      throw new IOException(StringUtils.stringifyException(e));
+    }
+    LOG.info("fsOwner=" + fsOwner);
+
+    this.supergroup = conf.get("dfs.permissions.supergroup", "supergroup");
+    this.isPermissionEnabled = conf.getBoolean("dfs.permissions", true);
+    LOG.info("supergroup=" + supergroup);
+    LOG.info("isPermissionEnabled=" + isPermissionEnabled);
+
     this.replicator = new ReplicationTargetChooser(
     this.replicator = new ReplicationTargetChooser(
                          conf.getBoolean("dfs.replication.considerLoad", true),
                          conf.getBoolean("dfs.replication.considerLoad", true),
                          this,
                          this,
@@ -644,13 +663,49 @@ class FSNamesystem implements FSConstants {
   // These methods are called by HadoopFS clients
   // These methods are called by HadoopFS clients
   //
   //
   /////////////////////////////////////////////////////////
   /////////////////////////////////////////////////////////
+  /**
+   * Set permissions for an existing file.
+   * @throws IOException
+   */
+  public synchronized void setPermission(String src, FsPermission permission
+      ) throws IOException {
+    checkOwner(src);
+    dir.setPermission(src, permission);
+    getEditLog().logSync();
+  }
+
+  /**
+   * Set owner for an existing file.
+   * @throws IOException
+   */
+  public synchronized void setOwner(String src, String username, String group
+      ) throws IOException {
+    PermissionChecker pc = checkOwner(src);
+    if (!pc.isSuper) {
+      if (username != null && !pc.user.equals(username)) {
+        throw new AccessControlException("Non-super user cannot change owner.");
+      }
+      if (group != null && !pc.containsGroup(group)) {
+        throw new AccessControlException("User does not belong to " + group
+            + " .");
+      }
+    }
+    dir.setOwner(src, username, group);
+    getEditLog().logSync();
+  }
+
   /**
   /**
    * Get block locations within the specified range.
    * Get block locations within the specified range.
    * 
    * 
    * @see ClientProtocol#open(String, long, long)
    * @see ClientProtocol#open(String, long, long)
    * @see ClientProtocol#getBlockLocations(String, long, long)
    * @see ClientProtocol#getBlockLocations(String, long, long)
    */
    */
-  LocatedBlocks getBlockLocations(String clientMachine,
+  LocatedBlocks getBlockLocations(String clientMachine, String src,
+      long offset, long length) throws IOException {
+    checkPathAccess(src, FsAction.READ);
+    return getBlockLocationsInternal(clientMachine, src, offset, length);
+  }
+  LocatedBlocks getBlockLocationsInternal(String clientMachine,
                                   String src, 
                                   String src, 
                                   long offset, 
                                   long offset, 
                                   long length
                                   long length
@@ -663,7 +718,7 @@ class FSNamesystem implements FSConstants {
     }
     }
 
 
     DatanodeDescriptor client = null;
     DatanodeDescriptor client = null;
-    LocatedBlocks blocks =  getBlockLocations(dir.getFileINode(src), 
+    LocatedBlocks blocks =  getBlockLocationInternal(dir.getFileINode(src),
                                               offset, length, 
                                               offset, length, 
                                               Integer.MAX_VALUE);
                                               Integer.MAX_VALUE);
     if (blocks == null) {
     if (blocks == null) {
@@ -679,7 +734,7 @@ class FSNamesystem implements FSConstants {
     return blocks;
     return blocks;
   }
   }
   
   
-  private synchronized LocatedBlocks getBlockLocations(INodeFile inode, 
+  private synchronized LocatedBlocks getBlockLocationInternal(INodeFile inode,
                                                        long offset, 
                                                        long offset, 
                                                        long length,
                                                        long length,
                                                        int nrBlocksToReturn) {
                                                        int nrBlocksToReturn) {
@@ -760,6 +815,7 @@ class FSNamesystem implements FSConstants {
     if (isInSafeMode())
     if (isInSafeMode())
       throw new SafeModeException("Cannot set replication for " + src, safeMode);
       throw new SafeModeException("Cannot set replication for " + src, safeMode);
     verifyReplication(src, replication, null);
     verifyReplication(src, replication, null);
+    checkPathAccess(src, FsAction.WRITE);
 
 
     int[] oldReplication = new int[1];
     int[] oldReplication = new int[1];
     Block[] fileBlocks;
     Block[] fileBlocks;
@@ -786,7 +842,8 @@ class FSNamesystem implements FSConstants {
     return true;
     return true;
   }
   }
     
     
-  public long getPreferredBlockSize(String filename) throws IOException {
+  long getPreferredBlockSize(String filename) throws IOException {
+    checkTraverse(filename);
     return dir.getPreferredBlockSize(filename);
     return dir.getPreferredBlockSize(filename);
   }
   }
     
     
@@ -811,10 +868,11 @@ class FSNamesystem implements FSConstants {
                             text + " is less than the required minimum " + minReplication);
                             text + " is less than the required minimum " + minReplication);
   }
   }
 
 
-  void startFile(String src, String holder, String clientMachine, 
+  void startFile(String src, PermissionStatus permissions,
+                 String holder, String clientMachine,
                  boolean overwrite, short replication, long blockSize
                  boolean overwrite, short replication, long blockSize
                 ) throws IOException {
                 ) throws IOException {
-    startFileInternal(src, holder, clientMachine, overwrite,
+    startFileInternal(src, permissions, holder, clientMachine, overwrite,
                       replication, blockSize);
                       replication, blockSize);
     getEditLog().logSync();
     getEditLog().logSync();
   }
   }
@@ -830,7 +888,8 @@ class FSNamesystem implements FSConstants {
    * @throws IOException if the filename is invalid
    * @throws IOException if the filename is invalid
    *         {@link FSDirectory#isValidToCreate(String)}.
    *         {@link FSDirectory#isValidToCreate(String)}.
    */
    */
-  synchronized void startFileInternal(String src, 
+  private synchronized void startFileInternal(String src,
+                                              PermissionStatus permissions,
                                               String holder, 
                                               String holder, 
                                               String clientMachine, 
                                               String clientMachine, 
                                               boolean overwrite,
                                               boolean overwrite,
@@ -844,6 +903,13 @@ class FSNamesystem implements FSConstants {
     if (!isValidName(src)) {
     if (!isValidName(src)) {
       throw new IOException("Invalid file name: " + src);      	  
       throw new IOException("Invalid file name: " + src);      	  
     }
     }
+    if (overwrite && exists(src)) {
+      checkPathAccess(src, FsAction.WRITE);
+    }
+    else {
+      checkAncestorAccess(src, FsAction.WRITE);
+    }
+
     try {
     try {
       INode myFile = dir.getFileINode(src);
       INode myFile = dir.getFileINode(src);
       if (myFile != null && myFile.isUnderConstruction()) {
       if (myFile != null && myFile.isUnderConstruction()) {
@@ -934,10 +1000,8 @@ class FSNamesystem implements FSConstants {
       // Now we can add the name to the filesystem. This file has no
       // Now we can add the name to the filesystem. This file has no
       // blocks associated with it.
       // blocks associated with it.
       //
       //
-      INode newNode = dir.addFile(src, replication, blockSize,
-                                  holder, 
-                                  clientMachine, 
-                                  clientNode);
+      INode newNode = dir.addFile(src, permissions,
+          replication, blockSize, holder, clientMachine, clientNode);
       if (newNode == null) {
       if (newNode == null) {
         throw new IOException("DIR* NameSystem.startFile: " +
         throw new IOException("DIR* NameSystem.startFile: " +
                               "Unable to add file to namespace.");
                               "Unable to add file to namespace.");
@@ -1287,22 +1351,25 @@ class FSNamesystem implements FSConstants {
   // are made, edit namespace and return to client.
   // are made, edit namespace and return to client.
   ////////////////////////////////////////////////////////////////
   ////////////////////////////////////////////////////////////////
 
 
+  /** Change the indicated filename. */
   public boolean renameTo(String src, String dst) throws IOException {
   public boolean renameTo(String src, String dst) throws IOException {
     boolean status = renameToInternal(src, dst);
     boolean status = renameToInternal(src, dst);
     getEditLog().logSync();
     getEditLog().logSync();
     return status;
     return status;
   }
   }
 
 
-  /**
-   * Change the indicated filename.
-   */
-  public synchronized boolean renameToInternal(String src, String dst) throws IOException {
+  private synchronized boolean renameToInternal(String src, String dst
+      ) throws IOException {
     NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src + " to " + dst);
     NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src + " to " + dst);
     if (isInSafeMode())
     if (isInSafeMode())
       throw new SafeModeException("Cannot rename " + src, safeMode);
       throw new SafeModeException("Cannot rename " + src, safeMode);
     if (!isValidName(dst)) {
     if (!isValidName(dst)) {
       throw new IOException("Invalid name: " + dst);
       throw new IOException("Invalid name: " + dst);
     }
     }
+
+    checkParentAccess(src, FsAction.WRITE);
+    checkAncestorAccess(dst, FsAction.WRITE);
+
     return dir.renameTo(src, dst);
     return dir.renameTo(src, dst);
   }
   }
 
 
@@ -1311,7 +1378,7 @@ class FSNamesystem implements FSConstants {
    * invalidate some blocks that make up the file.
    * invalidate some blocks that make up the file.
    */
    */
   public boolean delete(String src) throws IOException {
   public boolean delete(String src) throws IOException {
-    boolean status = deleteInternal(src, true);
+    boolean status = deleteInternal(src, true, true);
     getEditLog().logSync();
     getEditLog().logSync();
     return status;
     return status;
   }
   }
@@ -1320,7 +1387,7 @@ class FSNamesystem implements FSConstants {
    * An internal delete function that does not enforce safe mode
    * An internal delete function that does not enforce safe mode
    */
    */
   boolean deleteInSafeMode(String src) throws IOException {
   boolean deleteInSafeMode(String src) throws IOException {
-    boolean status = deleteInternal(src, false);
+    boolean status = deleteInternal(src, false, false);
     getEditLog().logSync();
     getEditLog().logSync();
     return status;
     return status;
   }
   }
@@ -1329,11 +1396,14 @@ class FSNamesystem implements FSConstants {
    * invalidate some blocks that make up the file.
    * invalidate some blocks that make up the file.
    */
    */
   private synchronized boolean deleteInternal(String src, 
   private synchronized boolean deleteInternal(String src, 
-                                              boolean enforceSafeMode) 
-                                              throws IOException {
+      boolean enforceSafeMode, boolean enforcePermission) throws IOException {
     NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
     NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
     if (enforceSafeMode && isInSafeMode())
     if (enforceSafeMode && isInSafeMode())
       throw new SafeModeException("Cannot delete " + src, safeMode);
       throw new SafeModeException("Cannot delete " + src, safeMode);
+    if (enforcePermission) {
+      checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
+    }
+
     Block deletedBlocks[] = dir.delete(src);
     Block deletedBlocks[] = dir.delete(src);
     if (deletedBlocks != null) {
     if (deletedBlocks != null) {
       for (int i = 0; i < deletedBlocks.length; i++) {
       for (int i = 0; i < deletedBlocks.length; i++) {
@@ -1356,27 +1426,26 @@ class FSNamesystem implements FSConstants {
   /**
   /**
    * Return whether the given filename exists
    * Return whether the given filename exists
    */
    */
-  public boolean exists(String src) {
-    if (dir.getFileBlocks(src) != null || dir.isDir(src)) {
-      return true;
-    } else {
-      return false;
-    }
+  public boolean exists(String src) throws AccessControlException {
+    checkTraverse(src);
+    return dir.exists(src);
   }
   }
 
 
   /**
   /**
    * Whether the given name is a directory
    * Whether the given name is a directory
    */
    */
-  public boolean isDir(String src) {
+  public boolean isDir(String src) throws AccessControlException {
+    checkTraverse(src);
     return dir.isDir(src);
     return dir.isDir(src);
   }
   }
 
 
-  /* Get the file info for a specific file.
+  /** Get the file info for a specific file.
    * @param src The string representation of the path to the file
    * @param src The string representation of the path to the file
    * @throws IOException if file does not exist
    * @throws IOException if file does not exist
    * @return object containing information regarding the file
    * @return object containing information regarding the file
    */
    */
   DFSFileInfo getFileInfo(String src) throws IOException {
   DFSFileInfo getFileInfo(String src) throws IOException {
+    checkTraverse(src);
     return dir.getFileInfo(src);
     return dir.getFileInfo(src);
   }
   }
 
 
@@ -1407,8 +1476,9 @@ class FSNamesystem implements FSConstants {
   /**
   /**
    * Create all the necessary directories
    * Create all the necessary directories
    */
    */
-  public boolean mkdirs(String src) throws IOException {
-    boolean status = mkdirsInternal(src);
+  public boolean mkdirs(String src, PermissionStatus permissions
+      ) throws IOException {
+    boolean status = mkdirsInternal(src, permissions);
     getEditLog().logSync();
     getEditLog().logSync();
     return status;
     return status;
   }
   }
@@ -1416,19 +1486,20 @@ class FSNamesystem implements FSConstants {
   /**
   /**
    * Create all the necessary directories
    * Create all the necessary directories
    */
    */
-  private synchronized boolean mkdirsInternal(String src) throws IOException {
-    boolean    success;
+  private synchronized boolean mkdirsInternal(String src,
+      PermissionStatus permissions) throws IOException {
     NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
     NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
     if (isInSafeMode())
     if (isInSafeMode())
       throw new SafeModeException("Cannot create directory " + src, safeMode);
       throw new SafeModeException("Cannot create directory " + src, safeMode);
     if (!isValidName(src)) {
     if (!isValidName(src)) {
       throw new IOException("Invalid directory name: " + src);
       throw new IOException("Invalid directory name: " + src);
     }
     }
-    success = dir.mkdirs(src, now());
-    if (!success) {
+    checkAncestorAccess(src, FsAction.WRITE);
+
+    if (!dir.mkdirs(src, permissions, false, now())) {
       throw new IOException("Invalid directory name: " + src);
       throw new IOException("Invalid directory name: " + src);
     }
     }
-    return success;
+    return true;
   }
   }
 
 
   /* Get the size of the specified directory subtree.
   /* Get the size of the specified directory subtree.
@@ -1437,6 +1508,7 @@ class FSNamesystem implements FSConstants {
    * @return size in bytes
    * @return size in bytes
    */
    */
   long getContentLength(String src) throws IOException {
   long getContentLength(String src) throws IOException {
+    checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE);
     return dir.getContentLength(src);
     return dir.getContentLength(src);
   }
   }
 
 
@@ -1660,7 +1732,13 @@ class FSNamesystem implements FSConstants {
    * Get a listing of all files at 'src'.  The Object[] array
    * Get a listing of all files at 'src'.  The Object[] array
    * exists so we can return file attributes (soon to be implemented)
    * exists so we can return file attributes (soon to be implemented)
    */
    */
-  public DFSFileInfo[] getListing(String src) {
+  public DFSFileInfo[] getListing(String src) throws IOException {
+    if (dir.isDir(src)) {
+      checkPathAccess(src, FsAction.READ);
+    }
+    else {
+      checkTraverse(src);
+    }
     return dir.getListing(src);
     return dir.getListing(src);
   }
   }
 
 
@@ -3755,4 +3833,50 @@ class FSNamesystem implements FSConstants {
   boolean startDistributedUpgradeIfNeeded() throws IOException {
   boolean startDistributedUpgradeIfNeeded() throws IOException {
     return upgradeManager.startUpgrade();
     return upgradeManager.startUpgrade();
   }
   }
+
+  PermissionStatus createFsOwnerPermissions(FsPermission permission) {
+    return new PermissionStatus(fsOwner.getUserName(), supergroup, permission);
+  }
+
+  private PermissionChecker checkOwner(String path) throws AccessControlException {
+    return checkPermission(path, true, null, null, null, null);
+  }
+
+  private PermissionChecker checkPathAccess(String path, FsAction access
+      ) throws AccessControlException {
+    return checkPermission(path, false, null, null, access, null);
+  }
+
+  private PermissionChecker checkParentAccess(String path, FsAction access
+      ) throws AccessControlException {
+    return checkPermission(path, false, null, access, null, null);
+  }
+
+  private PermissionChecker checkAncestorAccess(String path, FsAction access
+      ) throws AccessControlException {
+    return checkPermission(path, false, access, null, null, null);
+  }
+
+  private PermissionChecker checkTraverse(String path
+      ) throws AccessControlException {
+    return checkPermission(path, false, null, null, null, null);
+  }
+
+  /**
+   * Check whether current user have permissions to access the path.
+   * For more details of the parameters, see
+   * {@link PermissionChecker#checkPermission(INodeDirectory, boolean, FsAction, FsAction, FsAction)}.
+   */
+  private PermissionChecker checkPermission(String path, boolean doCheckOwner,
+      FsAction ancestorAccess, FsAction parentAccess, FsAction access,
+      FsAction subAccess) throws AccessControlException {
+    PermissionChecker pc = new PermissionChecker(
+        fsOwner.getUserName(), supergroup);
+    if (isPermissionEnabled && !pc.isSuper) {
+      dir.waitForReady();
+      pc.checkPermission(path, dir.rootDir, doCheckOwner,
+          ancestorAccess, parentAccess, access, subAccess);
+    }
+    return pc;
+  }
 }
 }

+ 135 - 29
src/java/org/apache/hadoop/dfs/INode.java

@@ -26,6 +26,7 @@ import java.util.List;
 import java.io.IOException;
 import java.io.IOException;
 
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.dfs.BlocksMap.BlockInfo;
 import org.apache.hadoop.dfs.BlocksMap.BlockInfo;
 
 
 /**
 /**
@@ -38,15 +39,88 @@ abstract class INode implements Comparable<byte[]> {
   protected INodeDirectory parent;
   protected INodeDirectory parent;
   protected long modificationTime;
   protected long modificationTime;
 
 
-  protected INode(String name) {
-    this(0L);
+  //Only updated by updatePermissionStatus(...).
+  //Other codes should not modify it.
+  private long permission;
+
+  private static enum PermissionStatusFormat {
+    MODE(0, 16),
+    GROUP(MODE.OFFSET + MODE.LENGTH, 25),
+    USER(GROUP.OFFSET + GROUP.LENGTH, 23);
+
+    final int OFFSET;
+    final int LENGTH; //bit length
+    final long MASK;
+
+    PermissionStatusFormat(int offset, int length) {
+      OFFSET = offset;
+      LENGTH = length;
+      MASK = ((-1L) >>> (64 - LENGTH)) << OFFSET;
+    }
+
+    long retrieve(long record) {
+      return (record & MASK) >>> OFFSET;
+    }
+
+    long combine(long bits, long record) {
+      return (record & ~MASK) | (bits << OFFSET);
+    }
+  }
+
+  protected INode(String name, PermissionStatus permissions) {
+    this(permissions, 0L);
     setLocalName(name);
     setLocalName(name);
   }
   }
 
 
-  INode(long mTime) {
+  INode(PermissionStatus permissions, long mTime) {
     this.name = null;
     this.name = null;
     this.parent = null;
     this.parent = null;
     this.modificationTime = mTime;
     this.modificationTime = mTime;
+    setPermissionStatus(permissions);
+  }
+
+  /** Set the {@link PermissionStatus} */
+  protected void setPermissionStatus(PermissionStatus ps) {
+    setUser(ps.getUserName());
+    setGroup(ps.getGroupName());
+    setPermission(ps.getPermission());
+  }
+  /** Get the {@link PermissionStatus} */
+  protected PermissionStatus getPermissionStatus() {
+    return new PermissionStatus(getUserName(),getGroupName(),getFsPermission());
+  }
+  private synchronized void updatePermissionStatus(
+      PermissionStatusFormat f, long n) {
+    permission = f.combine(n, permission);
+  }
+  /** Get user name */
+  protected String getUserName() {
+    int n = (int)PermissionStatusFormat.USER.retrieve(permission);
+    return SerialNumberManager.INSTANCE.getUser(n);
+  }
+  /** Set user */
+  protected void setUser(String user) {
+    int n = SerialNumberManager.INSTANCE.getUserSerialNumber(user);
+    updatePermissionStatus(PermissionStatusFormat.USER, n);
+  }
+  /** Get group name */
+  protected String getGroupName() {
+    int n = (int)PermissionStatusFormat.GROUP.retrieve(permission);
+    return SerialNumberManager.INSTANCE.getGroup(n);
+  }
+  /** Set group */
+  protected void setGroup(String group) {
+    int n = SerialNumberManager.INSTANCE.getGroupSerialNumber(group);
+    updatePermissionStatus(PermissionStatusFormat.GROUP, n);
+  }
+  /** Get the {@link FsPermission} */
+  protected FsPermission getFsPermission() {
+    return new FsPermission(
+        (short)PermissionStatusFormat.MODE.retrieve(permission));
+  }
+  /** Set the {@link FsPermission} of this {@link INode} */
+  protected void setPermission(FsPermission permission) {
+    updatePermissionStatus(PermissionStatusFormat.MODE, permission.toShort());
   }
   }
 
 
   /**
   /**
@@ -71,6 +145,11 @@ abstract class INode implements Comparable<byte[]> {
     this.name = string2Bytes(name);
     this.name = string2Bytes(name);
   }
   }
 
 
+  /** {@inheritDoc} */
+  public String toString() {
+    return "\"" + getLocalName() + "\":" + getPermissionStatus();
+  }
+
   /**
   /**
    * Get the full absolute path name of this file (recursively computed).
    * Get the full absolute path name of this file (recursively computed).
    * 
    * 
@@ -231,13 +310,13 @@ class INodeDirectory extends INode {
 
 
   private List<INode> children;
   private List<INode> children;
 
 
-  INodeDirectory(String name) {
-    super(name);
+  INodeDirectory(String name, PermissionStatus permissions) {
+    super(name, permissions);
     this.children = null;
     this.children = null;
   }
   }
 
 
-  INodeDirectory(long mTime) {
-    super(mTime);
+  INodeDirectory(PermissionStatus permissions, long mTime) {
+    super(permissions, mTime);
     this.children = null;
     this.children = null;
   }
   }
 
 
@@ -380,19 +459,31 @@ class INodeDirectory extends INode {
     children.add(-low - 1, node);
     children.add(-low - 1, node);
     // update modification time of the parent directory
     // update modification time of the parent directory
     setModificationTime(node.getModificationTime());
     setModificationTime(node.getModificationTime());
+    if (node.getGroupName() == null) {
+      node.setGroup(getGroupName());
+    }
     return node;
     return node;
   }
   }
 
 
+  /**
+   * Equivalent to addNode(path, newNode, false).
+   * @see #addNode(String, INode, boolean)
+   */
+  <T extends INode> T addNode(String path, T newNode) throws FileNotFoundException {
+    return addNode(path, newNode, false);
+  }
   /**
   /**
    * Add new INode to the file tree.
    * Add new INode to the file tree.
    * Find the parent and insert 
    * Find the parent and insert 
    * 
    * 
    * @param path file path
    * @param path file path
    * @param newNode INode to be added
    * @param newNode INode to be added
+   * @param inheritPermission If true, copy the parent's permission to newNode.
    * @return null if the node already exists; inserted INode, otherwise
    * @return null if the node already exists; inserted INode, otherwise
-   * @throws FileNotFoundException 
+   * @throws FileNotFoundException
    */
    */
-  <T extends INode> T addNode(String path, T newNode) throws FileNotFoundException {
+  <T extends INode> T addNode(String path, T newNode, boolean inheritPermission
+      ) throws FileNotFoundException {
     byte[][] pathComponents = getPathComponents(path);
     byte[][] pathComponents = getPathComponents(path);
     assert pathComponents != null : "Incorrect path " + path;
     assert pathComponents != null : "Incorrect path " + path;
     int pathLen = pathComponents.length;
     int pathLen = pathComponents.length;
@@ -409,6 +500,17 @@ class INodeDirectory extends INode {
       throw new FileNotFoundException("Parent path is not a directory: "+path);
       throw new FileNotFoundException("Parent path is not a directory: "+path);
     }
     }
     INodeDirectory parentNode = (INodeDirectory)node;
     INodeDirectory parentNode = (INodeDirectory)node;
+
+    if (inheritPermission) {
+      FsPermission p = parentNode.getFsPermission();
+      //make sure the  permission has wx for the user
+      if (!p.getUserAction().implies(FsAction.WRITE_EXECUTE)) {
+        p = new FsPermission(p.getUserAction().or(FsAction.WRITE_EXECUTE),
+            p.getGroupAction(), p.getOtherAction());
+      }
+      newNode.setPermission(p);
+    }
+
     // insert into the parent children list
     // insert into the parent children list
     newNode.name = pathComponents[pathLen-1];
     newNode.name = pathComponents[pathLen-1];
     return parentNode.addChild(newNode);
     return parentNode.addChild(newNode);
@@ -466,28 +568,37 @@ class INodeDirectory extends INode {
 }
 }
 
 
 class INodeFile extends INode {
 class INodeFile extends INode {
+  static final FsPermission UMASK = FsPermission.createImmutable((short)0111);
+
   private BlockInfo blocks[] = null;
   private BlockInfo blocks[] = null;
   protected short blockReplication;
   protected short blockReplication;
   protected long preferredBlockSize;
   protected long preferredBlockSize;
 
 
-  /**
-   */
-  INodeFile(int nrBlocks, short replication, long modificationTime,
+  INodeFile(PermissionStatus permissions,
+            int nrBlocks, short replication, long modificationTime,
             long preferredBlockSize) {
             long preferredBlockSize) {
-    super(modificationTime);
-    this.blockReplication = replication;
-    this.preferredBlockSize = preferredBlockSize;
-    allocateBlocks(nrBlocks);
+    this(permissions, new BlockInfo[nrBlocks], replication,
+        modificationTime, preferredBlockSize);
   }
   }
 
 
-  protected INodeFile(BlockInfo[] blklist, short replication, long modificationTime,
+  protected INodeFile(PermissionStatus permissions, BlockInfo[] blklist,
+                      short replication, long modificationTime,
                       long preferredBlockSize) {
                       long preferredBlockSize) {
-    super(modificationTime);
+    super(permissions, modificationTime);
     this.blockReplication = replication;
     this.blockReplication = replication;
     this.preferredBlockSize = preferredBlockSize;
     this.preferredBlockSize = preferredBlockSize;
     blocks = blklist;
     blocks = blklist;
   }
   }
 
 
+  /**
+   * Set the {@link FsPermission} of this {@link INodeFile}.
+   * Since this is a file,
+   * the {@link FsAction#EXECUTE} action, if any, is ignored.
+   */
+  protected void setPermission(FsPermission permission) {
+    super.setPermission(permission.applyUMask(UMASK));
+  }
+
   boolean isDirectory() {
   boolean isDirectory() {
     return false;
     return false;
   }
   }
@@ -512,14 +623,6 @@ class INodeFile extends INode {
     return this.blocks;
     return this.blocks;
   }
   }
 
 
-  /**
-   * Allocate space for blocks.
-   * @param nrBlocks number of blocks
-   */
-  void allocateBlocks(int nrBlocks) {
-    this.blocks = new BlockInfo[nrBlocks];
-  }
-
   /**
   /**
    * add a block to the block list
    * add a block to the block list
    */
    */
@@ -609,14 +712,16 @@ class INodeFileUnderConstruction extends INodeFile {
   protected StringBytesWritable clientMachine;
   protected StringBytesWritable clientMachine;
   protected DatanodeDescriptor clientNode; // if client is a cluster node too.
   protected DatanodeDescriptor clientNode; // if client is a cluster node too.
 
 
-  INodeFileUnderConstruction(short replication,
+  INodeFileUnderConstruction(PermissionStatus permissions,
+                             short replication,
                              long preferredBlockSize,
                              long preferredBlockSize,
                              long modTime,
                              long modTime,
                              String clientName,
                              String clientName,
                              String clientMachine,
                              String clientMachine,
                              DatanodeDescriptor clientNode) 
                              DatanodeDescriptor clientNode) 
                              throws IOException {
                              throws IOException {
-    super(0, replication, modTime, preferredBlockSize);
+    super(permissions.applyUMask(UMASK), 0, replication, modTime,
+        preferredBlockSize);
     this.clientName = new StringBytesWritable(clientName);
     this.clientName = new StringBytesWritable(clientName);
     this.clientMachine = new StringBytesWritable(clientMachine);
     this.clientMachine = new StringBytesWritable(clientMachine);
     this.clientNode = clientNode;
     this.clientNode = clientNode;
@@ -646,7 +751,8 @@ class INodeFileUnderConstruction extends INodeFile {
   // converts a INodeFileUnderConstruction into a INodeFile
   // converts a INodeFileUnderConstruction into a INodeFile
   //
   //
   INodeFile convertToInodeFile() {
   INodeFile convertToInodeFile() {
-    INodeFile obj = new INodeFile(getBlocks(),
+    INodeFile obj = new INodeFile(getPermissionStatus(),
+                                  getBlocks(),
                                   getReplication(),
                                   getReplication(),
                                   getModificationTime(),
                                   getModificationTime(),
                                   getPreferredBlockSize());
                                   getPreferredBlockSize());

+ 22 - 8
src/java/org/apache/hadoop/dfs/NameNode.java

@@ -21,6 +21,7 @@ import org.apache.commons.logging.*;
 
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -254,9 +255,9 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
     return clientMachine;
     return clientMachine;
   }
   }
 
 
-  /**
-   */
+  /** {@inheritDoc} */
   public void create(String src, 
   public void create(String src, 
+                     FsPermission masked,
                              String clientName, 
                              String clientName, 
                              boolean overwrite,
                              boolean overwrite,
                              short replication,
                              short replication,
@@ -269,8 +270,9 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
       throw new IOException("create: Pathname too long.  Limit " 
       throw new IOException("create: Pathname too long.  Limit " 
                             + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
                             + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
     }
     }
-    namesystem.startFile(
-        src, clientName, clientMachine, overwrite, replication, blockSize);
+    namesystem.startFile(src,
+        new PermissionStatus(Server.getUserInfo().getUserName(), null, masked),
+        clientName, clientMachine, overwrite, replication, blockSize);
     myMetrics.createFile();
     myMetrics.createFile();
   }
   }
 
 
@@ -280,6 +282,18 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
     return namesystem.setReplication(src, replication);
     return namesystem.setReplication(src, replication);
   }
   }
     
     
+  /** {@inheritDoc} */
+  public void setPermission(String src, FsPermission permissions
+      ) throws IOException {
+    namesystem.setPermission(src, permissions);
+  }
+
+  /** {@inheritDoc} */
+  public void setOwner(String src, String username, String groupname
+      ) throws IOException {
+    namesystem.setOwner(src, username, groupname);
+  }
+
   /**
   /**
    */
    */
   public LocatedBlock addBlock(String src, 
   public LocatedBlock addBlock(String src, 
@@ -389,15 +403,15 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
             srcPath.depth() <= MAX_PATH_DEPTH);
             srcPath.depth() <= MAX_PATH_DEPTH);
   }
   }
     
     
-  /**
-   */
-  public boolean mkdirs(String src) throws IOException {
+  /** {@inheritDoc} */
+  public boolean mkdirs(String src, FsPermission masked) throws IOException {
     stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src);
     stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src);
     if (!checkPathLength(src)) {
     if (!checkPathLength(src)) {
       throw new IOException("mkdirs: Pathname too long.  Limit " 
       throw new IOException("mkdirs: Pathname too long.  Limit " 
                             + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
                             + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
     }
     }
-    return namesystem.mkdirs(src);
+    return namesystem.mkdirs(src,
+        new PermissionStatus(Server.getUserInfo().getUserName(), null, masked));
   }
   }
 
 
   /**
   /**

+ 8 - 6
src/java/org/apache/hadoop/dfs/NamenodeFsck.java

@@ -68,6 +68,7 @@ public class NamenodeFsck {
   public static final int FIXING_DELETE = 2;
   public static final int FIXING_DELETE = 2;
   
   
   private NameNode nn;
   private NameNode nn;
+  private ClientProtocol namenodeproxy;
   private String lostFound = null;
   private String lostFound = null;
   private boolean lfInited = false;
   private boolean lfInited = false;
   private boolean lfInitedOk = false;
   private boolean lfInitedOk = false;
@@ -95,6 +96,7 @@ public class NamenodeFsck {
                       HttpServletResponse response) throws IOException {
                       HttpServletResponse response) throws IOException {
     this.conf = conf;
     this.conf = conf;
     this.nn = nn;
     this.nn = nn;
+    this.namenodeproxy =DFSClient.createNamenode(nn.getNameNodeAddress(),conf);
     this.out = response.getWriter();
     this.out = response.getWriter();
     for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
     for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
       String key = it.next();
       String key = it.next();
@@ -114,7 +116,7 @@ public class NamenodeFsck {
    */
    */
   public void fsck() throws IOException {
   public void fsck() throws IOException {
     try {
     try {
-      DFSFileInfo[] files = nn.getListing(path);
+      DFSFileInfo[] files = namenodeproxy.getListing(path);
       FsckResult res = new FsckResult();
       FsckResult res = new FsckResult();
       res.setReplication((short) conf.getInt("dfs.replication", 3));
       res.setReplication((short) conf.getInt("dfs.replication", 3));
       if (files != null) {
       if (files != null) {
@@ -144,7 +146,7 @@ public class NamenodeFsck {
         out.println(file.getPath().toString() + " <dir>");
         out.println(file.getPath().toString() + " <dir>");
       }
       }
       res.totalDirs++;
       res.totalDirs++;
-      DFSFileInfo[] files = nn.getListing(file.getPath().toString());
+      DFSFileInfo[] files =namenodeproxy.getListing(file.getPath().toString());
       for (int i = 0; i < files.length; i++) {
       for (int i = 0; i < files.length; i++) {
         check(files[i], res);
         check(files[i], res);
       }
       }
@@ -153,8 +155,8 @@ public class NamenodeFsck {
     res.totalFiles++;
     res.totalFiles++;
     long fileLen = file.getLen();
     long fileLen = file.getLen();
     res.totalSize += fileLen;
     res.totalSize += fileLen;
-    LocatedBlocks blocks = nn.getBlockLocations(file.getPath().toString(),
-        0, fileLen);
+    LocatedBlocks blocks = namenodeproxy.getBlockLocations(
+        file.getPath().toString(), 0, fileLen);
     res.totalBlocks += blocks.locatedBlockCount();
     res.totalBlocks += blocks.locatedBlockCount();
     if (showFiles) {
     if (showFiles) {
       out.print(file.getPath().toString() + " " + fileLen + " bytes, " +
       out.print(file.getPath().toString() + " " + fileLen + " bytes, " +
@@ -248,7 +250,7 @@ public class NamenodeFsck {
         lostFoundMove(file, blocks);
         lostFoundMove(file, blocks);
         break;
         break;
       case FIXING_DELETE:
       case FIXING_DELETE:
-        nn.delete(file.getPath().toString());
+        namenodeproxy.delete(file.getPath().toString());
       }
       }
     }
     }
     if (showFiles) {
     if (showFiles) {
@@ -276,7 +278,7 @@ public class NamenodeFsck {
     String target = lostFound + file.getPath();
     String target = lostFound + file.getPath();
     String errmsg = "Failed to move " + file.getPath() + " to /lost+found";
     String errmsg = "Failed to move " + file.getPath() + " to /lost+found";
     try {
     try {
-      if (!nn.mkdirs(target)) {
+      if (!namenodeproxy.mkdirs(target, file.getPermission())) {
         LOG.warn(errmsg);
         LOG.warn(errmsg);
         return;
         return;
       }
       }

+ 178 - 0
src/java/org/apache/hadoop/dfs/PermissionChecker.java

@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.util.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/** Perform permission checking in {@link FSNamesystem}. */
+class PermissionChecker {
+  static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
+
+  /** Anonymous PermissionStatus. */
+  static final PermissionStatus ANONYMOUS = PermissionStatus.createImmutable(
+      "HadoopAnonymous", "HadoopAnonymous", new FsPermission((short)0777));
+
+  final String user;
+  private final Set<String> groups = new HashSet<String>();
+  final boolean isSuper;
+
+  PermissionChecker(String fsOwner, String supergroup
+      ) throws AccessControlException{
+    UserGroupInformation ugi = Server.getUserInfo();
+    LOG.debug("ugi=" + ugi);
+
+    if (ugi != null) {
+      user = ugi.getUserName();
+      groups.addAll(Arrays.asList(ugi.getGroupNames()));
+      isSuper = user.equals(fsOwner) || groups.contains(supergroup);
+    }
+    else {
+      throw new AccessControlException("ugi = null");
+    }
+  }
+
+  boolean containsGroup(String group) {return groups.contains(group);}
+
+  /**
+   * Check whether current user have permissions to access the path.
+   * Traverse is always checked.
+   *
+   * Parent path means the parent directory for the path.
+   * Ancestor path means the last (the closest) existing ancestor directory
+   * of the path.
+   * Note that if the parent path exists,
+   * then the parent path and the ancestor path are the same.
+   *
+   * For example, suppose the path is "/foo/bar/baz".
+   * No matter baz is a file or a directory,
+   * the parent path is "/foo/bar".
+   * If bar exists, then the ancestor path is also "/foo/bar".
+   * If bar does not exist and foo exists,
+   * then the ancestor path is "/foo".
+   * Further, if both foo and bar do not exist,
+   * then the ancestor path is "/".
+   *
+   * @param doCheckOwner Require user to be the owner of the path?
+   * @param ancestorAccess The access required by the ancestor of the path.
+   * @param parentAccess The access required by the parent of the path.
+   * @param access The access required by the path.
+   * @param subAccess If path is a directory,
+   * it is the access required of the path and all the sub-directories.
+   * If path is not a directory, there is no effect.
+   * @return a PermissionChecker object which caches data for later use.
+   * @throws AccessControlException
+   */
+  void checkPermission(String path, INodeDirectory root, boolean doCheckOwner,
+      FsAction ancestorAccess, FsAction parentAccess, FsAction access,
+      FsAction subAccess) throws AccessControlException {
+    LOG.debug("ACCESS CHECK: " + this
+        + ", doCheckOwner=" + doCheckOwner
+        + ", ancestorAccess=" + ancestorAccess
+        + ", parentAccess=" + parentAccess
+        + ", access=" + access
+        + ", subAccess=" + subAccess);
+
+    synchronized(root) {
+      INode[] inodes = root.getExistingPathINodes(path);
+      int ancestorIndex = inodes.length - 2;
+      for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
+          ancestorIndex--);
+      checkTraverse(inodes, ancestorIndex);
+
+      if (ancestorAccess != null && inodes.length > 1) {
+        check(inodes, ancestorIndex, ancestorAccess);
+      }
+      if (parentAccess != null && inodes.length > 1) {
+        check(inodes, inodes.length - 2, parentAccess);
+      }
+      if (access != null) {
+        check(inodes[inodes.length - 1], access);
+      }
+      if (subAccess != null) {
+        checkSubAccess(inodes[inodes.length - 1], subAccess);
+      }
+      if (doCheckOwner) {
+        checkOwner(inodes[inodes.length - 1]);
+      }
+    }
+  }
+
+  private void checkOwner(INode inode) throws AccessControlException {
+    if (inode != null && user.equals(inode.getUserName())) {
+      return;
+    }
+    throw new AccessControlException("Permission denied");
+  }
+
+  private void checkTraverse(INode[] inodes, int last
+      ) throws AccessControlException {
+    for(int j = 0; j <= last; j++) {
+      check(inodes[j], FsAction.EXECUTE);
+    }
+  }
+
+  private void checkSubAccess(INode inode, FsAction access
+      ) throws AccessControlException {
+    if (inode == null || !inode.isDirectory()) {
+      return;
+    }
+
+    Stack<INodeDirectory> directories = new Stack<INodeDirectory>();
+    for(directories.push((INodeDirectory)inode); !directories.isEmpty(); ) {
+      INodeDirectory d = directories.pop();
+      check(d, access);
+
+      for(INode child : d.getChildren()) {
+        if (child.isDirectory()) {
+          directories.push((INodeDirectory)child);
+        }
+      }
+    }
+  }
+
+  private void check(INode[] inodes, int i, FsAction access
+      ) throws AccessControlException {
+    check(i >= 0? inodes[i]: null, access);
+  }
+
+  private void check(INode inode, FsAction access
+      ) throws AccessControlException {
+    if (inode == null) {
+      return;
+    }
+    FsPermission mode = inode.getFsPermission();
+
+    if (user.equals(inode.getUserName())) { //user class
+      if (mode.getUserAction().implies(access)) { return; }
+    }
+    else if (groups.contains(inode.getGroupName())) { //group class
+      if (mode.getGroupAction().implies(access)) { return; }
+    }
+    else { //other class
+      if (mode.getOtherAction().implies(access)) { return; }
+    }
+    throw new AccessControlException("Permission denied: user=" + user
+        + ", access=" + access + ", inode=" + inode);
+  }
+}

+ 72 - 0
src/java/org/apache/hadoop/dfs/SerialNumberManager.java

@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.util.*;
+
+/** Manage name-to-serial-number maps for users and groups. */
+class SerialNumberManager {
+  /** This is the only instance of {@link SerialNumberManager}.*/
+  static final SerialNumberManager INSTANCE = new SerialNumberManager();
+
+  private SerialNumberMap<String> usermap = new SerialNumberMap<String>();
+  private SerialNumberMap<String> groupmap = new SerialNumberMap<String>();
+
+  private SerialNumberManager() {}
+
+  int getUserSerialNumber(String u) {return usermap.get(u);}
+  int getGroupSerialNumber(String g) {return groupmap.get(g);}
+  String getUser(int n) {return usermap.get(n);}
+  String getGroup(int n) {return groupmap.get(n);}
+
+  {
+    getUserSerialNumber(null);
+    getGroupSerialNumber(null);
+  }
+
+  private static class SerialNumberMap<T> {
+    private int max = 0;
+    private int nextSerialNumber() {return max++;}
+
+    private Map<T, Integer> t2i = new HashMap<T, Integer>();
+    private Map<Integer, T> i2t = new HashMap<Integer, T>();
+
+    synchronized int get(T t) {
+      Integer sn = t2i.get(t);
+      if (sn == null) {
+        sn = nextSerialNumber();
+        t2i.put(t, sn);
+        i2t.put(sn, t);
+      }
+      return sn;
+    }
+
+    synchronized T get(int i) {
+      if (!i2t.containsKey(i)) {
+        throw new IllegalStateException("!i2t.containsKey(" + i
+            + "), this=" + this);
+      }
+      return i2t.get(i);
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+      return "max=" + max + ",\n  t2i=" + t2i + ",\n  i2t=" + i2t;
+    }
+  }
+}

+ 12 - 0
src/java/org/apache/hadoop/fs/permission/FsPermission.java

@@ -35,6 +35,18 @@ public class FsPermission implements Writable {
     WritableFactories.setFactory(FsPermission.class, FACTORY);
     WritableFactories.setFactory(FsPermission.class, FACTORY);
   }
   }
 
 
+  /** Create an immutable {@link FsPermission} object. */
+  public static FsPermission createImmutable(short permission) {
+    return new FsPermission(permission) {
+      public FsPermission applyUMask(FsPermission umask) {
+        throw new UnsupportedOperationException();
+      }
+      public void readFields(DataInput in) throws IOException {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
   //POSIX permission style
   //POSIX permission style
   private FsAction useraction = null;
   private FsAction useraction = null;
   private FsAction groupaction = null;
   private FsAction groupaction = null;

+ 22 - 0
src/java/org/apache/hadoop/fs/permission/PermissionStatus.java

@@ -34,6 +34,19 @@ public class PermissionStatus implements Writable {
     WritableFactories.setFactory(PermissionStatus.class, FACTORY);
     WritableFactories.setFactory(PermissionStatus.class, FACTORY);
   }
   }
 
 
+  /** Create an immutable {@link PermissionStatus} object. */
+  public static PermissionStatus createImmutable(
+      String user, String group, FsPermission permission) {
+    return new PermissionStatus(user, group, permission) {
+      public PermissionStatus applyUMask(FsPermission umask) {
+        throw new UnsupportedOperationException();
+      }
+      public void readFields(DataInput in) throws IOException {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
   private String username;
   private String username;
   private String groupname;
   private String groupname;
   private FsPermission permission;
   private FsPermission permission;
@@ -56,6 +69,15 @@ public class PermissionStatus implements Writable {
   /** Return permission */
   /** Return permission */
   public FsPermission getPermission() {return permission;}
   public FsPermission getPermission() {return permission;}
 
 
+  /**
+   * Apply umask.
+   * @see FsPermission#applyUMask(FsPermission)
+   */
+  public PermissionStatus applyUMask(FsPermission umask) {
+    permission = permission.applyUMask(umask);
+    return this;
+  }
+
   /** {@inheritDoc} */
   /** {@inheritDoc} */
   public void readFields(DataInput in) throws IOException {
   public void readFields(DataInput in) throws IOException {
     username = Text.readString(in);
     username = Text.readString(in);

+ 14 - 4
src/java/org/apache/hadoop/ipc/Server.java

@@ -21,7 +21,6 @@ package org.apache.hadoop.ipc;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.DataInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.DataOutputStream;
-import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.ByteArrayOutputStream;
 
 
@@ -45,6 +44,8 @@ import java.util.List;
 import java.util.Iterator;
 import java.util.Iterator;
 import java.util.Random;
 import java.util.Random;
 
 
+import javax.security.auth.login.LoginException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
@@ -52,9 +53,9 @@ import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.ipc.SocketChannelOutputStream;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.*;
+import org.apache.hadoop.security.UnixUserGroupInformation;
 
 
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -125,7 +126,16 @@ public abstract class Server {
    */
    */
   public static UserGroupInformation getUserInfo() {
   public static UserGroupInformation getUserInfo() {
     Call call = CurCall.get();
     Call call = CurCall.get();
-    return (call == null) ? null : call.connection.ticket;
+    if (call != null)
+      return call.connection.ticket;
+    // This is to support local calls (as opposed to rpc ones) to the name-node.
+    // Currently it is name-node specific and should be placed somewhere else.
+    try {
+      return UnixUserGroupInformation.login();
+    } catch(LoginException le) {
+      LOG.info(StringUtils.stringifyException(le));
+      return null;
+    }
   }
   }
   
   
   private String bindAddress; 
   private String bindAddress; 

+ 4 - 8
src/java/org/apache/hadoop/security/UnixUserGroupInformation.java

@@ -110,12 +110,6 @@ public class UnixUserGroupInformation implements UserGroupInformation {
     return userName;
     return userName;
   }
   }
 
 
-  /** Return the default  group name
-   */
-  public String getDefaultGroupName() {
-    return groupNames[0];
-  }
-
   /* The following two methods implements Writable interface */
   /* The following two methods implements Writable interface */
   final private static String UGI_TECHNOLOGY = "STRING_UGI"; 
   final private static String UGI_TECHNOLOGY = "STRING_UGI"; 
   /** Deserialize this object
   /** Deserialize this object
@@ -218,13 +212,14 @@ public class UnixUserGroupInformation implements UserGroupInformation {
     return currentUGI;
     return currentUGI;
   }
   }
   
   
-  /* Get current user's name and the names of all its groups from Unix.
+  /**
+   * Get current user's name and the names of all its groups from Unix.
    * It's assumed that there is only one UGI per user. If this user already
    * It's assumed that there is only one UGI per user. If this user already
    * has a UGI in the ugi map, return the ugi in the map.
    * has a UGI in the ugi map, return the ugi in the map.
    * Otherwise get the current user's information from Unix, store it
    * Otherwise get the current user's information from Unix, store it
    * in the map, and return it.
    * in the map, and return it.
    */
    */
-  private static UnixUserGroupInformation login() throws LoginException {
+  public static UnixUserGroupInformation login() throws LoginException {
     try {
     try {
       String userName =  getUnixUserName();
       String userName =  getUnixUserName();
 
 
@@ -263,6 +258,7 @@ public class UnixUserGroupInformation implements UserGroupInformation {
     UnixUserGroupInformation ugi = readFromConf(conf, UGI_PROPERTY_NAME);
     UnixUserGroupInformation ugi = readFromConf(conf, UGI_PROPERTY_NAME);
     if (ugi == null) {
     if (ugi == null) {
       ugi = login();
       ugi = login();
+      LOG.debug("Unix Login: " + ugi);
     } 
     } 
     return ugi;
     return ugi;
   }
   }

+ 4 - 6
src/java/org/apache/hadoop/security/UserGroupInformation.java

@@ -17,11 +17,15 @@
  */
  */
 package org.apache.hadoop.security;
 package org.apache.hadoop.security;
 
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
 
 
 /** A {@link Writable} interface for storing user and groups information.
 /** A {@link Writable} interface for storing user and groups information.
  */
  */
 public interface UserGroupInformation extends Writable {
 public interface UserGroupInformation extends Writable {
+  public static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
+
   /** Get username
   /** Get username
    * 
    * 
    * @return the user's name
    * @return the user's name
@@ -33,10 +37,4 @@ public interface UserGroupInformation extends Writable {
    * @return an array of group names
    * @return an array of group names
    */
    */
   public String[] getGroupNames();
   public String[] getGroupNames();
-  
-  /** Get the default group name.
-   * 
-   * @return the default the group name
-   */
-  public String getDefaultGroupName();
 }
 }

+ 5 - 3
src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java

@@ -9,6 +9,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -443,8 +444,8 @@ public class NNThroughputBenchmark {
     throws IOException {
     throws IOException {
       long start = System.currentTimeMillis();
       long start = System.currentTimeMillis();
       // dummyActionNoSynch(fileIdx);
       // dummyActionNoSynch(fileIdx);
-      nameNode.create(fileNames[daemonId][inputIdx], clientName, 
-                      true, replication, BLOCK_SIZE);
+      nameNode.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
+                      clientName, true, replication, BLOCK_SIZE);
       long end = System.currentTimeMillis();
       long end = System.currentTimeMillis();
       return end-start;
       return end-start;
     }
     }
@@ -656,7 +657,8 @@ public class NNThroughputBenchmark {
       String clientName = getClientName(007);
       String clientName = getClientName(007);
       for(int idx=0; idx < nrFiles; idx++) {
       for(int idx=0; idx < nrFiles; idx++) {
         String fileName = nameGenerator.getNextFileName();
         String fileName = nameGenerator.getNextFileName();
-        nameNode.create(fileName, clientName, true, replication, BLOCK_SIZE);
+        nameNode.create(fileName, FsPermission.getDefault(),
+                        clientName, true, replication, BLOCK_SIZE);
         addBlocks(fileName, clientName);
         addBlocks(fileName, clientName);
         nameNode.complete(fileName, clientName);
         nameNode.complete(fileName, clientName);
       }
       }

+ 1 - 1
src/test/org/apache/hadoop/dfs/TestBalancer.java

@@ -79,7 +79,7 @@ public class TestBalancer extends TestCase {
       long fileLen = size/replicationFactor;
       long fileLen = size/replicationFactor;
       createFile(fileLen, replicationFactor);
       createFile(fileLen, replicationFactor);
 
 
-      List<LocatedBlock> locatedBlocks = cluster.getNameNode().
+      List<LocatedBlock> locatedBlocks = client.
       getBlockLocations(fileName, 0, fileLen).getLocatedBlocks();
       getBlockLocations(fileName, 0, fileLen).getLocatedBlocks();
 
 
       int numOfBlocks = locatedBlocks.size();
       int numOfBlocks = locatedBlocks.size();

+ 9 - 9
src/test/org/apache/hadoop/dfs/TestBlockReplacement.java

@@ -93,7 +93,11 @@ public class TestBlockReplacement extends TestCase {
           DEFAULT_BLOCK_SIZE, REPLICATION_FACTOR, r.nextLong());
           DEFAULT_BLOCK_SIZE, REPLICATION_FACTOR, r.nextLong());
       DFSTestUtil.waitReplication(fs,fileName, REPLICATION_FACTOR);
       DFSTestUtil.waitReplication(fs,fileName, REPLICATION_FACTOR);
       
       
-      List<LocatedBlock> locatedBlocks = cluster.getNameNode().
+      // get all datanodes
+      InetSocketAddress addr = new InetSocketAddress("localhost",
+          cluster.getNameNodePort());
+      DFSClient client = new DFSClient(addr, CONF);
+      List<LocatedBlock> locatedBlocks = client.namenode.
         getBlockLocations("/tmp.txt", 0, DEFAULT_BLOCK_SIZE).getLocatedBlocks();
         getBlockLocations("/tmp.txt", 0, DEFAULT_BLOCK_SIZE).getLocatedBlocks();
       assertEquals(1, locatedBlocks.size());
       assertEquals(1, locatedBlocks.size());
       LocatedBlock block = locatedBlocks.get(0);
       LocatedBlock block = locatedBlocks.get(0);
@@ -105,10 +109,6 @@ public class TestBlockReplacement extends TestCase {
       cluster.startDataNodes(CONF, 1, true, null, NEW_RACKS);
       cluster.startDataNodes(CONF, 1, true, null, NEW_RACKS);
       cluster.waitActive();
       cluster.waitActive();
       
       
-      // get all datanodes
-      InetSocketAddress addr = new InetSocketAddress("localhost",
-          cluster.getNameNodePort());
-      DFSClient client = new DFSClient(addr, CONF);
       DatanodeInfo[] datanodes = client.datanodeReport(DatanodeReportType.ALL);
       DatanodeInfo[] datanodes = client.datanodeReport(DatanodeReportType.ALL);
 
 
       // find out the new node
       // find out the new node
@@ -157,7 +157,7 @@ public class TestBlockReplacement extends TestCase {
       // block locations should contain two proxies and newNode
       // block locations should contain two proxies and newNode
       checkBlocks(new DatanodeInfo[]{newNode, proxies.get(0), proxies.get(1)},
       checkBlocks(new DatanodeInfo[]{newNode, proxies.get(0), proxies.get(1)},
           fileName.toString(), 
           fileName.toString(), 
-          DEFAULT_BLOCK_SIZE, REPLICATION_FACTOR);
+          DEFAULT_BLOCK_SIZE, REPLICATION_FACTOR, client);
       // case 4: proxies.get(0) is not a valid del hint
       // case 4: proxies.get(0) is not a valid del hint
       LOG.info("Testcase 4: invalid del hint " + proxies.get(0).getName() );
       LOG.info("Testcase 4: invalid del hint " + proxies.get(0).getName() );
       assertTrue(replaceBlock(b, proxies.get(1), proxies.get(0), source));
       assertTrue(replaceBlock(b, proxies.get(1), proxies.get(0), source));
@@ -166,7 +166,7 @@ public class TestBlockReplacement extends TestCase {
        */
        */
       checkBlocks(proxies.toArray(new DatanodeInfo[proxies.size()]), 
       checkBlocks(proxies.toArray(new DatanodeInfo[proxies.size()]), 
           fileName.toString(), 
           fileName.toString(), 
-          DEFAULT_BLOCK_SIZE, REPLICATION_FACTOR);
+          DEFAULT_BLOCK_SIZE, REPLICATION_FACTOR, client);
     } finally {
     } finally {
       cluster.shutdown();
       cluster.shutdown();
     }
     }
@@ -174,14 +174,14 @@ public class TestBlockReplacement extends TestCase {
   
   
   /* check if file's blocks exist at includeNodes */
   /* check if file's blocks exist at includeNodes */
   private void checkBlocks(DatanodeInfo[] includeNodes, String fileName, 
   private void checkBlocks(DatanodeInfo[] includeNodes, String fileName, 
-      long fileLen, short replFactor) throws IOException {
+      long fileLen, short replFactor, DFSClient client) throws IOException {
     Boolean notDone;
     Boolean notDone;
     do {
     do {
       try {
       try {
         Thread.sleep(100);
         Thread.sleep(100);
       } catch(InterruptedException e) {
       } catch(InterruptedException e) {
       }
       }
-      List<LocatedBlock> blocks = cluster.getNameNode().
+      List<LocatedBlock> blocks = client.namenode.
       getBlockLocations(fileName, 0, fileLen).getLocatedBlocks();
       getBlockLocations(fileName, 0, fileLen).getLocatedBlocks();
       assertEquals(1, blocks.size());
       assertEquals(1, blocks.size());
       DatanodeInfo[] nodes = blocks.get(0).getLocations();
       DatanodeInfo[] nodes = blocks.get(0).getLocations();

+ 5 - 1
src/test/org/apache/hadoop/dfs/TestEditLog.java

@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.Random;
 import java.util.Random;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.*;
 
 
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.UTF8;
@@ -57,8 +58,11 @@ public class TestEditLog extends TestCase {
 
 
     // add a bunch of transactions.
     // add a bunch of transactions.
     public void run() {
     public void run() {
+      PermissionStatus p = FSNamesystem.getFSNamesystem(
+          ).createFsOwnerPermissions(new FsPermission((short)0777));
+
       for (int i = 0; i < numTransactions; i++) {
       for (int i = 0; i < numTransactions; i++) {
-        INodeFile inode = new INodeFile(0, replication, 0, blockSize);
+        INodeFile inode = new INodeFile(p, 0, replication, 0, blockSize);
         editLog.logCreateFile("/filename" + i, inode);
         editLog.logCreateFile("/filename" + i, inode);
         editLog.logSync();
         editLog.logSync();
       }
       }

+ 4 - 1
src/test/org/apache/hadoop/dfs/TestGetBlocks.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
 
 
 import junit.framework.TestCase;
 import junit.framework.TestCase;
 /**
 /**
@@ -69,7 +70,9 @@ public class TestGetBlocks extends TestCase {
       DatanodeInfo[] dataNodes=null;
       DatanodeInfo[] dataNodes=null;
       boolean notWritten;
       boolean notWritten;
       do {
       do {
-        locatedBlocks = cluster.getNameNode().
+        DFSClient dfsclient = new DFSClient(
+            NetUtils.createSocketAddr(CONF.get("fs.default.name")), CONF);
+        locatedBlocks = dfsclient.namenode.
           getBlockLocations("/tmp.txt", 0, fileLen).getLocatedBlocks();
           getBlockLocations("/tmp.txt", 0, fileLen).getLocatedBlocks();
         assertEquals(2, locatedBlocks.size());
         assertEquals(2, locatedBlocks.size());
         notWritten = false;
         notWritten = false;

+ 2 - 5
src/test/org/apache/hadoop/dfs/TestReplication.java

@@ -64,11 +64,8 @@ public class TestReplication extends TestCase {
   private void checkFile(FileSystem fileSys, Path name, int repl)
   private void checkFile(FileSystem fileSys, Path name, int repl)
     throws IOException {
     throws IOException {
     Configuration conf = fileSys.getConf();
     Configuration conf = fileSys.getConf();
-    ClientProtocol namenode = (ClientProtocol) RPC.getProxy(
-                      ClientProtocol.class,
-                      ClientProtocol.versionID,
-                      NetUtils.createSocketAddr(conf.get("fs.default.name")), 
-                      conf);
+    ClientProtocol namenode = DFSClient.createNamenode(
+        NetUtils.createSocketAddr(conf.get("fs.default.name")), conf);
       
       
     LocatedBlocks locations;
     LocatedBlocks locations;
     boolean isReplicationDone;
     boolean isReplicationDone;

+ 177 - 0
src/test/org/apache/hadoop/security/TestPermission.java

@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.ipc.RemoteException;
+
+import junit.framework.TestCase;
+
+/** Unit tests for permission */
+public class TestPermission extends TestCase {
+  public static final Log LOG = LogFactory.getLog(TestPermission.class);
+
+  final private static Path ROOT_PATH = new Path("/data");
+  final private static Path CHILD_DIR1 = new Path(ROOT_PATH, "web1");
+  final private static Path CHILD_DIR2 = new Path(ROOT_PATH, "web2");
+  final private static Path CHILD_FILE1 = new Path(ROOT_PATH, "file1");
+  final private static Path CHILD_FILE2 = new Path(ROOT_PATH, "file2");
+
+  final private static int FILE_LEN = 100;
+  final private static String PERMISSION_EXCEPTION_NAME =
+    AccessControlException.class.getName();
+
+  final private static String USER_NAME = "Who";
+  final private static String GROUP1_NAME = "group1";
+  final private static String GROUP2_NAME = "group2";
+  final private static String[] GROUP_NAMES = {GROUP1_NAME, GROUP2_NAME};
+
+  static FsPermission checkPermission(FileSystem fs,
+      String path, FsPermission expected) throws IOException {
+    FileStatus s = fs.getFileStatus(new Path(path));
+    LOG.info(s.getPath() + ": " + s.isDir() + " " + s.getPermission()
+        + ":" + s.getOwner() + ":" + s.getGroup());
+    if (expected != null) {
+      assertEquals(expected, s.getPermission());
+      assertEquals(expected.toShort(), s.getPermission().toShort());
+    }
+    return s.getPermission();
+  }
+
+  public void testCreate() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean("dfs.permissions", true);
+    conf.setInt(FsPermission.UMASK_LABEL, 0);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
+    cluster.waitActive();
+    FileSystem fs = FileSystem.get(conf);
+
+    try {
+      FsPermission rootPerm = checkPermission(fs, "/", null);
+      FsPermission inheritPerm = FsPermission.createImmutable(
+          (short)(rootPerm.toShort() | 0300));
+
+      FsPermission dirPerm = new FsPermission((short)0777);
+      fs.mkdirs(new Path("/a1/a2/a3"), dirPerm);
+      checkPermission(fs, "/a1", inheritPerm);
+      checkPermission(fs, "/a1/a2", inheritPerm);
+      checkPermission(fs, "/a1/a2/a3", dirPerm);
+
+      FsPermission filePerm = new FsPermission((short)0444);
+      FSDataOutputStream out = fs.create(new Path("/b1/b2/b3.txt"), filePerm,
+          true, conf.getInt("io.file.buffer.size", 4096),
+          fs.getDefaultReplication(), fs.getDefaultBlockSize(), null);
+      out.write(123);
+      out.close();
+      checkPermission(fs, "/b1", inheritPerm);
+      checkPermission(fs, "/b1/b2", inheritPerm);
+      checkPermission(fs, "/b1/b2/b3.txt", filePerm);
+    }
+    finally {
+      try{fs.close();} catch(Exception e) {}
+      try{cluster.shutdown();} catch(Exception e) {}
+    }
+  }
+
+  public void testFilePermision() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean("dfs.permissions", true);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
+    cluster.waitActive();
+    FileSystem fs = FileSystem.get(conf);
+
+    try {
+      // following dir/file creations are legal
+      fs.mkdirs(CHILD_DIR1);
+      FSDataOutputStream out = fs.create(CHILD_FILE1);
+      byte data[] = new byte[FILE_LEN];
+      Random r = new Random();
+      r.nextBytes(data);
+      out.write(data);
+      out.close();
+      fs.setPermission(CHILD_FILE1, new FsPermission((short)0700));
+
+      // following read is legal
+      byte dataIn[] = new byte[FILE_LEN];
+      FSDataInputStream fin = fs.open(CHILD_FILE1);
+      fin.read(dataIn);
+      for(int i=0; i<FILE_LEN; i++) {
+        assertEquals(data[i], dataIn[i]);
+      }
+      fs.close();
+
+      // test illegal file/dir creation
+      UnixUserGroupInformation userGroupInfo = new UnixUserGroupInformation(
+          USER_NAME, GROUP_NAMES );
+      conf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME,
+          userGroupInfo.toString());
+      fs = FileSystem.get(conf);
+
+      // illegal mkdir
+      assertTrue(!canMkdirs(fs, CHILD_DIR2));
+
+      // illegal file creation
+      assertTrue(!canCreate(fs, CHILD_FILE2));
+
+      // illegal file open
+      assertTrue(!canOpen(fs, CHILD_FILE1));
+    }
+    finally {
+      try{fs.close();} catch(Exception e) {}
+      try{cluster.shutdown();} catch(Exception e) {}
+    }
+  }
+
+  static boolean canMkdirs(FileSystem fs, Path p) throws IOException {
+    try {
+      fs.mkdirs(p);
+      return true;
+    } catch(RemoteException e) {
+      assertEquals(PERMISSION_EXCEPTION_NAME, e.getClassName());
+      return false;
+    }
+  }
+
+  static boolean canCreate(FileSystem fs, Path p) throws IOException {
+    try {
+      fs.create(p);
+      return true;
+    } catch(RemoteException e) {
+      assertEquals(PERMISSION_EXCEPTION_NAME, e.getClassName());
+      return false;
+    }
+  }
+
+  static boolean canOpen(FileSystem fs, Path p) throws IOException {
+    try {
+      fs.open(p);
+      return true;
+    } catch(RemoteException e) {
+      assertEquals(PERMISSION_EXCEPTION_NAME, e.getClassName());
+      return false;
+    }
+  }
+}