Pārlūkot izejas kodu

HDFS-10652. Add a unit test for HDFS-4660. Contributed by Vinayakumar B., Wei-Chiu Chuang, Yongjun Zhang.

Yongjun Zhang 8 gadi atpakaļ
vecāks
revīzija
c25817159a

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -1306,6 +1306,7 @@ class BlockReceiver implements Closeable {
           long ackRecvNanoTime = 0;
           try {
             if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) {
+              DataNodeFaultInjector.get().failPipeline(replicaInfo, mirrorAddr);
               // read an ack from downstream datanode
               ack.readFields(downstreamIn);
               ackRecvNanoTime = System.nanoTime();

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -2403,7 +2403,8 @@ public class DataNode extends ReconfigurableBase
         blockSender.sendBlock(out, unbufOut, null);
 
         // no response necessary
-        LOG.info(getClass().getSimpleName() + ": Transmitted " + b
+        LOG.info(getClass().getSimpleName() + ", at "
+            + DataNode.this.getDisplayName() + ": Transmitted " + b
             + " (numBytes=" + b.getNumBytes() + ") to " + curTarget);
 
         // read ack

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java

@@ -55,4 +55,7 @@ public class DataNodeFaultInjector {
   public void noRegistration() throws IOException { }
 
   public void failMirrorConnection() throws IOException { }
+
+  public void failPipeline(ReplicaInPipelineInterface replicaInfo,
+      String mirrorAddr) throws IOException { }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -1505,7 +1505,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           if (!rbw.attemptToSetWriter(null, Thread.currentThread())) {
             throw new MustStopExistingWriter(rbw);
           }
-          LOG.info("Recovering " + rbw);
+          LOG.info("At " + datanode.getDisplayName() + ", Recovering " + rbw);
           return recoverRbwImpl(rbw, b, newGS, minBytesRcvd, maxBytesRcvd);
         }
       } catch (MustStopExistingWriter e) {

+ 137 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java

@@ -17,9 +17,16 @@
  */
 package org.apache.hadoop.hdfs;
 
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.base.Supplier;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -31,6 +38,8 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
@@ -39,12 +48,15 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This tests pipeline recovery related client protocol works correct or not.
  */
 public class TestClientProtocolForPipelineRecovery {
-  
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestClientProtocolForPipelineRecovery.class);
   @Test public void testGetNewStamp() throws IOException {
     int numDataNodes = 1;
     Configuration conf = new HdfsConfiguration();
@@ -477,4 +489,128 @@ public class TestClientProtocolForPipelineRecovery {
       DataNodeFaultInjector.set(oldDnInjector);
     }
   }
+
+  // Test to verify that blocks are no longer corrupted after HDFS-4660.
+  // Revert HDFS-4660 and the other related ones (HDFS-9220, HDFS-8722), this
+  // test would fail.
+  // Scenario: Prior to the fix, block get corrupted when the transferBlock
+  // happens during pipeline recovery with extra bytes to make up the end of
+  // chunk.
+  // For verification, Need to fail the pipeline for last datanode when the
+  // second datanode have more bytes on disk than already acked bytes.
+  // This will enable to transfer extra bytes to the newNode to makeup
+  // end-of-chunk during pipeline recovery. This is achieved by the customized
+  // DataNodeFaultInjector class in this test.
+  // For detailed info, please refer to HDFS-4660 and HDFS-10587. HDFS-9220
+  // fixes an issue in HDFS-4660 patch, and HDFS-8722 is an optimization.
+  @Test
+  public void testPipelineRecoveryWithTransferBlock() throws Exception {
+    final int chunkSize = 512;
+    final int oneWriteSize = 5000;
+    final int totalSize = 1024 * 1024;
+    final int errorInjectionPos = 512;
+    Configuration conf = new HdfsConfiguration();
+    // Need 4 datanodes to verify the replaceDatanode during pipeline recovery
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
+    DataNodeFaultInjector old = DataNodeFaultInjector.get();
+
+    try {
+      DistributedFileSystem fs = cluster.getFileSystem();
+      Path fileName = new Path("/f");
+      FSDataOutputStream o = fs.create(fileName);
+      int count = 0;
+      // Flush to get the pipeline created.
+      o.writeBytes("hello");
+      o.hflush();
+      DFSOutputStream dfsO = (DFSOutputStream) o.getWrappedStream();
+      final DatanodeInfo[] pipeline = dfsO.getStreamer().getNodes();
+      final String lastDn = pipeline[2].getXferAddr(false);
+      final AtomicBoolean failed = new AtomicBoolean(false);
+
+      DataNodeFaultInjector.set(new DataNodeFaultInjector() {
+        @Override
+        public void failPipeline(ReplicaInPipelineInterface replicaInfo,
+            String mirror) throws IOException {
+          if (!lastDn.equals(mirror)) {
+            // Only fail for second DN
+            return;
+          }
+          if (!failed.get() &&
+              (replicaInfo.getBytesAcked() > errorInjectionPos) &&
+              (replicaInfo.getBytesAcked() % chunkSize != 0)) {
+            int count = 0;
+            while (count < 10) {
+              // Fail the pipeline (Throw exception) when:
+              //   1. bytsAcked is not at chunk boundary (checked in the if
+              //      statement above)
+              //   2. bytesOnDisk is bigger than bytesAcked and at least
+              //      reaches (or go beyond) the end of the chunk that
+              //      bytesAcked is in (checked in the if statement below).
+              // At this condition, transferBlock that happens during
+              // pipeline recovery would transfer extra bytes to make up to the
+              // end of the chunk. And this is when the block corruption
+              // described in HDFS-4660 would occur.
+              if ((replicaInfo.getBytesOnDisk() / chunkSize) -
+                  (replicaInfo.getBytesAcked() / chunkSize) >= 1) {
+                failed.set(true);
+                throw new IOException(
+                    "Failing Pipeline " + replicaInfo.getBytesAcked() + " : "
+                        + replicaInfo.getBytesOnDisk());
+              }
+              try {
+                Thread.sleep(200);
+              } catch (InterruptedException e) {
+              }
+              count++;
+            }
+          }
+        }
+      });
+
+      Random r = new Random();
+      byte[] b = new byte[oneWriteSize];
+      while (count < totalSize) {
+        r.nextBytes(b);
+        o.write(b);
+        count += oneWriteSize;
+        o.hflush();
+      }
+
+      assertTrue("Expected a failure in the pipeline", failed.get());
+      DatanodeInfo[] newNodes = dfsO.getStreamer().getNodes();
+      o.close();
+      // Trigger block report to NN
+      for (DataNode d: cluster.getDataNodes()) {
+        DataNodeTestUtils.triggerBlockReport(d);
+      }
+      // Read from the replaced datanode to verify the corruption. So shutdown
+      // all other nodes in the pipeline.
+      List<DatanodeInfo> pipelineList = Arrays.asList(pipeline);
+      DatanodeInfo newNode = null;
+      for (DatanodeInfo node : newNodes) {
+        if (!pipelineList.contains(node)) {
+          newNode = node;
+          break;
+        }
+      }
+      LOG.info("Number of nodes in pipeline: {} newNode {}",
+          newNodes.length, newNode.getName());
+      // shutdown old 2 nodes
+      for (int i = 0; i < newNodes.length; i++) {
+        if (newNodes[i].getName().equals(newNode.getName())) {
+          continue;
+        }
+        LOG.info("shutdown {}", newNodes[i].getName());
+        cluster.stopDataNode(newNodes[i].getName());
+      }
+
+      // Read should be successfull from only the newNode. There should not be
+      // any corruption reported.
+      DFSTestUtil.readFile(fs, fileName);
+    } finally {
+      DataNodeFaultInjector.set(old);
+      cluster.shutdown();
+    }
+  }
 }