소스 검색

HDFS-1642.HDFS Federation: add Datanode.getDNRegistration(String bpid) method

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1074754 13f79535-47bb-0310-9956-ffa450edef68
Boris Shkolnik 14 년 전
부모
커밋
7c9d060c1f

+ 3 - 0
CHANGES.txt

@@ -46,6 +46,9 @@ Trunk (unreleased changes)
 
     HDFS-1641. Datanode fields that are no longer used should be removed (boryas)
 
+    HDFS-1642. HDFS Federation: add Datanode.getDNRegistration(String bpid) 
+    method  (boryas)
+
   IMPROVEMENTS
 
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)

+ 6 - 2
src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
@@ -226,7 +227,8 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
    * affect this datanode unless it is caused by interruption.
    */
   private void handleMirrorOutError(IOException ioe) throws IOException {
-    LOG.info(datanode.dnRegistration + ":Exception writing block " +
+    String bpid = block.getPoolId();
+    LOG.info(datanode.getDNRegistrationForBP(bpid) + ":Exception writing block " +
              block + " to mirror " + mirrorAddr + "\n" +
              StringUtils.stringifyException(ioe));
     if (Thread.interrupted()) { // shut down if the thread is interrupted
@@ -903,10 +905,12 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
               if (ClientTraceLog.isInfoEnabled() &&
                   receiver.clientName.length() > 0) {
                 long offset = 0;
+                DatanodeRegistration dnR = 
+                  datanode.getDNRegistrationForBP(block.getPoolId());
                 ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
                       receiver.inAddr, receiver.myAddr, block.getNumBytes(),
                       "HDFS_WRITE", receiver.clientName, offset,
-                      datanode.dnRegistration.getStorageID(), block, endTime-startTime));
+                      dnR.getStorageID(), block, endTime-startTime));
               } else {
                 LOG.info("Received block " + block + 
                          " of size " + block.getNumBytes() + 

+ 2 - 1
src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

@@ -194,7 +194,8 @@ class BlockSender implements java.io.Closeable, FSConstants {
           || (length + startOffset) > endOffset) {
         String msg = " Offset " + startOffset + " and length " + length
         + " don't match block " + block + " ( blockLen " + endOffset + " )";
-        LOG.warn(datanode.dnRegistration + ":sendBlock() : " + msg);
+        LOG.warn(datanode.getDNRegistrationForBP(block.getPoolId()) +
+            ":sendBlock() : " + msg);
         throw new IOException(msg);
       }
       

+ 22 - 4
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -634,7 +634,7 @@ public class DataNode extends Configured
           if (numBlocks > 0) {
             if(numBlocks!=delHints.size()) {
               LOG.warn("Panic: receiveBlockList and delHints are not of " +
-              		"the same length" );
+              "the same length" );
             }
             //
             // Send newly-received blockids to namenode
@@ -854,11 +854,19 @@ public class DataNode extends Configured
         try {
           // reset name to machineName. Mainly for web interface. Same for all DB
           bpRegistration.name = machineName + ":" + bpRegistration.getPort();
-          LOG.info("bpReg before =" + bpRegistration.storageInfo + 
-              ";sid=" + bpRegistration.storageID);
+          LOG.info("bpReg before =" + bpRegistration.storageInfo +           
+              ";sid=" + bpRegistration.storageID + ";name="+bpRegistration.getName());
+
           bpRegistration = bpNamenode.registerDatanode(bpRegistration);
+          // make sure we got the machine name right (same as NN sees it)
+          String [] mNames = bpRegistration.getName().split(":");
+          synchronized (dnRegistration) {
+            dnRegistration.name = mNames[0] + ":" + dnRegistration.getPort();
+          }
+
           LOG.info("bpReg after =" + bpRegistration.storageInfo + 
-              ";sid=" + bpRegistration.storageID);
+              ";sid=" + bpRegistration.storageID + ";name="+bpRegistration.getName());
+
           break;
         } catch(SocketTimeoutException e) {  // namenode is busy
           LOG.info("Problem connecting to server: " + nn_addr);
@@ -1186,6 +1194,16 @@ public class DataNode extends Configured
       LOG.warn("Failed to register NameNode MXBean", e);
     }
   }
