浏览代码

HDFS-1639. Add block pool management to FSDataset. Contributed by Suresh Srinivas.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1074727 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 14 年之前
父节点
当前提交
901e29b307
共有 34 个文件被更改,包括 746 次插入453 次删除
  1. 2 0
      CHANGES.txt
  2. 0 10
      src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java
  3. 2 3
      src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
  4. 1 1
      src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
  5. 1 1
      src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
  6. 2 1
      src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
  7. 6 3
      src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
  8. 2 4
      src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  9. 5 3
      src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
  10. 369 156
      src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
  11. 19 15
      src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
  12. 16 7
      src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
  13. 70 32
      src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java
  14. 2 7
      src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
  15. 13 3
      src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  16. 1 1
      src/test/aop/org/apache/hadoop/hdfs/TestFiHftp.java
  17. 6 4
      src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
  18. 2 5
      src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java
  19. 2 2
      src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
  20. 3 3
      src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java
  21. 1 1
      src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
  22. 1 4
      src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
  23. 2 8
      src/test/hdfs/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java
  24. 2 1
      src/test/hdfs/org/apache/hadoop/hdfs/TestPipelines.java
  25. 3 1
      src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java
  26. 101 64
      src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
  27. 4 4
      src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
  28. 3 3
      src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
  29. 6 5
      src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java
  30. 10 8
      src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
  31. 16 12
      src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
  32. 16 15
      src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java
  33. 35 48
      src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
  34. 22 18
      src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java

+ 2 - 0
CHANGES.txt

@@ -39,6 +39,8 @@ Trunk (unreleased changes)
 
     HDFS-1647. Federation: Multiple namenode configuration. (jitendra)
 
+    HDFS-1639. Add block pool management to FSDataset. (suresh)
+
   IMPROVEMENTS
 
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)

+ 0 - 10
src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java

@@ -49,16 +49,6 @@ public class ExtendedBlock implements Writable {
     this(null, 0, 0, 0);
   }
 
-  // TODO:FEDERATION To remove when block pool ID related coding is complete
-  public ExtendedBlock(final Block b) {
-    this("TODO", b);
-  }
-  
-  // TODO:FEDERATION To remove when block pool ID related coding is complete
-  public ExtendedBlock(final long blkId) {
-    this("TODO", new Block(blkId));
-  }
-  
   public ExtendedBlock(final ExtendedBlock b) {
     this(b.poolId, b.block);
   }

+ 2 - 3
src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java

@@ -55,9 +55,8 @@ public class LocatedBlock implements Writable {
     this(new ExtendedBlock(), new DatanodeInfo[0], 0L, false);
   }
 
