浏览代码

[HDFS-15813] DataStreamer: keep sending heartbeat packets during flush. Contributed by Daryn Sharp and Jim Brennan

(cherry picked from commit 62389a5a041b02efb0f310891e258fb047e4ca2a)
Jim Brennan 4 年之前
父节点
当前提交
d74e9453d9

+ 64 - 45
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java

@@ -483,6 +483,7 @@ class DataStreamer extends Daemon {
   private volatile BlockConstructionStage stage;  // block construction stage
   protected long bytesSent = 0; // number of bytes that've been sent
   private final boolean isLazyPersistFile;
+  private long lastPacket;
 
   /** Nodes have been used in the pipeline before and have failed. */
   private final List<DatanodeInfo> failed = new ArrayList<>();
@@ -632,6 +633,7 @@ class DataStreamer extends Daemon {
     response = new ResponseProcessor(nodes);
     response.start();
     stage = BlockConstructionStage.DATA_STREAMING;
+    lastPacket = Time.monotonicNow();
   }
 
   protected void endBlock() {
@@ -653,7 +655,6 @@ class DataStreamer extends Daemon {
    */
   @Override
   public void run() {
-    long lastPacket = Time.monotonicNow();
     TraceScope scope = null;
     while (!streamerClosed && dfsClient.clientRunning) {
       // if the Responder encountered an error, shutdown Responder
@@ -666,44 +667,35 @@ class DataStreamer extends Daemon {
         // process datanode IO errors if any
         boolean doSleep = processDatanodeOrExternalError();
 
-        final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
         synchronized (dataQueue) {
           // wait for a packet to be sent.
-          long now = Time.monotonicNow();
-          while ((!shouldStop() && dataQueue.size() == 0 &&
-              (stage != BlockConstructionStage.DATA_STREAMING ||
-                  now - lastPacket < halfSocketTimeout)) || doSleep) {
-            long timeout = halfSocketTimeout - (now-lastPacket);
-            timeout = timeout <= 0 ? 1000 : timeout;
-            timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
-                timeout : 1000;
+          while ((!shouldStop() && dataQueue.isEmpty()) || doSleep) {
+            long timeout = 1000;
+            if (stage == BlockConstructionStage.DATA_STREAMING) {
+              timeout = sendHeartbeat();
+            }
             try {
               dataQueue.wait(timeout);
             } catch (InterruptedException  e) {
               LOG.debug("Thread interrupted", e);
             }
             doSleep = false;
-            now = Time.monotonicNow();
           }
           if (shouldStop()) {
             continue;
           }
           // get packet to be sent.
-          if (dataQueue.isEmpty()) {
-            one = createHeartbeatPacket();
-          } else {
-            try {
-              backOffIfNecessary();
-            } catch (InterruptedException e) {
-              LOG.debug("Thread interrupted", e);
-            }
-            one = dataQueue.getFirst(); // regular data packet
-            SpanId[] parents = one.getTraceParents();
-            if (parents.length > 0) {
-              scope = dfsClient.getTracer().
-                  newScope("dataStreamer", parents[0]);
-              scope.getSpan().setParents(parents);
-            }
+          try {
+            backOffIfNecessary();
+          } catch (InterruptedException e) {
+            LOG.debug("Thread interrupted", e);
+          }
+          one = dataQueue.getFirst(); // regular data packet
+          SpanId[] parents = one.getTraceParents();
+          if (parents.length > 0) {
+            scope = dfsClient.getTracer().
+                newScope("dataStreamer", parents[0]);
+            scope.getSpan().setParents(parents);
           }
         }
 
@@ -731,17 +723,8 @@ class DataStreamer extends Daemon {
 
         if (one.isLastPacketInBlock()) {
           // wait for all data packets have been successfully acked
-          synchronized (dataQueue) {
-            while (!shouldStop() && ackQueue.size() != 0) {
-              try {
-                // wait for acks to arrive from datanodes
-                dataQueue.wait(1000);
-              } catch (InterruptedException  e) {
-                LOG.debug("Thread interrupted", e);
-              }
-            }
-          }
-          if (shouldStop()) {
+          waitForAllAcks();
+          if(shouldStop()) {
             continue;
           }
           stage = BlockConstructionStage.PIPELINE_CLOSE;
@@ -770,8 +753,7 @@ class DataStreamer extends Daemon {
         // write out data to remote datanode
         try (TraceScope ignored = dfsClient.getTracer().
             newScope("DataStreamer#writeTo", spanId)) {
-          one.writeTo(blockStream);
-          blockStream.flush();
+          sendPacket(one);
         } catch (IOException e) {
           // HDFS-3398 treat primary DN is down since client is unable to
           // write to primary DN. If a failed or restarting node has already
@@ -782,7 +764,6 @@ class DataStreamer extends Daemon {
           errorState.markFirstNodeIfNotMarked();
           throw e;
         }
-        lastPacket = Time.monotonicNow();
 
         // update bytesSent
         long tmpBytesSent = one.getLastByteOffsetBlock();
@@ -797,11 +778,7 @@ class DataStreamer extends Daemon {
         // Is this block full?
         if (one.isLastPacketInBlock()) {
           // wait for the close packet has been acked
-          synchronized (dataQueue) {
-            while (!shouldStop() && ackQueue.size() != 0) {
-              dataQueue.wait(1000);// wait for acks to arrive from datanodes
-            }
-          }
+          waitForAllAcks();
           if (shouldStop()) {
             continue;
           }
@@ -842,6 +819,48 @@ class DataStreamer extends Daemon {
     closeInternal();
   }
 
+  private void waitForAllAcks() throws IOException {
+    // wait until all data packets have been successfully acked
+    synchronized (dataQueue) {
+      while (!shouldStop() && !ackQueue.isEmpty()) {
+        try {
+          // wait for acks to arrive from datanodes
+          dataQueue.wait(sendHeartbeat());
+        } catch (InterruptedException  e) {
+          LOG.debug("Thread interrupted ", e);
+        }
+      }
+    }
+  }
+
+  private void sendPacket(DFSPacket packet) throws IOException {
+    // write out data to remote datanode
+    try {
+      packet.writeTo(blockStream);
+      blockStream.flush();
+    } catch (IOException e) {
+      // HDFS-3398 treat primary DN is down since client is unable to
+      // write to primary DN. If a failed or restarting node has already
+      // been recorded by the responder, the following call will have no
+      // effect. Pipeline recovery can handle only one node error at a
+      // time. If the primary node fails again during the recovery, it
+      // will be taken out then.
+      errorState.markFirstNodeIfNotMarked();
+      throw e;
+    }
+    lastPacket = Time.monotonicNow();
+  }
+
+  private long sendHeartbeat() throws IOException {
+    final long heartbeatInterval = dfsClient.getConf().getSocketTimeout()/2;
+    long timeout = heartbeatInterval - (Time.monotonicNow() - lastPacket);
+    if (timeout <= 0) {
+      sendPacket(createHeartbeatPacket());
+      timeout = heartbeatInterval;
+    }
+    return timeout;
+  }
+
   private void closeInternal() {
     closeResponder();       // close and join
     closeStream();

+ 45 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java

@@ -245,6 +245,51 @@ public class TestClientProtocolForPipelineRecovery {
     }
   }
 
+  /**
+   * Test to ensure heartbeats continue during a flush in case of
+   * delayed acks.
+   */
+  @Test
+  public void testHeartbeatDuringFlush() throws Exception {
+    // Delay sending acks
+    DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() {
+      @Override
+      public void delaySendingAckToUpstream(final String upstreamAddr)
+          throws IOException {
+        try {
+          Thread.sleep(3500); // delay longer than socket timeout
+        } catch (InterruptedException ie) {
+          throw new IOException("Interrupted while sleeping");
+        }
+      }
+    };
+    DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get();
+
+    // Setting the timeout to be 3 seconds. Heartbeat packet
+    // should be sent every 1.5 seconds if there is no data traffic.
+    Configuration conf = new HdfsConfiguration();
+    conf.set(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, "3000");
+    MiniDFSCluster cluster = null;
+
+    try {
+      int numDataNodes = 1;
+      cluster = new MiniDFSCluster.Builder(conf)
+          .numDataNodes(numDataNodes).build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+      FSDataOutputStream out = fs.create(new Path("delayedack.dat"), (short)2);
+      out.write(0x31);
+      out.hflush();
+      DataNodeFaultInjector.set(dnFaultInjector); // cause ack delay
+      out.close();
+    } finally {
+      DataNodeFaultInjector.set(oldDnInjector);
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
   /**
    * Test recovery on restart OOB message. It also tests the delivery of 
    * OOB ack originating from the primary datanode. Since there is only