浏览代码

HDFS-9542. Move BlockIdManager from FSNamesystem to BlockManager. Contributed by Jing Zhao.

(cherry picked from commit c304890c8c7782d835896859f5b7f60b96c306c0)
Jing Zhao 9 年之前
父节点
当前提交
c19bdc1926
共有 17 个文件被更改,包括 110 次插入109 次删除
  1. 2 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 6 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java
  3. 33 32
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
  4. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerSafeMode.java
  5. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
  6. 6 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
  7. 14 11
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
  8. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
  9. 29 26
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  10. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
  11. 0 11
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
  12. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
  13. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManagerSafeMode.java
  14. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
  15. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSequentialBlockId.java
  16. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java
  17. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -45,6 +45,8 @@ Release 2.9.0 - UNRELEASED
     HDFS-9576: HTrace: collect position/length information on read operations
     (zhz via cmccabe)
 
+    HDFS-9542. Move BlockIdManager from FSNamesystem to BlockManager. (jing9)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 6 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java

@@ -144,7 +144,7 @@ public class BlockIdManager {
   /**
    * Increments, logs and then returns the stamp
    */
-  public long nextGenerationStamp(boolean legacyBlock) throws IOException {
+  long nextGenerationStamp(boolean legacyBlock) throws IOException {
     return legacyBlock ? getNextGenerationStampV1() :
       getNextGenerationStampV2();
   }
@@ -180,18 +180,18 @@ public class BlockIdManager {
    *
    * @return true if the block ID was randomly generated, false otherwise.
    */
-  public boolean isLegacyBlock(Block block) {
+  boolean isLegacyBlock(Block block) {
     return block.getGenerationStamp() < getGenerationStampV1Limit();
   }
 
   /**
    * Increments, logs and then returns the block ID
    */
-  public long nextBlockId() {
+  long nextBlockId() {
     return blockIdGenerator.nextValue();
   }
 
-  public boolean isGenStampInFuture(Block block) {
+  boolean isGenStampInFuture(Block block) {
     if (isLegacyBlock(block)) {
       return block.getGenerationStamp() > getGenerationStampV1();
     } else {
@@ -199,11 +199,11 @@ public class BlockIdManager {
     }
   }
 
-  public void clear() {
+  void clear() {
     generationStampV1.setCurrentValue(GenerationStamp.LAST_RESERVED_STAMP);
     generationStampV2.setCurrentValue(GenerationStamp.LAST_RESERVED_STAMP);
     getBlockIdGenerator().setCurrentValue(SequentialBlockIdGenerator
       .LAST_RESERVED_BLOCK_ID);
     generationStampV1Limit = HdfsConstants.GRANDFATHER_GENERATION_STAMP;
   }
-}
+}

+ 33 - 32
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -75,7 +75,6 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
@@ -292,11 +291,15 @@ public class BlockManager implements BlockStatsMXBean {
   /** Check whether name system is running before terminating */
   private boolean checkNSRunning = true;
 
-  public BlockManager(final Namesystem namesystem, final Configuration conf)
+  private final BlockIdManager blockIdManager;
+
+  public BlockManager(final Namesystem namesystem, boolean haEnabled,
+      final Configuration conf)
     throws IOException {
     this.namesystem = namesystem;
     datanodeManager = new DatanodeManager(this, namesystem, conf);
     heartbeatManager = datanodeManager.getHeartbeatManager();
+    this.blockIdManager = new BlockIdManager(this);
 
     startupDelayBlockDeletionInMs = conf.getLong(
         DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY,
@@ -371,7 +374,7 @@ public class BlockManager implements BlockStatsMXBean {
         DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT);
     this.blockReportLeaseManager = new BlockReportLeaseManager(conf);
 
-    bmSafeMode = new BlockManagerSafeMode(this, namesystem, conf);
+    bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);
 
     LOG.info("defaultReplication         = " + defaultReplication);
     LOG.info("maxReplication             = " + maxReplication);
@@ -475,8 +478,7 @@ public class BlockManager implements BlockStatsMXBean {
 
   /** Should the access keys be updated? */
   boolean shouldUpdateBlockKey(final long updateTime) throws IOException {
-    return isBlockTokenEnabled()? blockTokenSecretManager.updateKeys(updateTime)
-        : false;
+    return isBlockTokenEnabled() && blockTokenSecretManager.updateKeys(updateTime);
   }
 
   public void activate(Configuration conf, long blockTotal) {
@@ -523,7 +525,7 @@ public class BlockManager implements BlockStatsMXBean {
 
   /** Dump meta data to out. */
   public void metaSave(PrintWriter out) {
-    assert namesystem.hasWriteLock();
+    assert namesystem.hasWriteLock(); // TODO: block manager read lock and NS write lock
     final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
     final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
     datanodeManager.fetchDatanodes(live, dead, false);
@@ -1033,27 +1035,8 @@ public class BlockManager implements BlockStatsMXBean {
     return countNodes(b).liveReplicas() >= replication;
   }
 
-  /**
-   * return a list of blocks & their locations on <code>datanode</code> whose
-   * total size is <code>size</code>
-   * 
-   * @param datanode on which blocks are located
-   * @param size total size of blocks
-   */
-  public BlocksWithLocations getBlocks(DatanodeID datanode, long size
-      ) throws IOException {
-    namesystem.checkOperation(OperationCategory.READ);
-    namesystem.readLock();
-    try {
-      namesystem.checkOperation(OperationCategory.READ);
-      return getBlocksWithLocations(datanode, size);  
-    } finally {
-      namesystem.readUnlock();
-    }
-  }
-
   /** Get all blocks with location information from a datanode. */
-  private BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
+  public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
       final long size) throws UnregisteredNodeException {
     final DatanodeDescriptor node = getDatanodeManager().getDatanode(datanode);
     if (node == null) {
@@ -2183,8 +2166,7 @@ public class BlockManager implements BlockStatsMXBean {
             + " on " + storageInfo.getDatanodeDescriptor() + " size " +
             iblk.getNumBytes() + " replicaState = " + reportedState);
       }
-      if (shouldPostponeBlocksFromFuture &&
-          namesystem.isGenStampInFuture(iblk)) {
+      if (shouldPostponeBlocksFromFuture && isGenStampInFuture(iblk)) {
         queueReportedBlock(storageInfo, iblk, reportedState,
             QUEUE_REASON_FUTURE_GENSTAMP);
         continue;
@@ -2326,8 +2308,7 @@ public class BlockManager implements BlockStatsMXBean {
           + " replicaState = " + reportedState);
     }
   
-    if (shouldPostponeBlocksFromFuture &&
-        namesystem.isGenStampInFuture(block)) {
+    if (shouldPostponeBlocksFromFuture && isGenStampInFuture(block)) {
       queueReportedBlock(storageInfo, block, reportedState,
           QUEUE_REASON_FUTURE_GENSTAMP);
       return null;
@@ -3054,8 +3035,7 @@ public class BlockManager implements BlockStatsMXBean {
 
   private void removeStoredBlock(DatanodeStorageInfo storageInfo, Block block,
       DatanodeDescriptor node) {
-    if (shouldPostponeBlocksFromFuture &&
-        namesystem.isGenStampInFuture(block)) {
+    if (shouldPostponeBlocksFromFuture && isGenStampInFuture(block)) {
       queueReportedBlock(storageInfo, block, null,
           QUEUE_REASON_FUTURE_GENSTAMP);
       return;
@@ -3814,6 +3794,7 @@ public class BlockManager implements BlockStatsMXBean {
   }
   
   public void clear() {
+    blockIdManager.clear();
     clearQueues();
     blocksMap.clear();
   }
@@ -3977,4 +3958,24 @@ public class BlockManager implements BlockStatsMXBean {
       }
     }
   }
+
+  public BlockIdManager getBlockIdManager() {
+    return blockIdManager;
+  }
+
+  public long nextGenerationStamp(boolean legacyBlock) throws IOException {
+    return blockIdManager.nextGenerationStamp(legacyBlock);
+  }
+
+  public boolean isLegacyBlock(Block block) {
+    return blockIdManager.isLegacyBlock(block);
+  }
+
+  public long nextBlockId() {
+    return blockIdManager.nextBlockId();
+  }
+
+  boolean isGenStampInFuture(Block block) {
+    return blockIdManager.isGenStampInFuture(block);
+  }
 }

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerSafeMode.java

@@ -115,10 +115,10 @@ class BlockManagerSafeMode {
   private final boolean inRollBack;
 
   BlockManagerSafeMode(BlockManager blockManager, Namesystem namesystem,
-      Configuration conf) {
+      boolean haEnabled, Configuration conf) {
     this.blockManager = blockManager;
     this.namesystem = namesystem;
-    this.haEnabled = namesystem.isHaEnabled();
+    this.haEnabled = haEnabled;
     this.threshold = conf.getFloat(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
         DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT);
     if (this.threshold > 1.0) {
@@ -468,7 +468,7 @@ class BlockManagerSafeMode {
 
     if (!blockManager.getShouldPostponeBlocksFromFuture() &&
         !inRollBack &&
-        namesystem.isGenStampInFuture(brr)) {
+        blockManager.isGenStampInFuture(brr)) {
       numberOfBytesInFutureBlocks.addAndGet(brr.getBytesOnDisk());
     }
   }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java

@@ -219,7 +219,7 @@ final class FSDirTruncateOp {
     if (newBlock == null) {
       newBlock = (shouldCopyOnTruncate) ? fsn.createNewBlock() : new Block(
           oldBlock.getBlockId(), oldBlock.getNumBytes(),
-          fsn.nextGenerationStamp(fsn.getBlockIdManager().isLegacyBlock(
+          fsn.nextGenerationStamp(fsn.getBlockManager().isLegacyBlock(
               oldBlock)));
     }
 

+ 6 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java

@@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
@@ -112,12 +113,14 @@ public class FSEditLogLoader {
   static final long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec
 
   private final FSNamesystem fsNamesys;
+  private final BlockManager blockManager;
   private long lastAppliedTxId;
   /** Total number of end transactions loaded. */
   private int totalEdits = 0;
   
   public FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId) {
     this.fsNamesys = fsNamesys;
+    this.blockManager = fsNamesys.getBlockManager();
     this.lastAppliedTxId = lastAppliedTxId;
   }
   
@@ -573,7 +576,7 @@ public class FSEditLogLoader {
     }
     case OP_SET_GENSTAMP_V1: {
       SetGenstampV1Op setGenstampV1Op = (SetGenstampV1Op)op;
-      fsNamesys.getBlockIdManager().setGenerationStampV1(
+      blockManager.getBlockIdManager().setGenerationStampV1(
           setGenstampV1Op.genStampV1);
       break;
     }
@@ -781,13 +784,13 @@ public class FSEditLogLoader {
     }
     case OP_SET_GENSTAMP_V2: {
       SetGenstampV2Op setGenstampV2Op = (SetGenstampV2Op) op;
-      fsNamesys.getBlockIdManager().setGenerationStampV2(
+      blockManager.getBlockIdManager().setGenerationStampV2(
           setGenstampV2Op.genStampV2);
       break;
     }
     case OP_ALLOCATE_BLOCK_ID: {
       AllocateBlockIdOp allocateBlockIdOp = (AllocateBlockIdOp) op;
-      fsNamesys.getBlockIdManager().setLastAllocatedBlockId(
+      blockManager.getBlockIdManager().setLastAllocatedBlockId(
           allocateBlockIdOp.blockId);
       break;
     }

+ 14 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java

@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutFlags;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@@ -343,26 +344,26 @@ public class FSImageFormat {
 
         // read in the last generation stamp for legacy blocks.
         long genstamp = in.readLong();
-        namesystem.getBlockIdManager().setGenerationStampV1(genstamp);
+        final BlockIdManager blockIdManager = namesystem.getBlockManager()
+            .getBlockIdManager();
+        blockIdManager.setGenerationStampV1(genstamp);
 
         if (NameNodeLayoutVersion.supports(
             LayoutVersion.Feature.SEQUENTIAL_BLOCK_ID, imgVersion)) {
           // read the starting generation stamp for sequential block IDs
           genstamp = in.readLong();
-          namesystem.getBlockIdManager().setGenerationStampV2(genstamp);
+          blockIdManager.setGenerationStampV2(genstamp);
 
           // read the last generation stamp for blocks created after
           // the switch to sequential block IDs.
           long stampAtIdSwitch = in.readLong();
-          namesystem.getBlockIdManager().setGenerationStampV1Limit(stampAtIdSwitch);
+          blockIdManager.setGenerationStampV1Limit(stampAtIdSwitch);
 
           // read the max sequential block ID.
           long maxSequentialBlockId = in.readLong();
-          namesystem.getBlockIdManager().setLastAllocatedBlockId(maxSequentialBlockId);
+          blockIdManager.setLastAllocatedBlockId(maxSequentialBlockId);
         } else {
-
-          long startingGenStamp = namesystem.getBlockIdManager()
-            .upgradeGenerationStampToV2();
+          long startingGenStamp = blockIdManager.upgradeGenerationStampToV2();
           // This is an upgrade.
           LOG.info("Upgrading to sequential block IDs. Generation stamp " +
                    "for new blocks set to " + startingGenStamp);
@@ -1266,10 +1267,12 @@ public class FSImageFormat {
         out.writeInt(sourceNamesystem.unprotectedGetNamespaceInfo()
             .getNamespaceID());
         out.writeLong(numINodes);
-        out.writeLong(sourceNamesystem.getBlockIdManager().getGenerationStampV1());
-        out.writeLong(sourceNamesystem.getBlockIdManager().getGenerationStampV2());
-        out.writeLong(sourceNamesystem.getBlockIdManager().getGenerationStampAtblockIdSwitch());
-        out.writeLong(sourceNamesystem.getBlockIdManager().getLastAllocatedBlockId());
+        final BlockIdManager blockIdManager = sourceNamesystem.getBlockManager()
+            .getBlockIdManager();
+        out.writeLong(blockIdManager.getGenerationStampV1());
+        out.writeLong(blockIdManager.getGenerationStampV2());
+        out.writeLong(blockIdManager.getGenerationStampAtblockIdSwitch());
+        out.writeLong(blockIdManager.getLastAllocatedBlockId());
         out.writeLong(context.getTxId());
         out.writeLong(sourceNamesystem.dir.getLastInodeId());
 

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java

@@ -293,7 +293,7 @@ public final class FSImageFormatProtobuf {
 
     private void loadNameSystemSection(InputStream in) throws IOException {
       NameSystemSection s = NameSystemSection.parseDelimitedFrom(in);
-      BlockIdManager blockIdManager = fsn.getBlockIdManager();
+      BlockIdManager blockIdManager = fsn.getBlockManager().getBlockIdManager();
       blockIdManager.setGenerationStampV1(s.getGenstampV1());
       blockIdManager.setGenerationStampV2(s.getGenstampV2());
       blockIdManager.setGenerationStampV1Limit(s.getGenstampV1Limit());
@@ -544,7 +544,7 @@ public final class FSImageFormatProtobuf {
         throws IOException {
       final FSNamesystem fsn = context.getSourceNamesystem();
       OutputStream out = sectionOutputStream;
-      BlockIdManager blockIdManager = fsn.getBlockIdManager();
+      BlockIdManager blockIdManager = fsn.getBlockManager().getBlockIdManager();
       NameSystemSection.Builder b = NameSystemSection.newBuilder()
           .setGenstampV1(blockIdManager.getGenerationStampV1())
           .setGenstampV1Limit(blockIdManager.getGenerationStampV1Limit())

+ 29 - 26
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -198,7 +198,6 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager.SecretManagerState;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
@@ -238,6 +237,7 @@ import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
 import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
 import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
 import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
@@ -303,8 +303,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   NameNodeMXBean {
   public static final Log LOG = LogFactory.getLog(FSNamesystem.class);
 
-  private final BlockIdManager blockIdManager;
-
   boolean isAuditEnabled() {
     return (!isDefaultAuditLogger || auditLog.isInfoEnabled())
         && !auditLoggers.isEmpty();
@@ -543,7 +541,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   void clear() {
     dir.reset();
     dtSecretManager.reset();
-    blockIdManager.clear();
     leaseManager.removeAllLeases();
     snapshotManager.clearSnapshottableDirs();
     cacheManager.clear();
@@ -555,8 +552,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   LeaseManager getLeaseManager() {
     return leaseManager;
   }
-  
-  @Override
+
   public boolean isHaEnabled() {
     return haEnabled;
   }
@@ -713,9 +709,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       }
 
       // block manager needs the haEnabled initialized
-      this.blockManager = new BlockManager(this, conf);
+      this.blockManager = new BlockManager(this, haEnabled, conf);
       this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
-      this.blockIdManager = new BlockIdManager(blockManager);
 
       // Get the checksum type from config
       String checksumTypeStr = conf.get(DFS_CHECKSUM_TYPE_KEY,
@@ -1240,8 +1235,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       getFSImage().editLog.close();
     }
   }
-  
-  @Override
+
   public void checkOperation(OperationCategory op) throws StandbyException {
     if (haContext != null) {
       // null in some unit tests
@@ -1529,8 +1523,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   public boolean isRunning() {
     return fsRunning;
   }
-  
-  @Override
+
   public boolean isInStandbyState() {
     if (haContext == null || haContext.getState() == null) {
       // We're still starting up. In this case, if HA is
@@ -1542,6 +1535,25 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return HAServiceState.STANDBY == haContext.getState().getServiceState();
   }
 
+  /**
+   * return a list of blocks & their locations on <code>datanode</code> whose
+   * total size is <code>size</code>
+   *
+   * @param datanode on which blocks are located
+   * @param size total size of blocks
+   */
+  public BlocksWithLocations getBlocks(DatanodeID datanode, long size)
+      throws IOException {
+    checkOperation(OperationCategory.READ);
+    readLock();
+    try {
+      checkOperation(OperationCategory.READ);
+      return getBlockManager().getBlocksWithLocations(datanode, size);
+    } finally {
+      readUnlock();
+    }
+  }
+
   /**
    * Dump all metadata into specified file
    */
@@ -3010,7 +3022,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       }
       // start recovery of the last block for this file
       long blockRecoveryId = nextGenerationStamp(
-          blockIdManager.isLegacyBlock(lastBlock));
+          blockManager.isLegacyBlock(lastBlock));
       lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
       if(copyOnTruncate) {
         lastBlock.setGenerationStamp(blockRecoveryId);
@@ -4443,11 +4455,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * Increments, logs and then returns the stamp
    */
   long nextGenerationStamp(boolean legacyBlock)
-      throws IOException, SafeModeException {
+      throws IOException {
     assert hasWriteLock();
     checkNameNodeSafeMode("Cannot get next generation stamp");
 
-    long gs = blockIdManager.nextGenerationStamp(legacyBlock);
+    long gs = blockManager.nextGenerationStamp(legacyBlock);
     if (legacyBlock) {
       getEditLog().logGenerationStampV1(gs);
     } else {
@@ -4464,7 +4476,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   private long nextBlockId() throws IOException {
     assert hasWriteLock();
     checkNameNodeSafeMode("Cannot get next block ID");
-    final long blockId = blockIdManager.nextBlockId();
+    final long blockId = blockManager.nextBlockId();
     getEditLog().logAllocateBlockId(blockId);
     // NB: callers sync the log
     return blockId;
@@ -4591,7 +4603,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkUCBlock(block, clientName);
   
       // get a new generation stamp and an access token
-      block.setGenerationStamp(nextGenerationStamp(blockIdManager.isLegacyBlock(block.getLocalBlock())));
+      block.setGenerationStamp(nextGenerationStamp(blockManager.isLegacyBlock(block.getLocalBlock())));
       locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
       blockManager.setBlockToken(locatedBlock, BlockTokenIdentifier.AccessMode.WRITE);
     } finally {
@@ -5437,10 +5449,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return blockManager;
   }
 
-  public BlockIdManager getBlockIdManager() {
-    return blockIdManager;
-  }
-
   /** @return the FSDirectory. */
   public FSDirectory getFSDirectory() {
     return dir;
@@ -5569,11 +5577,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       throw it;
     }
   }
-  
-  @Override
-  public boolean isGenStampInFuture(Block block) {
-    return blockIdManager.isGenStampInFuture(block);
-  }
 
   @VisibleForTesting
   public EditLogTailer getEditLogTailer() {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -555,7 +555,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
     }
     checkNNStartup();
     namesystem.checkSuperuserPrivilege();
-    return namesystem.getBlockManager().getBlocks(datanode, size); 
+    return namesystem.getBlocks(datanode, size);
   }
 
   @Override // NamenodeProtocol

+ 0 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java

@@ -38,14 +38,8 @@ public interface Namesystem extends RwLock, SafeMode {
   /** @return the block pool ID */
   String getBlockPoolId();
 
-  boolean isInStandbyState();
-
-  boolean isGenStampInFuture(Block block);
-
   BlockCollection getBlockCollection(long id);
 
-  void checkOperation(OperationCategory read) throws StandbyException;
-
   void startSecretManagerIfNecessary();
 
   boolean isInSnapshot(long blockCollectionID);
@@ -53,11 +47,6 @@ public interface Namesystem extends RwLock, SafeMode {
   CacheManager getCacheManager();
   HAContext getHAContext();
 
-  /**
-   * @return true if the HA is enabled else false
-   */
-  boolean isHaEnabled();
-
   /**
    * @return Whether the namenode is transitioning to active state and is in the
    *         middle of the starting active services.

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java

@@ -130,7 +130,7 @@ public class TestBlockManager {
     Mockito.doReturn(true).when(fsn).hasWriteLock();
     Mockito.doReturn(true).when(fsn).hasReadLock();
     Mockito.doReturn(true).when(fsn).isRunning();
-    bm = new BlockManager(fsn, conf);
+    bm = new BlockManager(fsn, false, conf);
     final String[] racks = {
         "/rackA",
         "/rackA",

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManagerSafeMode.java

@@ -93,16 +93,16 @@ public class TestBlockManagerSafeMode {
     doReturn(true).when(fsn).hasWriteLock();
     doReturn(true).when(fsn).hasReadLock();
     doReturn(true).when(fsn).isRunning();
-    doReturn(true).when(fsn).isGenStampInFuture(any(Block.class));
     NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
 
-    bm = spy(new BlockManager(fsn, conf));
+    bm = spy(new BlockManager(fsn, false, conf));
+    doReturn(true).when(bm).isGenStampInFuture(any(Block.class));
     dn = spy(bm.getDatanodeManager());
     Whitebox.setInternalState(bm, "datanodeManager", dn);
     // the datanode threshold is always met
     when(dn.getNumLiveDataNodes()).thenReturn(DATANODE_NUM);
 
-    bmSafeMode = new BlockManagerSafeMode(bm, fsn, conf);
+    bmSafeMode = new BlockManagerSafeMode(bm, fsn, false, conf);
   }
 
   /**

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java

@@ -1315,7 +1315,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
     FSNamesystem mockNS = mock(FSNamesystem.class);
     when(mockNS.hasWriteLock()).thenReturn(true);
     when(mockNS.hasReadLock()).thenReturn(true);
-    BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
+    BlockManager bm = new BlockManager(mockNS, false, new HdfsConfiguration());
     UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
 
     BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
@@ -1365,7 +1365,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
     Namesystem mockNS = mock(Namesystem.class);
     when(mockNS.hasWriteLock()).thenReturn(true);
 
-    BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
+    BlockManager bm = new BlockManager(mockNS, false, new HdfsConfiguration());
     UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
 
     BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
@@ -1427,7 +1427,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
     Namesystem mockNS = mock(Namesystem.class);
     when(mockNS.hasReadLock()).thenReturn(true);
 
-    BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
+    BlockManager bm = new BlockManager(mockNS, false, new HdfsConfiguration());
     UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
 
     BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSequentialBlockId.java

@@ -116,8 +116,8 @@ public class TestSequentialBlockId {
 
       // Rewind the block ID counter in the name system object. This will result
       // in block ID collisions when we try to allocate new blocks.
-      SequentialBlockIdGenerator blockIdGenerator = fsn.getBlockIdManager()
-        .getBlockIdGenerator();
+      SequentialBlockIdGenerator blockIdGenerator = fsn.getBlockManager()
+          .getBlockIdManager().getBlockIdGenerator();
       blockIdGenerator.setCurrentValue(blockIdGenerator.getCurrentValue() - 5);
 
       // Trigger collisions by creating a new file.

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java

@@ -1019,7 +1019,7 @@ public class TestFileTruncate {
       assertThat(truncateBlock.getNumBytes(),
           is(oldBlock.getNumBytes()));
       assertThat(truncateBlock.getGenerationStamp(),
-          is(fsn.getBlockIdManager().getGenerationStampV2()));
+          is(fsn.getBlockManager().getBlockIdManager().getGenerationStampV2()));
       assertThat(file.getLastBlock().getBlockUCState(),
           is(HdfsServerConstants.BlockUCState.UNDER_RECOVERY));
       long blockRecoveryId = file.getLastBlock().getUnderConstructionFeature()
@@ -1052,7 +1052,7 @@ public class TestFileTruncate {
       assertThat(truncateBlock.getNumBytes() < oldBlock.getNumBytes(),
           is(true));
       assertThat(truncateBlock.getGenerationStamp(),
-          is(fsn.getBlockIdManager().getGenerationStampV2()));
+          is(fsn.getBlockManager().getBlockIdManager().getGenerationStampV2()));
       assertThat(file.getLastBlock().getBlockUCState(),
           is(HdfsServerConstants.BlockUCState.UNDER_RECOVERY));
       long blockRecoveryId = file.getLastBlock().getUnderConstructionFeature()

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java

@@ -544,8 +544,8 @@ public class TestSaveNamespace {
     FSNamesystem spyFsn = spy(fsn);
     final FSNamesystem finalFsn = spyFsn;
     DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG);
-    BlockIdManager bid = spy(spyFsn.getBlockIdManager());
-    Whitebox.setInternalState(finalFsn, "blockIdManager", bid);
+    BlockIdManager bid = spy(spyFsn.getBlockManager().getBlockIdManager());
+    Whitebox.setInternalState(finalFsn.getBlockManager(), "blockIdManager", bid);
     doAnswer(delayer).when(bid).getGenerationStampV2();
 
     ExecutorService pool = Executors.newFixedThreadPool(2);