Browse Source

HDFS-9752. Permanent write failures may happen to slow writers during datanode rolling upgrades. (Contributed by Walter Su)

Arpit Agarwal 9 năm trước cách đây
mục cha
commit
d833823daf

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -70,6 +70,9 @@ Release 2.6.4 - UNRELEASED
     HDFS-9220. Reading small file (< 512 bytes) that is open for append fails
     due to incorrect checksum (Jing Zhao via kihwal)
 
+    HDFS-9752. Permanent write failures may happen to slow writers during
+    datanode rolling upgrades. (Walter Su via kihwal)
+
 Release 2.6.3 - 2015-12-17
 
   INCOMPATIBLE CHANGES

+ 23 - 19
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -401,9 +401,8 @@ public class DFSOutputStream extends FSOutputSummer
 
     /** Nodes have been used in the pipeline before and have failed. */
     private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
-    /** The last ack sequence number before pipeline failure. */
-    private long lastAckedSeqnoBeforeFailure = -1;
-    private int pipelineRecoveryCount = 0;
+    /** The times have retried to recover pipeline, for the same packet. */
+    private volatile int pipelineRecoveryCount = 0;
     /** Has the current block been hflushed? */
     private boolean isHflushed = false;
     /** Append on an existing block? */
@@ -933,6 +932,7 @@ public class DFSOutputStream extends FSOutputSummer
 
             synchronized (dataQueue) {
               lastAckedSeqno = seqno;
+              pipelineRecoveryCount = 0;
               ackQueue.removeFirst();
               dataQueue.notifyAll();
 
@@ -984,23 +984,18 @@ public class DFSOutputStream extends FSOutputSummer
         ackQueue.clear();
       }
 
-      // Record the new pipeline failure recovery.
-      if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) {
-         lastAckedSeqnoBeforeFailure = lastAckedSeqno;
-         pipelineRecoveryCount = 1;
-      } else {
-        // If we had to recover the pipeline five times in a row for the
-        // same packet, this client likely has corrupt data or corrupting
-        // during transmission.
-        if (++pipelineRecoveryCount > 5) {
-          DFSClient.LOG.warn("Error recovering pipeline for writing " +
-              block + ". Already retried 5 times for the same packet.");
-          lastException.set(new IOException("Failing write. Tried pipeline " +
-              "recovery 5 times without success."));
-          streamerClosed = true;
-          return false;
-        }
+      // If we had to recover the pipeline five times in a row for the
+      // same packet, this client likely has corrupt data or corrupting
+      // during transmission.
+      if (restartingNodeIndex == -1 && ++pipelineRecoveryCount > 5) {
+        DFSClient.LOG.warn("Error recovering pipeline for writing " +
+            block + ". Already retried 5 times for the same packet.");
+        lastException.set(new IOException("Failing write. Tried pipeline " +
+            "recovery 5 times without success."));
+        streamerClosed = true;
+        return false;
       }
+
       boolean doSleep = setupPipelineForAppendOrRecovery();
       
       if (!streamerClosed && dfsClient.clientRunning) {
@@ -1019,6 +1014,7 @@ public class DFSOutputStream extends FSOutputSummer
             assert endOfBlockPacket.lastPacketInBlock;
             assert lastAckedSeqno == endOfBlockPacket.seqno - 1;
             lastAckedSeqno = endOfBlockPacket.seqno;
+            pipelineRecoveryCount = 0;
             dataQueue.notifyAll();
           }
           endBlock();
@@ -2345,4 +2341,12 @@ public class DFSOutputStream extends FSOutputSummer
     System.arraycopy(srcs, 0, dsts, 0, skipIndex);
     System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex);
   }
+
+  /**
+   * @return The times have retried to recover pipeline, for the same packet.
+   */
+  @VisibleForTesting
+  int getPipelineRecoveryCount() {
+    return streamer.pipelineRecoveryCount;
+  }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -2942,7 +2942,7 @@ public class DataNode extends ReconfigurableBase
 
     // Asynchronously start the shutdown process so that the rpc response can be
     // sent back.
