소스 검색

HDFS-8447. Decouple information of files in GetLocatedBlocks.

Haohui Mai 10 년 전
부모
커밋
c6228398d9

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.protocol;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Collections;
 import java.util.Comparator;
@@ -58,6 +59,12 @@ public class LocatedBlocks {
     this.fileEncryptionInfo = feInfo;
   }
 
+  public LocatedBlocks(
+      List<LocatedBlock> blks, LocatedBlock lastBlock,
+      boolean isLastBlockCompleted) {
+    this(0, false, blks, lastBlock, isLastBlockCompleted, null);
+  }
+
   /**
    * Get located blocks.
    */

+ 71 - 136
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -52,11 +52,9 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier.AccessMode;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
@@ -549,12 +547,12 @@ public class BlockManager {
       out.print(fileName + ": ");
     }
     // l: == live:, d: == decommissioned c: == corrupt e: == excess
-    out.print(block + ((usableReplicas > 0)? "" : " MISSING") + 
+    out.print(block + ((usableReplicas > 0) ? "" : " MISSING") +
               " (replicas:" +
               " l: " + numReplicas.liveReplicas() +
               " d: " + numReplicas.decommissionedAndDecommissioning() +
               " c: " + numReplicas.corruptReplicas() +
-              " e: " + numReplicas.excessReplicas() + ") "); 
+              " e: " + numReplicas.excessReplicas() + ") ");
 
     Collection<DatanodeDescriptor> corruptNodes = 
                                   corruptReplicas.getNodes(block);
@@ -750,7 +748,7 @@ public class BlockManager {
 
     final long fileLength = bc.computeContentSummary(getStoragePolicySuite()).getLength();
     final long pos = fileLength - ucBlock.getNumBytes();
-    return createLocatedBlock(ucBlock, pos, BlockTokenIdentifier.AccessMode.WRITE);
+    return createLocatedBlock(ucBlock, pos);
   }
 
   /**
@@ -767,151 +765,88 @@ public class BlockManager {
     }
     return locations;
   }
-  
-  private List<LocatedBlock> createLocatedBlockList(
-      final BlockInfoContiguous[] blocks,
-      final long offset, final long length, final int nrBlocksToReturn,
-      final AccessMode mode) throws IOException {
-    int curBlk = 0;
-    long curPos = 0, blkSize = 0;
-    int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
-    for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
-      blkSize = blocks[curBlk].getNumBytes();
-      assert blkSize > 0 : "Block of size 0";
-      if (curPos + blkSize > offset) {
-        break;
-      }
-      curPos += blkSize;
-    }
 
-    if (nrBlocks > 0 && curBlk == nrBlocks)   // offset >= end of file
-      return Collections.<LocatedBlock>emptyList();
-
-    long endOff = offset + length;
-    List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);
-    do {
-      results.add(createLocatedBlock(blocks[curBlk], curPos, mode));
-      curPos += blocks[curBlk].getNumBytes();
-      curBlk++;
-    } while (curPos < endOff 
-          && curBlk < blocks.length
-          && results.size() < nrBlocksToReturn);
-    return results;
-  }
-
-  private LocatedBlock createLocatedBlock(final BlockInfoContiguous[] blocks,
-      final long endPos, final AccessMode mode) throws IOException {
-    int curBlk = 0;
-    long curPos = 0;
-    int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
-    for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
-      long blkSize = blocks[curBlk].getNumBytes();
-      if (curPos + blkSize >= endPos) {
-        break;
-      }
-      curPos += blkSize;
-    }
-    
-    return createLocatedBlock(blocks[curBlk], curPos, mode);
-  }
-  
-  private LocatedBlock createLocatedBlock(final BlockInfoContiguous blk, final long pos,
-    final AccessMode mode) throws IOException {
-    final LocatedBlock lb = createLocatedBlock(blk, pos);
-    if (mode != null) {
-      setBlockToken(lb, mode);
-    }
-    return lb;
-  }
+  private LocatedBlock createLocatedBlock(
+      final BlockInfoContiguous blk, final long pos) throws IOException {
+    final String bpId = namesystem.getBlockPoolId();
+    final ExtendedBlock eb = new ExtendedBlock(bpId, blk);
 
-  /** @return a LocatedBlock for the given block */
-  private LocatedBlock createLocatedBlock(final BlockInfoContiguous blk, final long pos
-      ) throws IOException {
-    if (blk instanceof BlockInfoContiguousUnderConstruction) {
-      if (blk.isComplete()) {
-        throw new IOException(
-            "blk instanceof BlockInfoUnderConstruction && blk.isComplete()"
-            + ", blk=" + blk);
-      }
+    if (!blk.isComplete()) {
       final BlockInfoContiguousUnderConstruction uc =
           (BlockInfoContiguousUnderConstruction) blk;
       final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
-      final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
       return newLocatedBlock(eb, storages, pos, false);
     }
 
-    // get block locations
-    final int numCorruptNodes = countNodes(blk).corruptReplicas();
-    final int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blk);
-    if (numCorruptNodes != numCorruptReplicas) {
-      LOG.warn("Inconsistent number of corrupt replicas for "
-          + blk + " blockMap has " + numCorruptNodes
-          + " but corrupt replicas map has " + numCorruptReplicas);
-    }
-
-    final int numNodes = blocksMap.numNodes(blk);
-    final boolean isCorrupt = numCorruptNodes != 0 &&
-        numCorruptNodes == numNodes;
-    final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
-    final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
-    int j = 0;
-    if (numMachines > 0) {
-      for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
-        final DatanodeDescriptor d = storage.getDatanodeDescriptor();
-        final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
-        if (isCorrupt || (!replicaCorrupt))
-          machines[j++] = storage;
-      }
-    }
-    assert j == machines.length :
-      "isCorrupt: " + isCorrupt + 
-      " numMachines: " + numMachines +
-      " numNodes: " + numNodes +
-      " numCorrupt: " + numCorruptNodes +
-      " numCorruptRepls: " + numCorruptReplicas;
-    final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
+    ArrayList<DatanodeStorageInfo> storages = new ArrayList<>();
+    for (DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
+      final DatanodeDescriptor d = storage.getDatanodeDescriptor();
+      final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
+      if (!replicaCorrupt) {
+        storages.add(storage);
+      }
+    }
+    boolean isCorrupt = storages.isEmpty();
+    DatanodeStorageInfo[] machines = storages.toArray(
+        new DatanodeStorageInfo[storages.size()]);
     return newLocatedBlock(eb, machines, pos, isCorrupt);
   }
 
