Parcourir la source

HDFS-1697. HDFS federation: fix testBlockRecovery

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1076478 13f79535-47bb-0310-9956-ffa450edef68
Boris Shkolnik il y a 14 ans
Parent
commit
fe3d748a28

+ 1 - 0
CHANGES.txt

@@ -154,6 +154,7 @@ Trunk (unreleased changes)
     HDFS-1682. Change Balancer CLI for multiple namenodes and balancing
     HDFS-1682. Change Balancer CLI for multiple namenodes and balancing
     policy.  (szetszwo)
     policy.  (szetszwo)
 
 
+    HDFS-1697. HDFS federation: fix testBlockRecovery (boryas)
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)

+ 6 - 9
src/java/org/apache/hadoop/hdfs/BlockReader.java

@@ -317,13 +317,11 @@ public class BlockReader extends FSInputChecker {
     return bytesToRead;
     return bytesToRead;
   }
   }
   
   
-  private BlockReader( String file, long blockId, DataInputStream in, 
-                       DataChecksum checksum, boolean verifyChecksum,
-                       long startOffset, long firstChunkOffset,
-                       long bytesToRead,
-                       Socket dnSock ) {
+  private BlockReader(String file, String bpid, long blockId,
+      DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
+      long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) {
     // Path is used only for printing block and file information in debug
     // Path is used only for printing block and file information in debug
-    super(new Path("/blk_" + blockId + ":of:" + file)/*too non path-like?*/,
+    super(new Path("/blk_" + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/,
           1, verifyChecksum,
           1, verifyChecksum,
           checksum.getChecksumSize() > 0? checksum : null, 
           checksum.getChecksumSize() > 0? checksum : null, 
           checksum.getBytesPerChecksum(),
           checksum.getBytesPerChecksum(),
@@ -417,9 +415,8 @@ public class BlockReader extends FSInputChecker {
                             startOffset + " for file " + file);
                             startOffset + " for file " + file);
     }
     }
 
 
-    // TODO:FEDERATION use poolId
-    return new BlockReader(file, block.getBlockId(), in, checksum,
-        verifyChecksum, startOffset, firstChunkOffset, len, sock);
+    return new BlockReader(file, block.getBlockPoolId(), block.getBlockId(),
+        in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
   }
   }
 
 
   @Override
   @Override

+ 22 - 14
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -679,7 +679,7 @@ public class DataNode extends Configured
     }
     }
 
 
     void setNameNode(DatanodeProtocol dnProtocol) {
     void setNameNode(DatanodeProtocol dnProtocol) {
-      this.bpNamenode = dnProtocol;
+        bpNamenode = dnProtocol;
     }
     }
 
 
     private NamespaceInfo handshake() throws IOException {
     private NamespaceInfo handshake() throws IOException {
@@ -720,7 +720,6 @@ public class DataNode extends Configured
       return nsInfo;
       return nsInfo;
     }
     }
 
 
-
     void setupBP(Configuration conf, AbstractList<File> dataDirs) 
     void setupBP(Configuration conf, AbstractList<File> dataDirs) 
     throws IOException {
     throws IOException {
       // get NN proxy
       // get NN proxy
@@ -734,6 +733,11 @@ public class DataNode extends Configured
       setNamespaceInfo(nsInfo);
       setNamespaceInfo(nsInfo);
       setClusterId(nsInfo.clusterID);
       setClusterId(nsInfo.clusterID);
       
       
+      setupBPStorage();
+      initPeriodicScanners(conf);
+    }
+    
+    void setupBPStorage() throws IOException {
       StartupOption startOpt = getStartupOption(conf);
       StartupOption startOpt = getStartupOption(conf);
       assert startOpt != null : "Startup option must be set.";
       assert startOpt != null : "Startup option must be set.";
 
 
@@ -757,7 +761,6 @@ public class DataNode extends Configured
       }
       }
       initFsDataSet(conf, dataDirs);
       initFsDataSet(conf, dataDirs);
       data.addBlockPool(blockPoolId, conf);
       data.addBlockPool(blockPoolId, conf);
