浏览代码

HDFS-1400. HDFS federation: DataTransferProtocol uses ExtendedBlockPool to include BlockPoolID in the protocol. Contributed by Suresh Srinivas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1000541 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 15 年之前
父节点
当前提交
a90991b264
共有 23 个文件被更改,包括 175 次插入121 次删除
  1. 5 1
      CHANGES.txt
  2. 18 11
      src/java/org/apache/hadoop/hdfs/BlockReader.java
  3. 1 1
      src/java/org/apache/hadoop/hdfs/DFSClient.java
  4. 2 2
      src/java/org/apache/hadoop/hdfs/DFSInputStream.java
  5. 1 1
      src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
  6. 18 18
      src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
  7. 17 0
      src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java
  8. 3 1
      src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
  9. 4 3
      src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
  10. 21 16
      src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
  11. 10 9
      src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
  12. 3 1
      src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
  13. 23 19
      src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  14. 15 11
      src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
  15. 11 5
      src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
  16. 6 0
      src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
  17. 4 4
      src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
  18. 1 2
      src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
  19. 7 6
      src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
  20. 2 7
      src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
  21. 1 1
      src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
  22. 1 1
      src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
  23. 1 1
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java

+ 5 - 1
CHANGES.txt

@@ -37,11 +37,15 @@ Trunk (unreleased changes)
 
 
     HDFS-1361. Add -fileStatus operation to NNThroughputBenchmark. (shv)
     HDFS-1361. Add -fileStatus operation to NNThroughputBenchmark. (shv)
 
 
-    HDFS-1365.HDFS federation: propose ClusterID and BlockPoolID format (tanping via boryas)
+    HDFS-1365.HDFS federation: propose ClusterID and BlockPoolID format 
+    (tanping via boryas)
 
 
     HDFS-1394. modify -format option for namenode to generated new blockpool id 
     HDFS-1394. modify -format option for namenode to generated new blockpool id 
     and accept newcluster (boryas)
     and accept newcluster (boryas)
 
 
+    HDFS-1400. HDFS federation: DataTransferProtocol uses ExtendedBlockPool to 
+    include BlockPoolID in the protocol. (suresh)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     HDFS-1096. fix for prev. commit. (boryas)
     HDFS-1096. fix for prev. commit. (boryas)

+ 18 - 11
src/java/org/apache/hadoop/hdfs/BlockReader.java

@@ -35,7 +35,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
@@ -349,7 +349,7 @@ public class BlockReader extends FSInputChecker {
   }
   }
 
 
   public static BlockReader newBlockReader(Socket sock, String file,
   public static BlockReader newBlockReader(Socket sock, String file,
-      Block block, Token<BlockTokenIdentifier> blockToken, 
+      ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, 
       long startOffset, long len, int bufferSize) throws IOException {
       long startOffset, long len, int bufferSize) throws IOException {
     return newBlockReader(sock, file, block, blockToken, startOffset, len, bufferSize,
     return newBlockReader(sock, file, block, blockToken, startOffset, len, bufferSize,
         true);
         true);
@@ -357,7 +357,7 @@ public class BlockReader extends FSInputChecker {
 
 
   /** Java Doc required */
   /** Java Doc required */
   public static BlockReader newBlockReader( Socket sock, String file, 
   public static BlockReader newBlockReader( Socket sock, String file, 
-                                     Block block, 
+                                     ExtendedBlock block, 
                                      Token<BlockTokenIdentifier> blockToken,
                                      Token<BlockTokenIdentifier> blockToken,
                                      long startOffset, long len,
                                      long startOffset, long len,
                                      int bufferSize, boolean verifyChecksum)
                                      int bufferSize, boolean verifyChecksum)
@@ -367,7 +367,7 @@ public class BlockReader extends FSInputChecker {
   }
   }
 
 
   public static BlockReader newBlockReader( Socket sock, String file,
   public static BlockReader newBlockReader( Socket sock, String file,
-                                     Block block, 
+                                     ExtendedBlock block, 
                                      Token<BlockTokenIdentifier> blockToken,
                                      Token<BlockTokenIdentifier> blockToken,
                                      long startOffset, long len,
                                      long startOffset, long len,
                                      int bufferSize, boolean verifyChecksum,
                                      int bufferSize, boolean verifyChecksum,
@@ -394,14 +394,14 @@ public class BlockReader extends FSInputChecker {
             "Got access token error for OP_READ_BLOCK, self="
             "Got access token error for OP_READ_BLOCK, self="
                 + sock.getLocalSocketAddress() + ", remote="
                 + sock.getLocalSocketAddress() + ", remote="
                 + sock.getRemoteSocketAddress() + ", for file " + file
                 + sock.getRemoteSocketAddress() + ", for file " + file
-                + ", for block " + block.getBlockId() 
-                + "_" + block.getGenerationStamp());
+                + ", for pool " + block.getPoolId() + " block " 
+                + block.getBlockId() + "_" + block.getGenerationStamp());
       } else {
       } else {
         throw new IOException("Got error for OP_READ_BLOCK, self="
         throw new IOException("Got error for OP_READ_BLOCK, self="
             + sock.getLocalSocketAddress() + ", remote="
             + sock.getLocalSocketAddress() + ", remote="
             + sock.getRemoteSocketAddress() + ", for file " + file
             + sock.getRemoteSocketAddress() + ", for file " + file
-            + ", for block " + block.getBlockId() + "_" 
-            + block.getGenerationStamp());
+            + ", for pool " + block.getPoolId() + " block " 
+            + block.getBlockId() + "_" + block.getGenerationStamp());
       }
       }
     }
     }
     DataChecksum checksum = DataChecksum.newDataChecksum( in );
     DataChecksum checksum = DataChecksum.newDataChecksum( in );
@@ -417,6 +417,7 @@ public class BlockReader extends FSInputChecker {
                             startOffset + " for file " + file);
                             startOffset + " for file " + file);
     }
     }
 
 
+    // TODO:FEDERATION use poolId
     return new BlockReader(file, block.getBlockId(), in, checksum,
     return new BlockReader(file, block.getBlockId(), in, checksum,
         verifyChecksum, startOffset, firstChunkOffset, len, sock);
         verifyChecksum, startOffset, firstChunkOffset, len, sock);
   }
   }
@@ -453,9 +454,15 @@ public class BlockReader extends FSInputChecker {
     }
     }
   }
   }
   
   
-  // File name to print when accessing a block directory from servlets
+  /**
+   * File name to print when accessing a block directly (from servlets)
+   * @param s Address of the block location
+   * @param poolId Block pool ID of the block
+   * @param blockId Block ID of the block
+   * @return string that has a file name for debug purposes
+   */
   public static String getFileName(final InetSocketAddress s,
   public static String getFileName(final InetSocketAddress s,
-      final long blockId) {
-    return s.toString() + ":" + blockId;
+      final String poolId, final long blockId) {
+    return s.toString() + ":" + poolId + ":" + blockId;
   }
   }
 }
 }

+ 1 - 1
src/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -967,7 +967,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
                 + BLOCK_CHECKSUM + ", block=" + block);
                 + BLOCK_CHECKSUM + ", block=" + block);
           }
           }
           // get block MD5
           // get block MD5