-  /** Create a LocatedBlocks. */
-  public LocatedBlocks createLocatedBlocks(final BlockInfoContiguous[] blocks,
-      final long fileSizeExcludeBlocksUnderConstruction,
-      final boolean isFileUnderConstruction, final long offset,
-      final long length, final boolean needBlockToken,
-      final boolean inSnapshot, FileEncryptionInfo feInfo)
-      throws IOException {
+  /**
+   * Return blocks and their physical locations.
+   *
+   * @param blocks The list of blocks
+   * @param start the starting offset
+   * @param length the length
+   * @param visibleFileLength
+   * the visible length of the file. Note that the length differs from the
+   * real length if the file is in a snapshot.
+   * @param needBlockToken
+   * whether to include the block tokens in the LocatedBlock
+   *
+   * @return
+   * the list of blocks that cover the range of
+   * [start, min(start + length, visibleFileLength)) plus the last block that
+   * does not go beyond visibleFileLength.
+   *
+   * @throws IOException
+   */
+  public LocatedBlocks createLocatedBlocks(BlockInfoContiguous[] blocks,
+      final long start, final long length, long visibleFileLength,
+      final boolean needBlockToken) throws IOException {
     assert namesystem.hasReadLock();
-    if (blocks == null) {
-      return null;
-    } else if (blocks.length == 0) {
-      return new LocatedBlocks(0, isFileUnderConstruction,
-          Collections.<LocatedBlock>emptyList(), null, false, feInfo);
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
-      }
-      final AccessMode mode = needBlockToken? BlockTokenIdentifier.AccessMode.READ: null;
-      final List<LocatedBlock> locatedblocks = createLocatedBlockList(
-          blocks, offset, length, Integer.MAX_VALUE, mode);
-
-      final LocatedBlock lastlb;
-      final boolean isComplete;
-      if (!inSnapshot) {
-        final BlockInfoContiguous last = blocks[blocks.length - 1];
-        final long lastPos = last.isComplete()?
-            fileSizeExcludeBlocksUnderConstruction - last.getNumBytes()
-            : fileSizeExcludeBlocksUnderConstruction;
-        lastlb = createLocatedBlock(last, lastPos, mode);
-        isComplete = last.isComplete();
-      } else {
-        lastlb = createLocatedBlock(blocks,
-            fileSizeExcludeBlocksUnderConstruction, mode);
-        isComplete = true;
+    ArrayList<LocatedBlock> lbs = new ArrayList<>();
+    long pos = 0;
+    long end = Math.min(start + length, visibleFileLength);
+    BlockInfoContiguous last = null;
+    for (BlockInfoContiguous b : blocks) {
+      last = b;
+      long old = pos;
+      pos += b.getNumBytes();
+      if (pos > visibleFileLength) {
+        break;
+      } else if (pos <= start) {
+        continue;
+      } else if (old < end) {
+        lbs.add(createLocatedBlock(b, old));
+      }
+    }
+
+    LocatedBlock lastlb = last == null ? null
+        : createLocatedBlock(last, pos - last.getNumBytes());
+    boolean isComplete = last != null && last.isComplete();
+
+    if (needBlockToken) {
+      for (LocatedBlock lb : lbs) {
+        setBlockToken(lb, AccessMode.READ);
+      }
+      if (lastlb != null) {
+        setBlockToken(lastlb, AccessMode.READ);
       }
-      return new LocatedBlocks(
-          fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction,
-          locatedblocks, lastlb, isComplete, feInfo);
     }
+
+    LocatedBlocks res =  new LocatedBlocks(lbs, lastlb, isComplete);
+    return res;
   }
 
   /** @return current access keys. */

+ 3 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java

@@ -508,12 +508,9 @@ class FSDirStatAndListingOp {
       final long fileSize = !inSnapshot && isUc ?
           fileNode.computeFileSizeNotIncludingLastUcBlock() : size;
 
-      loc = fsd.getFSNamesystem().getBlockManager().createLocatedBlocks(
-          fileNode.getBlocks(snapshot), fileSize, isUc, 0L, size, false,
-          inSnapshot, feInfo);
-      if (loc == null) {
-        loc = new LocatedBlocks();
-      }
+      loc = fsd.getBlockManager().createLocatedBlocks(
+          fileNode.getBlocks(snapshot), 0L, size, fileSize, false);
+      loc = attachFileInfo(loc, fileSize, isUc, inSnapshot, feInfo);
       isEncrypted = (feInfo != null) ||
           (isRawPath && fsd.isInAnEZ(INodesInPath.fromINode(node)));
     } else {

+ 84 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -1779,6 +1779,90 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return blocks;
   }
 
+  /**
+   * Get block locations within the specified range.
+   * @see ClientProtocol#getBlockLocations(String, long, long)
+   * @throws IOException
+   */
+  GetBlockLocationsResult getBlockLocations(
+      FSPermissionChecker pc, String src, long offset, long length,
+      boolean needBlockToken, boolean checkSafeMode) throws IOException {
+    if (offset < 0) {
+      throw new HadoopIllegalArgumentException(
+          "Negative offset is not supported. File: " + src);
+    }
+    if (length < 0) {
+      throw new HadoopIllegalArgumentException(
+          "Negative length is not supported. File: " + src);
+    }
+    final GetBlockLocationsResult ret = getBlockLocationsInt(
+        pc, src, offset, length, needBlockToken);
+
+    if (checkSafeMode && isInSafeMode()) {
+      for (LocatedBlock b : ret.blocks.getLocatedBlocks()) {
+        // if safemode & no block locations yet then throw safemodeException
+        if ((b.getLocations() == null) || (b.getLocations().length == 0)) {
+          SafeModeException se = new SafeModeException(
+              "Zero blocklocations for " + src, safeMode);
+          if (haEnabled && haContext != null &&
+              haContext.getState().getServiceState() == HAServiceState.ACTIVE) {
+            throw new RetriableException(se);
+          } else {
+            throw se;
+          }
+        }
+      }
+    }
+    return ret;
+  }
+
+  private GetBlockLocationsResult getBlockLocationsInt(
+      FSPermissionChecker pc, final String srcArg, long offset, long length,
+      boolean needBlockToken)
+      throws IOException {
+    String src = srcArg;
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    src = dir.resolvePath(pc, srcArg, pathComponents);
+    final INodesInPath iip = dir.getINodesInPath(src, true);
+    final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
+    if (isPermissionEnabled) {
+      dir.checkPathAccess(pc, iip, FsAction.READ);
+      checkUnreadableBySuperuser(pc, inode, iip.getPathSnapshotId());
+    }
+
+    final long fileSize = iip.isSnapshot()
+        ? inode.computeFileSize(iip.getPathSnapshotId())
+        : inode.computeFileSizeNotIncludingLastUcBlock();
+    boolean isUc = inode.isUnderConstruction();
+    if (iip.isSnapshot()) {
+      // if src indicates a snapshot file, we need to make sure the returned
+      // blocks do not exceed the size of the snapshot file.
+      length = Math.min(length, fileSize - offset);
+      isUc = false;
+    }
+
+    final FileEncryptionInfo feInfo =
+        FSDirectory.isReservedRawName(srcArg) ? null
+            : dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
+
+    LocatedBlocks blocks = blockManager.createLocatedBlocks(
+        inode.getBlocks(iip.getPathSnapshotId()), offset, length, fileSize,
+        needBlockToken);
+    blocks = FSDirStatAndListingOp.attachFileInfo(blocks, fileSize, isUc,
+                                                  iip.isSnapshot(), feInfo);
+
+    // Set caching information for the located blocks.
+    for (LocatedBlock lb : blocks.getLocatedBlocks()) {
+      cacheManager.setCachedLocations(lb);
+    }
+
+    final long now = now();
+    boolean updateAccessTime = isAccessTimeSupported() && !isInSafeMode()
+        && !iip.isSnapshot()
+        && now > inode.getAccessTime() + getAccessTimePrecision();
+    return new GetBlockLocationsResult(updateAccessTime, blocks);
+  }
+
   /**
    * Moves all the blocks from {@code srcs} and appends them to {@code target}
    * To avoid rollbacks we will verify validity of ALL of the args