Browse Source

HDFS-11472. Fix inconsistent replica size after a data pipeline failure. Contributed by Erik Krogen and Wei-Chiu Chuang.

(cherry picked from commit 2a5a313539e211736fef12010918a60f9edad030)
Konstantin V Shvachko 7 năm trước cách đây
mục cha
commit
1edefb52e6

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -388,6 +388,9 @@ Release 2.7.4 - UNRELEASED
     HDFS-10921. TestDiskspaceQuotaUpdate doesn't wait for NN to get out of
     safe mode. (Eric Badger via Mingliang Liu)
 
+    HDFS-11472. Fix inconsistent replica size after a data pipeline failure.
+    (Erik Krogen and Wei-Chiu Chuang via shv)
+
 Release 2.7.3 - 2016-08-25
 
   INCOMPATIBLE CHANGES

+ 18 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -1377,14 +1377,28 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           minBytesRcvd + ", " + maxBytesRcvd + "].");
     }
 
+    long bytesOnDisk = rbw.getBytesOnDisk();
+    long blockDataLength = rbw.getBlockFile().length();
+    if (bytesOnDisk != blockDataLength) {
+      LOG.info("Resetting bytesOnDisk to match blockDataLength (=" +
+          blockDataLength + ") for replica " + rbw);
+      bytesOnDisk = blockDataLength;
+      rbw.setLastChecksumAndDataLen(bytesOnDisk, null);
+    }
+
+    if (bytesOnDisk < bytesAcked) {
+      throw new ReplicaNotFoundException("Found fewer bytesOnDisk than " +
+          "bytesAcked for replica " + rbw);
+    }
+
     FsVolumeReference ref = rbw.getVolume().obtainReference();
     try {
       // Truncate the potentially corrupt portion.
       // If the source was client and the last node in the pipeline was lost,
       // any corrupt data written after the acked length can go unnoticed.
-      if (numBytes > bytesAcked) {
+      if (bytesOnDisk > bytesAcked) {
         final File replicafile = rbw.getBlockFile();
-        truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
+        truncateBlock(replicafile, rbw.getMetaFile(), bytesOnDisk, bytesAcked);
         rbw.setNumBytes(bytesAcked);
         rbw.setLastChecksumAndDataLen(bytesAcked, null);
       }
@@ -2370,8 +2384,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
       //check replica bytes on disk.
       if (rip.getBytesOnDisk() < rip.getVisibleLength()) {
-        throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
-            + " getBytesOnDisk() < getVisibleLength(), rip=" + rip);
+        throw new IOException("getBytesOnDisk() < getVisibleLength(), rip="
+            + rip);
       }
 
       //check the replica's files

+ 55 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java

@@ -21,7 +21,9 @@ import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -38,6 +40,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.junit.Assert;
@@ -158,7 +161,7 @@ public class TestWriteToReplica {
     
     ExtendedBlock[] blocks = new ExtendedBlock[] {
         new ExtendedBlock(bpid, 1, 1, 2001), new ExtendedBlock(bpid, 2, 1, 2002), 
-        new ExtendedBlock(bpid, 3, 1, 2003), new ExtendedBlock(bpid, 4, 1, 2004),
+        new ExtendedBlock(bpid, 3, 2, 2003), new ExtendedBlock(bpid, 4, 1, 2004),
         new ExtendedBlock(bpid, 5, 1, 2005), new ExtendedBlock(bpid, 6, 1, 2006)
     };
     
@@ -182,6 +185,12 @@ public class TestWriteToReplica {
     replicasMap.add(bpid, replicaInfo);
     replicaInfo.getBlockFile().createNewFile();
     replicaInfo.getMetaFile().createNewFile();
+    try (RandomAccessFile blockRAF =
+        new RandomAccessFile(replicaInfo.getBlockFile(), "rw")) {
+      //extend blockFile
+      blockRAF.setLength(blocks[RBW].getNumBytes());
+    }
+    saveMetaFileHeader(replicaInfo.getMetaFile());
     
     replicasMap.add(bpid, new ReplicaWaitingToBeRecovered(
         blocks[RWR].getLocalBlock(), vol, vol.createRbwFile(bpid,
@@ -516,4 +525,49 @@ public class TestWriteToReplica {
           + "genstamp and replaced it with the newer one: " + blocks[NON_EXISTENT]);
     }
   }
+
+  /**
+   * Test that we can successfully recover a {@link ReplicaBeingWritten}
+   * which has inconsistent metadata (bytes were written to disk but bytesOnDisk
+   * was not updated) but that recovery fails when the block is actually
+   * corrupt (bytes are not present on disk).
+   */
+  @Test
+  public void testRecoverInconsistentRbw() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    cluster.waitActive();
+    DataNode dn = cluster.getDataNodes().get(0);
+    FsDatasetImpl fsDataset = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
+
+    // set up replicasMap
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    ExtendedBlock[] blocks = setup(bpid, fsDataset);
+
+    ReplicaBeingWritten rbw = (ReplicaBeingWritten)fsDataset.
+        getReplicaInfo(blocks[RBW]);
+    long bytesOnDisk = rbw.getBytesOnDisk();
+    // simulate an inconsistent replica length update by reducing in-memory
+    // value of on disk length
+    rbw.setLastChecksumAndDataLen(bytesOnDisk - 1, null);
+    fsDataset.recoverRbw(blocks[RBW], blocks[RBW].getGenerationStamp(), 0L,
+        rbw.getNumBytes());
+    // after the recovery, on disk length should equal acknowledged length.
+    Assert.assertTrue(rbw.getBytesOnDisk() == rbw.getBytesAcked());
+
+    // reduce on disk length again; this time actually truncate the file to
+    // simulate the data not being present
+    rbw.setLastChecksumAndDataLen(bytesOnDisk - 1, null);
+    try (RandomAccessFile blockRAF =
+        new RandomAccessFile(rbw.getBlockFile(), "rw")) {
+      // truncate blockFile
+      blockRAF.setLength(bytesOnDisk - 1);
+      fsDataset.recoverRbw(blocks[RBW], blocks[RBW].getGenerationStamp(), 0L,
+          rbw.getNumBytes());
+      Assert.fail("recovery should have failed");
+    } catch (ReplicaNotFoundException rnfe) {
+      GenericTestUtils.assertExceptionContains("Found fewer bytesOnDisk than " +
+          "bytesAcked for replica", rnfe);
+    }
+  }
 }