Browse Source

HDFS-1641.Datanode fields that are no longer used should be removed

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1074746 13f79535-47bb-0310-9956-ffa450edef68
Boris Shkolnik 14 years ago
parent
commit
0d019ac9c6

+ 2 - 0
CHANGES.txt

@@ -44,6 +44,8 @@ Trunk (unreleased changes)
     HDFS-1648. Only DataStorage must be locked using in_use.lock and no 
     HDFS-1648. Only DataStorage must be locked using in_use.lock and no 
     locks must be associated with BlockPoolStorage. (tanping via suresh)
     locks must be associated with BlockPoolStorage. (tanping via suresh)
 
 
+    HDFS-1641. Datanode fields that are no longer used should be removed (boryas)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)

+ 3 - 1
src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionSta
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum;
@@ -244,6 +245,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
   private void verifyChunks( byte[] dataBuf, int dataOff, int len, 
   private void verifyChunks( byte[] dataBuf, int dataOff, int len, 
                              byte[] checksumBuf, int checksumOff ) 
                              byte[] checksumBuf, int checksumOff ) 
                              throws IOException {
                              throws IOException {
+    DatanodeProtocol nn = datanode.getNamenode(block);
     while (len > 0) {
     while (len > 0) {
       int chunkLen = Math.min(len, bytesPerChecksum);
       int chunkLen = Math.min(len, bytesPerChecksum);
       
       
@@ -256,7 +258,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
                       srcDataNode + " to namenode");
                       srcDataNode + " to namenode");
             LocatedBlock lb = new LocatedBlock(block, 
             LocatedBlock lb = new LocatedBlock(block, 
                                             new DatanodeInfo[] {srcDataNode});
                                             new DatanodeInfo[] {srcDataNode});
-            datanode.namenode.reportBadBlocks(new LocatedBlock[] {lb});
+            nn.reportBadBlocks(new LocatedBlock[] {lb});
           } catch (IOException e) {
           } catch (IOException e) {
             LOG.warn("Failed to report bad block " + block + 
             LOG.warn("Failed to report bad block " + block + 
                       " from datanode " + srcDataNode + " to namenode");
                       " from datanode " + srcDataNode + " to namenode");

+ 5 - 14
src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java

@@ -48,9 +48,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -366,19 +364,12 @@ class DataBlockScanner implements Runnable {
   }
   }
   
   
   private void handleScanFailure(ExtendedBlock block) {
   private void handleScanFailure(ExtendedBlock block) {
-    
-    LOG.info("Reporting bad block " + block + " to namenode.");
-    
+    LOG.info("Reporting bad block " + block);
     try {
     try {
-      DatanodeInfo[] dnArr = { new DatanodeInfo(datanode.dnRegistration) };
-      LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) }; 
-      datanode.namenode.reportBadBlocks(blocks);
-    } catch (IOException e){
-      /* One common reason is that NameNode could be in safe mode.
-       * Should we keep on retrying in that case?
-       */
-      LOG.warn("Failed to report bad block " + block + " to namenode : " +
-               " Exception : " + StringUtils.stringifyException(e));
+      datanode.reportBadBlocks(block);
+    } catch (IOException ie) {
+      // it is bad, but not bad enough to shutdown the scanner
+      LOG.warn("Cannot report bad block=" + block.getBlockId());
     }
     }
   }
   }
   
   

+ 56 - 6
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -304,9 +304,6 @@ public class DataNode extends Configured
                                      conf.get("dfs.datanode.dns.nameserver","default"));
                                      conf.get("dfs.datanode.dns.nameserver","default"));
     }
     }
 
 
-    // TODO:FEDERATION this.nameNodeAddr = NameNode.getServiceAddress(conf, true);
-    // FEDDERATION this.nameNodeAddrForClient = NameNode.getAddress(conf);
-
     this.socketTimeout =  conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
     this.socketTimeout =  conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
                                       HdfsConstants.READ_TIMEOUT);
                                       HdfsConstants.READ_TIMEOUT);
     this.socketWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
     this.socketWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
@@ -455,6 +452,15 @@ public class DataNode extends Configured
     }
     }
   }
   }
 
 
