Bladeren bron

HDFS-8008. Support client-side back off when the datanodes are congested. Contributed by Haohui Mai.

Haohui Mai 10 jaren geleden
bovenliggende
commit
cfcf795492

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -559,6 +559,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7742. Favoring decommissioning node for replication can cause a block 
     to stay underreplicated for long periods (Nathan Roberts via kihwal)
 
+    HDFS-8008. Support client-side back off when the datanodes are congested.
+    (wheat9)
+
   OPTIMIZATIONS
 
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

+ 63 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java

@@ -218,6 +218,13 @@ class DataStreamer extends Daemon {
   private boolean failPacket = false;
   private final long dfsclientSlowLogThresholdMs;
   private long artificialSlowdown = 0;
+  // List of congested data nodes. The stream will back off if the DataNodes
+  // are congested
+  private final ArrayList<DatanodeInfo> congestedNodes = new ArrayList<>();
+  private static final int CONGESTION_BACKOFF_MEAN_TIME_IN_MS = 5000;
+  private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS =
+      CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10;
+  private int lastCongestionBackoffTime;
 
   private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
 
@@ -386,6 +393,11 @@ class DataStreamer extends Daemon {
             one = createHeartbeatPacket();
             assert one != null;
           } else {
+            try {
+              backOffIfNecessary();
+            } catch (InterruptedException e) {
+              DFSClient.LOG.warn("Caught exception ", e);
+            }
             one = dataQueue.getFirst(); // regular data packet
             long parents[] = one.getTraceParents();
             if (parents.length > 0) {
@@ -815,9 +827,14 @@ class DataStreamer extends Daemon {
 
           long seqno = ack.getSeqno();
           // processes response status from datanodes.
+          ArrayList<DatanodeInfo> congestedNodesFromAck = new ArrayList<>();
           for (int i = ack.getNumOfReplies()-1; i >=0  && dfsClient.clientRunning; i--) {
             final Status reply = PipelineAck.getStatusFromHeader(ack
                 .getHeaderFlag(i));
+            if (PipelineAck.getECNFromHeader(ack.getHeaderFlag(i)) ==
+                PipelineAck.ECN.CONGESTED) {
+              congestedNodesFromAck.add(targets[i]);
+            }
             // Restart will not be treated differently unless it is
             // the local node or the only one in the pipeline.
             if (PipelineAck.isRestartOOBStatus(reply) &&
@@ -839,6 +856,18 @@ class DataStreamer extends Daemon {
             }
           }
 
+          if (!congestedNodesFromAck.isEmpty()) {
+            synchronized (congestedNodes) {
+              congestedNodes.clear();
+              congestedNodes.addAll(congestedNodesFromAck);
+            }
+          } else {
+            synchronized (congestedNodes) {
+              congestedNodes.clear();
+              lastCongestionBackoffTime = 0;
+            }
+          }
+
           assert seqno != PipelineAck.UNKOWN_SEQNO :
               "Ack for unknown seqno should be a failed ack: " + ack;
           if (seqno == DFSPacket.HEART_BEAT_SEQNO) {  // a heartbeat ack
@@ -1543,6 +1572,40 @@ class DataStreamer extends Daemon {
     }
   }
 
+  /**
+   * This function sleeps for a certain amount of time when the writing
+   * pipeline is congested. The function calculates the time based on a
+   * decorrelated filter.
+   *
+   * @see
+   * <a href="http://www.awsarchitectureblog.com/2015/03/backoff.html">
+   *   http://www.awsarchitectureblog.com/2015/03/backoff.html</a>
+   */
+  private void backOffIfNecessary() throws InterruptedException {
+    int t = 0;
+    synchronized (congestedNodes) {
+      if (!congestedNodes.isEmpty()) {
+        StringBuilder sb = new StringBuilder("DataNode");
+        for (DatanodeInfo i : congestedNodes) {
+          sb.append(' ').append(i);
+        }
+        int range = Math.abs(lastCongestionBackoffTime * 3 -
+                                CONGESTION_BACKOFF_MEAN_TIME_IN_MS);
+        int base = Math.min(lastCongestionBackoffTime * 3,
+                            CONGESTION_BACKOFF_MEAN_TIME_IN_MS);
+        t = Math.min(CONGESTION_BACK_OFF_MAX_TIME_IN_MS,
+                     (int)(base + Math.random() * range));
+        lastCongestionBackoffTime = t;
+        sb.append(" are congested. Backing off for ").append(t).append(" ms");
+        DFSClient.LOG.info(sb.toString());
+        congestedNodes.clear();
+      }
+    }
+    if (t != 0) {
+      Thread.sleep(t);
+    }
+  }
+
   /**
    * get the block this streamer is writing to
    *

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java

@@ -257,6 +257,10 @@ public class PipelineAck {
     return StatusFormat.getStatus(header);
   }
 
+  public static ECN getECNFromHeader(int header) {
+    return StatusFormat.getECN(header);
+  }
+
   public static int setStatusForHeader(int old, Status status) {
     return StatusFormat.setStatus(old, status);
   }

+ 42 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java

@@ -17,20 +17,31 @@
  */
 package org.apache.hadoop.hdfs;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.internal.util.reflection.Whitebox;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+
 public class TestDFSOutputStream {
   static MiniDFSCluster cluster;
 
@@ -100,6 +111,37 @@ public class TestDFSOutputStream {
     Assert.assertTrue((Integer) field.get(dos) + 257 < packetSize);
   }
 
+  @Test
+  public void testCongestionBackoff() throws IOException {
+    DFSClient.Conf dfsClientConf = mock(DFSClient.Conf.class);
+    DFSClient client = mock(DFSClient.class);
+    when(client.getConf()).thenReturn(dfsClientConf);
+    client.clientRunning = true;
+    DataStreamer stream = new DataStreamer(
+        mock(HdfsFileStatus.class),
+        mock(ExtendedBlock.class),
+        client,
+        "foo", null, null, null, null);
+
+    DataOutputStream blockStream = mock(DataOutputStream.class);
+    doThrow(new IOException()).when(blockStream).flush();
+    Whitebox.setInternalState(stream, "blockStream", blockStream);
+    Whitebox.setInternalState(stream, "stage",
+                              BlockConstructionStage.PIPELINE_CLOSE);
+    @SuppressWarnings("unchecked")
+    LinkedList<DFSPacket> dataQueue = (LinkedList<DFSPacket>)
+        Whitebox.getInternalState(stream, "dataQueue");
+    @SuppressWarnings("unchecked")
+    ArrayList<DatanodeInfo> congestedNodes = (ArrayList<DatanodeInfo>)
+        Whitebox.getInternalState(stream, "congestedNodes");
+    congestedNodes.add(mock(DatanodeInfo.class));
+    DFSPacket packet = mock(DFSPacket.class);
+    when(packet.getTraceParents()).thenReturn(new long[] {});
+    dataQueue.add(packet);
+    stream.run();
+    Assert.assertTrue(congestedNodes.isEmpty());
+  }
+
   @AfterClass
   public static void tearDown() {
     cluster.shutdown();