-  // TODO:FEDERATION To remove when block pool ID related coding is complete
-  public LocatedBlock(Block b, DatanodeInfo[] locs) {
-    this(new ExtendedBlock(b), locs, -1, false); // startOffset is unknown
+  public LocatedBlock(String bpid, Block b, DatanodeInfo[] locs) {
+    this(new ExtendedBlock(bpid, b), locs, -1, false); // startOffset is unknown
   }
 
   public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) {

+ 1 - 1
src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java

@@ -195,7 +195,7 @@ public class BlockTokenSecretManager extends
    */
   public void checkAccess(Token<BlockTokenIdentifier> blockToken, String userId,
       Block block, AccessMode mode) throws InvalidToken {
-    checkAccess(blockToken, userId, new ExtendedBlock(block), mode);
+    checkAccess(blockToken, userId, new ExtendedBlock("TODO", block), mode);
   }
   
   /**

+ 1 - 1
src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java

@@ -379,7 +379,7 @@ public class Balancer implements Tool {
       }
       // TODO:FEDERATION use ExtendedBlock in BalancerBlock
       DataTransferProtocol.Sender.opReplaceBlock(out,
-          new ExtendedBlock(block.getBlock()), source.getStorageID(), 
+          new ExtendedBlock("TODO", block.getBlock()), source.getStorageID(), 
           proxySource.getDatanode(), accessToken);
     }
     

+ 2 - 1
src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

@@ -94,7 +94,8 @@ class BlockSender implements java.io.Closeable, FSConstants {
     try {
       this.block = block;
       synchronized(datanode.data) { 
-        this.replica = datanode.data.getReplica(block.getBlockId());
+        this.replica = datanode.data.getReplica(block.getPoolId(), 
+            block.getBlockId());
         if (replica == null) {
           throw new ReplicaNotFoundException(block);
         }

+ 6 - 3
src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java

@@ -213,8 +213,9 @@ class DataBlockScanner implements Runnable {
 
   private void init() {
     
+    // TODO:FEDERATION block scanner must work one BP at a time
     // get the list of blocks and arrange them in random order
-    List<Block> arr = dataset.getFinalizedBlocks();
+    List<Block> arr = dataset.getFinalizedBlocks("TODO");
     Collections.shuffle(arr);
     
     blockInfoSet = new TreeSet<BlockScanInfo>();
@@ -231,6 +232,8 @@ class DataBlockScanner implements Runnable {
     /* Pick the first directory that has any existing scanner log.
      * otherwise, pick the first directory.
      */
+    // TODO:FEDERATION currently picking only one block pool directory
+    // This needs to change to include all the block pool directories
     File dir = null;
     FSDataset.FSVolume[] volumes = dataset.volumes.volumes;
     for(FSDataset.FSVolume vol : volumes) {
@@ -462,7 +465,7 @@ class DataBlockScanner implements Runnable {
         updateScanStatus(block.getLocalBlock(), ScanType.VERIFICATION_SCAN, false);
 
         // If the block does not exists anymore, then its not an error
-        if ( dataset.getFile(block.getLocalBlock()) == null ) {
+        if ( dataset.getFile(block.getPoolId(), block.getLocalBlock()) == null ) {
           LOG.info("Verification failed for " + block + ". Its ok since " +
           "it not in datanode dataset anymore.");
           deleteBlock(block.getPoolId(), block.getLocalBlock());
@@ -506,7 +509,7 @@ class DataBlockScanner implements Runnable {
     
     if ( block != null ) {
       // TODO:FEDERATION blockInfoSet should use ExtendedBlock
-      verifyBlock(new ExtendedBlock(block));
+      verifyBlock(new ExtendedBlock("TODO", block));
     }
   }
   

+ 2 - 4
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -581,7 +581,7 @@ public class DataNode extends Configured
         bpRegistration.setStorageInfo(storage.getBPStorage(blockPoolId));
       }
       initFsDataSet(conf, dataDirs);
-      //data.addStorage(blockPoolId, storage);
+      data.addBlockPool(blockPoolId, conf);
     }
 
     /**
@@ -681,9 +681,7 @@ public class DataNode extends Configured
         // and can be safely GC'ed.
         //
         long brStartTime = now();
-        BlockListAsLongs bReport = data.getBlockReport(/* TODO:FEDERATION pass blockPoolId*/);
-
-        // TODO:FEDERATION add support for pool ID
+        BlockListAsLongs bReport = data.getBlockReport(blockPoolId);
         cmd = bpNamenode.blockReport(bpRegistration, blockPoolId, bReport
             .getBlockListAsLongs());
         long brTime = now() - brStartTime;

+ 5 - 3
src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java

@@ -173,8 +173,9 @@ public class DirectoryScanner {
   void reconcile() {
     scan();
     for (ScanInfo info : diff) {
-      dataset.checkAndUpdate(info.getBlockId(), info.getBlockFile(), info
-          .getMetaFile(), info.getVolume());
+      // TODO:FEDERATION use right block pool Id
+      dataset.checkAndUpdate("TODO", info.getBlockId(), info.getBlockFile(),
+          info.getMetaFile(), info.getVolume());
     }
   }
 
@@ -188,7 +189,8 @@ public class DirectoryScanner {
 
     // Hold FSDataset lock to prevent further changes to the block map
     synchronized(dataset) {
-      Block[] memReport = dataset.getBlockList(false);
+      // TODO:FEDERATION use right block pool Id
+      Block[] memReport = dataset.getBlockList("TODO", false);
       Arrays.sort(memReport); // Sort based on blockId
 
       int d = 0; // index for diskReport

文件差异内容过多而无法显示
+ 369 - 156
src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java


+ 19 - 15
src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java

@@ -147,12 +147,12 @@ class FSDatasetAsyncDiskService {
    * Delete the block file and meta file from the disk asynchronously, adjust
    * dfsUsed statistics accordingly.
    */
-  void deleteAsync(FSDataset.FSVolume volume, File blockFile,
+  void deleteAsync(FSDataset.FSVolume volume, String bpid, File blockFile,
       File metaFile, long dfsBytes, String blockName) {
     DataNode.LOG.info("Scheduling block " + blockName + " file " + blockFile
         + " for deletion");
     ReplicaFileDeleteTask deletionTask = 
-        new ReplicaFileDeleteTask(volume, blockFile, metaFile, dfsBytes,
+        new ReplicaFileDeleteTask(volume, bpid, blockFile, metaFile, dfsBytes,
             blockName);
     execute(volume.getCurrentDir(), deletionTask);
   }
@@ -161,16 +161,17 @@ class FSDatasetAsyncDiskService {
    *  as decrement the dfs usage of the volume. 
    */
   static class ReplicaFileDeleteTask implements Runnable {
-
-    FSDataset.FSVolume volume;
-    File blockFile;
-    File metaFile;
-    long dfsBytes;
-    String blockName;
+    final FSDataset.FSVolume volume;
+    final String blockPoolId;
+    final File blockFile;
+    final File metaFile;
+    final long dfsBytes;
+    final String blockName;
     
-    ReplicaFileDeleteTask(FSDataset.FSVolume volume, File blockFile,
-        File metaFile, long dfsBytes, String blockName) {
+    ReplicaFileDeleteTask(FSDataset.FSVolume volume, String bpid,
+        File blockFile, File metaFile, long dfsBytes, String blockName) {
       this.volume = volume;
+      this.blockPoolId = bpid;
       this.blockFile = blockFile;
       this.metaFile = metaFile;
       this.dfsBytes = dfsBytes;
@@ -184,18 +185,21 @@ class FSDatasetAsyncDiskService {
     @Override
     public String toString() {
       // Called in AsyncDiskService.execute for displaying error messages.
-      return "deletion of block " + blockName + " with block file " + blockFile
-          + " and meta file " + metaFile + " from volume " + volume;
+      return "deletion of block " + blockPoolId + " " + blockName
+          + " with block file " + blockFile + " and meta file " + metaFile
+          + " from volume " + volume;
     }
 
     @Override
     public void run() {
       if ( !blockFile.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
         DataNode.LOG.warn("Unexpected error trying to delete block "
-            + blockName + " at file " + blockFile + ". Ignored.");
+            + blockPoolId + " " + blockName + " at file " + blockFile
+            + ". Ignored.");
       } else {
-        volume.decDfsUsed(dfsBytes);
-        DataNode.LOG.info("Deleted block " + blockName + " at file " + blockFile);
+        volume.decDfsUsed(blockPoolId, dfsBytes);
+        DataNode.LOG.info("Deleted block " + blockPoolId + " " + blockName
+            + " at file " + blockFile);
       }
     }
   };

+ 16 - 7
src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java

@@ -25,6 +25,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -103,12 +104,12 @@ public interface FSDatasetInterface extends FSDatasetMBean {
    * @return replica from the replicas map
    */
   @Deprecated
-  public Replica getReplica(long blockId);
+  public Replica getReplica(String bpid, long blockId);
 
   /**
    * @return the generation stamp stored with the block.
    */
-  public Block getStoredBlock(String poolId, long blkid)
+  public Block getStoredBlock(String bpid, long blkid)
       throws IOException;
 
   /**
@@ -271,10 +272,12 @@ public interface FSDatasetInterface extends FSDatasetMBean {
   public void unfinalizeBlock(ExtendedBlock b) throws IOException;
 
   /**
-   * Returns the block report - the full list of blocks stored
+   * Returns the block report - the full list of blocks stored under a 
+   * block pool
+   * @param bpid Block Pool Id
    * @return - the block report - the full list of blocks stored
    */
-  public BlockListAsLongs getBlockReport();
+  public BlockListAsLongs getBlockReport(String bpid);
 
   /**
    * Is the block valid?
@@ -285,10 +288,11 @@ public interface FSDatasetInterface extends FSDatasetMBean {
 
   /**
    * Invalidates the specified blocks
+   * @param bpid Block pool Id
    * @param invalidBlks - the blocks to be invalidated
    * @throws IOException
    */
-  public void invalidate(String poolId, Block invalidBlks[]) throws IOException;
+  public void invalidate(String bpid, Block invalidBlks[]) throws IOException;
 
     /**
      * Check if all the data directories are healthy
@@ -330,12 +334,11 @@ public interface FSDatasetInterface extends FSDatasetMBean {
 
   /**
    * Initialize a replica recovery.
-   * 
    * @return actual state of the replica on this data-node or 
    * null if data-node does not have the replica.
    */
   public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
-  throws IOException;
+      throws IOException;
 
   /**
    * Update replica's generation stamp and length and finalize it.
@@ -344,4 +347,10 @@ public interface FSDatasetInterface extends FSDatasetMBean {
                                           ExtendedBlock oldBlock,
                                           long recoveryId,
                                           long newLength) throws IOException;
+  /**
+   * add new block pool ID
+   * @param bpid Block pool Id
+   * @param conf Configuration
+   */
+  public void addBlockPool(String bpid, Configuration conf) throws IOException;
 }

+ 70 - 32
src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java

@@ -19,24 +19,39 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.hadoop.hdfs.protocol.Block;
 
 class ReplicasMap {
-  // HashMap: maps a block id to the replica's meta info
-  private HashMap<Long, ReplicaInfo> map = new HashMap<Long, ReplicaInfo>();
+  // Map of block pool Id to another map of block Id to ReplicaInfo.
+  private Map<String, Map<Long, ReplicaInfo>> map = 
+    new HashMap<String, Map<Long, ReplicaInfo>>();
+  
+  private void checkBlockPool(String bpid) {
+    if (bpid == null) {
+      throw new IllegalArgumentException("Block Pool Id is null");
+    }
+  }
+  
+  private void checkBlock(Block b) {
+    if (b == null) {
+      throw new IllegalArgumentException("Block is null");
+    }
+  }
+  
   /**
    * Get the meta information of the replica that matches both block id 
    * and generation stamp
+   * @param bpid block pool id
    * @param block block with its id as the key
    * @return the replica's meta information
-   * @throws IllegalArgumentException if the input block is null
+   * @throws IllegalArgumentException if the input block or block pool is null
    */
-  ReplicaInfo get(Block block) {
-    if (block == null) {
-      throw new IllegalArgumentException("Do not expect null block");
-    }
-    ReplicaInfo replicaInfo = get(block.getBlockId());
+  ReplicaInfo get(String bpid, Block block) {
+    checkBlockPool(bpid);
+    checkBlock(block);
+    ReplicaInfo replicaInfo = get(bpid, block.getBlockId());
     if (replicaInfo != null && 
         block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
       return replicaInfo;
@@ -44,72 +59,95 @@ class ReplicasMap {
     return null;
   }
   
+  
   /**
    * Get the meta information of the replica that matches the block id
+   * @param bpid block pool id
    * @param blockId a block's id
    * @return the replica's meta information
    */
-  ReplicaInfo get(long blockId) {
-    return map.get(blockId);
+  ReplicaInfo get(String bpid, long blockId) {
+    checkBlockPool(bpid);
+    Map<Long, ReplicaInfo> m = map.get(bpid);
+    return m != null ? m.get(blockId) : null;
   }
   
   /**
    * Add a replica's meta information into the map 
    * 
+   * @param bpid block pool id
    * @param replicaInfo a replica's meta information
    * @return previous meta information of the replica
    * @throws IllegalArgumentException if the input parameter is null
    */
-  ReplicaInfo add(ReplicaInfo replicaInfo) {
-    if (replicaInfo == null) {
-      throw new IllegalArgumentException("Do not expect null block");
+  ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
+    checkBlockPool(bpid);
+    checkBlock(replicaInfo);
+    Map<Long, ReplicaInfo> m = map.get(bpid);
+    if (m == null) {
+      // Add an entry for block pool if it does not exist already
+      m = new HashMap<Long, ReplicaInfo>();
+      map.put(bpid, m);
     }
-    return  map.put(replicaInfo.getBlockId(), replicaInfo);
+    return  m.put(replicaInfo.getBlockId(), replicaInfo);
   }
   
   /**
    * Remove the replica's meta information from the map that matches
    * the input block's id and generation stamp
+   * @param bpid block pool id
    * @param block block with its id as the key
    * @return the removed replica's meta information
    * @throws IllegalArgumentException if the input block is null
    */
-  ReplicaInfo remove(Block block) {
-    if (block == null) {
-      throw new IllegalArgumentException("Do not expect null block");
+  ReplicaInfo remove(String bpid, Block block) {
+    checkBlockPool(bpid);
+    checkBlock(block);
+    Map<Long, ReplicaInfo> m = map.get(bpid);
+    if (m != null) {
+      Long key = Long.valueOf(block.getBlockId());
+      ReplicaInfo replicaInfo = m.get(key);
+      if (replicaInfo != null &&
+          block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
+        return m.remove(key);
+      } 
     }
-    Long key = Long.valueOf(block.getBlockId());
-    ReplicaInfo replicaInfo = map.get(key);
-    if (replicaInfo != null &&
-        block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
-      return remove(key);
-    } 
     
     return null;
   }
   
   /**
    * Remove the replica's meta information from the map if present
+   * @param bpid block pool id
    * @param the block id of the replica to be removed
    * @return the removed replica's meta information
    */
-  ReplicaInfo remove(long blockId) {
-    return map.remove(blockId);
+  ReplicaInfo remove(String bpid, long blockId) {
+    checkBlockPool(bpid);
+    Map<Long, ReplicaInfo> m = map.get(bpid);
+    if (m != null) {
+      return m.remove(blockId);
+    }
+    return null;
   }
  
   /**
-   * Get the size of the map
+   * Get the size of the map for given block pool
+   * @param bpid block pool id
    * @return the number of replicas in the map
    */
-  int size() {
-    return map.size();
+  int size(String bpid) {
+    Map<Long, ReplicaInfo> m = map.get(bpid);
+    return m != null ? m.size() : 0;
   }
   
   /**
-   * Get a collection of the replicas
+   * Get a collection of the replicas for given block pool
+   * @param bpid block pool id
    * @return a collection of the replicas
    */
-  Collection<ReplicaInfo> replicas() {
-    return map.values();
+  Collection<ReplicaInfo> replicas(String bpid) {
+    Map<Long, ReplicaInfo> m = map.get(bpid);
+    return m != null ? m.values() : null;
   }
-}
+}

+ 2 - 7
src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java

@@ -354,16 +354,11 @@ public class DatanodeDescriptor extends DatanodeInfo {
     return replicateBlocks.poll(maxTransfers);
   }
 
-  BlockRecoveryCommand getLeaseRecoveryCommand(int maxTransfers) {
+  BlockInfoUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) {
     List<BlockInfoUnderConstruction> blocks = recoverBlocks.poll(maxTransfers);
     if(blocks == null)
       return null;
-    BlockRecoveryCommand brCommand = new BlockRecoveryCommand(blocks.size());
-    for(BlockInfoUnderConstruction b : blocks) {
-      brCommand.add(new RecoveringBlock(
-          new ExtendedBlock(b), b.getExpectedLocations(), b.getBlockRecoveryId()));
-    }
-    return brCommand;
+    return blocks.toArray(new BlockInfoUnderConstruction[blocks.size()]);
   }
 
   /**

+ 13 - 3
src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor.BlockTargetPair
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -66,6 +67,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -2731,9 +2733,17 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
         updateStats(nodeinfo, true);
         
         //check lease recovery
-        cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
-        if (cmd != null) {
-          return new DatanodeCommand[] {cmd};
+        BlockInfoUnderConstruction[] blocks = nodeinfo
+            .getLeaseRecoveryCommand(Integer.MAX_VALUE);
+        if (blocks != null) {
+          BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
+              blocks.length);
+          for (BlockInfoUnderConstruction b : blocks) {
+            brCommand.add(new RecoveringBlock(
+                new ExtendedBlock(blockPoolId, b), b.getExpectedLocations(), b
+                    .getBlockRecoveryId()));
+          }
+          return new DatanodeCommand[] { brCommand };
         }
       
         ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(3);

+ 1 - 1
src/test/aop/org/apache/hadoop/hdfs/TestFiHftp.java

@@ -161,7 +161,7 @@ public class TestFiHftp {
     final DataNode dn = cluster.getDataNode(datanodeinfos[0].getIpcPort());
     LOG.info("dn=" + dn + ", blk=" + blk + " (length=" + blk.getNumBytes() + ")");
     final FSDataset data = (FSDataset)dn.getFSDataset();
-    final File blkfile = data.getBlockFile(blk.getLocalBlock());
+    final File blkfile = data.getBlockFile(blk);
     Assert.assertTrue(blkfile.delete());
 
     //read again by hftp, should get an exception 

+ 6 - 4
src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -827,6 +827,7 @@ public class MiniDFSCluster {
 
   /*
    * Corrupt a block on a particular datanode
+   * Types: delete, write bad data, truncate
    */
   boolean corruptBlockOnDataNode(int i, ExtendedBlock blk) throws Exception {
     Random random = new Random();
@@ -1094,11 +1095,12 @@ public class MiniDFSCluster {
    * @param dataNodeIndex - data node whose block report is desired - the index is same as for getDataNodes()
    * @return the block report for the specified data node
    */
-  public Iterable<Block> getBlockReport(int dataNodeIndex) {
+  public Iterable<Block> getBlockReport(String bpid, int dataNodeIndex) {
     if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
       throw new IndexOutOfBoundsException();
     }
-    return dataNodes.get(dataNodeIndex).datanode.getFSDataset().getBlockReport();
+    return dataNodes.get(dataNodeIndex).datanode.getFSDataset().getBlockReport(
+        bpid);
   }
   
   
@@ -1107,11 +1109,11 @@ public class MiniDFSCluster {
    * @return block reports from all data nodes
    *    BlockListAsLongs is indexed in the same order as the list of datanodes returned by getDataNodes()
    */
-  public Iterable<Block>[] getAllBlockReports() {
+  public Iterable<Block>[] getAllBlockReports(String bpid) {
     int numDataNodes = dataNodes.size();
     Iterable<Block>[] result = new BlockListAsLongs[numDataNodes];
     for (int i = 0; i < numDataNodes; ++i) {
-     result[i] = getBlockReport(i);
+     result[i] = getBlockReport(bpid, i);
     }
     return result;
   }

+ 2 - 5
src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java

@@ -45,7 +45,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.shell.Count;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset;
 import org.apache.hadoop.io.IOUtils;
@@ -1108,14 +1107,12 @@ public class TestDFSShell extends TestCase {
   static List<File> getBlockFiles(MiniDFSCluster cluster) throws IOException {
     List<File> files = new ArrayList<File>();
     List<DataNode> datanodes = cluster.getDataNodes();
-    Iterable<Block>[] blocks = cluster.getAllBlockReports();
-    ExtendedBlock blk = new ExtendedBlock();
     String poolId = cluster.getNamesystem().getPoolId();
+    Iterable<Block>[] blocks = cluster.getAllBlockReports(poolId);
     for(int i = 0; i < blocks.length; i++) {
       FSDataset ds = (FSDataset)datanodes.get(i).getFSDataset();
       for(Block b : blocks[i]) {
-        blk.set(poolId, b);
-        files.add(ds.getBlockFile(blk.getLocalBlock()));
+        files.add(ds.getBlockFile(poolId, b));
       }        
     }
     return files;

+ 2 - 2
src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java

@@ -49,7 +49,6 @@ import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@@ -420,7 +419,8 @@ public class TestDataTransferProtocol extends TestCase {
     
     /* Test OP_READ_BLOCK */
 
-    ExtendedBlock blk = new ExtendedBlock(firstBlock.getLocalBlock());
+    String bpid = cluster.getNamesystem().getPoolId();
+    ExtendedBlock blk = new ExtendedBlock(bpid, firstBlock.getLocalBlock());
     long blkid = blk.getBlockId();
     // bad block id
     sendBuf.reset();

+ 3 - 3
src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java

@@ -135,7 +135,7 @@ public class TestFileAppend extends TestCase {
       //
       for (int i = 0; i < blocks.size(); i = i + 2) {
         ExtendedBlock b = blocks.get(i).getBlock();
-        File f = dataset.getFile(b.getLocalBlock());
+        File f = dataset.getFile(b.getPoolId(), b.getLocalBlock());
         File link = new File(f.toString() + ".link");
         System.out.println("Creating hardlink for File " + f + " to " + link);
         HardLink.createHardLink(f, link);
@@ -148,7 +148,7 @@ public class TestFileAppend extends TestCase {
         ExtendedBlock b = blocks.get(i).getBlock();
         System.out.println("testCopyOnWrite detaching block " + b);
         assertTrue("Detaching block " + b + " should have returned true",
-            dataset.unlinkBlock(b.getLocalBlock(), 1));
+            dataset.unlinkBlock(b, 1));
       }
 
       // Since the blocks were already detached earlier, these calls should
@@ -158,7 +158,7 @@ public class TestFileAppend extends TestCase {
         ExtendedBlock b = blocks.get(i).getBlock();
         System.out.println("testCopyOnWrite detaching block " + b);
         assertTrue("Detaching block " + b + " should have returned false",
-            !dataset.unlinkBlock(b.getLocalBlock(), 1));
+            !dataset.unlinkBlock(b, 1));
       }
 
     } finally {

+ 1 - 1
src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java

@@ -199,7 +199,7 @@ public class TestFileAppend3 extends junit.framework.TestCase {
     assertEquals(repl, datanodeinfos.length);
     final DataNode dn = cluster.getDataNode(datanodeinfos[0].getIpcPort());
     final FSDataset data = (FSDataset)dn.getFSDataset();
-    final RandomAccessFile raf = new RandomAccessFile(data.getBlockFile(blk.getLocalBlock()), "rw");
+    final RandomAccessFile raf = new RandomAccessFile(data.getBlockFile(blk), "rw");
     AppendTestUtil.LOG.info("dn=" + dn + ", blk=" + blk + " (length=" + blk.getNumBytes() + ")");
     assertEquals(len1, raf.length());
     raf.setLength(0);

+ 1 - 4
src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java

@@ -27,12 +27,9 @@ import java.util.EnumSet;
 
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
@@ -768,7 +765,7 @@ public class TestFileCreation extends junit.framework.TestCase {
         FSDataset dataset = (FSDataset)datanode.data;
         ExtendedBlock blk = locatedblock.getBlock();
         Block b = dataset.getStoredBlock(blk.getPoolId(), blk.getBlockId());
-        File blockfile = dataset.findBlockFile(b.getBlockId());
+        File blockfile = dataset.findBlockFile(blk.getPoolId(), b.getBlockId());
         System.out.println("blockfile=" + blockfile);
         if (blockfile != null) {
           BufferedReader in = new BufferedReader(new FileReader(blockfile));

+ 2 - 8
src/test/hdfs/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java

@@ -140,24 +140,18 @@ public class TestInjectionForSimulatedStorage extends TestCase {
       //first time format
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
       cluster.waitActive();
+      String bpid = cluster.getNamesystem().getPoolId();
       DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
                                             cluster.getNameNodePort()),
                                             conf);
       
       writeFile(cluster.getFileSystem(), testPath, numDataNodes);
-
-      
       waitForBlockReplication(testFile, dfsClient.getNamenode(), numDataNodes, 20);
-
-      
-      Iterable<Block>[] blocksList = cluster.getAllBlockReports();
-                    
+      Iterable<Block>[] blocksList = cluster.getAllBlockReports(bpid);
       
       cluster.shutdown();
       cluster = null;
       
-
-      
       /* Start the MiniDFSCluster with more datanodes since once a writeBlock
        * to a datanode node fails, same block can not be written to it
        * immediately. In our case some replication attempts will fail.

+ 2 - 1
src/test/hdfs/org/apache/hadoop/hdfs/TestPipelines.java

@@ -103,8 +103,9 @@ public class TestPipelines {
     List<LocatedBlock> lb = cluster.getNameNode().getBlockLocations(
       filePath.toString(), FILE_SIZE - 1, FILE_SIZE).getLocatedBlocks();
 
+    String bpid = cluster.getNamesystem().getPoolId();
     Replica r = DataNodeAdapter.fetchReplicaInfo(cluster.getDataNodes().get(0),
-      lb.get(0).getBlock().getBlockId());
+        bpid, lb.get(0).getBlock().getBlockId());
     assertTrue("Replica shouldn'e be null", r != null);
     assertEquals(
       "Should be RBW replica after sequence of calls append()/write()/hflush()",

+ 3 - 1
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java

@@ -28,11 +28,13 @@ public class DataNodeAdapter {
   /**
    * Fetch a copy of ReplicaInfo from a datanode by block id
    * @param dn datanode to retrieve a replicainfo object from
+   * @param bpid Block pool Id
    * @param blkId id of the replica's block
    * @return copy of ReplicaInfo object @link{FSDataset#fetchReplicaInfo}
    */
   public static ReplicaInfo fetchReplicaInfo (final DataNode dn,
+                                              final String bpid,
                                               final long blkId) {
-    return ((FSDataset)dn.data).fetchReplicaInfo(blkId);
+    return ((FSDataset)dn.data).fetchReplicaInfo(bpid, blkId);
   }
 }

+ 101 - 64
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -23,6 +23,7 @@ import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Random;
 
 import javax.management.NotCompliantMBeanException;
@@ -295,7 +296,7 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
     }
   }
   
-  private HashMap<Block, BInfo> blockMap = null;
+  private Map<String, Map<Block, BInfo>> blockMap = null;
   private SimulatedStorage storage = null;
   private String storageId;
   
@@ -320,10 +321,10 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
     //DataNode.LOG.info("Starting Simulated storage; Capacity = " + getCapacity() + 
     //    "Used = " + getDfsUsed() + "Free =" + getRemaining());
 
-    blockMap = new HashMap<Block,BInfo>(); 
+    blockMap = new HashMap<String, Map<Block,BInfo>>(); 
   }
 
-  public synchronized void injectBlocks(String poolId,
+  public synchronized void injectBlocks(String bpid,
       Iterable<Block> injectBlocks) throws IOException {
     ExtendedBlock blk = new ExtendedBlock();
     if (injectBlocks != null) {
@@ -333,31 +334,41 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
         if (b == null) {
           throw new NullPointerException("Null blocks in block list");
         }
-        blk.set(poolId, b);
+        blk.set(bpid, b);
         if (isValidBlock(blk)) {
           throw new IOException("Block already exists in  block list");
         }
       }
-      HashMap<Block, BInfo> oldBlockMap = blockMap;
-      blockMap = new HashMap<Block,BInfo>(
-          numInjectedBlocks + oldBlockMap.size());
-      blockMap.putAll(oldBlockMap);
+      Map<Block, BInfo> map = blockMap.get(bpid);
+      if (map == null) {
+        map = new HashMap<Block, BInfo>();
+        blockMap.put(bpid, map);
+      }
+      
       for (Block b: injectBlocks) {
-          BInfo binfo = new BInfo(b, false);
-          blockMap.put(binfo.theBlock, binfo);
+        BInfo binfo = new BInfo(b, false);
+        map.put(binfo.theBlock, binfo);
       }
     }
   }
+  
+  /** Get a map for a given block pool Id */
+  private Map<Block, BInfo> getMap(String bpid) throws IOException {
+    final Map<Block, BInfo> map = blockMap.get(bpid);
+    if (map == null) {
+      throw new IOException("Non existent blockpool " + bpid);
+    }
+    return map;
+  }
 
   @Override // FSDatasetInterface
   public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
-    // TODO:FEDERATION use ExtendedBlock
-    BInfo binfo = blockMap.get(b.getLocalBlock());
+    final Map<Block, BInfo> map = getMap(b.getPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new IOException("Finalizing a non existing block " + b);
     }
     binfo.finalizeBlock(b.getNumBytes());
-
   }
 
   @Override // FSDatasetInterface
@@ -368,16 +379,21 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
   }
 
   @Override
-  public synchronized BlockListAsLongs getBlockReport() {
-    Block[] blockTable = new Block[blockMap.size()];
-    int count = 0;
-    for (BInfo b : blockMap.values()) {
-      if (b.isFinalized()) {
-        blockTable[count++] = b.theBlock;
+  public synchronized BlockListAsLongs getBlockReport(String bpid) {
+    final Map<Block, BInfo> map = blockMap.get(bpid);
+    Block[] blockTable = new Block[map.size()];
+    if (map != null) {
+      int count = 0;
+      for (BInfo b : map.values()) {
+        if (b.isFinalized()) {
+          blockTable[count++] = b.theBlock;
+        }
       }
-    }
-    if (count != blockTable.length) {
-      blockTable = Arrays.copyOf(blockTable, count);
+      if (count != blockTable.length) {
+        blockTable = Arrays.copyOf(blockTable, count);
+      }
+    } else {
+      blockTable = new Block[0];
     }
     return new BlockListAsLongs(
         new ArrayList<Block>(Arrays.asList(blockTable)), null);
@@ -397,7 +413,8 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
 
   @Override // FSDatasetInterface
   public synchronized long getLength(ExtendedBlock b) throws IOException {
-    BInfo binfo = blockMap.get(b.getLocalBlock());
+    final Map<Block, BInfo> map = getMap(b.getPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new IOException("Finalizing a non existing block " + b);
     }
@@ -406,35 +423,40 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
 
   @Override
   @Deprecated
-  public Replica getReplica(long blockId) {
-    return blockMap.get(new Block(blockId));
+  public Replica getReplica(String bpid, long blockId) {
+    final Map<Block, BInfo> map = blockMap.get(bpid);
+    if (map != null) {
+      return map.get(new Block(blockId));
+    }
+    return null;
   }
 
   @Override // FSDatasetInterface
-  public Block getStoredBlock(String poolId, long blkid) throws IOException {
-    ExtendedBlock b = new ExtendedBlock(poolId, blkid);
-    // TODO:FEDERATION use ExtendedBlock
-    BInfo binfo = blockMap.get(b.getLocalBlock());
-    if (binfo == null) {
-      return null;
+  public Block getStoredBlock(String bpid, long blkid) throws IOException {
+    final Map<Block, BInfo> map = blockMap.get(bpid);
+    if (map != null) {
+      BInfo binfo = map.get(new Block(blkid));
+      if (binfo == null) {
+        return null;
+      }
+      return new Block(blkid, binfo.getGenerationStamp(), binfo.getNumBytes());
     }
-    b.setGenerationStamp(binfo.getGenerationStamp());
-    b.setNumBytes(binfo.getNumBytes());
-    return b.getLocalBlock();
+    return null;
   }
 
   @Override // FSDatasetInterface
-  public synchronized void invalidate(String poolId, Block[] invalidBlks)
+  public synchronized void invalidate(String bpid, Block[] invalidBlks)
       throws IOException {
     boolean error = false;
     if (invalidBlks == null) {
       return;
     }
+    final Map<Block, BInfo> map = getMap(bpid);
     for (Block b: invalidBlks) {
       if (b == null) {
         continue;
       }
-      BInfo binfo = blockMap.get(b);
+      BInfo binfo = map.get(b);
       if (binfo == null) {
         error = true;
         DataNode.LOG.warn("Invalidate: Missing block");
@@ -443,16 +465,18 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
       storage.free(binfo.getNumBytes());
       blockMap.remove(b);
     }
-      if (error) {
-          throw new IOException("Invalidate: Missing blocks.");
-      }
+    if (error) {
+      throw new IOException("Invalidate: Missing blocks.");
+    }
   }
 
   @Override // FSDatasetInterface
   public synchronized boolean isValidBlock(ExtendedBlock b) {
-    // return (blockMap.containsKey(b));
-    // TODO:FEDERATION use ExtendedBlock
-    BInfo binfo = blockMap.get(b.getLocalBlock());
+    final Map<Block, BInfo> map = blockMap.get(b.getPoolId());
+    if (map == null) {
+      return false;
+    }
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       return false;
     }
@@ -461,8 +485,11 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
 
   /* check if a block is created but not finalized */
   private synchronized boolean isBeingWritten(ExtendedBlock b) {
-    // TODO:FEDERATION use ExtendedBlock
-    BInfo binfo = blockMap.get(b.getLocalBlock());
+    final Map<Block, BInfo> map = blockMap.get(b.getPoolId());
+    if (map == null) {
+      return false;
+    }
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       return false;
     }
@@ -476,7 +503,8 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
   @Override // FSDatasetInterface
   public synchronized ReplicaInPipelineInterface append(ExtendedBlock b,
       long newGS, long expectedBlockLen) throws IOException {
-    BInfo binfo = blockMap.get(b.getLocalBlock());
+    final Map<Block, BInfo> map = getMap(b.getPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null || !binfo.isFinalized()) {
       throw new ReplicaNotFoundException("Block " + b
           + " is not valid, and cannot be appended to.");
@@ -488,7 +516,8 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
   @Override // FSDatasetInterface
   public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b,
       long newGS, long expectedBlockLen) throws IOException {
-    BInfo binfo = blockMap.get(b.getLocalBlock());
+    final Map<Block, BInfo> map = getMap(b.getPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new ReplicaNotFoundException("Block " + b
           + " is not valid, and cannot be appended to.");
@@ -496,16 +525,17 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
     if (binfo.isFinalized()) {
       binfo.unfinalizeBlock();
     }
-    blockMap.remove(b);
+    map.remove(b);
     binfo.theBlock.setGenerationStamp(newGS);
-    blockMap.put(binfo.theBlock, binfo);
+    map.put(binfo.theBlock, binfo);
     return binfo;
   }
 
   @Override // FSDatasetInterface
   public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
       throws IOException {
-    BInfo binfo = blockMap.get(b.getLocalBlock());
+    final Map<Block, BInfo> map = getMap(b.getPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new ReplicaNotFoundException("Block " + b
           + " is not valid, and cannot be appended to.");
@@ -513,15 +543,16 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
     if (!binfo.isFinalized()) {
       binfo.finalizeBlock(binfo.getNumBytes());
     }
-    blockMap.remove(b.getLocalBlock());
+    map.remove(b.getLocalBlock());
     binfo.theBlock.setGenerationStamp(newGS);
-    blockMap.put(binfo.theBlock, binfo);
+    map.put(binfo.theBlock, binfo);
   }
   
   @Override // FSDatasetInterface
   public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b,
       long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException {
-    BInfo binfo = blockMap.get(b.getLocalBlock());
+    final Map<Block, BInfo> map = getMap(b.getPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if ( binfo == null) {
       throw new ReplicaNotFoundException("Block " + b
           + " does not exist, and cannot be appended to.");
@@ -530,9 +561,9 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
       throw new ReplicaAlreadyExistsException("Block " + b
           + " is valid, and cannot be written to.");
     }
-    blockMap.remove(b);
+    map.remove(b);
     binfo.theBlock.setGenerationStamp(newGS);
-    blockMap.put(binfo.theBlock, binfo);
+    map.put(binfo.theBlock, binfo);
     return binfo;
   }
 
@@ -553,22 +584,21 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
         throw new ReplicaAlreadyExistsException("Block " + b + 
             " is being written, and cannot be written to.");
     }
-    // TODO:FEDERATION use ExtendedBlock
+    final Map<Block, BInfo> map = getMap(b.getPoolId());
     BInfo binfo = new BInfo(b.getLocalBlock(), true);
-    blockMap.put(binfo.theBlock, binfo);
+    map.put(binfo.theBlock, binfo);
     return binfo;
   }
 
   @Override // FSDatasetInterface
   public synchronized InputStream getBlockInputStream(ExtendedBlock b)
       throws IOException {
-    // TODO:FEDERATION use ExtendedBlock
-    BInfo binfo = blockMap.get(b.getLocalBlock());
+    final Map<Block, BInfo> map = getMap(b.getPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new IOException("No such Block " + b );  
     }
     
-    //DataNode.LOG.info("Opening block(" + b.blkid + ") of length " + b.len);
     return binfo.getIStream();
   }
   
@@ -596,8 +626,8 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
    */
   private synchronized InputStream getMetaDataInStream(ExtendedBlock b)
                                               throws IOException {
-    // TODO:FEDERATION use ExtendedBlock
-    BInfo binfo = blockMap.get(b.getLocalBlock());
+    final Map<Block, BInfo> map = getMap(b.getPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new IOException("No such Block " + b );  
     }
@@ -611,8 +641,8 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
   @Override // FSDatasetInterface
   public synchronized long getMetaDataLength(ExtendedBlock b)
       throws IOException {
-    // TODO:FEDERATION use ExtendedBlock
-    BInfo binfo = blockMap.get(b.getLocalBlock());
+    final Map<Block, BInfo> map = getMap(b.getPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new IOException("No such Block " + b );  
     }
@@ -818,7 +848,8 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
   public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
   throws IOException {
     ExtendedBlock b = rBlock.getBlock();
-    BInfo binfo = blockMap.get(b.getLocalBlock());
+    final Map<Block, BInfo> map = getMap(b.getPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new IOException("No such Block " + b );  
     }
@@ -840,4 +871,10 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
   public long getReplicaVisibleLength(ExtendedBlock block) throws IOException {
     return block.getNumBytes();
   }
+
+  @Override // FSDatasetInterface
+  public void addBlockPool(String bpid, Configuration conf) {
+    Map<Block, BInfo> map = new HashMap<Block, BInfo>();
+    blockMap.put(bpid, map);
+  }
 }

+ 4 - 4
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java

@@ -490,15 +490,15 @@ public class TestBlockReport {
       LOG.debug("Total number of DNs " + cluster.getDataNodes().size());
     }
     // Look about specified DN for the replica of the block from 1st DN
-    Replica r;
-    r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()).
-      fetchReplicaInfo(bl.getBlockId());
+    String bpid = cluster.getNamesystem().getPoolId();
+    Replica r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()).
+      fetchReplicaInfo(bpid, bl.getBlockId());
     long start = System.currentTimeMillis();
     int count = 0;
     while (r == null) {
       waitTil(50);
       r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()).
-        fetchReplicaInfo(bl.getBlockId());
+        fetchReplicaInfo(bpid, bl.getBlockId());
       long waiting_period = System.currentTimeMillis() - start;
       if (count++ % 10 == 0)
         if(LOG.isDebugEnabled()) {

+ 3 - 3
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

@@ -118,9 +118,9 @@ public class TestDataNodeVolumeFailure extends TestCase{
     
     // make sure a block report is sent 
     DataNode dn = cluster.getDataNodes().get(1); //corresponds to dir data3
-    long[] bReport = dn.getFSDataset().getBlockReport().getBlockListAsLongs();
-    String poolId = cluster.getNamesystem().getPoolId();
-    cluster.getNameNode().blockReport(dn.dnRegistration, poolId, bReport);
+    String bpid = cluster.getNamesystem().getPoolId();
+    long[] bReport = dn.getFSDataset().getBlockReport(bpid).getBlockListAsLongs();
+    cluster.getNameNode().blockReport(dn.dnRegistration, bpid, bReport);
 
     // verify number of blocks and files...
     verify(filename, filesize);

+ 6 - 5
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java

@@ -112,16 +112,16 @@ public class TestDatanodeRestart {
       dn = cluster.getDataNodes().get(0);
 
       // check volumeMap: one rwr replica
+      String bpid = cluster.getNamesystem().getPoolId();
       ReplicasMap replicas = ((FSDataset)(dn.data)).volumeMap;
-      Assert.assertEquals(1, replicas.size());
-      ReplicaInfo replica = replicas.replicas().iterator().next();
+      Assert.assertEquals(1, replicas.size(bpid));
+      ReplicaInfo replica = replicas.replicas(bpid).iterator().next();
       Assert.assertEquals(ReplicaState.RWR, replica.getState());
       if (isCorrupt) {
         Assert.assertEquals((fileLen-1)/512*512, replica.getNumBytes());
       } else {
         Assert.assertEquals(fileLen, replica.getNumBytes());
       }
-      String bpid = cluster.getNamesystem().getPoolId();
       dn.data.invalidate(bpid, new Block[]{replica});
     } finally {
       IOUtils.closeStream(out);
@@ -147,9 +147,10 @@ public class TestDatanodeRestart {
         DFSTestUtil.createFile(fs, fileName, 1, (short)1, 0L);
         DFSTestUtil.waitReplication(fs, fileName, (short)1);
       }
+      String bpid = cluster.getNamesystem().getPoolId();
       DataNode dn = cluster.getDataNodes().get(0);
       Iterator<ReplicaInfo> replicasItor = 
-        ((FSDataset)dn.data).volumeMap.replicas().iterator();
+        ((FSDataset)dn.data).volumeMap.replicas(bpid).iterator();
       ReplicaInfo replica = replicasItor.next();
       createUnlinkTmpFile(replica, true, true); // rename block file
       createUnlinkTmpFile(replica, false, true); // rename meta file
@@ -166,7 +167,7 @@ public class TestDatanodeRestart {
 
       // check volumeMap: 4 finalized replica
       Collection<ReplicaInfo> replicas = 
-        ((FSDataset)(dn.data)).volumeMap.replicas();
+        ((FSDataset)(dn.data)).volumeMap.replicas(bpid);
       Assert.assertEquals(4, replicas.size());
       replicasItor = replicas.iterator();
       while (replicasItor.hasNext()) {

+ 10 - 8
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java

@@ -48,6 +48,7 @@ public class TestDirectoryScanner extends TestCase {
   private static final int DEFAULT_GEN_STAMP = 9999;
 
   private MiniDFSCluster cluster;
+  private String bpid;
   private FSDataset fds = null;
   private DirectoryScanner scanner = null;
   private Random rand = new Random();
@@ -69,7 +70,7 @@ public class TestDirectoryScanner extends TestCase {
   /** Truncate a block file */
   private long truncateBlockFile() throws IOException {
     synchronized (fds) {
-      for (ReplicaInfo b : fds.volumeMap.replicas()) {
+      for (ReplicaInfo b : fds.volumeMap.replicas(bpid)) {
         File f = b.getBlockFile();
         File mf = b.getMetaFile();
         // Truncate a block file that has a corresponding metadata file
@@ -88,7 +89,7 @@ public class TestDirectoryScanner extends TestCase {
   /** Delete a block file */
   private long deleteBlockFile() {
     synchronized(fds) {
-      for (ReplicaInfo b : fds.volumeMap.replicas()) {
+      for (ReplicaInfo b : fds.volumeMap.replicas(bpid)) {
         File f = b.getBlockFile();
         File mf = b.getMetaFile();
         // Delete a block file that has corresponding metadata file
@@ -104,7 +105,7 @@ public class TestDirectoryScanner extends TestCase {
   /** Delete block meta file */
   private long deleteMetaFile() {
     synchronized(fds) {
-      for (ReplicaInfo b : fds.volumeMap.replicas()) {
+      for (ReplicaInfo b : fds.volumeMap.replicas(bpid)) {
         File file = b.getMetaFile();
         // Delete a metadata file
         if (file.exists() && file.delete()) {
@@ -121,7 +122,7 @@ public class TestDirectoryScanner extends TestCase {
     long id = rand.nextLong();
     while (true) {
       id = rand.nextLong();
-      if (fds.fetchReplicaInfo(id) == null) {
+      if (fds.fetchReplicaInfo(bpid, id) == null) {
         break;
       }
     }
@@ -215,6 +216,7 @@ public class TestDirectoryScanner extends TestCase {
     cluster = new MiniDFSCluster.Builder(CONF).build();
     try {
       cluster.waitActive();
+      bpid = cluster.getNamesystem().getPoolId();
       fds = (FSDataset) cluster.getDataNodes().get(0).getFSDataset();
       CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
                   parallelism);
@@ -325,12 +327,12 @@ public class TestDirectoryScanner extends TestCase {
 
   private void verifyAddition(long blockId, long genStamp, long size) {
     final ReplicaInfo replicainfo;
-    replicainfo = fds.fetchReplicaInfo(blockId);
+    replicainfo = fds.fetchReplicaInfo(bpid, blockId);
     assertNotNull(replicainfo);
 
     // Added block has the same file as the one created by the test
     File file = new File(getBlockFile(blockId));
-    assertEquals(file.getName(), fds.findBlockFile(blockId).getName());
+    assertEquals(file.getName(), fds.findBlockFile(bpid, blockId).getName());
 
     // Generation stamp is same as that of created file
     assertEquals(genStamp, replicainfo.getGenerationStamp());
@@ -341,12 +343,12 @@ public class TestDirectoryScanner extends TestCase {
 
   private void verifyDeletion(long blockId) {
     // Ensure block does not exist in memory
-    assertNull(fds.fetchReplicaInfo(blockId));
+    assertNull(fds.fetchReplicaInfo(bpid, blockId));
   }
 
   private void verifyGenStamp(long blockId, long genStamp) {
     final ReplicaInfo memBlock;
-    memBlock = fds.fetchReplicaInfo(blockId);
+    memBlock = fds.fetchReplicaInfo(bpid, blockId);
     assertNotNull(memBlock);
     assertEquals(genStamp, memBlock.getGenerationStamp());
   }

+ 16 - 12
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java

@@ -130,44 +130,47 @@ public class TestInterDatanodeProtocol {
     Assert.assertEquals(originalInfo.getState(), recoveryInfo.getOriginalReplicaState());
   }
 
-  /** Test {@link FSDataset#initReplicaRecovery(ReplicasMap, Block, long)} */
+  /** Test 
+   * {@link FSDataset#initReplicaRecovery(String, ReplicasMap, Block, long)}
+   */
   @Test
   public void testInitReplicaRecovery() throws IOException {
     final long firstblockid = 10000L;
     final long gs = 7777L;
     final long length = 22L;
     final ReplicasMap map = new ReplicasMap();
+    String bpid = "BP-TEST";
     final Block[] blocks = new Block[5];
     for(int i = 0; i < blocks.length; i++) {
       blocks[i] = new Block(firstblockid + i, length, gs);
-      map.add(createReplicaInfo(blocks[i]));
+      map.add(bpid, createReplicaInfo(blocks[i]));
     }
     
     { 
       //normal case
       final Block b = blocks[0];
-      final ReplicaInfo originalInfo = map.get(b);
+      final ReplicaInfo originalInfo = map.get(bpid, b);
 
       final long recoveryid = gs + 1;
-      final ReplicaRecoveryInfo recoveryInfo = FSDataset.initReplicaRecovery(map, blocks[0], recoveryid);
+      final ReplicaRecoveryInfo recoveryInfo = FSDataset.initReplicaRecovery(bpid, map, blocks[0], recoveryid);
       assertEquals(originalInfo, recoveryInfo);
 
-      final ReplicaUnderRecovery updatedInfo = (ReplicaUnderRecovery)map.get(b);
+      final ReplicaUnderRecovery updatedInfo = (ReplicaUnderRecovery)map.get(bpid, b);
       Assert.assertEquals(originalInfo.getBlockId(), updatedInfo.getBlockId());
       Assert.assertEquals(recoveryid, updatedInfo.getRecoveryID());
 
       //recover one more time 
       final long recoveryid2 = gs + 2;
-      final ReplicaRecoveryInfo recoveryInfo2 = FSDataset.initReplicaRecovery(map, blocks[0], recoveryid2);
+      final ReplicaRecoveryInfo recoveryInfo2 = FSDataset.initReplicaRecovery(bpid, map, blocks[0], recoveryid2);
       assertEquals(originalInfo, recoveryInfo2);
 
-      final ReplicaUnderRecovery updatedInfo2 = (ReplicaUnderRecovery)map.get(b);
+      final ReplicaUnderRecovery updatedInfo2 = (ReplicaUnderRecovery)map.get(bpid, b);
       Assert.assertEquals(originalInfo.getBlockId(), updatedInfo2.getBlockId());
       Assert.assertEquals(recoveryid2, updatedInfo2.getRecoveryID());
       
       //case RecoveryInProgressException
       try {
-        FSDataset.initReplicaRecovery(map, b, recoveryid);
+        FSDataset.initReplicaRecovery(bpid, map, b, recoveryid);
         Assert.fail();
       }
       catch(RecoveryInProgressException ripe) {
@@ -178,7 +181,7 @@ public class TestInterDatanodeProtocol {
     { // BlockRecoveryFI_01: replica not found
       final long recoveryid = gs + 1;
       final Block b = new Block(firstblockid - 1, length, gs);
-      ReplicaRecoveryInfo r = FSDataset.initReplicaRecovery(map, b, recoveryid);
+      ReplicaRecoveryInfo r = FSDataset.initReplicaRecovery(bpid, map, b, recoveryid);
       Assert.assertNull("Data-node should not have this replica.", r);
     }
     
@@ -186,7 +189,7 @@ public class TestInterDatanodeProtocol {
       final long recoveryid = gs - 1;
       final Block b = new Block(firstblockid + 1, length, gs);
       try {
-        FSDataset.initReplicaRecovery(map, b, recoveryid);
+        FSDataset.initReplicaRecovery(bpid, map, b, recoveryid);
         Assert.fail();
       }
       catch(IOException ioe) {
@@ -199,7 +202,7 @@ public class TestInterDatanodeProtocol {
       final long recoveryid = gs + 1;
       final Block b = new Block(firstblockid, length, gs+1);
       try {
-        FSDataset.initReplicaRecovery(map, b, recoveryid);
+        FSDataset.initReplicaRecovery(bpid, map, b, recoveryid);
         fail("InitReplicaRecovery should fail because replica's " +
         		"gs is less than the block's gs");
       } catch (IOException e) {
@@ -221,6 +224,7 @@ public class TestInterDatanodeProtocol {
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
       cluster.waitActive();
+      String bpid = cluster.getNamesystem().getPoolId();
 
       //create a file
       DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
@@ -248,7 +252,7 @@ public class TestInterDatanodeProtocol {
           new RecoveringBlock(b, null, recoveryid));
 
       //check replica
-      final ReplicaInfo replica = fsdataset.fetchReplicaInfo(b.getBlockId());
+      final ReplicaInfo replica = fsdataset.fetchReplicaInfo(bpid, b.getBlockId());
       Assert.assertEquals(ReplicaState.RUR, replica.getState());
 
       //check meta data before update

+ 16 - 15
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java

@@ -27,11 +27,12 @@ import org.junit.Test;
  */
 public class TestReplicasMap {
   private static final ReplicasMap map = new ReplicasMap();
+  private static final String bpid = "BP-TEST";
   private static final  Block block = new Block(1234, 1234, 1234);
   
   @BeforeClass
   public static void setup() {
-    map.add(new FinalizedReplica(block, null, null));
+    map.add(bpid, new FinalizedReplica(block, null, null));
   }
   
   /**
@@ -41,35 +42,35 @@ public class TestReplicasMap {
   public void testGet() {
     // Test 1: null argument throws invalid argument exception
     try {
-      map.get(null);
+      map.get(bpid, null);
       fail("Expected exception not thrown");
     } catch (IllegalArgumentException expected) { }
     
     // Test 2: successful lookup based on block
-    assertNotNull(map.get(block));
+    assertNotNull(map.get(bpid, block));
     
     // Test 3: Lookup failure - generation stamp mismatch 
     Block b = new Block(block);
     b.setGenerationStamp(0);
-    assertNull(map.get(b));
+    assertNull(map.get(bpid, b));
     
     // Test 4: Lookup failure - blockID mismatch
     b.setGenerationStamp(block.getGenerationStamp());
     b.setBlockId(0);
-    assertNull(map.get(b));
+    assertNull(map.get(bpid, b));
     
     // Test 5: successful lookup based on block ID
-    assertNotNull(map.get(block.getBlockId()));
+    assertNotNull(map.get(bpid, block.getBlockId()));
     
     // Test 6: failed lookup for invalid block ID
-    assertNull(map.get(0));
+    assertNull(map.get(bpid, 0));
   }
   
   @Test
   public void testAdd() {
     // Test 1: null argument throws invalid argument exception
     try {
-      map.add(null);
+      map.add(bpid, null);
       fail("Expected exception not thrown");
     } catch (IllegalArgumentException expected) { }
   }
@@ -78,28 +79,28 @@ public class TestReplicasMap {
   public void testRemove() {
     // Test 1: null argument throws invalid argument exception
     try {
-      map.remove(null);
+      map.remove(bpid, null);
       fail("Expected exception not thrown");
     } catch (IllegalArgumentException expected) { }
     
     // Test 2: remove failure - generation stamp mismatch 
     Block b = new Block(block);
     b.setGenerationStamp(0);
-    assertNull(map.remove(b));
+    assertNull(map.remove(bpid, b));
     
     // Test 3: remove failure - blockID mismatch
     b.setGenerationStamp(block.getGenerationStamp());
     b.setBlockId(0);
-    assertNull(map.remove(b));
+    assertNull(map.remove(bpid, b));
     
     // Test 4: remove success
-    assertNotNull(map.remove(block));
+    assertNotNull(map.remove(bpid, block));
     
     // Test 5: remove failure - invalid blockID
-    assertNull(map.remove(0));
+    assertNull(map.remove(bpid, 0));
     
     // Test 6: remove success
-    map.add(new FinalizedReplica(block, null, null));
-    assertNotNull(map.remove(block.getBlockId()));
+    map.add(bpid, new FinalizedReplica(block, null, null));
+    assertNotNull(map.remove(bpid, block.getBlockId()));
   }
 }

+ 35 - 48
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java

@@ -35,14 +35,10 @@ import org.apache.hadoop.util.DataChecksum;
 
 /**
  * this class tests the methods of the  SimulatedFSDataset.
- *
  */
-
 public class TestSimulatedFSDataset extends TestCase {
   Configuration conf = null;
-  
-  // TODO:FEDERATION initialize this
-  static String bpid;
+  static final String bpid = "BP-TEST";
   static final int NUMBLOCKS = 20;
   static final int BLOCK_LENGTH_MULTIPLIER = 79;
 
@@ -50,7 +46,6 @@ public class TestSimulatedFSDataset extends TestCase {
     super.setUp();
       conf = new HdfsConfiguration();
       conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
- 
   }
 
   protected void tearDown() throws Exception {
@@ -92,7 +87,7 @@ public class TestSimulatedFSDataset extends TestCase {
   }
 
   public void testGetMetaData() throws IOException {
-    FSDatasetInterface fsdataset = new SimulatedFSDataset(conf); 
+    FSDatasetInterface fsdataset = getSimulatedFSDataset(); 
     ExtendedBlock b = new ExtendedBlock(bpid, 1, 5, 0);
     try {
       assertFalse(fsdataset.metaFileExists(b));
@@ -113,19 +108,18 @@ public class TestSimulatedFSDataset extends TestCase {
 
 
   public void testStorageUsage() throws IOException {
-    FSDatasetInterface fsdataset = new SimulatedFSDataset(conf); 
+    FSDatasetInterface fsdataset = getSimulatedFSDataset(); 
     assertEquals(fsdataset.getDfsUsed(), 0);
     assertEquals(fsdataset.getRemaining(), fsdataset.getCapacity());
     int bytesAdded = addSomeBlocks(fsdataset);
     assertEquals(bytesAdded, fsdataset.getDfsUsed());
     assertEquals(fsdataset.getCapacity()-bytesAdded,  fsdataset.getRemaining());
-    
   }
 
 
 
-  void  checkBlockDataAndSize(FSDatasetInterface fsdataset, 
-              ExtendedBlock b, long expectedLen) throws IOException { 
+  void checkBlockDataAndSize(FSDatasetInterface fsdataset, ExtendedBlock b,
+      long expectedLen) throws IOException { 
     InputStream input = fsdataset.getBlockInputStream(b);
     long lengthRead = 0;
     int data;
@@ -137,7 +131,7 @@ public class TestSimulatedFSDataset extends TestCase {
   }
   
   public void testWriteRead() throws IOException {
-    FSDatasetInterface fsdataset = new SimulatedFSDataset(conf); 
+    FSDatasetInterface fsdataset = getSimulatedFSDataset(); 
     addSomeBlocks(fsdataset);
     for (int i=1; i <= NUMBLOCKS; ++i) {
       ExtendedBlock b = new ExtendedBlock(bpid, i, 0, 0);
@@ -147,26 +141,25 @@ public class TestSimulatedFSDataset extends TestCase {
     }
   }
 
-
-
   public void testGetBlockReport() throws IOException {
-    SimulatedFSDataset fsdataset = new SimulatedFSDataset(conf); 
-    BlockListAsLongs blockReport = fsdataset.getBlockReport();
+    SimulatedFSDataset fsdataset = getSimulatedFSDataset(); 
+    BlockListAsLongs blockReport = fsdataset.getBlockReport(bpid);
     assertEquals(0, blockReport.getNumberOfBlocks());
-    int bytesAdded = addSomeBlocks(fsdataset);
-    blockReport = fsdataset.getBlockReport();
+    addSomeBlocks(fsdataset);
+    blockReport = fsdataset.getBlockReport(bpid);
     assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
     for (Block b: blockReport) {
       assertNotNull(b);
       assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
     }
   }
+  
   public void testInjectionEmpty() throws IOException {
-    SimulatedFSDataset fsdataset = new SimulatedFSDataset(conf); 
-    BlockListAsLongs blockReport = fsdataset.getBlockReport();
+    SimulatedFSDataset fsdataset = getSimulatedFSDataset(); 
+    BlockListAsLongs blockReport = fsdataset.getBlockReport(bpid);
     assertEquals(0, blockReport.getNumberOfBlocks());
     int bytesAdded = addSomeBlocks(fsdataset);
-    blockReport = fsdataset.getBlockReport();
+    blockReport = fsdataset.getBlockReport(bpid);
     assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
     for (Block b: blockReport) {
       assertNotNull(b);
@@ -175,11 +168,9 @@ public class TestSimulatedFSDataset extends TestCase {
     
     // Inject blocks into an empty fsdataset
     //  - injecting the blocks we got above.
-  
-   
-    SimulatedFSDataset sfsdataset = new SimulatedFSDataset(conf);
+    SimulatedFSDataset sfsdataset = getSimulatedFSDataset();
     sfsdataset.injectBlocks(bpid, blockReport);
-    blockReport = sfsdataset.getBlockReport();
+    blockReport = sfsdataset.getBlockReport(bpid);
     assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
     for (Block b: blockReport) {
       assertNotNull(b);
@@ -192,12 +183,11 @@ public class TestSimulatedFSDataset extends TestCase {
   }
 
   public void testInjectionNonEmpty() throws IOException {
-    SimulatedFSDataset fsdataset = new SimulatedFSDataset(conf); 
-    
-    BlockListAsLongs blockReport = fsdataset.getBlockReport();
+    SimulatedFSDataset fsdataset = getSimulatedFSDataset(); 
+    BlockListAsLongs blockReport = fsdataset.getBlockReport(bpid);
     assertEquals(0, blockReport.getNumberOfBlocks());
     int bytesAdded = addSomeBlocks(fsdataset);
-    blockReport = fsdataset.getBlockReport();
+    blockReport = fsdataset.getBlockReport(bpid);
     assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
     for (Block b: blockReport) {
       assertNotNull(b);
@@ -207,18 +197,16 @@ public class TestSimulatedFSDataset extends TestCase {
     
     // Inject blocks into an non-empty fsdataset
     //  - injecting the blocks we got above.
-  
-   
-    SimulatedFSDataset sfsdataset = new SimulatedFSDataset(conf);
+    SimulatedFSDataset sfsdataset = getSimulatedFSDataset();
     // Add come blocks whose block ids do not conflict with
     // the ones we are going to inject.
     bytesAdded += addSomeBlocks(sfsdataset, NUMBLOCKS+1);
-    BlockListAsLongs blockReport2 = sfsdataset.getBlockReport();
+    sfsdataset.getBlockReport(bpid);
     assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
-    blockReport2 = sfsdataset.getBlockReport();
+    sfsdataset.getBlockReport(bpid);
     assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
     sfsdataset.injectBlocks(bpid, blockReport);
-    blockReport = sfsdataset.getBlockReport();
+    blockReport = sfsdataset.getBlockReport(bpid);
     assertEquals(NUMBLOCKS*2, blockReport.getNumberOfBlocks());
     for (Block b: blockReport) {
       assertNotNull(b);
@@ -229,23 +217,21 @@ public class TestSimulatedFSDataset extends TestCase {
     assertEquals(bytesAdded, sfsdataset.getDfsUsed());
     assertEquals(sfsdataset.getCapacity()-bytesAdded,  sfsdataset.getRemaining());
     
-    
     // Now test that the dataset cannot be created if it does not have sufficient cap
-
     conf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY, 10);
  
     try {
-      sfsdataset = new SimulatedFSDataset(conf);
+      sfsdataset = getSimulatedFSDataset();
+      sfsdataset.addBlockPool(bpid, conf);
       sfsdataset.injectBlocks(bpid, blockReport);
       assertTrue("Expected an IO exception", false);
     } catch (IOException e) {
       // ok - as expected
     }
-
   }
 
   public void checkInvalidBlock(ExtendedBlock b) throws IOException {
-    FSDatasetInterface fsdataset = new SimulatedFSDataset(conf); 
+    FSDatasetInterface fsdataset = getSimulatedFSDataset(); 
     assertFalse(fsdataset.isValidBlock(b));
     try {
       fsdataset.getLength(b);
@@ -267,11 +253,10 @@ public class TestSimulatedFSDataset extends TestCase {
     } catch (IOException e) {
       // ok - as expected
     }
-    
   }
   
   public void testInValidBlocks() throws IOException {
-    FSDatasetInterface fsdataset = new SimulatedFSDataset(conf); 
+    FSDatasetInterface fsdataset = getSimulatedFSDataset(); 
     ExtendedBlock b = new ExtendedBlock(bpid, 1, 5, 0);
     checkInvalidBlock(b);
     
@@ -279,29 +264,31 @@ public class TestSimulatedFSDataset extends TestCase {
     addSomeBlocks(fsdataset);
     b = new ExtendedBlock(bpid, NUMBLOCKS + 99, 5, 0);
     checkInvalidBlock(b);
-    
   }
 
   public void testInvalidate() throws IOException {
-    FSDatasetInterface fsdataset = new SimulatedFSDataset(conf); 
+    FSDatasetInterface fsdataset = getSimulatedFSDataset(); 
     int bytesAdded = addSomeBlocks(fsdataset);
     Block[] deleteBlocks = new Block[2];
     deleteBlocks[0] = new Block(1, 0, 0);
     deleteBlocks[1] = new Block(2, 0, 0);
     fsdataset.invalidate(bpid, deleteBlocks);
     checkInvalidBlock(new ExtendedBlock(bpid, deleteBlocks[0]));
-    checkInvalidBlock(new ExtendedBlock(deleteBlocks[1]));
+    checkInvalidBlock(new ExtendedBlock(bpid, deleteBlocks[1]));
     long sizeDeleted = blockIdToLen(1) + blockIdToLen(2);
     assertEquals(bytesAdded-sizeDeleted, fsdataset.getDfsUsed());
     assertEquals(fsdataset.getCapacity()-bytesAdded+sizeDeleted,  fsdataset.getRemaining());
     
-    
-    
     // Now make sure the rest of the blocks are valid
     for (int i=3; i <= NUMBLOCKS; ++i) {
       Block b = new Block(i, 0, 0);
       assertTrue(fsdataset.isValidBlock(new ExtendedBlock(bpid, b)));
     }
   }
-
+  
+  private SimulatedFSDataset getSimulatedFSDataset() throws IOException {
+    SimulatedFSDataset fsdataset = new SimulatedFSDataset(conf); 
+    fsdataset.addBlockPool(bpid, conf);
+    return fsdataset;
+  }
 }

+ 22 - 18
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java

@@ -54,7 +54,8 @@ public class TestWriteToReplica {
       FSDataset dataSet = (FSDataset)dn.data;
 
       // set up replicasMap
-      setup(dataSet);
+      String bpid = cluster.getNamesystem().getPoolId();
+      setup(bpid, dataSet);
 
       // test close
       testClose(dataSet);
@@ -73,10 +74,11 @@ public class TestWriteToReplica {
       FSDataset dataSet = (FSDataset)dn.data;
 
       // set up replicasMap
-      setup(dataSet);
+      String bpid = cluster.getNamesystem().getPoolId();
+      setup(bpid, dataSet);
 
       // test append
-      testAppend(dataSet);
+      testAppend(bpid, dataSet);
     } finally {
       cluster.shutdown();
     }
@@ -92,7 +94,8 @@ public class TestWriteToReplica {
       FSDataset dataSet = (FSDataset)dn.data;
 
       // set up replicasMap
-      setup(dataSet);
+      String bpid = cluster.getNamesystem().getPoolId();
+      setup(bpid, dataSet);
 
       // test writeToRbw
       testWriteToRbw(dataSet);
@@ -111,7 +114,8 @@ public class TestWriteToReplica {
       FSDataset dataSet = (FSDataset)dn.data;
 
       // set up replicasMap
-      setup(dataSet);
+      String bpid = cluster.getNamesystem().getPoolId();
+      setup(bpid, dataSet);
 
       // test writeToTemporary
       testWriteToTemporary(dataSet);
@@ -120,42 +124,42 @@ public class TestWriteToReplica {
     }
   }
   
-  private void setup(FSDataset dataSet) throws IOException {
+  private void setup(String bpid, FSDataset dataSet) throws IOException {
     // setup replicas map
     ReplicasMap replicasMap = dataSet.volumeMap;
     FSVolume vol = dataSet.volumes.getNextVolume(0);
     ReplicaInfo replicaInfo = new FinalizedReplica(
         blocks[FINALIZED].getLocalBlock(), vol, vol.getDir());
-    replicasMap.add(replicaInfo);
+    replicasMap.add(bpid, replicaInfo);
     replicaInfo.getBlockFile().createNewFile();
     replicaInfo.getMetaFile().createNewFile();
     
-    replicasMap.add(new ReplicaInPipeline(
+    replicasMap.add(bpid, new ReplicaInPipeline(
         blocks[TEMPORARY].getBlockId(),
         blocks[TEMPORARY].getGenerationStamp(), vol, 
-        vol.createTmpFile(blocks[TEMPORARY].getLocalBlock()).getParentFile()));
+        vol.createTmpFile(bpid, blocks[TEMPORARY].getLocalBlock()).getParentFile()));
     
     replicaInfo = new ReplicaBeingWritten(blocks[RBW].getLocalBlock(), vol, 
-        vol.createRbwFile(blocks[RBW].getLocalBlock()).getParentFile(), null);
-    replicasMap.add(replicaInfo);
+        vol.createRbwFile(bpid, blocks[RBW].getLocalBlock()).getParentFile(), null);
+    replicasMap.add(bpid, replicaInfo);
     replicaInfo.getBlockFile().createNewFile();
     replicaInfo.getMetaFile().createNewFile();
     
-    replicasMap.add(new ReplicaWaitingToBeRecovered(
-        blocks[RWR].getLocalBlock(), vol, vol.createRbwFile(
+    replicasMap.add(bpid, new ReplicaWaitingToBeRecovered(
+        blocks[RWR].getLocalBlock(), vol, vol.createRbwFile(bpid,
             blocks[RWR].getLocalBlock()).getParentFile()));
-    replicasMap.add(new ReplicaUnderRecovery(new FinalizedReplica(blocks[RUR]
+    replicasMap.add(bpid, new ReplicaUnderRecovery(new FinalizedReplica(blocks[RUR]
         .getLocalBlock(), vol, vol.getDir()), 2007));    
   }
   
-  private void testAppend(FSDataset dataSet) throws IOException {
+  private void testAppend(String bpid, FSDataset dataSet) throws IOException {
     long newGS = blocks[FINALIZED].getGenerationStamp()+1;
-    FSVolume v = dataSet.volumeMap.get(blocks[FINALIZED].getLocalBlock())
+    FSVolume v = dataSet.volumeMap.get(bpid, blocks[FINALIZED].getLocalBlock())
         .getVolume();
     long available = v.getCapacity()-v.getDfsUsed();
     long expectedLen = blocks[FINALIZED].getNumBytes();
     try {
-      v.decDfsUsed(-available);
+      v.decDfsUsed(bpid, -available);
       blocks[FINALIZED].setNumBytes(expectedLen+100);
       dataSet.append(blocks[FINALIZED], newGS, expectedLen);
       Assert.fail("Should not have space to append to an RWR replica" + blocks[RWR]);
@@ -163,7 +167,7 @@ public class TestWriteToReplica {
       Assert.assertTrue(e.getMessage().startsWith(
           "Insufficient space for appending to "));
     }
-    v.decDfsUsed(available);
+    v.decDfsUsed(bpid, available);
     blocks[FINALIZED].setNumBytes(expectedLen);
 
     newGS = blocks[RBW].getGenerationStamp()+1;

部分文件因为文件数量过多而无法显示