Browse Source

HDFS-11030. TestDataNodeVolumeFailure#testVolumeFailure is flaky (though passing). Contributed by Mingliang Liu

Mingliang Liu 8 years ago
parent
commit
0c49f73a6c

+ 33 - 31
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

@@ -29,6 +29,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.Socket;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
@@ -52,7 +53,6 @@ import org.apache.hadoop.hdfs.RemotePeerFactory;
 import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -66,14 +66,16 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
-import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import com.google.common.base.Supplier;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.TrueFileFilter;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
@@ -108,6 +110,7 @@ public class TestDataNodeVolumeFailure {
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, block_size);
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, block_size);
     // Allow a single volume failure (there are two volumes)
     // Allow a single volume failure (there are two volumes)
     conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
     conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dn_num).build();
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dn_num).build();
     cluster.waitActive();
     cluster.waitActive();
     fs = cluster.getFileSystem();
     fs = cluster.getFileSystem();
@@ -135,7 +138,7 @@ public class TestDataNodeVolumeFailure {
    * and that we can replicate to both datanodes even after a single volume
    * and that we can replicate to both datanodes even after a single volume
    * failure if the configuration parameter allows this.
    * failure if the configuration parameter allows this.
    */
    */
-  @Test
+  @Test(timeout = 120000)
   public void testVolumeFailure() throws Exception {
   public void testVolumeFailure() throws Exception {
     System.out.println("Data dir: is " +  dataDir.getPath());
     System.out.println("Data dir: is " +  dataDir.getPath());
    
    
@@ -155,7 +158,7 @@ public class TestDataNodeVolumeFailure {
     // fail the volume
     // fail the volume
     // delete/make non-writable one of the directories (failed volume)
     // delete/make non-writable one of the directories (failed volume)
     data_fail = new File(dataDir, "data3");
     data_fail = new File(dataDir, "data3");
-    failedDir = MiniDFSCluster.getFinalizedDir(dataDir, 
+    failedDir = MiniDFSCluster.getFinalizedDir(data_fail,
         cluster.getNamesystem().getBlockPoolId());
         cluster.getNamesystem().getBlockPoolId());
     if (failedDir.exists() &&
     if (failedDir.exists() &&
         //!FileUtil.fullyDelete(failedDir)
         //!FileUtil.fullyDelete(failedDir)
@@ -171,29 +174,26 @@ public class TestDataNodeVolumeFailure {
     // we need to make sure that the "failed" volume is being accessed - 
     // we need to make sure that the "failed" volume is being accessed - 
     // and that will cause failure, blocks removal, "emergency" block report
     // and that will cause failure, blocks removal, "emergency" block report
     triggerFailure(filename, filesize);
     triggerFailure(filename, filesize);
-    
-    // make sure a block report is sent 
-    DataNode dn = cluster.getDataNodes().get(1); //corresponds to dir data3
-    String bpid = cluster.getNamesystem().getBlockPoolId();
-    DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid);
-    
-    Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
-        dn.getFSDataset().getBlockReports(bpid);
-
-    // Send block report
-    StorageBlockReport[] reports =
-        new StorageBlockReport[perVolumeBlockLists.size()];
-
-    int reportIndex = 0;
-    for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
-        DatanodeStorage dnStorage = kvPair.getKey();
-        BlockListAsLongs blockList = kvPair.getValue();
-        reports[reportIndex++] =
-            new StorageBlockReport(dnStorage, blockList);
-    }
-    
-    cluster.getNameNodeRpc().blockReport(dnR, bpid, reports,
-        new BlockReportContext(1, 0, System.nanoTime(), 0, true));
+    // DN eventually have latest volume failure information for next heartbeat
+    final DataNode dn = cluster.getDataNodes().get(1);
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        final VolumeFailureSummary summary =
+            dn.getFSDataset().getVolumeFailureSummary();
+        return summary != null &&
+            summary.getFailedStorageLocations() != null &&
+            summary.getFailedStorageLocations().length == 1;
+      }
+    }, 10, 30 * 1000);
+
+    // trigger DN to send heartbeat
+    DataNodeTestUtils.triggerHeartbeat(dn);
+    final BlockManager bm = cluster.getNamesystem().getBlockManager();
+    // trigger NN handel heartbeat
+    BlockManagerTestUtil.checkHeartbeat(bm);
+    // NN now should have latest volume failure
+    assertEquals(1, cluster.getNamesystem().getVolumeFailuresTotal());
 
 
     // verify number of blocks and files...
     // verify number of blocks and files...
     verify(filename, filesize);
     verify(filename, filesize);
@@ -492,9 +492,11 @@ public class TestDataNodeVolumeFailure {
    * @throws IOException
    * @throws IOException
    */
    */
   private boolean deteteBlocks(File dir) {
   private boolean deteteBlocks(File dir) {
-    File [] fileList = dir.listFiles();
+    Collection<File> fileList = FileUtils.listFiles(dir,
+        TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
     for(File f : fileList) {
     for(File f : fileList) {
       if(f.getName().startsWith(Block.BLOCK_FILE_PREFIX)) {
       if(f.getName().startsWith(Block.BLOCK_FILE_PREFIX)) {
+        System.out.println("Deleting file " + f);
         if(!f.delete())
         if(!f.delete())
           return false;
           return false;