Browse Source

[partial-ns] Implement GetFileInfo and GetListing.

Haohui Mai 10 years ago
parent
commit
9747ed51a0

+ 162 - 188
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java

@@ -21,11 +21,11 @@ package org.apache.hadoop.hdfs.server.namenode;
 import com.google.common.base.Preconditions;
 import org.apache.commons.io.Charsets;
 import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@@ -35,15 +35,13 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.SnapshotException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
-import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Map;
 
 import static org.apache.hadoop.util.Time.now;
 
@@ -51,37 +49,28 @@ class FSDirStatAndListingOp {
   static DirectoryListing getListingInt(FSDirectory fsd, final String srcArg,
       byte[] startAfter, boolean needLocation) throws IOException {
     FSPermissionChecker pc = fsd.getPermissionChecker();
-    byte[][] pathComponents = FSDirectory
-        .getPathComponentsForReservedPath(srcArg);
-    final String startAfterString = new String(startAfter, Charsets.UTF_8);
-    final String src = fsd.resolvePath(pc, srcArg, pathComponents);
-    final INodesInPath iip = fsd.getINodesInPath(src, true);
-
-    // Get file name when startAfter is an INodePath
-    if (FSDirectory.isReservedName(startAfterString)) {
-      byte[][] startAfterComponents = FSDirectory
-          .getPathComponentsForReservedPath(startAfterString);
-      try {
-        String tmp = FSDirectory.resolvePath(src, startAfterComponents, fsd);
-        byte[][] regularPath = INode.getPathComponents(tmp);
-        startAfter = regularPath[regularPath.length - 1];
-      } catch (IOException e) {
-        // Possibly the inode is deleted
-        throw new DirectoryListingStartAfterNotFoundException(
-            "Can't find startAfter " + startAfterString);
+    String startAfterComponent = startAfter.length == 0 ? "" : new String(startAfter, Charsets.UTF_8);
+    try (ROTransaction tx = fsd.newROTransaction().begin()) {
+      Resolver.Result paths = Resolver.resolve(tx, srcArg);
+      if (paths.invalidPath()) {
+        throw new InvalidPathException(srcArg);
+      } else if (paths.notFound()) {
+        return null;
       }
-    }
+      final FlatINodesInPath iip = paths.inodesInPath();
 
-    boolean isSuperUser = true;
-    if (fsd.isPermissionEnabled()) {
-      if (iip.getLastINode() != null && iip.getLastINode().isDirectory()) {
-        fsd.checkPathAccess(pc, iip, FsAction.READ_EXECUTE);
-      } else {
-        fsd.checkTraverse(pc, iip);
+      boolean isSuperUser = true;
+      if (fsd.isPermissionEnabled()) {
+        if (iip.getLastINode() != null && iip.getLastINode().isDirectory()) {
+          fsd.checkPathAccess(pc, iip, FsAction.READ_EXECUTE);
+        } else {
+          fsd.checkTraverse(pc, paths);
+        }
+        isSuperUser = pc.isSuperUser();
       }
-      isSuperUser = pc.isSuperUser();
+      return getListing(tx, fsd, iip, srcArg, startAfterComponent, needLocation,
+                        isSuperUser);
     }
-    return getListing(fsd, iip, src, startAfter, needLocation, isSuperUser);
   }
 
   /**
@@ -102,16 +91,25 @@ class FSDirStatAndListingOp {
       throw new InvalidPathException("Invalid file name: " + src);
     }
     FSPermissionChecker pc = fsd.getPermissionChecker();
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
-    src = fsd.resolvePath(pc, src, pathComponents);
-    final INodesInPath iip = fsd.getINodesInPath(src, resolveLink);
-    boolean isSuperUser = true;
-    if (fsd.isPermissionEnabled()) {
-      fsd.checkPermission(pc, iip, false, null, null, null, null, false);
-      isSuperUser = pc.isSuperUser();
+    try (ROTransaction tx = fsd.newROTransaction().begin()) {
+      Resolver.Result paths = Resolver.resolve(tx, src);
+      if (!paths.ok()) {
+        return null;
+      }
+
+      // TODO: Handle storage policy ID
+      if (fsd.isPermissionEnabled()) {
+        fsd.checkTraverse(pc, paths);
+      }
+
+//      byte policyId = includeStoragePolicy && i != null && !i.isSymlink() ?
+//          i.getStoragePolicyID() : BlockStoragePolicySuite.ID_UNSPECIFIED;
+
+      byte policyId = HdfsConstantsClient.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
+      return createFileStatus(tx, fsd, paths.inodesInPath().getLastINode(),
+                              HdfsFileStatus.EMPTY_NAME,
+                              policyId);
     }
-    return getFileInfo(fsd, src, resolveLink,
-        FSDirectory.isReservedRawName(srcArg), isSuperUser);
   }
 
   /**
@@ -136,7 +134,7 @@ class FSDirStatAndListingOp {
     final INodesInPath iip = fsd.getINodesInPath(src, false);
     if (fsd.isPermissionEnabled()) {
       fsd.checkPermission(pc, iip, false, null, null, null,
-          FsAction.READ_EXECUTE);
+                          FsAction.READ_EXECUTE);
     }
     return getContentSummaryInt(fsd, iip);
   }
@@ -222,118 +220,76 @@ class FSDirStatAndListingOp {
    * @param needLocation if block locations are returned
    * @return a partial listing starting after startAfter
    */
-  private static DirectoryListing getListing(FSDirectory fsd, INodesInPath iip,
-      String src, byte[] startAfter, boolean needLocation, boolean isSuperUser)
+  private static DirectoryListing getListing(
+      ROTransaction tx,
+      FSDirectory fsd, FlatINodesInPath iip,
+      String src, String startAfter, boolean needLocation, boolean isSuperUser)
       throws IOException {
     String srcs = FSDirectory.normalizePath(src);
     final boolean isRawPath = FSDirectory.isReservedRawName(src);
+    final FlatINode targetNode = iip.getLastINode();
 
-    fsd.readLock();
-    try {
-      if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) {
-        return getSnapshotsListing(fsd, srcs, startAfter);
-      }
-      final int snapshot = iip.getPathSnapshotId();
-      final INode targetNode = iip.getLastINode();
-      if (targetNode == null)
-        return null;
-      byte parentStoragePolicy = isSuperUser ?
-          targetNode.getStoragePolicyID() : HdfsConstants
-          .BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
-
-      if (!targetNode.isDirectory()) {
-        INodeAttributes nodeAttrs = getINodeAttributes(
-            fsd, src, HdfsFileStatus.EMPTY_NAME, targetNode,
-            snapshot);
-        return new DirectoryListing(
-            new HdfsFileStatus[]{ createFileStatus(
-                fsd, HdfsFileStatus.EMPTY_NAME, targetNode, nodeAttrs,
-                needLocation, parentStoragePolicy, snapshot, isRawPath, iip)
-            }, 0);
-      }
+      // TODO: Handle storage policy
+//      byte parentStoragePolicy = isSuperUser ?
+//          targetNode.getStoragePolicyID() : HdfsConstants
+//          .BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
+    byte parentStoragePolicy = HdfsConstants
+        .BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
 
-      final INodeDirectory dirInode = targetNode.asDirectory();
-      final ReadOnlyList<INode> contents = dirInode.getChildrenList(snapshot);
-      int startChild = INodeDirectory.nextChild(contents, startAfter);
-      int totalNumChildren = contents.size();
-      int numOfListing = Math.min(totalNumChildren - startChild,
-          fsd.getLsLimit());
-      int locationBudget = fsd.getLsLimit();
-      int listingCnt = 0;
-      HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
-      for (int i=0; i<numOfListing && locationBudget>0; i++) {
-        INode cur = contents.get(startChild+i);
-        byte curPolicy = isSuperUser && !cur.isSymlink()?
-            cur.getLocalStoragePolicyID():
-            HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
-        INodeAttributes nodeAttrs = getINodeAttributes(
-            fsd, src, cur.getLocalNameBytes(), cur,
-            snapshot);
-        listing[i] = createFileStatus(fsd, cur.getLocalNameBytes(),
-            cur, nodeAttrs, needLocation, getStoragePolicyID(curPolicy,
-                parentStoragePolicy), snapshot, isRawPath, iip);
-        listingCnt++;
-        if (needLocation) {
-            // Once we  hit lsLimit locations, stop.
-            // This helps to prevent excessively large response payloads.
-            // Approximate #locations with locatedBlockCount() * repl_factor
-            LocatedBlocks blks =
-                ((HdfsLocatedFileStatus)listing[i]).getBlockLocations();
-            locationBudget -= (blks == null) ? 0 :
-               blks.locatedBlockCount() * listing[i].getReplication();
-        }
-      }
-      // truncate return array if necessary
-      if (listingCnt < numOfListing) {
-          listing = Arrays.copyOf(listing, listingCnt);
-      }
+    if (!targetNode.isDirectory()) {
       return new DirectoryListing(
-          listing, totalNumChildren-startChild-listingCnt);
-    } finally {
-      fsd.readUnlock();
+          new HdfsFileStatus[]{ createFileStatus(
+              tx, fsd, targetNode, HdfsFileStatus.EMPTY_NAME,
+              parentStoragePolicy)
+          }, 0);
     }
-  }
 
-  /**
-   * Get a listing of all the snapshots of a snapshottable directory
-   */
-  private static DirectoryListing getSnapshotsListing(
-      FSDirectory fsd, String src, byte[] startAfter)
-      throws IOException {
-    Preconditions.checkState(fsd.hasReadLock());
-    Preconditions.checkArgument(
-        src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR),
-        "%s does not end with %s", src, HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR);
-
-    final String dirPath = FSDirectory.normalizePath(src.substring(0,
-        src.length() - HdfsConstants.DOT_SNAPSHOT_DIR.length()));
-
-    final INode node = fsd.getINode(dirPath);
-    final INodeDirectory dirNode = INodeDirectory.valueOf(node, dirPath);
-    final DirectorySnapshottableFeature sf = dirNode.getDirectorySnapshottableFeature();
-    if (sf == null) {
-      throw new SnapshotException(
-          "Directory is not a snapshottable directory: " + dirPath);
+    Map<ByteBuffer, Long> children = tx.childrenView(targetNode.id()).tailMap(
+        ByteBuffer.wrap(startAfter.getBytes(Charsets.UTF_8)));
+    int numOfListing = Math.min(children.size(), fsd.getLsLimit());
+    int locationBudget = fsd.getLsLimit();
+    int listingCnt = 0;
+    int i = 0;
+    HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
+
+    for (Map.Entry<ByteBuffer, Long> e : children.entrySet()) {
+      if (locationBudget < 0 && i >= listing.length) {
+        break;
+      }
+
+      FlatINode cur = tx.getINode(e.getValue());
+      // TODO: Handle Storage policy
+//      byte curPolicy = isSuperUser && !cur.isSymlink()?
+//          cur.getLocalStoragePolicyID():
+//          HdfsConstantsClient.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
+      byte curPolicy = HdfsConstantsClient.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
+      ByteBuffer b =e.getKey().duplicate();
+      byte[] localName = new byte[b.remaining()];
+      b.get(localName);
+      listing[i] =
+          createFileStatus(tx, fsd, cur, localName, needLocation,
+                           getStoragePolicyID(curPolicy, parentStoragePolicy));
+      if (needLocation) {
+        // Once we  hit lsLimit locations, stop.
+        // This helps to prevent excessively large response payloads.
+        // Approximate #locations with locatedBlockCount() * repl_factor
+        LocatedBlocks blks =
+            ((HdfsLocatedFileStatus)listing[i]).getBlockLocations();
+        locationBudget -= (blks == null) ? 0 :
+            blks.locatedBlockCount() * listing[i].getReplication();
+      }
+      ++i;
+      ++listingCnt;
     }
-    final ReadOnlyList<Snapshot> snapshots = sf.getSnapshotList();
-    int skipSize = ReadOnlyList.Util.binarySearch(snapshots, startAfter);
-    skipSize = skipSize < 0 ? -skipSize - 1 : skipSize + 1;
-    int numOfListing = Math.min(snapshots.size() - skipSize, fsd.getLsLimit());
-    final HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
-    for (int i = 0; i < numOfListing; i++) {
-      Snapshot.Root sRoot = snapshots.get(i + skipSize).getRoot();
-      INodeAttributes nodeAttrs = getINodeAttributes(
-          fsd, src, sRoot.getLocalNameBytes(),
-          node, Snapshot.CURRENT_STATE_ID);
-      listing[i] = createFileStatus(
-          fsd, sRoot.getLocalNameBytes(),
-          sRoot, nodeAttrs,
-          HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED,
-          Snapshot.CURRENT_STATE_ID, false,
-          INodesInPath.fromINode(sRoot));
+
+    // truncate return array if necessary
+    if (listingCnt < numOfListing) {
+      listing = Arrays.copyOf(listing, listingCnt);
     }
+
     return new DirectoryListing(
-        listing, snapshots.size() - skipSize - numOfListing);
+        listing,
+        listingCnt < numOfListing ? 0 : children.size() - listingCnt);
   }
 
   /** Get the file info for a specific file.
@@ -348,72 +304,35 @@ class FSDirStatAndListingOp {
       FSDirectory fsd, String path, INodesInPath src, boolean isRawPath,
       boolean includeStoragePolicy)
       throws IOException {
-    fsd.readLock();
-    try {
-      final INode i = src.getLastINode();
-      if (i == null) {
-        return null;
-      }
-
-      byte policyId = includeStoragePolicy && !i.isSymlink() ?
-          i.getStoragePolicyID() :
-          HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
-      INodeAttributes nodeAttrs = getINodeAttributes(fsd, path,
-                                                     HdfsFileStatus.EMPTY_NAME,
-                                                     i, src.getPathSnapshotId());
-      return createFileStatus(fsd, HdfsFileStatus.EMPTY_NAME, i, nodeAttrs,
-                              policyId, src.getPathSnapshotId(), isRawPath, src);
-    } finally {
-      fsd.readUnlock();
-    }
+    throw new IllegalArgumentException("Unimplemented");
   }
 
   static HdfsFileStatus getFileInfo(
       FSDirectory fsd, String src, boolean resolveLink, boolean isRawPath,
       boolean includeStoragePolicy)
     throws IOException {
-    String srcs = FSDirectory.normalizePath(src);
-    if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) {
-      if (fsd.getINode4DotSnapshot(srcs) != null) {
-        return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
-            HdfsFileStatus.EMPTY_NAME, -1L, 0, null,
-            HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED);
-      }
-      return null;
-    }
-
-    fsd.readLock();
-    try {
-      final INodesInPath iip = fsd.getINodesInPath(srcs, resolveLink);
-      return getFileInfo(fsd, src, iip, isRawPath, includeStoragePolicy);
-    } finally {
-      fsd.readUnlock();
-    }
+    throw new IllegalArgumentException("Unimplemented");
   }
 
   /**
    * create an hdfs file status from an inode
    *
    * @param fsd FSDirectory
-   * @param path the local name
    * @param node inode
    * @param needLocation if block locations need to be included or not
-   * @param isRawPath true if this is being called on behalf of a path in
-   *                  /.reserved/raw
    * @return a file status
    * @throws java.io.IOException if any error occurs
    */
   private static HdfsFileStatus createFileStatus(
-      FSDirectory fsd, byte[] path, INode node, INodeAttributes nodeAttrs,
-      boolean needLocation, byte storagePolicy, int snapshot, boolean isRawPath,
-      INodesInPath iip)
+      ROTransaction tx, FSDirectory fsd, FlatINode node,
+      byte[] localName, boolean needLocation, byte storagePolicy)
       throws IOException {
     if (needLocation) {
-      return createLocatedFileStatus(fsd, path, node, nodeAttrs, storagePolicy,
-                                     snapshot, isRawPath, iip);
+      throw new IllegalStateException("Unimplemented");
+//      return createLocatedFileStatus(fsd, path, node, nodeAttrs, storagePolicy,
+//                                     snapshot, isRawPath, iip);
     } else {
-      return createFileStatus(fsd, path, node, nodeAttrs, storagePolicy,
-                              snapshot, isRawPath, iip);
+      return createFileStatus(tx, fsd, node, localName, storagePolicy);
     }
   }
 
@@ -482,6 +401,61 @@ class FSDirStatAndListingOp {
     return fsd.getAttributes(fullPath, path, node, snapshot);
   }
 
+  /**
+   * Create FileStatus by file INode
+   */
+  static HdfsFileStatus createFileStatus(
+      Transaction tx, FSDirectory fsd, FlatINode node,
+      byte[] localName, byte storagePolicy)
+      throws IOException {
+    long size = 0;     // length is zero for directories
+    short replication = 0;
+    long blocksize = 0;
+    final boolean isEncrypted;
+
+    // TODO: Handle FileEncryptionInfo
+    final FileEncryptionInfo feInfo = null;
+//    final FileEncryptionInfo feInfo = isRawPath ? null :
+//        fsd.getFileEncryptionInfo(node, snapshot, iip);
+
+    if (node.isFile()) {
+      FlatINodeFileFeature f = node.feature(FlatINodeFileFeature.class);
+      size = f.fileSize();
+      replication = f.replication();
+      blocksize = f.blockSize();
+      isEncrypted = false;
+    } else {
+      isEncrypted = false;
+    }
+
+    int childrenNum = node.isDirectory()
+        ? tx.childrenView(node.id()).size()
+        : 0;
+
+    PermissionStatus perm = node.permissionStatus(fsd.ugid());
+
+    // TODO:
+    // <ul>
+    // <li>If the INode has ACL / encryption, the function returns
+    // FsPermissionExtension instead of FsPermisson</li>
+    // <li>Handle symlink</li>
+    // </ul>
+    //
+    return new HdfsFileStatus(size,
+        node.isDirectory(), replication, blocksize,
+        node.mtime(),
+        node.atime(),
+        perm.getPermission(),
+        perm.getUserName(),
+        perm.getGroupName(),
+        null,
+        localName,
+        node.id(),
+        childrenNum,
+        feInfo,
+        storagePolicy);
+  }
+
   /**
    * Create FileStatus with location info by file INode
    */

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -1568,6 +1568,11 @@ public class FSDirectory implements Closeable {
     checkPermission(pc, iip, false, null, null, null, null);
   }
 
+  void checkPathAccess(FSPermissionChecker pc, FlatINodesInPath iip,
+      FsAction access) throws AccessControlException {
+    // TODO
+  }
+
   void checkTraverse(FSPermissionChecker pc, Resolver.Result iip)
       throws AccessControlException {
     // TODO

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FlatNSUtil.java

@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import com.google.common.base.Preconditions;
+
 public class FlatNSUtil {
   public static String getNextComponent(String src, int offset) {
     if (offset >= src.length()) {
@@ -35,4 +37,10 @@ public class FlatNSUtil {
   public static boolean hasNextLevelInPath(String src, int offset) {
     return src.indexOf('/', offset + 1) != -1;
   }
+
+  public static String getLastComponent(String src) {
+    int next = src.lastIndexOf('/');
+    Preconditions.checkState(next != -1);
+    return getNextComponent(src, next);
+  }
 }