-      initPeriodicScanners(conf);
     }
     }
 
 
     /**
     /**
@@ -1055,9 +1058,7 @@ public class DataNode extends Configured
         try {
         try {
           // reset name to machineName. Mainly for web interface. Same for all DB
           // reset name to machineName. Mainly for web interface. Same for all DB
           bpRegistration.name = machineName + ":" + bpRegistration.getPort();
           bpRegistration.name = machineName + ":" + bpRegistration.getPort();
-          LOG.info("bpReg before =" + bpRegistration.storageInfo +           
-              ";sid=" + bpRegistration.storageID + ";name="+bpRegistration.getName());
-
+          
           bpRegistration = bpNamenode.registerDatanode(bpRegistration);
           bpRegistration = bpNamenode.registerDatanode(bpRegistration);
           // make sure we got the machine name right (same as NN sees it)
           // make sure we got the machine name right (same as NN sees it)
           String [] mNames = bpRegistration.getName().split(":");
           String [] mNames = bpRegistration.getName().split(":");
@@ -1250,7 +1251,6 @@ public class DataNode extends Configured
         processDistributedUpgradeCommand((UpgradeCommand)cmd);
         processDistributedUpgradeCommand((UpgradeCommand)cmd);
         break;
         break;
       case DatanodeProtocol.DNA_RECOVERBLOCK:
       case DatanodeProtocol.DNA_RECOVERBLOCK:
-        // TODO:FEDERATION - global storage????? or per BP storage
         recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
         recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
         break;
         break;
       case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
       case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
@@ -2123,8 +2123,7 @@ public class DataNode extends Configured
       bpos.scheduleBlockReport(delay);
       bpos.scheduleBlockReport(delay);
     }
     }
   }
   }
-  
-  
+
   /**
   /**
    * This method is used for testing. 
    * This method is used for testing. 
    * Examples are adding and deleting blocks directly.
    * Examples are adding and deleting blocks directly.
@@ -2136,7 +2135,6 @@ public class DataNode extends Configured
     return data;
     return data;
   }
   }
 
 
-
   public static void secureMain(String args[], SecureResources resources) {
   public static void secureMain(String args[], SecureResources resources) {
     try {
     try {
       StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
       StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
@@ -2159,7 +2157,7 @@ public class DataNode extends Configured
       public void run() {
       public void run() {
         for(RecoveringBlock b : blocks) {
         for(RecoveringBlock b : blocks) {
           try {
           try {
-            logRecoverBlock("NameNode", b.getBlock().getLocalBlock(), b.getLocations());
+            logRecoverBlock("NameNode", b.getBlock(), b.getLocations());
             recoverBlock(b);
             recoverBlock(b);
           } catch (IOException e) {
           } catch (IOException e) {
             LOG.warn("recoverBlocks FAILED: " + b, e);
             LOG.warn("recoverBlocks FAILED: " + b, e);
@@ -2282,8 +2280,8 @@ public class DataNode extends Configured
   }
   }
 
 
   /**
   /**
-   * 
-   * @param bpid
+   * Get namenode corresponding to a block pool
+   * @param bpid Block pool Id
    * @return Namenode corresponding to the bpid
    * @return Namenode corresponding to the bpid
    * @throws IOException
    * @throws IOException
    */
    */
@@ -2295,6 +2293,16 @@ public class DataNode extends Configured
     return bpos.bpNamenode;
     return bpos.bpNamenode;
   }
   }
 
 
