浏览代码

HDFS-1643. HDFS Federation: remove namenode argument from DataNode constructor

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1074763 13f79535-47bb-0310-9956-ffa450edef68
Boris Shkolnik 14 年之前
父节点
当前提交
4565ffa500

+ 3 - 0
CHANGES.txt

@@ -49,6 +49,9 @@ Trunk (unreleased changes)
     HDFS-1642. HDFS Federation: add Datanode.getDNRegistration(String bpid) 
     method  (boryas)
 
+    HDFS-1643. HDFS Federation: remove namenode argument from DataNode 
+    constructor (boryas)
+
   IMPROVEMENTS
 
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)

+ 1 - 1
src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -247,7 +247,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
   private void verifyChunks( byte[] dataBuf, int dataOff, int len, 
                              byte[] checksumBuf, int checksumOff ) 
                              throws IOException {
-    DatanodeProtocol nn = datanode.getNamenode(block);
+    DatanodeProtocol nn = datanode.getBPNamenode(block.getPoolId());
     while (len > 0) {
       int chunkLen = Math.min(len, bytesPerChecksum);
       

+ 12 - 21
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -104,7 +104,6 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
@@ -200,7 +199,7 @@ public class DataNode extends Configured
   BPOfferService[] nameNodeThreads;
   private Map<String, BPOfferService> bpMapping = 
     new HashMap<String, BPOfferService>();
-  public DatanodeProtocol namenode = null;
+  public DatanodeProtocol namenodeTODO_FED = null; //TODO:FEDERATION needs to be taken out.
   public FSDatasetInterface data = null;
   public DatanodeRegistration dnRegistration = null;
   private String clusterId = null;
@@ -260,6 +259,7 @@ public class DataNode extends Configured
    * where they are run with privileged ports and injected from a higher
    * level of capability
    */
+  /*
   DataNode(final Configuration conf,
            final AbstractList<File> dataDirs, final SecureResources resources) throws IOException {  
     this(conf, dataDirs, (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
@@ -267,6 +267,7 @@ public class DataNode extends Configured
                        NameNode.getServiceAddress(conf, true), 
                        conf), resources);
   }
+  */
   
   /**
    * Create the DataNode given a configuration, an array of dataDirs,
@@ -274,13 +275,13 @@ public class DataNode extends Configured
    */
   DataNode(final Configuration conf, 
            final AbstractList<File> dataDirs,
-           final DatanodeProtocol namenode, final SecureResources resources) throws IOException {
+           final SecureResources resources) throws IOException {
     super(conf);
 
     DataNode.setDataNode(this);
     
     try {
-      startDataNode(conf, dataDirs, namenode, resources);
+      startDataNode(conf, dataDirs, resources);
     } catch (IOException ie) {
       shutdown();
       throw ie;
@@ -451,7 +452,6 @@ public class DataNode extends Configured
           + block.getPoolId());
     }
   }
-
   
   public void reportBadBlocks(ExtendedBlock block) throws IOException{
     BPOfferService bpos = bpMapping.get(block.getPoolId());
@@ -1042,6 +1042,7 @@ public class DataNode extends Configured
         break;
       case UpgradeCommand.UC_ACTION_START_UPGRADE:
         // start distributed upgrade here
+        datanodeObject.namenodeTODO_FED = bpNamenode; //TODO:FEDERATION - this needs to be converted.
         processDistributedUpgradeCommand((UpgradeCommand)cmd);
         break;
       case DatanodeProtocol.DNA_RECOVERBLOCK:
@@ -1074,7 +1075,8 @@ public class DataNode extends Configured
    */
   void startDataNode(Configuration conf, 
                      AbstractList<File> dataDirs,
-                     DatanodeProtocol namenode, SecureResources resources
+                    // DatanodeProtocol namenode,
+                     SecureResources resources
                      ) throws IOException {
     if(UserGroupInformation.isSecurityEnabled() && resources == null)
       throw new RuntimeException("Cannot start secure cluster without " +
@@ -1098,7 +1100,6 @@ public class DataNode extends Configured
 
     // get all the NNs configured
     nameNodeThreads = getAllNamenodes(conf);
-    this.namenode = namenode;
   }
   
   /**
@@ -1488,7 +1489,7 @@ public class DataNode extends Configured
   private void transferBlock( ExtendedBlock block, 
                               DatanodeInfo xferTargets[] 
                               ) throws IOException {
-    DatanodeProtocol nn = getNamenode(block);
+    DatanodeProtocol nn = getBPNamenode(block.getPoolId());
     
     if (!data.isValidBlock(block)) {
       // block does not exist or is under-construction
@@ -1504,7 +1505,7 @@ public class DataNode extends Configured
     long onDiskLength = data.getLength(block);
     if (block.getNumBytes() > onDiskLength) {
       // Shorter on-disk len indicates corruption so report NN the corrupt block
-      namenode.reportBadBlocks(new LocatedBlock[]{
+      nn.reportBadBlocks(new LocatedBlock[]{
           new LocatedBlock(block, new DatanodeInfo[] {
               new DatanodeInfo(dnRegistration)})});
       LOG.info("Can't replicate block " + block
@@ -2085,23 +2086,13 @@ public class DataNode extends Configured
     syncBlock(rBlock, syncList);
   }
 
-  /**
-   * 
-   * @param eblock
-   * @return namenode corresponding to the Extended block
-   * @throws IOException
-   */
-  public DatanodeProtocol getNamenode(ExtendedBlock eblock) throws IOException {
-    return getNamenodeForBP(eblock.getPoolId());
-  }
-  
   /**
    * 
    * @param bpid
    * @return Namenode corresponding to the bpid
    * @throws IOException
    */
-  public DatanodeProtocol getNamenodeForBP(String bpid) throws IOException {
+  public DatanodeProtocol getBPNamenode(String bpid) throws IOException {
     BPOfferService bpos = bpMapping.get(bpid);
     if(bpos == null || bpos.bpNamenode == null) {
       throw new IOException("cannot find a namnode proxy for bpid=" + bpid);
@@ -2113,7 +2104,7 @@ public class DataNode extends Configured
   void syncBlock(RecoveringBlock rBlock,
                          List<BlockRecord> syncList) throws IOException {
     ExtendedBlock block = rBlock.getBlock();
-    DatanodeProtocol nn = getNamenode(block);
+    DatanodeProtocol nn = getBPNamenode(block.getPoolId());
     
     long recoveryId = rBlock.getNewGenerationStamp();
     if (LOG.isDebugEnabled()) {

+ 4 - 5
src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java

@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -2170,12 +2171,10 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
 
     // Send corrupt block report outside the lock
     if (corruptBlock != null) {
-      DatanodeInfo[] dnArr = { new DatanodeInfo(datanode.dnRegistration) };
-      LocatedBlock[] blocks = { new LocatedBlock(bpid, corruptBlock, dnArr) };
+      DataNode.LOG.warn("Reporting the block " + corruptBlock
+          + " as corrupt due to length mismatch");
       try {
-        datanode.namenode.reportBadBlocks(blocks);
-        DataNode.LOG.warn("Reporting the block " + corruptBlock
-            + " as corrupt due to length mismatch");
+        datanode.reportBadBlocks(new ExtendedBlock(bpid, corruptBlock));  
       } catch (IOException e) {
         DataNode.LOG.warn("Failed to repot bad block " + corruptBlock
             + "Exception:" + StringUtils.stringifyException(e));

+ 1 - 1
src/java/org/apache/hadoop/hdfs/server/datanode/UpgradeManagerDatanode.java

@@ -89,7 +89,7 @@ class UpgradeManagerDatanode extends UpgradeManager {
           "UpgradeManagerDatanode.currentUpgrades is not null.";
         assert upgradeDaemon == null : 
           "UpgradeManagerDatanode.upgradeDaemon is not null.";
-        dataNode.namenode.processUpgradeCommand(broadcastCommand);
+        dataNode.namenodeTODO_FED.processUpgradeCommand(broadcastCommand); // TODO:FEDERATION needs to be fixed
         return true;
       }
     }

+ 2 - 1
src/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java

@@ -87,8 +87,9 @@ public abstract class UpgradeObjectDatanode extends UpgradeObject implements Run
             + " Name-node version = " + nsInfo.getLayoutVersion() + ".";
     DataNode.LOG.fatal( errorMsg );
     String bpid = nsInfo.getBlockPoolID();
+    DatanodeProtocol nn = dataNode.getBPNamenode(bpid);
     try {
-      dataNode.namenode.errorReport(dataNode.getDNRegistrationForBP(bpid),
+      nn.errorReport(dataNode.getDNRegistrationForBP(bpid),
                                     DatanodeProtocol.NOTIFY, errorMsg);
     } catch(SocketTimeoutException e) {  // namenode is busy
       DataNode.LOG.info("Problem connecting to server: " 

+ 1 - 1
src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestDistributedUpgrade.java

@@ -173,7 +173,7 @@ class UO_Datanode extends UpgradeObjectDatanode {
 
   public void doUpgrade() throws IOException {
     this.status = (short)100;
-    getDatanode().namenode.processUpgradeCommand(
+    getDatanode().namenodeTODO_FED.processUpgradeCommand(
         new UpgradeCommand(UpgradeCommand.UC_ACTION_REPORT_STATUS, 
             getVersion(), getUpgradeStatus()));
   }

+ 5 - 4
src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java

@@ -105,7 +105,8 @@ public class TestBlockRecovery {
     when(namenode.sendHeartbeat(any(DatanodeRegistration.class), anyLong(), 
         anyLong(), anyLong(), anyInt(), anyInt())).thenReturn(
             new DatanodeCommand[0]);
-    dn = new DataNode(conf, dirs, namenode, null);
+    dn = new DataNode(conf, dirs, null);
+    dn.namenodeTODO_FED = namenode; // TODO:FEDERATION - should go to a specific bpid
   }
 
   /**
@@ -403,7 +404,7 @@ public class TestBlockRecovery {
         initReplicaRecovery(any(RecoveringBlock.class));
     Daemon d = spyDN.recoverBlocks(initRecoveringBlocks());
     d.join();
-    verify(dn.namenode).commitBlockSynchronization(
+    verify(dn.namenodeTODO_FED).commitBlockSynchronization(
         block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY);
   }
 
@@ -459,7 +460,7 @@ public class TestBlockRecovery {
     } catch (IOException e) {
       e.getMessage().startsWith("Cannot recover ");
     }
-    verify(dn.namenode, never()).commitBlockSynchronization(
+    verify(dn.namenodeTODO_FED, never()).commitBlockSynchronization(
         any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(),
         anyBoolean(), any(DatanodeID[].class));
   }
@@ -486,7 +487,7 @@ public class TestBlockRecovery {
       } catch (IOException e) {
         e.getMessage().startsWith("Cannot recover ");
       }
-      verify(dn.namenode, never()).commitBlockSynchronization(
+      verify(dn.namenodeTODO_FED, never()).commitBlockSynchronization(
           any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(),
           anyBoolean(), any(DatanodeID[].class));
     } finally {