浏览代码

[partial-ns] Implement getBlockLocations().

Haohui Mai 10 年之前
父节点
当前提交
a56b2a3114

+ 11 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -811,7 +811,7 @@ public class BlockManager {
    *
    * @throws IOException
    */
-  public LocatedBlocks createLocatedBlocks(BlockInfoContiguous[] blocks,
+  public LocatedBlocks createLocatedBlocks(Iterable<Block> blocks,
       final long start, final long length, long visibleFileLength,
       final boolean needBlockToken) throws IOException {
     assert namesystem.hasReadLock();
@@ -819,7 +819,8 @@ public class BlockManager {
     long pos = 0;
     long end = Math.min(start + length, visibleFileLength);
     BlockInfoContiguous last = null;
-    for (BlockInfoContiguous b : blocks) {
+    for (Block bid : blocks) {
+      BlockInfoContiguous b = getStoredBlock(bid);
       last = b;
       long old = pos;
       pos += b.getNumBytes();
@@ -888,8 +889,7 @@ public class BlockManager {
    * replication levels.
    */
   public short adjustReplication(short replication) {
-    return replication < minReplication? minReplication
-        : replication > maxReplication? maxReplication: replication;
+    return replication < minReplication? minReplication : replication > maxReplication? maxReplication: replication;
   }
 
   /**
@@ -974,7 +974,7 @@ public class BlockManager {
     BlockInfoContiguous curBlock;
     while(totalSize<size && iter.hasNext()) {
       curBlock = iter.next();
-      if(!curBlock.isComplete())  continue;
+      if(!curBlock.isComplete()) continue;
       totalSize += addBlock(curBlock, results);
     }
     if(totalSize<size) {
@@ -1443,8 +1443,8 @@ public class BlockManager {
   }
 
   /** Choose target for WebHDFS redirection. */
-  public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src,
-      DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) {
+  public DatanodeStorageInfo[] chooseTarget4WebHDFS(
+      String src, DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) {
     return blockplacement.chooseTarget(src, 1, clientnode,
         Collections.<DatanodeStorageInfo>emptyList(), false, excludes,
         blocksize, storagePolicySuite.getDefaultPolicy());
@@ -3039,7 +3039,7 @@ public class BlockManager {
    */
   private long addBlock(Block block, List<BlockWithLocations> results) {
     final List<DatanodeStorageInfo> locations = getValidLocations(block);
-    if(locations.size() == 0) {
+    if (locations.size() == 0) {
       return 0;
     } else {
       final String[] datanodeUuids = new String[locations.size()];
@@ -3142,8 +3142,9 @@ public class BlockManager {
     int receiving = 0;
     final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
     if (node == null || !node.isAlive) {
-      blockLog.warn("BLOCK* processIncrementalBlockReport"
-              + " is received from dead or unregistered node {}", nodeID);
+      blockLog.warn(
+          "BLOCK* processIncrementalBlockReport" + " is received from dead or unregistered node {}",
+          nodeID);
       throw new IOException(
           "Got incremental block report from unregistered or dead node");
     }

+ 54 - 59
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java

@@ -155,34 +155,36 @@ class FSDirStatAndListingOp {
     BlockManager bm = fsd.getBlockManager();
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     src = fsd.resolvePath(pc, src, pathComponents);
-    fsd.readLock();
-    try {
-      final INodesInPath iip = fsd.getINodesInPath(src, true);
-      final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
-      if (fsd.isPermissionEnabled()) {
-        fsd.checkPathAccess(pc, iip, FsAction.READ);
-        fsd.checkUnreadableBySuperuser(pc, inode, iip.getPathSnapshotId());
+    try (ROTransaction tx = fsd.newROTransaction().begin()){
+      Resolver.Result paths = Resolver.resolve(tx, src);
+      if (paths.invalidPath()) {
+        throw new InvalidPathException(src);
+      } else if (paths.notFound()) {
+        throw new FileNotFoundException(src);
       }
 
-      final long fileSize = iip.isSnapshot()
-          ? inode.computeFileSize(iip.getPathSnapshotId())
-          : inode.computeFileSizeNotIncludingLastUcBlock();
+      if (fsd.isPermissionEnabled()) {
+        fsd.checkPathAccess(pc, paths.inodesInPath(), FsAction.READ);
+        fsd.checkUnreadableBySuperuser(pc, paths.inodesInPath());
+      }
 
-      boolean isUc = inode.isUnderConstruction();
-      if (iip.isSnapshot()) {
-        // if src indicates a snapshot file, we need to make sure the returned
-        // blocks do not exceed the size of the snapshot file.
-        length = Math.min(length, fileSize - offset);
-        isUc = false;
+      FlatINode inode = paths.inodesInPath().getLastINode();
+      FlatINodeFileFeature file = inode.feature(FlatINodeFileFeature.class);
+      if (file == null) {
+        throw new FileNotFoundException(src);
       }
 
-      final FileEncryptionInfo feInfo = FSDirectory.isReservedRawName(src)
-          ? null
-          : fsd.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
+      final long fileSize = file.fileSize();
+      final FileEncryptionInfo feInfo = null;
+//      final FileEncryptionInfo feInfo = FSDirectory.isReservedRawName(src)
+//          ? null
+//          : fsd.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
 
-      final LocatedBlocks blocks = bm.createLocatedBlocks(
-          inode.getBlocks(iip.getPathSnapshotId()), fileSize, isUc, offset,
-          length, needBlockToken, iip.isSnapshot(), feInfo);
+      LocatedBlocks blocks = bm.createLocatedBlocks(
+          file.blocks(), offset, length, fileSize, needBlockToken);
+      // TODO: Fix snapshot
+      blocks = FSDirStatAndListingOp.attachFileInfo(
+          blocks, fileSize, file.inConstruction(), false, feInfo);
 
       // Set caching information for the located blocks.
       for (LocatedBlock lb : blocks.getLocatedBlocks()) {
@@ -191,11 +193,8 @@ class FSDirStatAndListingOp {
 
       final long now = now();
       boolean updateAccessTime = fsd.isAccessTimeSupported()
-          && !iip.isSnapshot()
-          && now > inode.getAccessTime() + fsd.getAccessTimePrecision();
+          && now > inode.atime() + fsd.getAccessTimePrecision();
       return new GetBlockLocationsResult(updateAccessTime, blocks);
-    } finally {
-      fsd.readUnlock();
     }
   }
 
@@ -328,9 +327,7 @@ class FSDirStatAndListingOp {
       byte[] localName, boolean needLocation, byte storagePolicy)
       throws IOException {
     if (needLocation) {
-      throw new IllegalStateException("Unimplemented");
-//      return createLocatedFileStatus(fsd, path, node, nodeAttrs, storagePolicy,
-//                                     snapshot, isRawPath, iip);
+      return createLocatedFileStatus(tx, fsd, node, localName, storagePolicy);
     } else {
       return createFileStatus(tx, fsd, node, localName, storagePolicy);
     }
@@ -460,47 +457,45 @@ class FSDirStatAndListingOp {
    * Create FileStatus with location info by file INode
    */
   private static HdfsLocatedFileStatus createLocatedFileStatus(
-      FSDirectory fsd, byte[] path, INode node, INodeAttributes nodeAttrs,
-      byte storagePolicy, int snapshot,
-      boolean isRawPath, INodesInPath iip) throws IOException {
+      ROTransaction tx, FSDirectory fsd, FlatINode node, byte[] path,
+      byte storagePolicy) throws IOException {
     assert fsd.hasReadLock();
     long size = 0; // length is zero for directories
     short replication = 0;
     long blocksize = 0;
     LocatedBlocks loc = null;
-    final boolean isEncrypted;
-    final FileEncryptionInfo feInfo = isRawPath ? null :
-        fsd.getFileEncryptionInfo(node, snapshot, iip);
+
+    // TODO
+    final FileEncryptionInfo feInfo = null;
+//    final FileEncryptionInfo feInfo = isRawPath ? null :
+//        fsd.getFileEncryptionInfo(node, snapshot, iip);
+
     if (node.isFile()) {
-      final INodeFile fileNode = node.asFile();
-      size = fileNode.computeFileSize(snapshot);
-      replication = fileNode.getFileReplication(snapshot);
-      blocksize = fileNode.getPreferredBlockSize();
+      FlatINodeFileFeature file = node.feature(FlatINodeFileFeature.class);
+      size = file.fileSize();
+      replication = file.replication();
+      blocksize = file.blockSize();
 
-      final boolean inSnapshot = snapshot != Snapshot.CURRENT_STATE_ID;
-      final boolean isUc = !inSnapshot && fileNode.isUnderConstruction();
-      final long fileSize = !inSnapshot && isUc ?
-          fileNode.computeFileSizeNotIncludingLastUcBlock() : size;
+      final boolean isUc = file.inConstruction();
+      final long fileSize = file.inConstruction()
+          ? size - file.lastBlock().getNumBytes()
+          : size;
 
       loc = fsd.getBlockManager().createLocatedBlocks(
-          fileNode.getBlocks(snapshot), 0L, size, fileSize, false);
-      loc = attachFileInfo(loc, fileSize, isUc, inSnapshot, feInfo);
-      isEncrypted = (feInfo != null) ||
-          (isRawPath && fsd.isInAnEZ(INodesInPath.fromINode(node)));
-    } else {
-      isEncrypted = fsd.isInAnEZ(INodesInPath.fromINode(node));
+          file.blocks(), 0L, size, fileSize, false);
+      // TODO: Snapsho
+      loc = attachFileInfo(loc, fileSize, isUc, false, feInfo);
     }
-    int childrenNum = node.isDirectory() ?
-        node.asDirectory().getChildrenNum(snapshot) : 0;
 
-    HdfsLocatedFileStatus status =
-        new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
-          blocksize, node.getModificationTime(snapshot),
-          node.getAccessTime(snapshot),
-          getPermissionForFileStatus(nodeAttrs, isEncrypted),
-          nodeAttrs.getUserName(), nodeAttrs.getGroupName(),
-          node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
-          node.getId(), loc, childrenNum, feInfo, storagePolicy);
+    int childrenNum = node.isDirectory()
+        ? tx.childrenView(node.id()).size()
+        : 0;
+
+    PermissionStatus perm = node.permissionStatus(fsd.ugid());
+    HdfsLocatedFileStatus status = new HdfsLocatedFileStatus(
+        size, node.isDirectory(), replication, blocksize, node.mtime(), node.atime(),
+        perm.getPermission(), perm.getUserName(), perm.getGroupName(),
+        null, path, node.id(), loc, childrenNum, feInfo, storagePolicy);
     // Set caching information for the located blocks.
     if (loc != null) {
       CacheManager cacheManager = fsd.getFSNamesystem().getCacheManager();

+ 12 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -1617,18 +1617,19 @@ public class FSDirectory implements Closeable {
   }
 
   void checkUnreadableBySuperuser(
-      FSPermissionChecker pc, INode inode, int snapshotId)
+      FSPermissionChecker pc, FlatINodesInPath iip)
       throws IOException {
-    if (pc.isSuperUser()) {
-      for (XAttr xattr : FSDirXAttrOp.getXAttrs(this, inode, snapshotId)) {
-        if (XAttrHelper.getPrefixName(xattr).
-            equals(SECURITY_XATTR_UNREADABLE_BY_SUPERUSER)) {
-          throw new AccessControlException(
-              "Access is denied for " + pc.getUser() + " since the superuser "
-              + "is not allowed to perform this operation.");
-        }
-      }
-    }
+    // TODO
+//    if (pc.isSuperUser()) {
+//      for (XAttr xattr : FSDirXAttrOp.getXAttrs(this, inode, snapshotId)) {
+//        if (XAttrHelper.getPrefixName(xattr).
+//            equals(SECURITY_XATTR_UNREADABLE_BY_SUPERUSER)) {
+//          throw new AccessControlException(
+//              "Access is denied for " + pc.getUser() + " since the superuser "
+//              + "is not allowed to perform this operation.");
+//        }
+//      }
+//    }
   }
 
   HdfsFileStatus getAuditFileInfo(INodesInPath iip)

+ 0 - 84
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -1779,90 +1779,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return blocks;
   }
 
-  /**
-   * Get block locations within the specified range.
-   * @see ClientProtocol#getBlockLocations(String, long, long)
-   * @throws IOException
-   */
-  GetBlockLocationsResult getBlockLocations(
-      FSPermissionChecker pc, String src, long offset, long length,
-      boolean needBlockToken, boolean checkSafeMode) throws IOException {
-    if (offset < 0) {
-      throw new HadoopIllegalArgumentException(
-          "Negative offset is not supported. File: " + src);
-    }
-    if (length < 0) {
-      throw new HadoopIllegalArgumentException(
-          "Negative length is not supported. File: " + src);
-    }
-    final GetBlockLocationsResult ret = getBlockLocationsInt(
-        pc, src, offset, length, needBlockToken);
-
-    if (checkSafeMode && isInSafeMode()) {
-      for (LocatedBlock b : ret.blocks.getLocatedBlocks()) {
-        // if safemode & no block locations yet then throw safemodeException
-        if ((b.getLocations() == null) || (b.getLocations().length == 0)) {
-          SafeModeException se = new SafeModeException(
-              "Zero blocklocations for " + src, safeMode);
-          if (haEnabled && haContext != null &&
-              haContext.getState().getServiceState() == HAServiceState.ACTIVE) {
-            throw new RetriableException(se);
-          } else {
-            throw se;
-          }
-        }
-      }
-    }
-    return ret;
-  }
-
-  private GetBlockLocationsResult getBlockLocationsInt(
-      FSPermissionChecker pc, final String srcArg, long offset, long length,
-      boolean needBlockToken)
-      throws IOException {
-    String src = srcArg;
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
-    src = dir.resolvePath(pc, srcArg, pathComponents);
-    final INodesInPath iip = dir.getINodesInPath(src, true);
-    final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
-    if (isPermissionEnabled) {
-      dir.checkPathAccess(pc, iip, FsAction.READ);
-      checkUnreadableBySuperuser(pc, inode, iip.getPathSnapshotId());
-    }
-
-    final long fileSize = iip.isSnapshot()
-        ? inode.computeFileSize(iip.getPathSnapshotId())
-        : inode.computeFileSizeNotIncludingLastUcBlock();
-    boolean isUc = inode.isUnderConstruction();
-    if (iip.isSnapshot()) {
-      // if src indicates a snapshot file, we need to make sure the returned
-      // blocks do not exceed the size of the snapshot file.
-      length = Math.min(length, fileSize - offset);
-      isUc = false;
-    }
-
-    final FileEncryptionInfo feInfo =
-        FSDirectory.isReservedRawName(srcArg) ? null
-            : dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
-
-    LocatedBlocks blocks = blockManager.createLocatedBlocks(
-        inode.getBlocks(iip.getPathSnapshotId()), offset, length, fileSize,
-        needBlockToken);
-    blocks = FSDirStatAndListingOp.attachFileInfo(blocks, fileSize, isUc,
-                                                  iip.isSnapshot(), feInfo);
-
-    // Set caching information for the located blocks.
-    for (LocatedBlock lb : blocks.getLocatedBlocks()) {
-      cacheManager.setCachedLocations(lb);
-    }
-
-    final long now = now();
-    boolean updateAccessTime = isAccessTimeSupported() && !isInSafeMode()
-        && !iip.isSnapshot()
-        && now > inode.getAccessTime() + getAccessTimePrecision();
-    return new GetBlockLocationsResult(updateAccessTime, blocks);
-  }
-
   /**
    * Moves all the blocks from {@code srcs} and appends them to {@code target}
    * To avoid rollbacks we will verify validity of ALL of the args

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java

@@ -184,7 +184,8 @@ public class TestPendingReplication {
           DatanodeStorageInfo.toDatanodeDescriptors(
               DFSTestUtil.createDatanodeStorageInfos(1)));
       BlockCollection bc = Mockito.mock(BlockCollection.class);
-      Mockito.doReturn((short) 3).when(bc).getPreferredBlockReplication();
+      Mockito.when(bc.getId()).thenReturn(1L);
+      //Mockito.doReturn((short) 3).when(bc).getPreferredBlockReplication();
       // Place into blocksmap with GenerationStamp = 1
       blockInfo.setGenerationStamp(1);
       blocksMap.addBlockCollection(blockInfo, bc);