+  /**
+   * To be used by tests only to set a mock namenode in BPOfferService
+   */
+  void setBPNamenode(String bpid, DatanodeProtocol namenode) {
+    BPOfferService bp = blockPoolManager.get(bpid);
+    if (bp != null) {
+      bp.setNameNode(namenode);
+    }
+  }
+
   /** Block synchronization */
   /** Block synchronization */
   void syncBlock(RecoveringBlock rBlock,
   void syncBlock(RecoveringBlock rBlock,
                          List<BlockRecord> syncList) throws IOException {
                          List<BlockRecord> syncList) throws IOException {
@@ -2404,7 +2412,7 @@ public class DataNode extends Configured
   }
   }
   
   
   private static void logRecoverBlock(String who,
   private static void logRecoverBlock(String who,
-      Block block, DatanodeID[] targets) {
+      ExtendedBlock block, DatanodeID[] targets) {
     StringBuilder msg = new StringBuilder(targets[0].getName());
     StringBuilder msg = new StringBuilder(targets[0].getName());
     for (int i = 1; i < targets.length; i++) {
     for (int i = 1; i < targets.length; i++) {
       msg.append(", " + targets[i].getName());
       msg.append(", " + targets[i].getName());

+ 27 - 0
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java

@@ -21,7 +21,10 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 
 import java.io.IOException;
 import java.io.IOException;
 
 
+import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 
 
 /**
 /**
  * Utility class for accessing package-private DataNode information during tests.
  * Utility class for accessing package-private DataNode information during tests.
@@ -37,4 +40,28 @@ public class DataNodeTestUtils {
   getDNRegistrationForBP(DataNode dn, String bpid) throws IOException {
   getDNRegistrationForBP(DataNode dn, String bpid) throws IOException {
     return dn.getDNRegistrationForBP(bpid);
     return dn.getDNRegistrationForBP(bpid);
   }
   }
+  
+  /**
+   * manually setup datanode to testing
+   * @param dn - datanode
+   * @param nsifno - namenode info
+   * @param bpid - block pool id
+   * @param nn - namenode object
+   * @throws IOException
+   */
+  public static void setBPNamenodeByIndex(DataNode dn,
+      NamespaceInfo nsifno, String bpid, DatanodeProtocol nn) 
+  throws IOException {
+    // setup the right BPOS..
+    BPOfferService [] bposs = dn.getAllBpOs();
+    if(bposs.length<0) {
+      throw new IOException("Datanode wasn't initializes with at least one NN");
+    }
+    for(BPOfferService bpos : bposs) {
+      bpos.setNamespaceInfo(nsifno);
+
+      dn.setBPNamenode(bpid, nn);
+      bpos.setupBPStorage();
+    }
+  }
 }
 }

+ 14 - 5
src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
 import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord;
 import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -70,6 +71,7 @@ public class TestBlockRecovery {
   private DataNode dn;
   private DataNode dn;
   private Configuration conf;
   private Configuration conf;
   private final static long RECOVERY_ID = 3000L;
   private final static long RECOVERY_ID = 3000L;
+  private final static String CLUSTER_ID = "testClusterID";
   private final static String POOL_ID = "BP-TEST";
   private final static String POOL_ID = "BP-TEST";
   private final static long BLOCK_ID = 1000L;
   private final static long BLOCK_ID = 1000L;
   private final static long GEN_STAMP = 2000L;
   private final static long GEN_STAMP = 2000L;
@@ -78,6 +80,9 @@ public class TestBlockRecovery {
   private final static long REPLICA_LEN2 = 5000L;
   private final static long REPLICA_LEN2 = 5000L;
   private final static ExtendedBlock block = new ExtendedBlock(POOL_ID,
   private final static ExtendedBlock block = new ExtendedBlock(POOL_ID,
       BLOCK_ID, BLOCK_LEN, GEN_STAMP);
       BLOCK_ID, BLOCK_LEN, GEN_STAMP);
+  
+  private final NamespaceInfo nsifno = 
+    new NamespaceInfo(1,CLUSTER_ID, POOL_ID, 2, 3);
 
 
   static {
   static {
     ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
     ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
@@ -100,12 +105,13 @@ public class TestBlockRecovery {
     dirs.add(dataDir);
     dirs.add(dataDir);
     DatanodeProtocol namenode = mock(DatanodeProtocol.class);
     DatanodeProtocol namenode = mock(DatanodeProtocol.class);
     when(namenode.versionRequest()).thenReturn(new NamespaceInfo
     when(namenode.versionRequest()).thenReturn(new NamespaceInfo
-        (1, "cid-test", "bpid-test", 1L, 1));
+        (1, CLUSTER_ID, POOL_ID, 1L, 1));
     when(namenode.sendHeartbeat(any(DatanodeRegistration.class), anyLong(), 
     when(namenode.sendHeartbeat(any(DatanodeRegistration.class), anyLong(), 
         anyLong(), anyLong(), anyLong(), anyInt(), anyInt())).thenReturn(
         anyLong(), anyLong(), anyLong(), anyInt(), anyInt())).thenReturn(
             new DatanodeCommand[0]);
             new DatanodeCommand[0]);
     dn = new DataNode(conf, dirs, null);
     dn = new DataNode(conf, dirs, null);
-    dn.namenodeTODO_FED = namenode; // TODO:FEDERATION - should go to a specific bpid
+    
+    DataNodeTestUtils.setBPNamenodeByIndex(dn, nsifno, POOL_ID, namenode);
   }
   }
 
 
   /**
   /**
@@ -403,7 +409,8 @@ public class TestBlockRecovery {
         initReplicaRecovery(any(RecoveringBlock.class));
         initReplicaRecovery(any(RecoveringBlock.class));
     Daemon d = spyDN.recoverBlocks(initRecoveringBlocks());
     Daemon d = spyDN.recoverBlocks(initRecoveringBlocks());
     d.join();
     d.join();
-    verify(dn.namenodeTODO_FED).commitBlockSynchronization(
+    DatanodeProtocol dnP = dn.getBPNamenode(POOL_ID);
+    verify(dnP).commitBlockSynchronization(
         block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY);
         block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY);
   }
   }
 
 
@@ -459,7 +466,8 @@ public class TestBlockRecovery {
     } catch (IOException e) {
     } catch (IOException e) {
       e.getMessage().startsWith("Cannot recover ");
       e.getMessage().startsWith("Cannot recover ");
     }
     }
-    verify(dn.namenodeTODO_FED, never()).commitBlockSynchronization(
+    DatanodeProtocol namenode = dn.getBPNamenode(POOL_ID);
+    verify(namenode, never()).commitBlockSynchronization(
         any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(),
         any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(),
         anyBoolean(), any(DatanodeID[].class));
         anyBoolean(), any(DatanodeID[].class));
   }
   }
@@ -486,7 +494,8 @@ public class TestBlockRecovery {
       } catch (IOException e) {
       } catch (IOException e) {
         e.getMessage().startsWith("Cannot recover ");
         e.getMessage().startsWith("Cannot recover ");
       }
       }
-      verify(dn.namenodeTODO_FED, never()).commitBlockSynchronization(
+      DatanodeProtocol namenode = dn.getBPNamenode(POOL_ID);
+      verify(namenode, never()).commitBlockSynchronization(
           any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(),
           any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(),
           anyBoolean(), any(DatanodeID[].class));
           anyBoolean(), any(DatanodeID[].class));
     } finally {
     } finally {