+  
+  public void reportBadBlocks(ExtendedBlock block) throws IOException{
+    BPOfferService bpos = bpMapping.get(block.getPoolId());
+    if(bpos == null || bpos.bpNamenode == null) {
+      throw new IOException("cannot locate OfferService thread for bp="+block.getPoolId());
+    }
+    bpos.reportBadBlocks(block);
+  }
+  
   /**
   /**
    * A thread per namenode to perform:
    * A thread per namenode to perform:
    * <ul>
    * <ul>
@@ -598,6 +604,22 @@ public class DataNode extends Configured
       resetBlockReportTime = true; // reset future BRs for randomness
       resetBlockReportTime = true; // reset future BRs for randomness
     }
     }
 
 
+    private void reportBadBlocks(ExtendedBlock block) {
+      DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) };
+      LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) }; 
+      
+      try {
+        bpNamenode.reportBadBlocks(blocks);  
+      } catch (IOException e){
+        /* One common reason is that NameNode could be in safe mode.
+         * Should we keep on retrying in that case?
+         */
+        LOG.warn("Failed to report bad block " + block + " to namenode : " +
+                 " Exception : " + StringUtils.stringifyException(e));
+      }
+      
+    }
+    
     /**
     /**
      * Report received blocks and delete hints to the Namenode
      * Report received blocks and delete hints to the Namenode
      * @throws IOException
      * @throws IOException
@@ -1448,11 +1470,13 @@ public class DataNode extends Configured
   private void transferBlock( ExtendedBlock block, 
   private void transferBlock( ExtendedBlock block, 
                               DatanodeInfo xferTargets[] 
                               DatanodeInfo xferTargets[] 
                               ) throws IOException {
                               ) throws IOException {
+    DatanodeProtocol nn = getNamenode(block);
+    
     if (!data.isValidBlock(block)) {
     if (!data.isValidBlock(block)) {
       // block does not exist or is under-construction
       // block does not exist or is under-construction
       String errStr = "Can't send invalid block " + block;
       String errStr = "Can't send invalid block " + block;
       LOG.info(errStr);
       LOG.info(errStr);
-      namenode.errorReport(dnRegistration, 
+      nn.errorReport(dnRegistration, 
                            DatanodeProtocol.INVALID_BLOCK, 
                            DatanodeProtocol.INVALID_BLOCK, 
                            errStr);
                            errStr);
       return;
       return;
@@ -2043,10 +2067,36 @@ public class DataNode extends Configured
     syncBlock(rBlock, syncList);
     syncBlock(rBlock, syncList);
   }
   }
 
 
+  /**
+   * 
+   * @param eblock
+   * @return namenode corresponding to the Extended block
+   * @throws IOException
+   */
+  public DatanodeProtocol getNamenode(ExtendedBlock eblock) throws IOException {
+    return getNamenodeForBP(eblock.getPoolId());
+  }
+  
+  /**
+   * 
+   * @param bpid
+   * @return Namenode corresponding to the bpid
+   * @throws IOException
+   */
+  public DatanodeProtocol getNamenodeForBP(String bpid) throws IOException {
+    BPOfferService bpos = bpMapping.get(bpid);
+    if(bpos == null || bpos.bpNamenode == null) {
+      throw new IOException("cannot find a namnode proxy for bpid=" + bpid);
+    }
+    return bpos.bpNamenode;
+  }
+
   /** Block synchronization */
   /** Block synchronization */
   void syncBlock(RecoveringBlock rBlock,
   void syncBlock(RecoveringBlock rBlock,
                          List<BlockRecord> syncList) throws IOException {
                          List<BlockRecord> syncList) throws IOException {
     ExtendedBlock block = rBlock.getBlock();
     ExtendedBlock block = rBlock.getBlock();
+    DatanodeProtocol nn = getNamenode(block);
+    
     long recoveryId = rBlock.getNewGenerationStamp();
     long recoveryId = rBlock.getNewGenerationStamp();
     if (LOG.isDebugEnabled()) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
       LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
@@ -2057,7 +2107,7 @@ public class DataNode extends Configured
     // or their replicas have 0 length.
     // or their replicas have 0 length.
     // The block can be deleted.
     // The block can be deleted.
     if (syncList.isEmpty()) {
     if (syncList.isEmpty()) {
-      namenode.commitBlockSynchronization(block, recoveryId, 0,
+      nn.commitBlockSynchronization(block, recoveryId, 0,
           true, true, DatanodeID.EMPTY_ARRAY);
           true, true, DatanodeID.EMPTY_ARRAY);
       return;
       return;
     }
     }
@@ -2144,7 +2194,7 @@ public class DataNode extends Configured
 
 
     // Notify the name-node about successfully recovered replicas.
     // Notify the name-node about successfully recovered replicas.
     DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);
     DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);
-    namenode.commitBlockSynchronization(block,
+    nn.commitBlockSynchronization(block,
         newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false,
         newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false,
         nlist);
         nlist);
   }
   }

+ 1 - 1
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java

@@ -261,7 +261,7 @@ public class TestDataNodeMultipleRegistrations {
     
     
 
 
     for (BPOfferService bpos : dn.nameNodeThreads) {
     for (BPOfferService bpos : dn.nameNodeThreads) {
-      LOG.info("reg: bpid=" + "; name=" + bpos.bpRegistration.name
+      LOG.debug("reg: bpid=" + "; name=" + bpos.bpRegistration.name
           + "; sid=" + bpos.bpRegistration.storageID + "; nna=" + bpos.nn_addr);
           + "; sid=" + bpos.bpRegistration.storageID + "; nna=" + bpos.nn_addr);
     }
     }