浏览代码

HDFS-16348. Mark slownode as badnode to recover pipeline (#3704)

Symious 3 年之前
父节点
当前提交
b225287913

+ 60 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java

@@ -38,6 +38,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -525,11 +526,13 @@ class DataStreamer extends Daemon {
   // List of congested data nodes. The stream will back off if the DataNodes
   // are congested
   private final List<DatanodeInfo> congestedNodes = new ArrayList<>();
+  private final Map<DatanodeInfo, Integer> slowNodeMap = new HashMap<>();
   private static final int CONGESTION_BACKOFF_MEAN_TIME_IN_MS = 5000;
   private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS =
       CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10;
   private int lastCongestionBackoffTime;
   private int maxPipelineRecoveryRetries;
+  private int markSlowNodeAsBadNodeThreshold;
 
   protected final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
   private final String[] favoredNodes;
@@ -559,6 +562,7 @@ class DataStreamer extends Daemon {
     this.errorState = new ErrorState(conf.getDatanodeRestartTimeout());
     this.addBlockFlags = flags;
     this.maxPipelineRecoveryRetries = conf.getMaxPipelineRecoveryRetries();
+    this.markSlowNodeAsBadNodeThreshold = conf.getMarkSlowNodeAsBadNodeThreshold();
   }
 
   /**
@@ -1155,6 +1159,7 @@ class DataStreamer extends Daemon {
           long seqno = ack.getSeqno();
           // processes response status from datanodes.
           ArrayList<DatanodeInfo> congestedNodesFromAck = new ArrayList<>();
+          ArrayList<DatanodeInfo> slownodesFromAck = new ArrayList<>();
           for (int i = ack.getNumOfReplies()-1; i >=0  && dfsClient.clientRunning; i--) {
             final Status reply = PipelineAck.getStatusFromHeader(ack
                 .getHeaderFlag(i));
@@ -1162,6 +1167,10 @@ class DataStreamer extends Daemon {
                 PipelineAck.ECN.CONGESTED) {
               congestedNodesFromAck.add(targets[i]);
             }
+            if (PipelineAck.getSLOWFromHeader(ack.getHeaderFlag(i)) ==
+                PipelineAck.SLOW.SLOW) {
+              slownodesFromAck.add(targets[i]);
+            }
             // Restart will not be treated differently unless it is
             // the local node or the only one in the pipeline.
             if (PipelineAck.isRestartOOBStatus(reply)) {
@@ -1191,6 +1200,16 @@ class DataStreamer extends Daemon {
             }
           }
 
+          if (slownodesFromAck.isEmpty()) {
+            if (!slowNodeMap.isEmpty()) {
+              slowNodeMap.clear();
+            }
+          } else {
+            markSlowNode(slownodesFromAck);
+            LOG.debug("SlowNodeMap content: {}.", slowNodeMap);
+          }
+
+
           assert seqno != PipelineAck.UNKOWN_SEQNO :
               "Ack for unknown seqno should be a failed ack: " + ack;
           if (seqno == DFSPacket.HEART_BEAT_SEQNO) {  // a heartbeat ack
@@ -1257,10 +1276,51 @@ class DataStreamer extends Daemon {
       }
     }
 
+    void markSlowNode(List<DatanodeInfo> slownodesFromAck) throws IOException {
+      Set<DatanodeInfo> discontinuousNodes = new HashSet<>(slowNodeMap.keySet());
+      for (DatanodeInfo slowNode : slownodesFromAck) {
+        if (!slowNodeMap.containsKey(slowNode)) {
+          slowNodeMap.put(slowNode, 1);
+        } else {
+          int oldCount = slowNodeMap.get(slowNode);
+          slowNodeMap.put(slowNode, ++oldCount);
+        }
+        discontinuousNodes.remove(slowNode);
+      }
+      for (DatanodeInfo discontinuousNode : discontinuousNodes) {
+        slowNodeMap.remove(discontinuousNode);
+      }
+
+      if (!slowNodeMap.isEmpty()) {
+        for (Map.Entry<DatanodeInfo, Integer> entry : slowNodeMap.entrySet()) {
+          if (entry.getValue() >= markSlowNodeAsBadNodeThreshold) {
+            DatanodeInfo slowNode = entry.getKey();
+            int index = getDatanodeIndex(slowNode);
+            if (index >= 0) {
+              errorState.setBadNodeIndex(index);
+              throw new IOException("Receive reply from slowNode " + slowNode +
+                  " for continuous " + markSlowNodeAsBadNodeThreshold +
+                  " times, treating it as badNode");
+            }
+            slowNodeMap.remove(entry.getKey());
+          }
+        }
+      }
+    }
+
     void close() {
       responderClosed = true;
       this.interrupt();
     }
+
+    int getDatanodeIndex(DatanodeInfo datanodeInfo) {
+      for (int i = 0; i < targets.length; i++) {
+        if (targets[i].equals(datanodeInfo)) {
+          return i;
+        }
+      }
+      return -1;
+    }
   }
 
   private boolean shouldHandleExternalError(){

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java

@@ -154,6 +154,9 @@ public interface HdfsClientConfigKeys {
   String  DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY =
       "dfs.client.slow.io.warning.threshold.ms";
   long    DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 30000;
+  String  DFS_CLIENT_MARK_SLOWNODE_AS_BADNODE_THRESHOLD_KEY =
+      "dfs.client.mark.slownode.as.badnode.threshold";
+  int DFS_CLIENT_MARK_SLOWNODE_AS_BADNODE_THRESHOLD_DEFAULT = 10;
   String  DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS =
           "dfs.client.key.provider.cache.expiry";
   long    DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =

+ 13 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java

@@ -60,6 +60,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMA
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MARK_SLOWNODE_AS_BADNODE_THRESHOLD_DEFAULT;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MARK_SLOWNODE_AS_BADNODE_THRESHOLD_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_READ_USE_CACHE_PRIORITY;
@@ -142,6 +144,7 @@ public class DfsClientConf {
   private final int retryIntervalForGetLastBlockLength;
   private final long datanodeRestartTimeout;
   private final long slowIoWarningThresholdMs;
+  private final int markSlowNodeAsBadNodeThreshold;
 
   /** wait time window before refreshing blocklocation for inputstream. */
   private final long refreshReadBlockLocationsMS;
@@ -261,6 +264,9 @@ public class DfsClientConf {
         DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
     readUseCachePriority = conf.getBoolean(DFS_CLIENT_READ_USE_CACHE_PRIORITY,
         DFS_CLIENT_READ_USE_CACHE_PRIORITY_DEFAULT);
+    markSlowNodeAsBadNodeThreshold = conf.getInt(
+        DFS_CLIENT_MARK_SLOWNODE_AS_BADNODE_THRESHOLD_KEY,
+        DFS_CLIENT_MARK_SLOWNODE_AS_BADNODE_THRESHOLD_DEFAULT);
 
     refreshReadBlockLocationsMS = conf.getLong(
         HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY,
@@ -644,6 +650,13 @@ public class DfsClientConf {
     return slowIoWarningThresholdMs;
   }
 
+  /**
+   * @return the continuous slowNode replies received to mark slowNode as badNode
+   */
+  public int getMarkSlowNodeAsBadNodeThreshold() {
+    return markSlowNodeAsBadNodeThreshold;
+  }
+
   /*
    * @return the clientShortCircuitNum
    */

+ 47 - 2
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java

@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.thirdparty.protobuf.TextFormat;
@@ -42,6 +43,27 @@ public class PipelineAck {
   final static int OOB_START = Status.OOB_RESTART_VALUE; // the first OOB type
   final static int OOB_END = Status.OOB_RESERVED3_VALUE; // the last OOB type
 
+  public enum SLOW {
+    DISABLED(0),
+    NORMAL(1),
+    SLOW(2),
+    RESERVED(3);
+
+    private final int value;
+    private static final SLOW[] VALUES = values();
+    static SLOW valueOf(int value) {
+      return VALUES[value];
+    }
+
+    SLOW(int value) {
+      this.value = value;
+    }
+
+    public int getValue() {
+      return value;
+    }
+  }
+
   public enum ECN {
     DISABLED(0),
     SUPPORTED(1),
@@ -66,7 +88,8 @@ public class PipelineAck {
   private enum StatusFormat {
     STATUS(null, 4),
     RESERVED(STATUS.BITS, 1),
-    ECN_BITS(RESERVED.BITS, 2);
+    ECN_BITS(RESERVED.BITS, 2),
+    SLOW_BITS(ECN_BITS.BITS, 2);
 
     private final LongBitFormat BITS;
 
@@ -82,6 +105,10 @@ public class PipelineAck {
       return ECN.valueOf((int) ECN_BITS.BITS.retrieve(header));
     }
 
+    static SLOW getSLOW(int header) {
+      return SLOW.valueOf((int) SLOW_BITS.BITS.retrieve(header));
+    }
+
     public static int setStatus(int old, Status status) {
       return (int) STATUS.BITS.combine(status.getNumber(), old);
     }
@@ -89,6 +116,10 @@ public class PipelineAck {
     public static int setECN(int old, ECN ecn) {
       return (int) ECN_BITS.BITS.combine(ecn.getValue(), old);
     }
+
+    public static int setSLOW(int old, SLOW slow) {
+      return (int) SLOW_BITS.BITS.combine(slow.getValue(), old);
+    }
   }
 
   /** default constructor **/
@@ -149,7 +180,7 @@ public class PipelineAck {
     if (proto.getFlagCount() > 0) {
       return proto.getFlag(i);
     } else {
-      return combineHeader(ECN.DISABLED, proto.getReply(i));
+      return combineHeader(ECN.DISABLED, proto.getReply(i), SLOW.DISABLED);
     }
   }
 
@@ -230,14 +261,28 @@ public class PipelineAck {
     return StatusFormat.getECN(header);
   }
 
+  public static SLOW getSLOWFromHeader(int header) {
+    return StatusFormat.getSLOW(header);
+  }
+
   public static int setStatusForHeader(int old, Status status) {
     return StatusFormat.setStatus(old, status);
   }
 
+  @VisibleForTesting
+  public static int setSLOWForHeader(int old, SLOW slow) {
+    return StatusFormat.setSLOW(old, slow);
+  }
+
   public static int combineHeader(ECN ecn, Status status) {
+    return combineHeader(ecn, status, SLOW.DISABLED);
+  }
+
+  public static int combineHeader(ECN ecn, Status status, SLOW slow) {
     int header = 0;
     header = StatusFormat.setStatus(header, status);
     header = StatusFormat.setECN(header, ecn);
+    header = StatusFormat.setSLOW(header, slow);
     return header;
   }
 }

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -1526,6 +1526,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String[] NNTOP_WINDOWS_MINUTES_DEFAULT = {"1", "5", "25"};
   public static final String DFS_PIPELINE_ECN_ENABLED = "dfs.pipeline.ecn";
   public static final boolean DFS_PIPELINE_ECN_ENABLED_DEFAULT = false;
+  public static final String DFS_PIPELINE_SLOWNODE_ENABLED = "dfs.pipeline.slownode";
+  public static final boolean DFS_PIPELINE_SLOWNODE_ENABLED_DEFAULT = false;
 
   // Key Provider Cache Expiry
   public static final String DFS_DATANODE_BLOCK_PINNING_ENABLED =

+ 11 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -1327,7 +1327,8 @@ class BlockReceiver implements Closeable {
       LOG.info("Sending an out of band ack of type " + ackStatus);
       try {
         sendAckUpstreamUnprotected(null, PipelineAck.UNKOWN_SEQNO, 0L, 0L,
-            PipelineAck.combineHeader(datanode.getECN(), ackStatus));
+            PipelineAck.combineHeader(datanode.getECN(), ackStatus,
+                datanode.getSLOWByBlockPoolId(block.getBlockPoolId())));
       } finally {
         // Let others send ack. Unless there are miltiple OOB send
         // calls, there can be only one waiter, the responder thread.
@@ -1409,7 +1410,8 @@ class BlockReceiver implements Closeable {
                 LOG.info("Relaying an out of band ack of type " + oobStatus);
                 sendAckUpstream(ack, PipelineAck.UNKOWN_SEQNO, 0L, 0L,
                     PipelineAck.combineHeader(datanode.getECN(),
-                      Status.SUCCESS));
+                      Status.SUCCESS,
+                      datanode.getSLOWByBlockPoolId(block.getBlockPoolId())));
                 continue;
               }
               seqno = ack.getSeqno();
@@ -1499,7 +1501,8 @@ class BlockReceiver implements Closeable {
           Status myStatus = pkt != null ? pkt.ackStatus : Status.SUCCESS;
           sendAckUpstream(ack, expected, totalAckTimeNanos,
             (pkt != null ? pkt.offsetInBlock : 0),
-            PipelineAck.combineHeader(datanode.getECN(), myStatus));
+              PipelineAck.combineHeader(datanode.getECN(), myStatus,
+                  datanode.getSLOWByBlockPoolId(block.getBlockPoolId())));
           if (pkt != null) {
             // remove the packet from the ack queue
             removeAckHead();
@@ -1620,8 +1623,10 @@ class BlockReceiver implements Closeable {
         // downstream nodes, reply should contain one reply.
         replies = new int[] { myHeader };
       } else if (mirrorError) { // ack read error
-        int h = PipelineAck.combineHeader(datanode.getECN(), Status.SUCCESS);
-        int h1 = PipelineAck.combineHeader(datanode.getECN(), Status.ERROR);
+        int h = PipelineAck.combineHeader(datanode.getECN(), Status.SUCCESS,
+            datanode.getSLOWByBlockPoolId(block.getBlockPoolId()));
+        int h1 = PipelineAck.combineHeader(datanode.getECN(), Status.ERROR,
+            datanode.getSLOWByBlockPoolId(block.getBlockPoolId()));
         replies = new int[] {h, h1};
       } else {
         short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack
@@ -1631,6 +1636,7 @@ class BlockReceiver implements Closeable {
         for (int i = 0; i < ackLen; ++i) {
           replies[i + 1] = ack.getHeaderFlag(i);
         }
+        DataNodeFaultInjector.get().markSlow(mirrorAddr, replies);
         // If the mirror has reported that it received a corrupt packet,
         // do self-destruct to mark myself bad, instead of making the
         // mirror node bad. The mirror is guaranteed to be good without

+ 22 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -376,6 +376,7 @@ public class DataNode extends ReconfigurableBase
   private final String confVersion;
   private final long maxNumberOfBlocksToLog;
   private final boolean pipelineSupportECN;
+  private final boolean pipelineSupportSlownode;
 
   private final List<String> usersWithLocalPathAccess;
   private final boolean connectToDnViaHostname;
@@ -433,6 +434,7 @@ public class DataNode extends ReconfigurableBase
     this.connectToDnViaHostname = false;
     this.blockScanner = new BlockScanner(this, this.getConf());
     this.pipelineSupportECN = false;
+    this.pipelineSupportSlownode = false;
     this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
     this.dnConf = new DNConf(this);
     initOOBTimeout();
@@ -471,6 +473,9 @@ public class DataNode extends ReconfigurableBase
     this.pipelineSupportECN = conf.getBoolean(
         DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED,
         DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED_DEFAULT);
+    this.pipelineSupportSlownode = conf.getBoolean(
+        DFSConfigKeys.DFS_PIPELINE_SLOWNODE_ENABLED,
+        DFSConfigKeys.DFS_PIPELINE_SLOWNODE_ENABLED_DEFAULT);
 
     confVersion = "core-" +
         conf.get("hadoop.common.configuration.version", "UNSPECIFIED") +
@@ -675,6 +680,23 @@ public class DataNode extends ReconfigurableBase
         PipelineAck.ECN.SUPPORTED;
   }
 
+  /**
+   * The SLOW bit for the DataNode of the specific BlockPool.
+   * The DataNode should return:
+   * <ul>
+   *   <li>SLOW.DISABLED when SLOW is disabled
+   *   <li>SLOW.NORMAL when SLOW is enabled and DN is not slownode.</li>
+   *   <li>SLOW.SLOW when SLOW is enabled and DN is slownode.</li>
+   * </ul>
+   */
+  public PipelineAck.SLOW getSLOWByBlockPoolId(String bpId) {
+    if (!pipelineSupportSlownode) {
+      return PipelineAck.SLOW.DISABLED;
+    }
+    return isSlownodeByBlockPoolId(bpId) ? PipelineAck.SLOW.SLOW :
+        PipelineAck.SLOW.NORMAL;
+  }
+
   public FileIoProvider getFileIoProvider() {
     return fileIoProvider;
   }

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java

@@ -155,4 +155,6 @@ public class DataNodeFaultInjector {
    * into an erasure coding reconstruction.
    */
   public void badDecoding(ByteBuffer[] outputs) {}
+
+  public void markSlow(String dnAddr, int[] replies) {}
 }

+ 18 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -5521,6 +5521,14 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.pipeline.slownode</name>
+  <value>false</value>
+  <description>
+    If true, allows slownode information to be replied to Client via PipelineAck.
+  </description>
+</property>
+
 <property>
   <name>dfs.qjournal.accept-recovery.timeout.ms</name>
   <value>120000</value>
@@ -6310,4 +6318,14 @@
       Effective with dfs.nameservices.resolution-enabled on.
     </description>
   </property>
+
+  <property>
+    <name>dfs.client.mark.slownode.as.badnode.threshold</name>
+    <value>10</value>
+    <description>
+      The threshold to mark a slownode as a badnode. If we get PipelineAck from
+      a slownode continuously for ${dfs.client.treat.slownode.as.badnode.threshold}
+      times, we should mark it as a badnode.
+    </description>
+  </property>
 </configuration>

+ 53 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java

@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
@@ -893,4 +894,56 @@ public class TestClientProtocolForPipelineRecovery {
       DataNodeFaultInjector.set(oldDnInjector);
     }
   }
+
+  @Test
+  public void testPipelineRecoveryWithSlowNode() throws Exception {
+    final int oneWriteSize = 5000;
+
+    final int threshold = 3;
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_MARK_SLOWNODE_AS_BADNODE_THRESHOLD_KEY, threshold);
+
+    // Need 4 datanodes to verify the replaceDatanode during pipeline recovery
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
+    DataNodeFaultInjector old = DataNodeFaultInjector.get();
+
+    try {
+      DistributedFileSystem fs = cluster.getFileSystem();
+      Path fileName = new Path("/f");
+      FSDataOutputStream o = fs.create(fileName);
+      // Flush to get the pipeline created.
+      o.writeBytes("hello");
+      o.hflush();
+      DFSOutputStream dfsO = (DFSOutputStream) o.getWrappedStream();
+      final DatanodeInfo[] pipeline = dfsO.getStreamer().getNodes();
+      final String lastDn = pipeline[2].getXferAddr(false);
+
+      DataNodeFaultInjector.set(new DataNodeFaultInjector() {
+        @Override
+        public void markSlow(String mirrorAddr, int[] replies) {
+          if (!lastDn.equals(mirrorAddr)) {
+            // Only fail for last DN
+            return;
+          }
+          assert(replies.length == 2);
+          replies[1] = PipelineAck.setSLOWForHeader(replies[1], PipelineAck.SLOW.SLOW);
+        }
+      });
+
+      int count = 0;
+      Random r = new Random();
+      byte[] b = new byte[oneWriteSize];
+      while (count < threshold) {
+        r.nextBytes(b);
+        o.write(b);
+        count++;
+        o.hflush();
+      }
+      Assert.assertNotEquals(lastDn, dfsO.getStreamer().getNodes()[2].getXferAddr(false));
+    } finally {
+      DataNodeFaultInjector.set(old);
+      cluster.shutdown();
+    }
+  }
 }

+ 33 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java

@@ -551,6 +551,39 @@ public class TestDataTransferProtocol {
         .CHECKSUM_OK), newAck.getHeaderFlag(0));
   }
 
+  @Test
+  public void testPipeLineAckCompatibilityWithSLOW() throws IOException {
+    DataTransferProtos.PipelineAckProto proto = DataTransferProtos
+        .PipelineAckProto.newBuilder()
+        .setSeqno(0)
+        .addReply(Status.CHECKSUM_OK)
+        .addFlag(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED,
+            Status.CHECKSUM_OK))
+        .build();
+
+    DataTransferProtos.PipelineAckProto newProto = DataTransferProtos
+        .PipelineAckProto.newBuilder()
+        .setSeqno(0)
+        .addReply(Status.CHECKSUM_OK)
+        .addFlag(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED,
+            Status.CHECKSUM_OK, PipelineAck.SLOW.SLOW))
+        .build();
+
+    ByteArrayOutputStream oldAckBytes = new ByteArrayOutputStream();
+    proto.writeDelimitedTo(oldAckBytes);
+    PipelineAck oldAck = new PipelineAck();
+    oldAck.readFields(new ByteArrayInputStream(oldAckBytes.toByteArray()));
+    assertEquals(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED, Status
+        .CHECKSUM_OK, PipelineAck.SLOW.DISABLED), oldAck.getHeaderFlag(0));
+
+    PipelineAck newAck = new PipelineAck();
+    ByteArrayOutputStream newAckBytes = new ByteArrayOutputStream();
+    newProto.writeDelimitedTo(newAckBytes);
+    newAck.readFields(new ByteArrayInputStream(newAckBytes.toByteArray()));
+    assertEquals(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED, Status
+        .CHECKSUM_OK, PipelineAck.SLOW.SLOW), newAck.getHeaderFlag(0));
+  }
+
   void writeBlock(String poolId, long blockId, DataChecksum checksum) throws IOException {
     writeBlock(new ExtendedBlock(poolId, blockId),
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, checksum);