Browse Source

HDFS-7996. After swapping a volume, BlockReceiver reports ReplicaNotFoundException (Lei (Eddy) Xu via Colin P. McCabe)

(cherry picked from commit 023133cef9a7ca05364cefbcead57c921589eda7)
(cherry picked from commit 0c5069c4329305db876f0c0f5bd8e983d46f854e)
Colin Patrick Mccabe 10 năm trước cách đây
mục cha
commit
520043b438

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -959,6 +959,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-8039. Fix TestDebugAdmin#testRecoverLease and
     testVerifyBlockChecksumCommand on Windows. (Xiaoyu Yao via cnauroth)
 
+    HDFS-7996. After swapping a volume, BlockReceiver reports
+    ReplicaNotFoundException (Lei (Eddy) Xu via Colin P. McCabe)
+
     BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
 
       HDFS-7720. Quota by Storage Type API, tools and ClientNameNode

+ 32 - 19
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -281,7 +281,7 @@ class BlockReceiver implements Closeable {
   }
 
   /**
-   * close files.
+   * close files and release volume reference.
    */
   @Override
   public void close() throws IOException {
@@ -798,17 +798,20 @@ class BlockReceiver implements Closeable {
       // then finalize block or convert temporary to RBW.
       // For client-writes, the block is finalized in the PacketResponder.
       if (isDatanode || isTransfer) {
-        // close the block/crc files
-        close();
-        block.setNumBytes(replicaInfo.getNumBytes());
-
-        if (stage == BlockConstructionStage.TRANSFER_RBW) {
-          // for TRANSFER_RBW, convert temporary to RBW
-          datanode.data.convertTemporaryToRbw(block);
-        } else {
-          // for isDatnode or TRANSFER_FINALIZED
-          // Finalize the block.
-          datanode.data.finalizeBlock(block);
+        // Hold a volume reference to finalize block.
+        try (ReplicaHandler handler = claimReplicaHandler()) {
+          // close the block/crc files
+          close();
+          block.setNumBytes(replicaInfo.getNumBytes());
+
+          if (stage == BlockConstructionStage.TRANSFER_RBW) {
+            // for TRANSFER_RBW, convert temporary to RBW
+            datanode.data.convertTemporaryToRbw(block);
+          } else {
+            // for isDatnode or TRANSFER_FINALIZED
+            // Finalize the block.
+            datanode.data.finalizeBlock(block);
+          }
         }
         datanode.metrics.incrBlocksWritten();
       }
@@ -980,7 +983,14 @@ class BlockReceiver implements Closeable {
     }
     return partialCrc;
   }
-  
+
+  /** The caller claims the ownership of the replica handler. */
+  private ReplicaHandler claimReplicaHandler() {
+    ReplicaHandler handler = replicaHandler;
+    replicaHandler = null;
+    return handler;
+  }
+
   private static enum PacketResponderType {
     NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE
   }
@@ -1280,12 +1290,15 @@ class BlockReceiver implements Closeable {
      * @param startTime time when BlockReceiver started receiving the block
      */
     private void finalizeBlock(long startTime) throws IOException {
-      BlockReceiver.this.close();
-      final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime()
-          : 0;
-      block.setNumBytes(replicaInfo.getNumBytes());
-      datanode.data.finalizeBlock(block);
-      
+      long endTime = 0;
+      // Hold a volume reference to finalize block.
+      try (ReplicaHandler handler = BlockReceiver.this.claimReplicaHandler()) {
+        BlockReceiver.this.close();
+        endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
+        block.setNumBytes(replicaInfo.getNumBytes());
+        datanode.data.finalizeBlock(block);
+      }
+
       if (pinning) {
         datanode.data.setPinning(block);
       }

+ 29 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -64,6 +65,8 @@ import java.util.concurrent.TimeoutException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
 import static org.hamcrest.CoreMatchers.anyOf;
@@ -77,6 +80,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.timeout;
 
 public class TestDataNodeHotSwapVolumes {
@@ -577,6 +581,7 @@ public class TestDataNodeHotSwapVolumes {
     final DataNode dn = cluster.getDataNodes().get(dataNodeIdx);
     final FileSystem fs = cluster.getFileSystem();
     final Path testFile = new Path("/test");
+    final long lastTimeDiskErrorCheck = dn.getLastDiskErrorCheck();
 
     FSDataOutputStream out = fs.create(testFile, REPLICATION);
 
@@ -586,6 +591,23 @@ public class TestDataNodeHotSwapVolumes {
     out.write(writeBuf);
     out.hflush();
 
+    // Make FsDatasetSpi#finalizeBlock a time-consuming operation. So if the
+    // BlockReceiver releases volume reference before finalizeBlock(), the blocks
+    // on the volume will be removed, and finalizeBlock() throws IOE.
+    final FsDatasetSpi<? extends FsVolumeSpi> data = dn.data;
+    dn.data = Mockito.spy(data);
+    doAnswer(new Answer<Object>() {
+          public Object answer(InvocationOnMock invocation)
+              throws IOException, InterruptedException {
+            Thread.sleep(1000);
+            // Bypass the argument to FsDatasetImpl#finalizeBlock to verify that
+            // the block is not removed, since the volume reference should not
+            // be released at this point.
+            data.finalizeBlock((ExtendedBlock) invocation.getArguments()[0]);
+            return null;
+          }
+        }).when(dn.data).finalizeBlock(any(ExtendedBlock.class));
+
     final CyclicBarrier barrier = new CyclicBarrier(2);
 
     List<String> oldDirs = getDataDirs(dn);
@@ -612,13 +634,19 @@ public class TestDataNodeHotSwapVolumes {
     out.hflush();
     out.close();
 
+    reconfigThread.join();
+
     // Verify the file has sufficient replications.
     DFSTestUtil.waitReplication(fs, testFile, REPLICATION);
     // Read the content back
     byte[] content = DFSTestUtil.readFileBuffer(fs, testFile);
     assertEquals(BLOCK_SIZE, content.length);
 
-    reconfigThread.join();
+    // If an IOException thrown from BlockReceiver#run, it triggers
+    // DataNode#checkDiskError(). So we can test whether checkDiskError() is called,
+    // to see whether there is IOException in BlockReceiver#run().
+    assertEquals(lastTimeDiskErrorCheck, dn.getLastDiskErrorCheck());
+
     if (!exceptions.isEmpty()) {
       throw new IOException(exceptions.get(0).getCause());
     }