Prechádzať zdrojové kódy

HDFS-6804. Add test for race condition between transferring block and appending block causes "Unexpected checksum mismatch exception". Contributed by Brahma Reddy Battula.

Wei-Chiu Chuang 7 rokov pred
rodič
commit
a6c0c2ced5

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

@@ -327,7 +327,9 @@ class BlockSender implements java.io.Closeable {
                 metaIn.getLength() >= BlockMetadataHeader.getHeaderSize()) {
               checksumIn = new DataInputStream(new BufferedInputStream(
                   metaIn, IO_FILE_BUFFER_SIZE));
-  
+              // HDFS-11160/HDFS-11056
+              DataNodeFaultInjector.get()
+                  .waitForBlockSenderMetaInputStreamBeforeAppend();
               csum = BlockMetadataHeader.readDataChecksum(checksumIn, block);
               keepMetaInOpen = true;
             }

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java

@@ -62,4 +62,8 @@ public class DataNodeFaultInjector {
 
   public void throwTooManyOpenFiles() throws FileNotFoundException {
   }
+
+  public void waitForBlockSenderMetaInputStreamBeforeAppend()
+      throws IOException {
+  }
 }

+ 71 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendDifferentChecksum.java

@@ -19,14 +19,23 @@ package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Supplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
 import org.apache.hadoop.util.Time;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -130,7 +139,68 @@ public class TestAppendDifferentChecksum {
     AppendTestUtil.check(fsWithCrc32, p, len);
     AppendTestUtil.check(fsWithCrc32C, p, len);
   }
-  
+
+  @Test(timeout = 60000)
+  public void testChecksumErrorAppendWhileTransfer()
+      throws Exception {
+    DataNodeFaultInjector oldFi = DataNodeFaultInjector.get();
+    LogCapturer logCapturer =
+        GenericTestUtils.LogCapturer.captureLogs(DataNode.LOG);
+    try {
+      Path f = new Path("/f");
+      FSDataOutputStream o =
+          fs.create(f, false, 1024, (short) 1, 128 * 1024 * 1024);
+      try {
+        AppendTestUtil.write(o, 0, 64 * 1024 + 600);
+      } finally {
+        o.close();
+      }
+      final CountDownLatch latch = new CountDownLatch(2);
+      DataNodeFaultInjector.set(new DataNodeFaultInjector() {
+        public void waitForBlockSenderMetaInputStreamBeforeAppend()
+            throws IOException {
+          latch.countDown();
+          try {
+            latch.await(20, TimeUnit.SECONDS);
+          } catch (InterruptedException ignored) {
+          }
+        }
+      });
+      cluster.startDataNodes(cluster.getConfiguration(0), 1, true, null, null);
+      fs.setReplication(f, (short) 2);
+      // STEP 1: Wait till the BlockSender creates the meta input stream and then
+      // append to same file.
+      while (latch.getCount() > 1) {
+        Thread.sleep(100);
+      }
+      o = fs.append(f);
+      try {
+        AppendTestUtil.write(o, 0, 1);
+        o.hflush();
+        latch.countDown();
+        // STEP 2: Wait till the transfer happens.
+        final ExtendedBlock b = cluster.getFileSystem().getClient()
+            .getLocatedBlocks(f.toString(), 0).get(0).getBlock();
+        GenericTestUtils.waitFor(new Supplier<Boolean>() {
+          @Override public Boolean get() {
+            return cluster.getDataNodes().get(1).getFSDataset().contains(b);
+          }
+        }, 100, 600);
+
+      } finally {
+        o.close();
+      }
+    } finally {
+      DataNodeFaultInjector.set(oldFi);
+      String logs = logCapturer.getOutput();
+      logCapturer.stopCapturing();
+      Assert.assertFalse("There should not be any checkum exception thrown",
+          logs.contains("ChecksumException"));
+    }
+  }
+
+
+
   private FileSystem createFsWithChecksum(String type, int bytes)
       throws IOException {
     Configuration conf = new Configuration(fs.getConf());