+  
+  public DatanodeRegistration getDNRegistrationForBP(String bpid) 
+  throws IOException {
+    BPOfferService bpos = bpMapping.get(bpid);
+    if(bpos==null || bpos.bpRegistration==null) {
+      throw new IOException("cannot find BPOfferService for bpid="+bpid);
+    }
+    return bpos.bpRegistration;
+  }
+  
 
   /**
    * Creates either NIO or regular depending on socketWriteTimeout.

+ 7 - 3
src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
@@ -157,12 +158,13 @@ class DataXceiver extends DataTransferProtocol.Receiver
   
     // send the block
     BlockSender blockSender = null;
+    DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block.getPoolId());
     final String clientTraceFmt =
       clientName.length() > 0 && ClientTraceLog.isInfoEnabled()
         ? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress,
             "%d", "HDFS_READ", clientName, "%d",
-            datanode.dnRegistration.getStorageID(), block, "%d")
-        : datanode.dnRegistration + " Served block " + block + " to " +
+            dnR.getStorageID(), block, "%d")
+        : dnR + " Served block " + block + " to " +
             s.getInetAddress();
     try {
       try {
@@ -243,7 +245,9 @@ class DataXceiver extends DataTransferProtocol.Receiver
         try {
           if (client.length() != 0) {
             ERROR_ACCESS_TOKEN.write(replyOut);
-            Text.writeString(replyOut, datanode.dnRegistration.getName());
+            DatanodeRegistration dnR = 
+              datanode.getDNRegistrationForBP(block.getPoolId()); 
+            Text.writeString(replyOut, dnR.getName());
             replyOut.flush();
           }
           LOG.warn("Block token verification failed, for client "

+ 1 - 0
src/java/org/apache/hadoop/hdfs/server/datanode/UpgradeManagerDatanode.java

@@ -138,6 +138,7 @@ class UpgradeManagerDatanode extends UpgradeManager {
     upgradeState = false;
     currentUpgrades = null;
     upgradeDaemon = null;
+    // TODO:FEDERATION what dnRegistration we want to use here?
     DataNode.LOG.info("\n   Distributed upgrade for DataNode " 
         + dataNode.dnRegistration.getName() 
         + " version " + getUpgradeVersion() + " to current LV " 

+ 2 - 1
src/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java

@@ -86,8 +86,9 @@ public abstract class UpgradeObjectDatanode extends UpgradeObject implements Run
             + "\n   " + getDescription() + "."
             + " Name-node version = " + nsInfo.getLayoutVersion() + ".";
     DataNode.LOG.fatal( errorMsg );
+    String bpid = nsInfo.getBlockPoolID();
     try {
-      dataNode.namenode.errorReport(dataNode.dnRegistration,
+      dataNode.namenode.errorReport(dataNode.getDNRegistrationForBP(bpid),
                                     DatanodeProtocol.NOTIFY, errorMsg);
     } catch(SocketTimeoutException e) {  // namenode is busy
       DataNode.LOG.info("Problem connecting to server: " 

+ 5 - 0
src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -1243,6 +1243,11 @@ public class NameNode implements NamenodeProtocols, FSConstants {
         xceiverCount, xmitsInProgress);
   }
 
+  /**
+   * sends block report to the corresponding namenode (for the poolId)
+   * @return DataNodeCommand from the namenode
+   * @throws IOException
+   */
   public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
                                      String poolId,
                                      long[] blocks) throws IOException {

+ 2 - 1
src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java

@@ -194,7 +194,8 @@ public class TestDataTransferProtocol extends TestCase {
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
     try {
       cluster.waitActive();
-      datanode = cluster.getDataNodes().get(0).dnRegistration;
+      String poolId = cluster.getNamesystem().getBlockpoolId(); 
+      datanode = cluster.getDataNodes().get(0).getDNRegistrationForBP(poolId);
       dnAddr = NetUtils.createSocketAddr(datanode.getName());
       FileSystem fileSys = cluster.getFileSystem();
 

+ 3 - 1
src/test/hdfs/org/apache/hadoop/hdfs/TestFileCorruption.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.log4j.Level;
 
 /**
@@ -138,8 +139,9 @@ public class TestFileCorruption extends TestCase {
       DataNode dataNode = datanodes.get(2);
       
       // report corrupted block by the third datanode
+      DatanodeRegistration dnR = dataNode.getDNRegistrationForBP(blk.getPoolId());
       cluster.getNamesystem().markBlockAsCorrupt(blk, 
-          new DatanodeInfo(dataNode.dnRegistration ));
+          new DatanodeInfo(dnR));
       
       // open the file
       fs.open(FILE_PATH);

+ 45 - 31
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.After;
@@ -133,9 +134,11 @@ public class TestBlockReport {
             b.getNumBytes());
       }
     }
-    cluster.getNameNode().blockReport(
-      cluster.getDataNodes().get(DN_N0).dnRegistration,
-      cluster.getNamesystem().getPoolId(),
+    // all blocks belong to the same file, hence same BP
+    DataNode dn = cluster.getDataNodes().get(DN_N0);
+    String poolId = cluster.getNamesystem().getPoolId();
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+    cluster.getNameNode().blockReport(dnR, poolId,
       new BlockListAsLongs(blocks, null).getBlockListAsLongs());
 
     List<LocatedBlock> blocksAfterReport =
@@ -210,9 +213,11 @@ public class TestBlockReport {
 
     waitTil(DN_RESCAN_EXTRA_WAIT);
 
-    cluster.getNameNode().blockReport(
-      cluster.getDataNodes().get(DN_N0).dnRegistration,
-      cluster.getNamesystem().getPoolId(),
+    // all blocks belong to the same file, hence same BP
+    DataNode dn = cluster.getDataNodes().get(DN_N0);
+    String poolId = cluster.getNamesystem().getPoolId();
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+    cluster.getNameNode().blockReport(dnR, poolId,
       new BlockListAsLongs(blocks, null).getBlockListAsLongs());
 
     cluster.getNamesystem().computeDatanodeWork();
@@ -246,10 +251,12 @@ public class TestBlockReport {
     blocks.get(0).setGenerationStamp(rand.nextLong());
     // This new block is unknown to NN and will be mark for deletion.
     blocks.add(new Block());
-    DatanodeCommand dnCmd =
-      cluster.getNameNode().blockReport(
-        cluster.getDataNodes().get(DN_N0).dnRegistration,
-        cluster.getNamesystem().getPoolId(),
+    
+    // all blocks belong to the same file, hence same BP
+    DataNode dn = cluster.getDataNodes().get(DN_N0);
+    String poolId = cluster.getNamesystem().getPoolId();
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+    DatanodeCommand dnCmd = cluster.getNameNode().blockReport(dnR, poolId,
         new BlockListAsLongs(blocks, null).getBlockListAsLongs());
     if(LOG.isDebugEnabled()) {
       LOG.debug("Got the command: " + dnCmd);
@@ -297,10 +304,12 @@ public class TestBlockReport {
     ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath);
     startDNandWait(filePath, true);
 
-    cluster.getNameNode().blockReport(
-      cluster.getDataNodes().get(DN_N1).dnRegistration,
-      cluster.getNamesystem().getPoolId(),
-      new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+ // all blocks belong to the same file, hence same BP
+    DataNode dn = cluster.getDataNodes().get(DN_N1);
+    String poolId = cluster.getNamesystem().getPoolId();
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+    cluster.getNameNode().blockReport(dnR, poolId,
+        new BlockListAsLongs(blocks, null).getBlockListAsLongs());
     printStats();
     assertEquals("Wrong number of PendingReplication Blocks",
       0, cluster.getNamesystem().getUnderReplicatedBlocks());
@@ -345,10 +354,12 @@ public class TestBlockReport {
       LOG.debug("BlockGS after " + blocks.get(randIndex).getGenerationStamp());
       LOG.debug("Done corrupting GS of " + corruptedBlock.getBlockName());
     }
-    cluster.getNameNode().blockReport(
-      cluster.getDataNodes().get(DN_N1).dnRegistration,
-      cluster.getNamesystem().getPoolId(),
-      new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+    // all blocks belong to the same file, hence same BP
+    DataNode dn = cluster.getDataNodes().get(DN_N1);
+    String poolId = cluster.getNamesystem().getPoolId();
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+    cluster.getNameNode().blockReport(dnR, poolId,
+        new BlockListAsLongs(blocks, null).getBlockListAsLongs());
     printStats();
     assertEquals("Wrong number of Corrupted blocks",
       1, cluster.getNamesystem().getCorruptReplicaBlocks() +
@@ -368,10 +379,9 @@ public class TestBlockReport {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Done corrupting length of " + corruptedBlock.getBlockName());
     }
-    cluster.getNameNode().blockReport(
-      cluster.getDataNodes().get(DN_N1).dnRegistration,
-      cluster.getNamesystem().getPoolId(),
-      new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+    
+    cluster.getNameNode().blockReport(dnR, poolId,
+        new BlockListAsLongs(blocks, null).getBlockListAsLongs());
     printStats();
 
     assertEquals("Wrong number of Corrupted blocks",
@@ -415,11 +425,13 @@ public class TestBlockReport {
       bc.start();
 
       waitForTempReplica(bl, DN_N1);
-
-      cluster.getNameNode().blockReport(
-        cluster.getDataNodes().get(DN_N1).dnRegistration,
-        cluster.getNamesystem().getPoolId(),
-        new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+      
+      // all blocks belong to the same file, hence same BP
+      DataNode dn = cluster.getDataNodes().get(DN_N1);
+      String poolId = cluster.getNamesystem().getPoolId();
+      DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+      cluster.getNameNode().blockReport(dnR, poolId,
+          new BlockListAsLongs(blocks, null).getBlockListAsLongs());
       printStats();
       assertEquals("Wrong number of PendingReplication blocks",
         blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks());
@@ -460,10 +472,12 @@ public class TestBlockReport {
 
       waitForTempReplica(bl, DN_N1);
                                                 
-      cluster.getNameNode().blockReport(
-        cluster.getDataNodes().get(DN_N1).dnRegistration,
-        cluster.getNamesystem().getPoolId(),
-        new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+      // all blocks belong to the same file, hence same BP
+      DataNode dn = cluster.getDataNodes().get(DN_N1);
+      String poolId = cluster.getNamesystem().getPoolId();
+      DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+      cluster.getNameNode().blockReport(dnR, poolId,
+          new BlockListAsLongs(blocks, null).getBlockListAsLongs());
       printStats();
       assertEquals("Wrong number of PendingReplication blocks",
         2, cluster.getNamesystem().getPendingReplicationBlocks());

+ 3 - 1
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.net.NetUtils;
 import org.junit.After;
@@ -119,8 +120,9 @@ public class TestDataNodeVolumeFailure extends TestCase{
     // make sure a block report is sent 
     DataNode dn = cluster.getDataNodes().get(1); //corresponds to dir data3
     String bpid = cluster.getNamesystem().getPoolId();
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid);
     long[] bReport = dn.getFSDataset().getBlockReport(bpid).getBlockListAsLongs();
-    cluster.getNameNode().blockReport(dn.dnRegistration, bpid, bReport);
+    cluster.getNameNode().blockReport(dnR, bpid, bReport);
 
     // verify number of blocks and files...
     verify(filename, filesize);

+ 4 - 3
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java

@@ -86,10 +86,11 @@ public class TestDeadDatanode {
     cluster = new MiniDFSCluster.Builder(conf).build();
     cluster.waitActive();
 
+    String poolId = cluster.getNamesystem().getPoolId();
     // wait for datanode to be marked live
     DataNode dn = cluster.getDataNodes().get(0);
-    DatanodeRegistration reg = cluster.getDataNodes().get(0)
-        .getDatanodeRegistration();
+    DatanodeRegistration reg = 
+      cluster.getDataNodes().get(0).getDNRegistrationForBP(poolId);
     waitForDatanodeState(reg.getStorageID(), true, 20000);
 
     // Shutdown and wait for datanode to be marked dead
@@ -97,7 +98,7 @@ public class TestDeadDatanode {
     waitForDatanodeState(reg.getStorageID(), false, 20000);
 
     DatanodeProtocol dnp = cluster.getNameNode();
-    String poolId = cluster.getNamesystem().getPoolId();
+    
     Block[] blocks = new Block[] { new Block(0) };
     String[] delHints = new String[] { "" };
     

+ 3 - 1
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHeartbeatHandling.java

@@ -47,7 +47,9 @@ public class TestHeartbeatHandling extends TestCase {
     try {
       cluster.waitActive();
       final FSNamesystem namesystem = cluster.getNamesystem();
-      final DatanodeRegistration nodeReg = cluster.getDataNodes().get(0).dnRegistration;
+      final String poolId = namesystem.getBlockpoolId();
+      final DatanodeRegistration nodeReg = 
+        cluster.getDataNodes().get(0).getDNRegistrationForBP(poolId);
       DatanodeDescriptor dd = namesystem.getDatanode(nodeReg);
       
       final int REMAINING_BLOCKS = 1;

+ 5 - 4
src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java

@@ -337,10 +337,10 @@ public class TestBlockRecovery {
     verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);    
   }  
 
-  private Collection<RecoveringBlock> initRecoveringBlocks() {
+  private Collection<RecoveringBlock> initRecoveringBlocks() throws IOException {
     Collection<RecoveringBlock> blocks = new ArrayList<RecoveringBlock>(1);
     DatanodeInfo[] locs = new DatanodeInfo[] {
-        new DatanodeInfo(dn.dnRegistration),
+        new DatanodeInfo(dn.getDNRegistrationForBP(block.getPoolId())),
         mock(DatanodeInfo.class) };
     RecoveringBlock rBlock = new RecoveringBlock(block, locs, RECOVERY_ID);
     blocks.add(rBlock);
@@ -407,10 +407,11 @@ public class TestBlockRecovery {
         block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY);
   }
 
-  private List<BlockRecord> initBlockRecords(DataNode spyDN) {
+  private List<BlockRecord> initBlockRecords(DataNode spyDN) throws IOException {
     List<BlockRecord> blocks = new ArrayList<BlockRecord>(1);
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(block.getPoolId());
     BlockRecord blockRecord = new BlockRecord(
-        new DatanodeID(dn.dnRegistration), spyDN,
+        new DatanodeID(dnR), spyDN,
         new ReplicaRecoveryInfo(block.getBlockId(), block.getNumBytes(),
             block.getGenerationStamp(), ReplicaState.FINALIZED));
     blocks.add(blockRecord);