Jelajahi Sumber

HDFS-2878. Fix TestBlockRecovery and move it back into main test directory. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1242995 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon 13 tahun lalu
induk
melakukan
8be9441b9b

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -129,6 +129,9 @@ Trunk (unreleased changes)
     HDFS-2486. Remove unnecessary priority level checks in
     UnderReplicatedBlocks.  (Uma Maheswara Rao G via szetszwo)
 
+    HDFS-2878. Fix TestBlockRecovery and move it back into main test directory.
+    (todd)
+
   OPTIMIZATIONS
     HDFS-2477. Optimize computing the diff between a block report and the
     namenode state. (Tomasz Nykiel via hairong)

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java

@@ -229,8 +229,7 @@ class BPOfferService implements Runnable {
 
   private void connectToNNAndHandshake() throws IOException {
     // get NN proxy
-    bpNamenode = new DatanodeProtocolClientSideTranslatorPB(nnAddr,
-        dn.getConf());
+    bpNamenode = dn.connectToNN(nnAddr);
 
     // First phase of the handshake with NN - get the namespace
     // info.

+ 12 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -1015,6 +1015,14 @@ public class DataNode extends Configured
            SocketChannel.open().socket() : new Socket();                                   
   }
 
+  /**
+   * Connect to the NN. This is separated out for easier testing.
+   */
+  DatanodeProtocolClientSideTranslatorPB connectToNN(
+      InetSocketAddress nnAddr) throws IOException {
+    return new DatanodeProtocolClientSideTranslatorPB(nnAddr, conf);
+  }
+
   public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
       DatanodeID datanodeid, final Configuration conf, final int socketTimeout)
     throws IOException {
@@ -1982,8 +1990,10 @@ public class DataNode extends Configured
   public DatanodeProtocolClientSideTranslatorPB getBPNamenode(String bpid)
       throws IOException {
     BPOfferService bpos = blockPoolManager.get(bpid);
-    if(bpos == null || bpos.bpNamenode == null) {
-      throw new IOException("cannot find a namnode proxy for bpid=" + bpid);
+    if (bpos == null) {
+      throw new IOException("No block pool offer service for bpid=" + bpid);
+    } else if (bpos.bpNamenode == null) {
+      throw new IOException("cannot find a namenode proxy for bpid=" + bpid);
     }
     return bpos.bpNamenode;
   }
@@ -2325,5 +2335,4 @@ public class DataNode extends Configured
   boolean shouldRun() {
     return shouldRun;
   }
-
 }

+ 56 - 12
hadoop-hdfs-project/hadoop-hdfs/src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java → hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java

@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord;
@@ -39,23 +41,30 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.*;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -72,6 +81,8 @@ public class TestBlockRecovery {
   private final static long RECOVERY_ID = 3000L;
   private final static String CLUSTER_ID = "testClusterID";
   private final static String POOL_ID = "BP-TEST";
+  private final static InetSocketAddress NN_ADDR = new InetSocketAddress(
+      "localhost", 5020);
   private final static long BLOCK_ID = 1000L;
   private final static long GEN_STAMP = 2000L;
   private final static long BLOCK_LEN = 3000L;
@@ -80,9 +91,6 @@ public class TestBlockRecovery {
   private final static ExtendedBlock block = new ExtendedBlock(POOL_ID,
       BLOCK_ID, BLOCK_LEN, GEN_STAMP);
   
-  private final NamespaceInfo nsifno = 
-    new NamespaceInfo(1,CLUSTER_ID, POOL_ID, 2, 3);
-
   static {
     ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
     ((Log4JLogger)LOG).getLogger().setLevel(Level.ALL);
@@ -99,21 +107,54 @@ public class TestBlockRecovery {
     conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
     conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
     conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
-    FileSystem.setDefaultUri(conf, "hdfs://localhost:5020");
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+    FileSystem.setDefaultUri(conf,
+        "hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort());
     ArrayList<File> dirs = new ArrayList<File>();
     File dataDir = new File(DATA_DIR);
     FileUtil.fullyDelete(dataDir);
     dataDir.mkdirs();
     dirs.add(dataDir);
-    DatanodeProtocol namenode = mock(DatanodeProtocol.class);
+    final DatanodeProtocolClientSideTranslatorPB namenode =
+      mock(DatanodeProtocolClientSideTranslatorPB.class);
+
+    Mockito.doAnswer(new Answer<DatanodeRegistration>() {
+      @Override
+      public DatanodeRegistration answer(InvocationOnMock invocation)
+          throws Throwable {
+        return (DatanodeRegistration) invocation.getArguments()[0];
+      }
+    }).when(namenode).registerDatanode(
+        Mockito.any(DatanodeRegistration.class),
+        Mockito.any(DatanodeStorage[].class));
+
     when(namenode.versionRequest()).thenReturn(new NamespaceInfo
         (1, CLUSTER_ID, POOL_ID, 1L, 1));
-    when(namenode.sendHeartbeat(any(DatanodeRegistration.class), anyLong(), 
-        anyLong(), anyLong(), anyLong(), anyInt(), anyInt(), anyInt()))
+
+    when(namenode.sendHeartbeat(
+            Mockito.any(DatanodeRegistration.class),
+            Mockito.any(StorageReport[].class),
+            Mockito.anyInt(),
+            Mockito.anyInt(),
+            Mockito.anyInt()))
         .thenReturn(new DatanodeCommand[0]);
-    dn = new DataNode(conf, dirs, null);
-    
-    DataNodeTestUtils.setBPNamenodeByIndex(dn, nsifno, POOL_ID, namenode);
+
+    dn = new DataNode(conf, dirs, null) {
+      @Override
+      DatanodeProtocolClientSideTranslatorPB connectToNN(
+          InetSocketAddress nnAddr) throws IOException {
+        Assert.assertEquals(NN_ADDR, nnAddr);
+        return namenode;
+      }
+    };
+    dn.runDatanodeDaemon();
+    while (!dn.isDatanodeFullyStarted()) {
+      try {
+        Thread.sleep(50);
+      } catch (InterruptedException e) {
+        fail("Interrupted starting DN");
+      }
+    }
   }
 
   /**
@@ -355,9 +396,11 @@ public class TestBlockRecovery {
 
   private Collection<RecoveringBlock> initRecoveringBlocks() throws IOException {
     Collection<RecoveringBlock> blocks = new ArrayList<RecoveringBlock>(1);
+    DatanodeInfo mockOtherDN = new DatanodeInfo(
+        new DatanodeID("127.0.0.1", "storage-1234", 0, 0));
     DatanodeInfo[] locs = new DatanodeInfo[] {
         new DatanodeInfo(dn.getDNRegistrationForBP(block.getBlockPoolId())),
-        mock(DatanodeInfo.class) };
+        mockOtherDN };
     RecoveringBlock rBlock = new RecoveringBlock(block, locs, RECOVERY_ID);
     blocks.add(rBlock);
     return blocks;
@@ -495,7 +538,8 @@ public class TestBlockRecovery {
     ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(block);
     BlockWriteStreams streams = null;
     try {
-      streams = replicaInfo.createStreams(true, 0, 0);
+      streams = replicaInfo.createStreams(true,
+          DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512));
       streams.checksumOut.write('a');
       dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1));
       try {