-    Thread shutdownThread = new Thread() {
+    Thread shutdownThread = new Thread("Async datanode shutdown thread") {
       @Override public void run() {
         if (!shutdownForUpgrade) {
           // Delay the shutdown a bit if not doing for restart.

+ 22 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -1958,6 +1958,28 @@ public class MiniDFSCluster {
     return stopDataNode(i);
   }
 
+  /*
+   * Shutdown a particular datanode
+   * @param i node index
+   * @return null if the node index is out of range, else the properties of the
+   * removed node
+   */
+  public synchronized DataNodeProperties stopDataNodeForUpgrade(int i)
+      throws IOException {
+    if (i < 0 || i >= dataNodes.size()) {
+      return null;
+    }
+    DataNodeProperties dnprop = dataNodes.remove(i);
+    DataNode dn = dnprop.datanode;
+    LOG.info("MiniDFSCluster Stopping DataNode " +
+        dn.getDisplayName() +
+        " from a total of " + (dataNodes.size() + 1) +
+        " datanodes.");
+    dn.shutdownDatanode(true);
+    numDataNodes--;
+    return dnprop;
+  }
+
   /**
    * Restart a datanode
    * @param dnprop datanode's property

+ 57 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
 
+import com.google.common.base.Supplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -33,6 +34,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -256,7 +258,8 @@ public class TestClientProtocolForPipelineRecovery {
       final String[] args1 = {"-shutdownDatanode", dnAddr, "upgrade" };
       Assert.assertEquals(0, dfsadmin.run(args1));
       // Wait long enough to receive an OOB ack before closing the file.
-      Thread.sleep(4000);
+      GenericTestUtils.waitForThreadTermination(
+          "Async datanode shutdown thread", 100, 10000);
       // Retart the datanode 
       cluster.restartDataNode(0, true);
       // The following forces a data packet and end of block packets to be sent. 
@@ -293,7 +296,8 @@ public class TestClientProtocolForPipelineRecovery {
       // issue shutdown to the datanode.
       final String[] args1 = {"-shutdownDatanode", dnAddr1, "upgrade" };
       Assert.assertEquals(0, dfsadmin.run(args1));
-      Thread.sleep(4000);
+      GenericTestUtils.waitForThreadTermination(
+          "Async datanode shutdown thread", 100, 10000);
       // This should succeed without restarting the node. The restart will
       // expire and regular pipeline recovery will kick in. 
       out.close();
@@ -309,7 +313,8 @@ public class TestClientProtocolForPipelineRecovery {
       // issue shutdown to the datanode.
       final String[] args2 = {"-shutdownDatanode", dnAddr2, "upgrade" };
       Assert.assertEquals(0, dfsadmin.run(args2));
-      Thread.sleep(4000);
+      GenericTestUtils.waitForThreadTermination(
+          "Async datanode shutdown thread", 100, 10000);
       try {
         // close should fail
         out.close();
@@ -321,4 +326,53 @@ public class TestClientProtocolForPipelineRecovery {
       }
     }
   }
+
+  /**
+   *  HDFS-9752. The client keeps sending heartbeat packets during datanode
+   *  rolling upgrades. The client should be able to retry pipeline recovery
+   *  more times than the default.
+   *  (in a row for the same packet, including the heartbeat packet)
+   *  (See{@link DataStreamer#pipelineRecoveryCount})
+   */
+  @Test(timeout = 60000)
+  public void testPipelineRecoveryOnDatanodeUpgrade() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+      cluster.waitActive();
+      FileSystem fileSys = cluster.getFileSystem();
+
+      Path file = new Path("/testPipelineRecoveryOnDatanodeUpgrade");
+      DFSTestUtil.createFile(fileSys, file, 10240L, (short) 2, 0L);
+      final DFSOutputStream out = (DFSOutputStream) (fileSys.append(file).
+          getWrappedStream());
+      out.write(1);
+      out.hflush();
+
+      final long oldGs = out.getBlock().getGenerationStamp();
+      MiniDFSCluster.DataNodeProperties dnProps =
+          cluster.stopDataNodeForUpgrade(0);
+      GenericTestUtils.waitForThreadTermination(
+          "Async datanode shutdown thread", 100, 10000);
+      cluster.restartDataNode(dnProps, true);
+      cluster.waitActive();
+
+      // wait pipeline to be recovered
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          return out.getBlock().getGenerationStamp() > oldGs;
+        }
+      }, 100, 10000);
+      Assert.assertEquals("The pipeline recovery count shouldn't increase",
+          0, out.getPipelineRecoveryCount());
+      out.write(1);
+      out.close();
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.server.namenode.NNStorage;
 import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -340,7 +341,8 @@ public class TestRollingUpgrade {
       Assert.assertEquals(0, dfsadmin.run(args2));
 
       // the datanode should be down.
-      Thread.sleep(2000);
+      GenericTestUtils.waitForThreadTermination(
+          "Async datanode shutdown thread", 100, 10000);
       Assert.assertFalse("DataNode should exit", dn.isDatanodeUp());
 
       // ping should fail.