-          DataTransferProtocol.Sender.opBlockChecksum(out, block.getLocalBlock(),
+          DataTransferProtocol.Sender.opBlockChecksum(out, block,
               lb.getBlockToken());
               lb.getBlockToken());
 
 
           final DataTransferProtocol.Status reply = DataTransferProtocol.Status.read(in);
           final DataTransferProtocol.Status reply = DataTransferProtocol.Status.read(in);

+ 2 - 2
src/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -387,7 +387,7 @@ public class DFSInputStream extends FSInputStream {
         ExtendedBlock blk = targetBlock.getBlock();
         ExtendedBlock blk = targetBlock.getBlock();
         Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
         Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
         
         
-        blockReader = BlockReader.newBlockReader(s, src, blk.getLocalBlock(), 
+        blockReader = BlockReader.newBlockReader(s, src, blk,
             accessToken, 
             accessToken, 
             offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
             offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
             buffersize, verifyChecksum, dfsClient.clientName);
             buffersize, verifyChecksum, dfsClient.clientName);
@@ -629,7 +629,7 @@ public class DFSInputStream extends FSInputStream {
         int len = (int) (end - start + 1);
         int len = (int) (end - start + 1);
             
             
         reader = BlockReader.newBlockReader(dn, src, 
         reader = BlockReader.newBlockReader(dn, src, 
-                                            block.getBlock().getLocalBlock(),
+                                            block.getBlock(),
                                             blockToken,
                                             blockToken,
                                             start, len, buffersize, 
                                             start, len, buffersize, 
                                             verifyChecksum, dfsClient.clientName);
                                             verifyChecksum, dfsClient.clientName);

+ 1 - 1
src/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -892,7 +892,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
         blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
         blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
 
 
         // send the request
         // send the request
-        DataTransferProtocol.Sender.opWriteBlock(out, block.getLocalBlock(), 
+        DataTransferProtocol.Sender.opWriteBlock(out, block,
             nodes.length, recoveryFlag ? stage.getRecoveryStage() : stage, newGS, 
             nodes.length, recoveryFlag ? stage.getRecoveryStage() : stage, newGS, 
             block.getNumBytes(), bytesSent, dfsClient.clientName, null, nodes,
             block.getNumBytes(), bytesSent, dfsClient.clientName, null, nodes,
             accessToken);
             accessToken);

+ 18 - 18
src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java

@@ -46,9 +46,9 @@ public interface DataTransferProtocol {
    * when protocol changes. It is not very obvious. 
    * when protocol changes. It is not very obvious. 
    */
    */
   /*
   /*
-   * Version 19:
-   *    Change the block packet ack protocol to include seqno,
-   *    numberOfReplies, reply0, reply1, ...
+   * Version 20:
+   *    Changed the protocol methods to use ExtendedBlock instead
+   *    of Block.
    */
    */
   public static final int DATA_TRANSFER_VERSION = 19;
   public static final int DATA_TRANSFER_VERSION = 19;
 
 
@@ -229,7 +229,7 @@ public interface DataTransferProtocol {
     }
     }
 
 
     /** Send OP_READ_BLOCK */
     /** Send OP_READ_BLOCK */
-    public static void opReadBlock(DataOutputStream out, Block blk,
+    public static void opReadBlock(DataOutputStream out, ExtendedBlock blk,
         long blockOffset, long blockLen, String clientName,
         long blockOffset, long blockLen, String clientName,
         Token<BlockTokenIdentifier> blockToken)
         Token<BlockTokenIdentifier> blockToken)
         throws IOException {
         throws IOException {
@@ -244,7 +244,7 @@ public interface DataTransferProtocol {
     }
     }
     
     
     /** Send OP_WRITE_BLOCK */
     /** Send OP_WRITE_BLOCK */
-    public static void opWriteBlock(DataOutputStream out, Block blk,
+    public static void opWriteBlock(DataOutputStream out, ExtendedBlock blk,
         int pipelineSize, BlockConstructionStage stage, long newGs,
         int pipelineSize, BlockConstructionStage stage, long newGs,
         long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
         long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
         DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
         DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
@@ -273,7 +273,7 @@ public interface DataTransferProtocol {
     
     
     /** Send OP_REPLACE_BLOCK */
     /** Send OP_REPLACE_BLOCK */
     public static void opReplaceBlock(DataOutputStream out,
     public static void opReplaceBlock(DataOutputStream out,
-        Block blk, String storageId, DatanodeInfo src,
+        ExtendedBlock blk, String storageId, DatanodeInfo src,
         Token<BlockTokenIdentifier> blockToken) throws IOException {
         Token<BlockTokenIdentifier> blockToken) throws IOException {
       op(out, Op.REPLACE_BLOCK);
       op(out, Op.REPLACE_BLOCK);
 
 
@@ -285,7 +285,7 @@ public interface DataTransferProtocol {
     }
     }
 
 
     /** Send OP_COPY_BLOCK */
     /** Send OP_COPY_BLOCK */
-    public static void opCopyBlock(DataOutputStream out, Block blk,
+    public static void opCopyBlock(DataOutputStream out, ExtendedBlock blk,
         Token<BlockTokenIdentifier> blockToken)
         Token<BlockTokenIdentifier> blockToken)
         throws IOException {
         throws IOException {
       op(out, Op.COPY_BLOCK);
       op(out, Op.COPY_BLOCK);
@@ -296,7 +296,7 @@ public interface DataTransferProtocol {
     }
     }
 
 
     /** Send OP_BLOCK_CHECKSUM */
     /** Send OP_BLOCK_CHECKSUM */
-    public static void opBlockChecksum(DataOutputStream out, Block blk,
+    public static void opBlockChecksum(DataOutputStream out, ExtendedBlock blk,
         Token<BlockTokenIdentifier> blockToken)
         Token<BlockTokenIdentifier> blockToken)
         throws IOException {
         throws IOException {
       op(out, Op.BLOCK_CHECKSUM);
       op(out, Op.BLOCK_CHECKSUM);
@@ -346,7 +346,7 @@ public interface DataTransferProtocol {
 
 
     /** Receive OP_READ_BLOCK */
     /** Receive OP_READ_BLOCK */
     private void opReadBlock(DataInputStream in) throws IOException {
     private void opReadBlock(DataInputStream in) throws IOException {
-      final Block blk = new Block();
+      final ExtendedBlock blk = new ExtendedBlock();
       blk.readId(in);
       blk.readId(in);
       final long offset = in.readLong();
       final long offset = in.readLong();
       final long length = in.readLong();
       final long length = in.readLong();
@@ -359,13 +359,13 @@ public interface DataTransferProtocol {
     /**
     /**
      * Abstract OP_READ_BLOCK method. Read a block.
      * Abstract OP_READ_BLOCK method. Read a block.
      */
      */
-    protected abstract void opReadBlock(DataInputStream in, Block blk,
+    protected abstract void opReadBlock(DataInputStream in, ExtendedBlock blk,
         long offset, long length, String client,
         long offset, long length, String client,
         Token<BlockTokenIdentifier> blockToken) throws IOException;
         Token<BlockTokenIdentifier> blockToken) throws IOException;
     
     
     /** Receive OP_WRITE_BLOCK */
     /** Receive OP_WRITE_BLOCK */
     private void opWriteBlock(DataInputStream in) throws IOException {
     private void opWriteBlock(DataInputStream in) throws IOException {
-      final Block blk = new Block();
+      final ExtendedBlock blk = new ExtendedBlock();
       blk.readId(in);
       blk.readId(in);
       final int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
       final int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
       final BlockConstructionStage stage = 
       final BlockConstructionStage stage = 
@@ -394,7 +394,7 @@ public interface DataTransferProtocol {
      * Abstract OP_WRITE_BLOCK method. 
      * Abstract OP_WRITE_BLOCK method. 
      * Write a block.
      * Write a block.
      */
      */
-    protected abstract void opWriteBlock(DataInputStream in, Block blk,
+    protected abstract void opWriteBlock(DataInputStream in, ExtendedBlock blk,
         int pipelineSize, BlockConstructionStage stage, long newGs,
         int pipelineSize, BlockConstructionStage stage, long newGs,
         long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
         long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
         DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
         DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
@@ -402,7 +402,7 @@ public interface DataTransferProtocol {
 
 
     /** Receive OP_REPLACE_BLOCK */
     /** Receive OP_REPLACE_BLOCK */
     private void opReplaceBlock(DataInputStream in) throws IOException {
     private void opReplaceBlock(DataInputStream in) throws IOException {
-      final Block blk = new Block();
+      final ExtendedBlock blk = new ExtendedBlock();
       blk.readId(in);
       blk.readId(in);
       final String sourceId = Text.readString(in); // read del hint
       final String sourceId = Text.readString(in); // read del hint
       final DatanodeInfo src = DatanodeInfo.read(in); // read proxy source
       final DatanodeInfo src = DatanodeInfo.read(in); // read proxy source
@@ -416,12 +416,12 @@ public interface DataTransferProtocol {
      * It is used for balancing purpose; send to a destination
      * It is used for balancing purpose; send to a destination
      */
      */
     protected abstract void opReplaceBlock(DataInputStream in,
     protected abstract void opReplaceBlock(DataInputStream in,
-        Block blk, String sourceId, DatanodeInfo src,
+        ExtendedBlock blk, String sourceId, DatanodeInfo src,
         Token<BlockTokenIdentifier> blockToken) throws IOException;
         Token<BlockTokenIdentifier> blockToken) throws IOException;
 
 
     /** Receive OP_COPY_BLOCK */
     /** Receive OP_COPY_BLOCK */
     private void opCopyBlock(DataInputStream in) throws IOException {
     private void opCopyBlock(DataInputStream in) throws IOException {
-      final Block blk = new Block();
+      final ExtendedBlock blk = new ExtendedBlock();
       blk.readId(in);
       blk.readId(in);
       final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
       final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
 
 
@@ -432,13 +432,13 @@ public interface DataTransferProtocol {
      * Abstract OP_COPY_BLOCK method. It is used for balancing purpose; send to
      * Abstract OP_COPY_BLOCK method. It is used for balancing purpose; send to
      * a proxy source.
      * a proxy source.
      */
      */
-    protected abstract void opCopyBlock(DataInputStream in, Block blk,
+    protected abstract void opCopyBlock(DataInputStream in, ExtendedBlock blk,
         Token<BlockTokenIdentifier> blockToken)
         Token<BlockTokenIdentifier> blockToken)
         throws IOException;
         throws IOException;
 
 
     /** Receive OP_BLOCK_CHECKSUM */
     /** Receive OP_BLOCK_CHECKSUM */
     private void opBlockChecksum(DataInputStream in) throws IOException {
     private void opBlockChecksum(DataInputStream in) throws IOException {
-      final Block blk = new Block();
+      final ExtendedBlock blk = new ExtendedBlock();
       blk.readId(in);
       blk.readId(in);
       final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
       final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
 
 
@@ -450,7 +450,7 @@ public interface DataTransferProtocol {
      * Get the checksum of a block 
      * Get the checksum of a block 
      */
      */
     protected abstract void opBlockChecksum(DataInputStream in,
     protected abstract void opBlockChecksum(DataInputStream in,
-        Block blk, Token<BlockTokenIdentifier> blockToken)
+        ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken)
         throws IOException;
         throws IOException;
 
 
     /** Read an AccessToken */
     /** Read an AccessToken */

+ 17 - 0
src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java

@@ -79,6 +79,18 @@ public class ExtendedBlock implements Writable {
     block.readHelper(in);
     block.readHelper(in);
   }
   }
 
 
+  // Write only the identifier part of the block
+  public void writeId(DataOutput out) throws IOException {
+    DeprecatedUTF8.writeString(out, poolId);
+    block.writeId(out);
+  }
+
+  // Read only the identifier part of the block
+  public void readId(DataInput in) throws IOException {
+    this.poolId = DeprecatedUTF8.readString(in);
+    block.readId(in);
+  }
+  
   public String getPoolId() {
   public String getPoolId() {
     return poolId;
     return poolId;
   }
   }
@@ -110,6 +122,11 @@ public class ExtendedBlock implements Writable {
   public void setNumBytes(final long len) {
   public void setNumBytes(final long len) {
     block.setNumBytes(len);
     block.setNumBytes(len);
   }
   }
+  
+  public void set(String poolId, long blkid, long gs, long len) {
+    this.poolId = poolId;
+    block.set(blkid, gs, len);
+  }
 
 
   public static Block getLocalBlock(final ExtendedBlock b) {
   public static Block getLocalBlock(final ExtendedBlock b) {
     return b == null ? null : b.getLocalBlock();
     return b == null ? null : b.getLocalBlock();

+ 3 - 1
src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java

@@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -376,8 +377,9 @@ public class Balancer implements Tool {
             .getBlock(), EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE,
             .getBlock(), EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE,
             BlockTokenSecretManager.AccessMode.COPY));
             BlockTokenSecretManager.AccessMode.COPY));
       }
       }
+      // TODO:FEDERATION use ExtendedBlock in BalancerBlock
       DataTransferProtocol.Sender.opReplaceBlock(out,
       DataTransferProtocol.Sender.opReplaceBlock(out,
-          block.getBlock(), source.getStorageID(), 
+          new ExtendedBlock(block.getBlock()), source.getStorageID(), 
           proxySource.getDatanode(), accessToken);
           proxySource.getDatanode(), accessToken);
     }
     }
     
     

+ 4 - 3
src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java

@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -180,7 +181,7 @@ public class JspHelper {
     return chosenNode;
     return chosenNode;
   }
   }
 
 
-  public static void streamBlockInAscii(InetSocketAddress addr, 
+  public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
       long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp,
       long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp,
       long blockSize, long offsetIntoBlock, long chunkSizeToView,
       long blockSize, long offsetIntoBlock, long chunkSizeToView,
       JspWriter out, Configuration conf) throws IOException {
       JspWriter out, Configuration conf) throws IOException {
@@ -192,9 +193,9 @@ public class JspHelper {
       long amtToRead = Math.min(chunkSizeToView, blockSize - offsetIntoBlock);     
       long amtToRead = Math.min(chunkSizeToView, blockSize - offsetIntoBlock);     
       
       
       // Use the block name for file name. 
       // Use the block name for file name. 
-      String file = BlockReader.getFileName(addr, blockId);
+      String file = BlockReader.getFileName(addr, poolId, blockId);
       BlockReader blockReader = BlockReader.newBlockReader(s, file,
       BlockReader blockReader = BlockReader.newBlockReader(s, file,
-        new Block(blockId, 0, genStamp), blockToken,
+        new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
         offsetIntoBlock, amtToRead, conf.getInt("io.file.buffer.size", 4096));
         offsetIntoBlock, amtToRead, conf.getInt("io.file.buffer.size", 4096));
         
         
     byte[] buf = new byte[(int)amtToRead];
     byte[] buf = new byte[(int)amtToRead];

+ 21 - 16
src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.fs.FSOutputSummer;
 import org.apache.hadoop.fs.FSOutputSummer;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
@@ -57,7 +58,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
   public static final Log LOG = DataNode.LOG;
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
   
-  private Block block; // the block to receive
+  private ExtendedBlock block; // the block to receive
   private DataInputStream in = null; // from where data are read
   private DataInputStream in = null; // from where data are read
   private DataChecksum checksum; // from where chunks of a block can be read
   private DataChecksum checksum; // from where chunks of a block can be read
   private OutputStream out = null; // to block file at local disk
   private OutputStream out = null; // to block file at local disk
@@ -81,7 +82,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
   final private ReplicaInPipelineInterface replicaInfo;
   final private ReplicaInPipelineInterface replicaInfo;
   volatile private boolean mirrorError;
   volatile private boolean mirrorError;
 
 
-  BlockReceiver(Block block, DataInputStream in, String inAddr,
+  BlockReceiver(ExtendedBlock block, DataInputStream in, String inAddr,
                 String myAddr, BlockConstructionStage stage, 
                 String myAddr, BlockConstructionStage stage, 
                 long newGs, long minBytesRcvd, long maxBytesRcvd, 
                 long newGs, long minBytesRcvd, long maxBytesRcvd, 
                 String clientName, DatanodeInfo srcDataNode, DataNode datanode)
                 String clientName, DatanodeInfo srcDataNode, DataNode datanode)
@@ -97,29 +98,30 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
       //
       //
       // Open local disk out
       // Open local disk out
       //
       //
+      // TODO:FEDERATION use ExtendedBlock in the following method calls
       if (clientName.length() == 0) { //replication or move
       if (clientName.length() == 0) { //replication or move
-        replicaInfo = datanode.data.createTemporary(block);
+        replicaInfo = datanode.data.createTemporary(block.getLocalBlock());
       } else {
       } else {
         switch (stage) {
         switch (stage) {
         case PIPELINE_SETUP_CREATE:
         case PIPELINE_SETUP_CREATE:
-          replicaInfo = datanode.data.createRbw(block);
+          replicaInfo = datanode.data.createRbw(block.getLocalBlock());
           break;
           break;
         case PIPELINE_SETUP_STREAMING_RECOVERY:
         case PIPELINE_SETUP_STREAMING_RECOVERY:
           replicaInfo = datanode.data.recoverRbw(
           replicaInfo = datanode.data.recoverRbw(
-              block, newGs, minBytesRcvd, maxBytesRcvd);
+              block.getLocalBlock(), newGs, minBytesRcvd, maxBytesRcvd);
           block.setGenerationStamp(newGs);
           block.setGenerationStamp(newGs);
           break;
           break;
         case PIPELINE_SETUP_APPEND:
         case PIPELINE_SETUP_APPEND:
-          replicaInfo = datanode.data.append(block, newGs, minBytesRcvd);
+          replicaInfo = datanode.data.append(block.getLocalBlock(), newGs, minBytesRcvd);
           if (datanode.blockScanner != null) { // remove from block scanner
           if (datanode.blockScanner != null) { // remove from block scanner
-            datanode.blockScanner.deleteBlock(block);
+            datanode.blockScanner.deleteBlock(block.getLocalBlock());
           }
           }
           block.setGenerationStamp(newGs);
           block.setGenerationStamp(newGs);
           break;
           break;
         case PIPELINE_SETUP_APPEND_RECOVERY:
         case PIPELINE_SETUP_APPEND_RECOVERY:
-          replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
+          replicaInfo = datanode.data.recoverAppend(block.getLocalBlock(), newGs, minBytesRcvd);
           if (datanode.blockScanner != null) { // remove from block scanner
           if (datanode.blockScanner != null) { // remove from block scanner
-            datanode.blockScanner.deleteBlock(block);
+            datanode.blockScanner.deleteBlock(block.getLocalBlock());
           }
           }
           block.setGenerationStamp(newGs);
           block.setGenerationStamp(newGs);
           break;
           break;
@@ -613,9 +615,8 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
     try {
     try {
       if (clientName.length() > 0) {
       if (clientName.length() > 0) {
         responder = new Daemon(datanode.threadGroup, 
         responder = new Daemon(datanode.threadGroup, 
-                               new PacketResponder(this, block, mirrIn, 
-                                                   replyOut, numTargets,
-                                                   Thread.currentThread()));
+            new PacketResponder(this, block.getLocalBlock(), mirrIn, replyOut, 
+                                numTargets, Thread.currentThread()));
         responder.start(); // start thread to processes reponses
         responder.start(); // start thread to processes reponses
       }
       }
 
 
@@ -641,7 +642,8 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
 
 
         // Finalize the block. Does this fsync()?
         // Finalize the block. Does this fsync()?
         block.setNumBytes(replicaInfo.getNumBytes());
         block.setNumBytes(replicaInfo.getNumBytes());
-        datanode.data.finalizeBlock(block);
+        // TODO:FEDERATION use ExtendedBlock
+        datanode.data.finalizeBlock(block.getLocalBlock());
         datanode.myMetrics.blocksWritten.inc();
         datanode.myMetrics.blocksWritten.inc();
       }
       }
 
 
@@ -673,7 +675,8 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
    */
    */
   private void cleanupBlock() throws IOException {
   private void cleanupBlock() throws IOException {
     if (clientName.length() == 0) { // not client write
     if (clientName.length() == 0) { // not client write
-      datanode.data.unfinalizeBlock(block);
+      // TODO:FEDERATION use ExtendedBlock
+      datanode.data.unfinalizeBlock(block.getLocalBlock());
     }
     }
   }
   }
 
 
@@ -690,7 +693,8 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
     }
     }
 
 
     // rollback the position of the meta file
     // rollback the position of the meta file
-    datanode.data.adjustCrcChannelPosition(block, streams, checksumSize);
+    // TODO:FEDERATION use ExtendedBlock
+    datanode.data.adjustCrcChannelPosition(block.getLocalBlock(), streams, checksumSize);
   }
   }
 
 
   /**
   /**
@@ -718,7 +722,8 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
     byte[] crcbuf = new byte[checksumSize];
     byte[] crcbuf = new byte[checksumSize];
     FSDataset.BlockInputStreams instr = null;
     FSDataset.BlockInputStreams instr = null;
     try { 
     try { 
-      instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff);
+      // TODO:FEDERATION use ExtendedBlock
+      instr = datanode.data.getTmpInputStreams(block.getLocalBlock(), blkoff, ckoff);
       IOUtils.readFully(instr.dataIn, buf, 0, sizePartialChunk);
       IOUtils.readFully(instr.dataIn, buf, 0, sizePartialChunk);
 
 
       // open meta file and read in crc value computer earlier
       // open meta file and read in crc value computer earlier

+ 10 - 9
src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

@@ -31,7 +31,7 @@ import java.util.Arrays;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
@@ -46,7 +46,7 @@ class BlockSender implements java.io.Closeable, FSConstants {
   public static final Log LOG = DataNode.LOG;
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
   
-  private Block block; // the block to read from
+  private ExtendedBlock block; // the block to read from
 
 
   /** the replica to read from */
   /** the replica to read from */
   private final Replica replica;
   private final Replica replica;
@@ -80,14 +80,14 @@ class BlockSender implements java.io.Closeable, FSConstants {
   private volatile ChunkChecksum lastChunkChecksum = null;
   private volatile ChunkChecksum lastChunkChecksum = null;
 
 
   
   
-  BlockSender(Block block, long startOffset, long length,
+  BlockSender(ExtendedBlock block, long startOffset, long length,
               boolean corruptChecksumOk, boolean chunkOffsetOK,
               boolean corruptChecksumOk, boolean chunkOffsetOK,
               boolean verifyChecksum, DataNode datanode) throws IOException {
               boolean verifyChecksum, DataNode datanode) throws IOException {
     this(block, startOffset, length, corruptChecksumOk, chunkOffsetOK,
     this(block, startOffset, length, corruptChecksumOk, chunkOffsetOK,
          verifyChecksum, datanode, null);
          verifyChecksum, datanode, null);
   }
   }
 
 
-  BlockSender(Block block, long startOffset, long length,
+  BlockSender(ExtendedBlock block, long startOffset, long length,
               boolean corruptChecksumOk, boolean chunkOffsetOK,
               boolean corruptChecksumOk, boolean chunkOffsetOK,
               boolean verifyChecksum, DataNode datanode, String clientTraceFmt)
               boolean verifyChecksum, DataNode datanode, String clientTraceFmt)
       throws IOException {
       throws IOException {
@@ -145,10 +145,10 @@ class BlockSender implements java.io.Closeable, FSConstants {
       this.transferToAllowed = datanode.transferToAllowed;
       this.transferToAllowed = datanode.transferToAllowed;
       this.clientTraceFmt = clientTraceFmt;
       this.clientTraceFmt = clientTraceFmt;
 
 
-      if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) {
-        checksumIn = new DataInputStream(
-                new BufferedInputStream(datanode.data.getMetaDataInputStream(block),
-                                        BUFFER_SIZE));
+      // TODO:FEDERATION metaFileExists and getMetaDataInputStream should take ExtendedBlock
+      if ( !corruptChecksumOk || datanode.data.metaFileExists(block.getLocalBlock()) ) {
+        checksumIn = new DataInputStream(new BufferedInputStream(datanode.data
+            .getMetaDataInputStream(block.getLocalBlock()), BUFFER_SIZE));
 
 
         // read and handle the common header here. For now just a version
         // read and handle the common header here. For now just a version
        BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
        BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
@@ -230,7 +230,8 @@ class BlockSender implements java.io.Closeable, FSConstants {
         DataNode.LOG.debug("replica=" + replica);
         DataNode.LOG.debug("replica=" + replica);
       }
       }
 
 
-      blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
+      // TODO:FEDERATION getBlockInputStream must acccept ExtendedBlock
+      blockIn = datanode.data.getBlockInputStream(block.getLocalBlock(), offset); // seek to offset
     } catch (IOException ioe) {
     } catch (IOException ioe) {
       IOUtils.closeStream(this);
       IOUtils.closeStream(this);
       IOUtils.closeStream(blockIn);
       IOUtils.closeStream(blockIn);

+ 3 - 1
src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java

@@ -49,6 +49,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
@@ -435,7 +436,8 @@ class DataBlockScanner implements Runnable {
       try {
       try {
         adjustThrottler();
         adjustThrottler();
         
         
-        blockSender = new BlockSender(block, 0, -1, false, 
+        // TODO:FEDERATION use ExtendedBlock
+        blockSender = new BlockSender(new ExtendedBlock(block), 0, -1, false, 
                                                false, true, datanode);
                                                false, true, datanode);
 
 
         DataOutputStream out = 
         DataOutputStream out = 

+ 23 - 19
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -25,7 +25,6 @@ import java.io.DataOutputStream;
 import java.io.File;
 import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.OutputStream;
-import java.lang.management.ManagementFactory;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.Socket;
@@ -50,9 +49,6 @@ import java.util.Random;
 import java.util.Set;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicInteger;
 
 
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -69,7 +65,6 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -77,16 +72,17 @@ import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
 import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
 import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
@@ -96,7 +92,6 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -106,6 +101,7 @@ import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC;
@@ -121,15 +117,20 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ServicePlugin;
 import org.apache.hadoop.util.ServicePlugin;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.mortbay.util.ajax.JSON;
 import org.mortbay.util.ajax.JSON;
 
 
+import java.lang.management.ManagementFactory;  
+
+import javax.management.MBeanServer; 
+import javax.management.ObjectName;
+
 /**********************************************************
 /**********************************************************
  * DataNode is a class (and program) that stores a set of
  * DataNode is a class (and program) that stores a set of
  * blocks for a DFS deployment.  A single deployment can
  * blocks for a DFS deployment.  A single deployment can
@@ -363,7 +364,6 @@ public class DataNode extends Configured
     } else { // real storage
     } else { // real storage
       // read storage info, lock data dirs and transition fs state if necessary
       // read storage info, lock data dirs and transition fs state if necessary
       storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
       storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
-      
       // adjust
       // adjust
       this.dnRegistration.setStorageInfo(storage);
       this.dnRegistration.setStorageInfo(storage);
       // initialize data node internal structure
       // initialize data node internal structure
@@ -1143,10 +1143,11 @@ public class DataNode extends Configured
     return;
     return;
   }
   }
 
 
-  private void transferBlock( Block block, 
+  private void transferBlock( ExtendedBlock block, 
                               DatanodeInfo xferTargets[] 
                               DatanodeInfo xferTargets[] 
                               ) throws IOException {
                               ) throws IOException {
-    if (!data.isValidBlock(block)) {
+    // TODO:FEDERATION use ExtendedBlock
+    if (!data.isValidBlock(block.getLocalBlock())) {
       // block does not exist or is under-construction
       // block does not exist or is under-construction
       String errStr = "Can't send invalid block " + block;
       String errStr = "Can't send invalid block " + block;
       LOG.info(errStr);
       LOG.info(errStr);
@@ -1157,7 +1158,8 @@ public class DataNode extends Configured
     }
     }
 
 
     // Check if NN recorded length matches on-disk length 
     // Check if NN recorded length matches on-disk length 
-    long onDiskLength = data.getLength(block);
+    // TODO:FEDERATION use ExtendedBlock
+    long onDiskLength = data.getLength(block.getLocalBlock());
     if (block.getNumBytes() > onDiskLength) {
     if (block.getNumBytes() > onDiskLength) {
       // Shorter on-disk len indicates corruption so report NN the corrupt block
       // Shorter on-disk len indicates corruption so report NN the corrupt block
       namenode.reportBadBlocks(new LocatedBlock[]{
       namenode.reportBadBlocks(new LocatedBlock[]{
@@ -1190,7 +1192,8 @@ public class DataNode extends Configured
                                ) {
                                ) {
     for (int i = 0; i < blocks.length; i++) {
     for (int i = 0; i < blocks.length; i++) {
       try {
       try {
-        transferBlock(blocks[i], xferTargets[i]);
+        // TODO:FEDERATION cleanup
+        transferBlock(new ExtendedBlock(blocks[i]), xferTargets[i]);
       } catch (IOException ie) {
       } catch (IOException ie) {
         LOG.warn("Failed to transfer block " + blocks[i], ie);
         LOG.warn("Failed to transfer block " + blocks[i], ie);
       }
       }
@@ -1306,14 +1309,15 @@ public class DataNode extends Configured
    */
    */
   class DataTransfer implements Runnable {
   class DataTransfer implements Runnable {
     DatanodeInfo targets[];
     DatanodeInfo targets[];
-    Block b;
+    ExtendedBlock b;
     DataNode datanode;
     DataNode datanode;
 
 
     /**
     /**
      * Connect to the first item in the target list.  Pass along the 
      * Connect to the first item in the target list.  Pass along the 
      * entire target list, the block, and the data.
      * entire target list, the block, and the data.
      */
      */
-    public DataTransfer(DatanodeInfo targets[], Block b, DataNode datanode) throws IOException {
+    public DataTransfer(DatanodeInfo targets[], ExtendedBlock b,
+        DataNode datanode) throws IOException {
       this.targets = targets;
       this.targets = targets;
       this.b = b;
       this.b = b;
       this.datanode = datanode;
       this.datanode = datanode;
@@ -1350,8 +1354,8 @@ public class DataNode extends Configured
         //
         //
         Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
         Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
         if (isBlockTokenEnabled) {
         if (isBlockTokenEnabled) {
-          accessToken = blockTokenSecretManager.generateToken(null, b,
-          EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
+          accessToken = blockTokenSecretManager.generateToken(b, 
+              EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
         }
         }
         DataTransferProtocol.Sender.opWriteBlock(out,
         DataTransferProtocol.Sender.opWriteBlock(out,
             b, 0, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, "",
             b, 0, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, "",

+ 15 - 11
src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -34,9 +34,9 @@ import java.net.Socket;
 import java.net.SocketException;
 import java.net.SocketException;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -129,7 +129,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
    * Read a block from the disk.
    * Read a block from the disk.
    */
    */
   @Override
   @Override
-  protected void opReadBlock(DataInputStream in, Block block,
+  protected void opReadBlock(DataInputStream in, ExtendedBlock block,
       long startOffset, long length, String clientName,
       long startOffset, long length, String clientName,
       Token<BlockTokenIdentifier> blockToken) throws IOException {
       Token<BlockTokenIdentifier> blockToken) throws IOException {
     OutputStream baseStream = NetUtils.getOutputStream(s, 
     OutputStream baseStream = NetUtils.getOutputStream(s, 
@@ -182,7 +182,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
         try {
         try {
           if (DataTransferProtocol.Status.read(in) == CHECKSUM_OK
           if (DataTransferProtocol.Status.read(in) == CHECKSUM_OK
               && datanode.blockScanner != null) {
               && datanode.blockScanner != null) {
-            datanode.blockScanner.verifiedByClient(block);
+            datanode.blockScanner.verifiedByClient(block.getLocalBlock());
           }
           }
         } catch (IOException ignored) {}
         } catch (IOException ignored) {}
       }
       }
@@ -216,7 +216,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
    * Write a block to disk.
    * Write a block to disk.
    */
    */
   @Override
   @Override
-  protected void opWriteBlock(DataInputStream in, Block block, 
+  protected void opWriteBlock(DataInputStream in, ExtendedBlock block, 
       int pipelineSize, BlockConstructionStage stage,
       int pipelineSize, BlockConstructionStage stage,
       long newGs, long minBytesRcvd, long maxBytesRcvd,
       long newGs, long minBytesRcvd, long maxBytesRcvd,
       String client, DatanodeInfo srcDataNode, DatanodeInfo[] targets,
       String client, DatanodeInfo srcDataNode, DatanodeInfo[] targets,
@@ -273,7 +273,8 @@ class DataXceiver extends DataTransferProtocol.Receiver
             stage, newGs, minBytesRcvd, maxBytesRcvd,
             stage, newGs, minBytesRcvd, maxBytesRcvd,
             client, srcDataNode, datanode);
             client, srcDataNode, datanode);
       } else {
       } else {
-        datanode.data.recoverClose(block, newGs, minBytesRcvd);
+        // TODO:FEDERATION use ExtendedBlock
+        datanode.data.recoverClose(block.getLocalBlock(), newGs, minBytesRcvd);
       }
       }
 
 
       //
       //
@@ -376,7 +377,8 @@ class DataXceiver extends DataTransferProtocol.Receiver
       // the block is finalized in the PacketResponder.
       // the block is finalized in the PacketResponder.
       if (client.length() == 0 || 
       if (client.length() == 0 || 
           stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
           stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
-        datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
+        // TODO:FEDERATION use ExtendedBlock
+        datanode.closeBlock(block.getLocalBlock(), DataNode.EMPTY_DEL_HINT);
         LOG.info("Received block " + block + 
         LOG.info("Received block " + block + 
                  " src: " + remoteAddress +
                  " src: " + remoteAddress +
                  " dest: " + localAddress +
                  " dest: " + localAddress +
@@ -406,7 +408,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
    * Get block checksum (MD5 of CRC32).
    * Get block checksum (MD5 of CRC32).
    */
    */
   @Override
   @Override
-  protected void opBlockChecksum(DataInputStream in, Block block,
+  protected void opBlockChecksum(DataInputStream in, ExtendedBlock block,
       Token<BlockTokenIdentifier> blockToken) throws IOException {
       Token<BlockTokenIdentifier> blockToken) throws IOException {
     DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
     DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
         datanode.socketWriteTimeout));
         datanode.socketWriteTimeout));
@@ -429,8 +431,9 @@ class DataXceiver extends DataTransferProtocol.Receiver
       }
       }
     }
     }
 
 
+    // TODO:FEDERATION use ExtendedBlock
     final MetaDataInputStream metadataIn = 
     final MetaDataInputStream metadataIn = 
-      datanode.data.getMetaDataInputStream(block);
+      datanode.data.getMetaDataInputStream(block.getLocalBlock());
     final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(
     final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(
         metadataIn, BUFFER_SIZE));
         metadataIn, BUFFER_SIZE));
 
 
@@ -470,7 +473,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
    * Read a block from the disk and then sends it to a destination.
    * Read a block from the disk and then sends it to a destination.
    */
    */
   @Override
   @Override
-  protected void opCopyBlock(DataInputStream in, Block block,
+  protected void opCopyBlock(DataInputStream in, ExtendedBlock block,
       Token<BlockTokenIdentifier> blockToken) throws IOException {
       Token<BlockTokenIdentifier> blockToken) throws IOException {
     // Read in the header
     // Read in the header
     if (datanode.isBlockTokenEnabled) {
     if (datanode.isBlockTokenEnabled) {
@@ -545,7 +548,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
    */
    */
   @Override
   @Override
   protected void opReplaceBlock(DataInputStream in,
   protected void opReplaceBlock(DataInputStream in,
-      Block block, String sourceID, DatanodeInfo proxySource,
+      ExtendedBlock block, String sourceID, DatanodeInfo proxySource,
       Token<BlockTokenIdentifier> blockToken) throws IOException {
       Token<BlockTokenIdentifier> blockToken) throws IOException {
     /* read header */
     /* read header */
     block.setNumBytes(dataXceiverServer.estimateBlockSize);
     block.setNumBytes(dataXceiverServer.estimateBlockSize);
@@ -616,7 +619,8 @@ class DataXceiver extends DataTransferProtocol.Receiver
           dataXceiverServer.balanceThrottler, -1);
           dataXceiverServer.balanceThrottler, -1);
                     
                     
       // notify name node
       // notify name node
-      datanode.notifyNamenodeReceivedBlock(block, sourceID);
+      // TODO:FEDERATION use ExtendedBlock
+      datanode.notifyNamenodeReceivedBlock(block.getLocalBlock(), sourceID);
 
 
       LOG.info("Moved block " + block + 
       LOG.info("Moved block " + block + 
           " from " + s.getRemoteSocketAddress());
           " from " + s.getRemoteSocketAddress());

+ 11 - 5
src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java

@@ -246,13 +246,12 @@ public class DatanodeJspHelper {
       return;
       return;
     }
     }
 
 
-    String blockSizeStr = req.getParameter("blockSize");
-    long blockSize = 0;
+    final String blockSizeStr = req.getParameter("blockSize");
     if (blockSizeStr == null || blockSizeStr.length() == 0) {
     if (blockSizeStr == null || blockSizeStr.length() == 0) {
       out.print("Invalid input");
       out.print("Invalid input");
       return;
       return;
     }
     }
-    blockSize = Long.parseLong(blockSizeStr);
+    long blockSize = Long.parseLong(blockSizeStr);
 
 
     final DFSClient dfs = getDFSClient(ugi, datanode.getNameNodeAddrForClient(), conf);
     final DFSClient dfs = getDFSClient(ugi, datanode.getNameNodeAddrForClient(), conf);
     List<LocatedBlock> blocks = dfs.getNamenode().getBlockLocations(filename, 0,
     List<LocatedBlock> blocks = dfs.getNamenode().getBlockLocations(filename, 0,
@@ -378,6 +377,12 @@ public class DatanodeJspHelper {
       out.print("Invalid input (blockId absent)");
       out.print("Invalid input (blockId absent)");
       return;
       return;
     }
     }
+    
+    final String poolId = req.getParameter("poolId");
+    if (poolId == null) {
+      out.print("Invalid input (poolId absent)");
+      return;
+    }
 
 
     final DFSClient dfs = getDFSClient(ugi, datanode.getNameNodeAddrForClient(), conf);
     final DFSClient dfs = getDFSClient(ugi, datanode.getNameNodeAddrForClient(), conf);
 
 
@@ -559,7 +564,7 @@ public class DatanodeJspHelper {
     out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
     out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
     try {
     try {
       JspHelper.streamBlockInAscii(new InetSocketAddress(req.getServerName(),
       JspHelper.streamBlockInAscii(new InetSocketAddress(req.getServerName(),
-          datanodePort), blockId, blockToken, genStamp, blockSize,
+          datanodePort), poolId, blockId, blockToken, genStamp, blockSize,
           startOffset, chunkSizeToView, out, conf);
           startOffset, chunkSizeToView, out, conf);
     } catch (Exception e) {
     } catch (Exception e) {
       out.print(e);
       out.print(e);
@@ -626,6 +631,7 @@ public class DatanodeJspHelper {
       return;
       return;
     }
     }
     LocatedBlock lastBlk = blocks.get(blocks.size() - 1);
     LocatedBlock lastBlk = blocks.get(blocks.size() - 1);
+    String poolId = lastBlk.getBlock().getPoolId();
     long blockSize = lastBlk.getBlock().getNumBytes();
     long blockSize = lastBlk.getBlock().getNumBytes();
     long blockId = lastBlk.getBlock().getBlockId();
     long blockId = lastBlk.getBlock().getBlockId();
     Token<BlockTokenIdentifier> accessToken = lastBlk.getBlockToken();
     Token<BlockTokenIdentifier> accessToken = lastBlk.getBlockToken();
@@ -644,7 +650,7 @@ public class DatanodeJspHelper {
         - chunkSizeToView : 0;
         - chunkSizeToView : 0;
 
 
     out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
     out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
-    JspHelper.streamBlockInAscii(addr, blockId, accessToken, genStamp,
+    JspHelper.streamBlockInAscii(addr, poolId, blockId, accessToken, genStamp,
         blockSize, startOffset, chunkSizeToView, out, conf);
         blockSize, startOffset, chunkSizeToView, out, conf);
     out.print("</textarea>");
     out.print("</textarea>");
     dfs.close();
     dfs.close();

+ 6 - 0
src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java

@@ -22,6 +22,7 @@ package org.apache.hadoop.hdfs.server.datanode;
 import java.io.IOException;
 import java.io.IOException;
 
 
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 
 
 /**
 /**
  * Exception indicating that DataNode does not have a replica
  * Exception indicating that DataNode does not have a replica
@@ -43,6 +44,11 @@ public class ReplicaNotFoundException extends IOException {
     super();
     super();
   }
   }
 
 
+  ReplicaNotFoundException(ExtendedBlock b) {
+    super("Replica not found for " + b);
+  }
+  
+  // TODO:FEDERATION remove this later
   ReplicaNotFoundException(Block b) {
   ReplicaNotFoundException(Block b) {
     super("Replica not found for " + b);
     super("Replica not found for " + b);
   }
   }

+ 4 - 4
src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java

@@ -502,10 +502,10 @@ public class NamenodeFsck {
         s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
         s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
         s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
         s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
         
         
-        String file = BlockReader.getFileName(targetAddr, block.getBlockId());
-        blockReader = BlockReader.newBlockReader(s, file,
-            block.getLocalBlock(), lblock.getBlockToken(), 0, -1, conf.getInt(
-                "io.file.buffer.size", 4096));
+        String file = BlockReader.getFileName(targetAddr, block.getPoolId(),
+            block.getBlockId());
+        blockReader = BlockReader.newBlockReader(s, file, block, lblock
+            .getBlockToken(), 0, -1, conf.getInt("io.file.buffer.size", 4096));
         
         
       }  catch (IOException ex) {
       }  catch (IOException ex) {
         // Put chosen node into dead list, continue
         // Put chosen node into dead list, continue

+ 1 - 2
src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java

@@ -26,7 +26,6 @@ import java.util.List;
 import java.io.IOException;
 import java.io.IOException;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -83,7 +82,7 @@ public class TestClientBlockVerification {
     int offset, int lenToRead) throws IOException {
     int offset, int lenToRead) throws IOException {
     InetSocketAddress targetAddr = null;
     InetSocketAddress targetAddr = null;
     Socket s = null;
     Socket s = null;
-    Block block = testBlock.getBlock().getLocalBlock();
+    ExtendedBlock block = testBlock.getBlock();
     DatanodeInfo[] nodes = testBlock.getLocations();
     DatanodeInfo[] nodes = testBlock.getLocations();
     targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
     targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
     s = new Socket();
     s = new Socket();

+ 7 - 6
src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java

@@ -172,7 +172,7 @@ public class TestDataTransferProtocol extends TestCase {
       String description, Boolean eofExcepted) throws IOException {
       String description, Boolean eofExcepted) throws IOException {
     sendBuf.reset();
     sendBuf.reset();
     recvBuf.reset();
     recvBuf.reset();
-    DataTransferProtocol.Sender.opWriteBlock(sendOut, block.getLocalBlock(), 0,
+    DataTransferProtocol.Sender.opWriteBlock(sendOut, block, 0,
         stage, newGS, block.getNumBytes(), block.getNumBytes(), "cl", null,
         stage, newGS, block.getNumBytes(), block.getNumBytes(), "cl", null,
         new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
         new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
     if (eofExcepted) {
     if (eofExcepted) {
@@ -338,7 +338,8 @@ public class TestDataTransferProtocol extends TestCase {
     createFile(fileSys, file, fileLen);
     createFile(fileSys, file, fileLen);
 
 
     // get the first blockid for the file
     // get the first blockid for the file
-    ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+    final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+    final String poolId = firstBlock.getPoolId();
     long newBlockId = firstBlock.getBlockId() + 1;
     long newBlockId = firstBlock.getBlockId() + 1;
 
 
     recvBuf.reset();
     recvBuf.reset();
@@ -358,7 +359,7 @@ public class TestDataTransferProtocol extends TestCase {
     /* Test OP_WRITE_BLOCK */
     /* Test OP_WRITE_BLOCK */
     sendBuf.reset();
     sendBuf.reset();
     DataTransferProtocol.Sender.opWriteBlock(sendOut, 
     DataTransferProtocol.Sender.opWriteBlock(sendOut, 
-        new Block(newBlockId), 0,
+        new ExtendedBlock(poolId, newBlockId), 0,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
         new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
         new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
@@ -372,7 +373,7 @@ public class TestDataTransferProtocol extends TestCase {
     sendBuf.reset();
     sendBuf.reset();
     recvBuf.reset();
     recvBuf.reset();
     DataTransferProtocol.Sender.opWriteBlock(sendOut,
     DataTransferProtocol.Sender.opWriteBlock(sendOut,
-        new Block(++newBlockId), 0,
+        new ExtendedBlock(poolId, ++newBlockId), 0,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
         new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
         new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
@@ -396,7 +397,7 @@ public class TestDataTransferProtocol extends TestCase {
     sendBuf.reset();
     sendBuf.reset();
     recvBuf.reset();
     recvBuf.reset();
     DataTransferProtocol.Sender.opWriteBlock(sendOut, 
     DataTransferProtocol.Sender.opWriteBlock(sendOut, 
-        new Block(++newBlockId), 0,
+        new ExtendedBlock(poolId, ++newBlockId), 0,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
         new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
         new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
@@ -419,7 +420,7 @@ public class TestDataTransferProtocol extends TestCase {
     
     
     /* Test OP_READ_BLOCK */
     /* Test OP_READ_BLOCK */
 
 
-    Block blk = new Block(firstBlock.getLocalBlock());
+    ExtendedBlock blk = new ExtendedBlock(firstBlock.getLocalBlock());
     long blkid = blk.getBlockId();
     long blkid = blk.getBlockId();
     // bad block id
     // bad block id
     sendBuf.reset();
     sendBuf.reset();

+ 2 - 7
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java

@@ -230,13 +230,8 @@ public class TestBlockReplacement extends TestCase {
     sock.setKeepAlive(true);
     sock.setKeepAlive(true);
     // sendRequest
     // sendRequest
     DataOutputStream out = new DataOutputStream(sock.getOutputStream());
     DataOutputStream out = new DataOutputStream(sock.getOutputStream());
-    out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
-    REPLACE_BLOCK.write(out);
-    out.writeLong(block.getBlockId());
-    out.writeLong(block.getGenerationStamp());
-    Text.writeString(out, source.getStorageID());
-    sourceProxy.write(out);
-    BlockTokenSecretManager.DUMMY_TOKEN.write(out);
+    DataTransferProtocol.Sender.opReplaceBlock(out, block, source
+        .getStorageID(), sourceProxy, BlockTokenSecretManager.DUMMY_TOKEN);
     out.flush();
     out.flush();
     // receiveResponse
     // receiveResponse
     DataInputStream reply = new DataInputStream(sock.getInputStream());
     DataInputStream reply = new DataInputStream(sock.getInputStream());

+ 1 - 1
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

@@ -250,7 +250,7 @@ public class TestDataNodeVolumeFailure extends TestCase{
 
 
     String file = BlockReader.getFileName(targetAddr, block.getBlockId());
     String file = BlockReader.getFileName(targetAddr, block.getBlockId());
     BlockReader blockReader = 
     BlockReader blockReader = 
-      BlockReader.newBlockReader(s, file, block.getLocalBlock(), lblock
+      BlockReader.newBlockReader(s, file, block, lblock
         .getBlockToken(), 0, -1, 4096);
         .getBlockToken(), 0, -1, 4096);
 
 
     // nothing - if it fails - it will throw and exception
     // nothing - if it fails - it will throw and exception

+ 1 - 1
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java

@@ -117,7 +117,7 @@ public class TestDiskError extends TestCase {
       DataOutputStream out = new DataOutputStream(
       DataOutputStream out = new DataOutputStream(
           s.getOutputStream());
           s.getOutputStream());
 
 
-      Sender.opWriteBlock(out, block.getBlock().getLocalBlock(), 1, 
+      Sender.opWriteBlock(out, block.getBlock(), 1, 
           BlockConstructionStage.PIPELINE_SETUP_CREATE, 
           BlockConstructionStage.PIPELINE_SETUP_CREATE, 
           0L, 0L, 0L, "", null, new DatanodeInfo[0], 
           0L, 0L, 0L, "", null, new DatanodeInfo[0], 
           BlockTokenSecretManager.DUMMY_TOKEN);
           BlockTokenSecretManager.DUMMY_TOKEN);

+ 1 - 1
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java

@@ -131,7 +131,7 @@ public class TestBlockTokenWithDFS extends TestCase {
       s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
       s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
 
 
       String file = BlockReader.getFileName(targetAddr, block.getBlockId());
       String file = BlockReader.getFileName(targetAddr, block.getBlockId());
-      blockReader = BlockReader.newBlockReader(s, file, block.getLocalBlock(), 
+      blockReader = BlockReader.newBlockReader(s, file, block, 
           lblock.getBlockToken(), 0, -1, 
           lblock.getBlockToken(), 0, -1, 
           conf.getInt("io.file.buffer.size", 4096));
           conf.getInt("io.file.buffer.size", 4096));