Преглед изворни кода

HDFS-11472. Fix inconsistent replica size after a data pipeline failure. Contributed by Erik Krogen and Wei-Chiu Chuang.

(cherry picked from commit 2a5a313539e211736fef12010918a60f9edad030)
Konstantin V Shvachko пре 7 година
родитељ
комит
f3bf1a6eb9

+ 18 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -1526,14 +1526,28 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
             minBytesRcvd + ", " + maxBytesRcvd + "].");
       }
 
+      long bytesOnDisk = rbw.getBytesOnDisk();
+      long blockDataLength = rbw.getBlockFile().length();
+      if (bytesOnDisk != blockDataLength) {
+        LOG.info("Resetting bytesOnDisk to match blockDataLength (=" +
+            blockDataLength + ") for replica " + rbw);
+        bytesOnDisk = blockDataLength;
+        rbw.setLastChecksumAndDataLen(bytesOnDisk, null);
+      }
+
+      if (bytesOnDisk < bytesAcked) {
+        throw new ReplicaNotFoundException("Found fewer bytesOnDisk than " +
+            "bytesAcked for replica " + rbw);
+      }
+
       FsVolumeReference ref = rbw.getVolume().obtainReference();
       try {
         // Truncate the potentially corrupt portion.
         // If the source was client and the last node in the pipeline was lost,
         // any corrupt data written after the acked length can go unnoticed.
-        if (numBytes > bytesAcked) {
+        if (bytesOnDisk > bytesAcked) {
           final File replicafile = rbw.getBlockFile();
-          truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
+          truncateBlock(replicafile, rbw.getMetaFile(), bytesOnDisk, bytesAcked);
           rbw.setNumBytes(bytesAcked);
           rbw.setLastChecksumAndDataLen(bytesAcked, null);
         }
@@ -2570,8 +2584,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
       //check replica bytes on disk.
       if (rip.getBytesOnDisk() < rip.getVisibleLength()) {
-        throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
-            + " getBytesOnDisk() < getVisibleLength(), rip=" + rip);
+        throw new IOException("getBytesOnDisk() < getVisibleLength(), rip="
+            + rip);
       }
 
       //check the replica's files

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java

@@ -278,6 +278,13 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
     rbw.getBlockFile().createNewFile();
     rbw.getMetaFile().createNewFile();
     dataset.volumeMap.add(bpid, rbw);
+
+    try (RandomAccessFile blockRAF =
+        new RandomAccessFile(rbw.getBlockFile(), "rw")) {
+      //extend blockFile
+      blockRAF.setLength(eb.getNumBytes());
+    }
+    saveMetaFileHeader(rbw.getMetaFile());
     return rbw;
   }
 

+ 50 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java

@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -36,12 +37,14 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.junit.Assert;
@@ -154,7 +157,7 @@ public class TestWriteToReplica {
     
     ExtendedBlock[] blocks = new ExtendedBlock[] {
         new ExtendedBlock(bpid, 1, 1, 2001), new ExtendedBlock(bpid, 2, 1, 2002), 
-        new ExtendedBlock(bpid, 3, 1, 2003), new ExtendedBlock(bpid, 4, 1, 2004),
+        new ExtendedBlock(bpid, 3, 2, 2003), new ExtendedBlock(bpid, 4, 1, 2004),
         new ExtendedBlock(bpid, 5, 1, 2005), new ExtendedBlock(bpid, 6, 1, 2006)
     };
 
@@ -548,7 +551,52 @@ public class TestWriteToReplica {
       cluster.shutdown();
     }
   }
-  
+
+  /**
+   * Test that we can successfully recover a {@link ReplicaBeingWritten}
+   * which has inconsistent metadata (bytes were written to disk but bytesOnDisk
+   * was not updated) but that recovery fails when the block is actually
+   * corrupt (bytes are not present on disk).
+   */
+  @Test
+  public void testRecoverInconsistentRbw() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    cluster.waitActive();
+    DataNode dn = cluster.getDataNodes().get(0);
+    FsDatasetImpl fsDataset = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
+
+    // set up replicasMap
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn));
+
+    ReplicaBeingWritten rbw = (ReplicaBeingWritten)fsDataset.
+        getReplicaInfo(blocks[RBW]);
+    long bytesOnDisk = rbw.getBytesOnDisk();
+    // simulate an inconsistent replica length update by reducing in-memory
+    // value of on disk length
+    rbw.setLastChecksumAndDataLen(bytesOnDisk - 1, null);
+    fsDataset.recoverRbw(blocks[RBW], blocks[RBW].getGenerationStamp(), 0L,
+        rbw.getNumBytes());
+    // after the recovery, on disk length should equal acknowledged length.
+    Assert.assertTrue(rbw.getBytesOnDisk() == rbw.getBytesAcked());
+
+    // reduce on disk length again; this time actually truncate the file to
+    // simulate the data not being present
+    rbw.setLastChecksumAndDataLen(bytesOnDisk - 1, null);
+    try (RandomAccessFile blockRAF =
+        new RandomAccessFile(rbw.getBlockFile(), "rw")) {
+      // truncate blockFile
+      blockRAF.setLength(bytesOnDisk - 1);
+      fsDataset.recoverRbw(blocks[RBW], blocks[RBW].getGenerationStamp(), 0L,
+          rbw.getNumBytes());
+      Assert.fail("recovery should have failed");
+    } catch (ReplicaNotFoundException rnfe) {
+      GenericTestUtils.assertExceptionContains("Found fewer bytesOnDisk than " +
+          "bytesAcked for replica", rnfe);
+    }
+  }
+
   /**
    * Compare the replica map before and after the restart
    **/