Browse Source

HDFS-14795. Add Throttler for writing block. Contributed by Lisheng Sun.

Inigo Goiri 5 years ago
parent
commit
f580a87079

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -119,6 +119,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       "dfs.datanode.data.transfer.bandwidthPerSec";
   public static final long DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_DEFAULT =
       0; // A value of zero indicates no limit
+  public static final String DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY =
+      "dfs.datanode.data.write.bandwidthPerSec";
+  // A value of zero indicates no limit
+  public static final long DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_DEFAULT = 0;
   @Deprecated
   public static final String  DFS_DATANODE_READAHEAD_BYTES_KEY =
       HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY;

+ 38 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -46,6 +46,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEF
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY;
+import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY;
+import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
+import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -2500,8 +2503,8 @@ public class DataNode extends ReconfigurableBase
     final String clientname;
     final CachingStrategy cachingStrategy;
 
-    /** Throttle to block replication when data transfers. */
-    private DataTransferThrottler transferThrottler;
+    /** Throttle to block replication when data transfers or writes. */
+    private DataTransferThrottler throttler;
 
     /**
      * Connect to the first item in the target list.  Pass along the 
@@ -2529,14 +2532,10 @@ public class DataNode extends ReconfigurableBase
       this.clientname = clientname;
       this.cachingStrategy =
           new CachingStrategy(true, getDnConf().readaheadLength);
-      // 1. the stage is PIPELINE_SETUP_CREATE,that is moving blocks, set
-      // throttler.
-      // 2. the stage is PIPELINE_SETUP_APPEND_RECOVERY or
-      // PIPELINE_SETUP_STREAMING_RECOVERY,
-      // that is writing and recovering pipeline, don't set throttle.
-      if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE
-          && clientname.isEmpty()) {
-        this.transferThrottler = xserver.getTransferThrottler();
+      if (isTransfer(stage, clientname)) {
+        this.throttler = xserver.getTransferThrottler();
+      } else if(isWrite(stage)) {
+        this.throttler = xserver.getWriteThrottler();
       }
     }
 
@@ -2596,7 +2595,7 @@ public class DataNode extends ReconfigurableBase
             targetStorageIds);
 
         // send data & checksum
-        blockSender.sendBlock(out, unbufOut, transferThrottler);
+        blockSender.sendBlock(out, unbufOut, throttler);
 
         // no response necessary
         LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {}",
@@ -3739,4 +3738,32 @@ public class DataNode extends ReconfigurableBase
     }
     return this.diskBalancer;
   }
+
+  /**
+   * Construct DataTransfer in {@link DataNode#transferBlock}, the
+   * BlockConstructionStage is PIPELINE_SETUP_CREATE and clientName is "".
+   */
+  private static boolean isTransfer(BlockConstructionStage stage,
+      String clientName) {
+    if (stage == PIPELINE_SETUP_CREATE && clientName.isEmpty()) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Construct DataTransfer in
+   * {@link DataNode#transferReplicaForPipelineRecovery}.
+   *
+   * When recover pipeline, BlockConstructionStage is
+   * PIPELINE_SETUP_APPEND_RECOVERY,
+   * PIPELINE_SETUP_STREAMING_RECOVERY,PIPELINE_CLOSE_RECOVERY. If
+   * BlockConstructionStage is PIPELINE_CLOSE_RECOVERY, don't need transfer
+   * replica. So BlockConstructionStage is PIPELINE_SETUP_APPEND_RECOVERY,
+   * PIPELINE_SETUP_STREAMING_RECOVERY.
+   */
+  private static boolean isWrite(BlockConstructionStage stage) {
+    return (stage == PIPELINE_SETUP_STREAMING_RECOVERY
+        || stage == PIPELINE_SETUP_APPEND_RECOVERY);
+  }
 }

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -905,8 +905,8 @@ class DataXceiver extends Receiver implements Runnable {
       // receive the block and mirror to the next target
       if (blockReceiver != null) {
         String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
-        blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
-            mirrorAddr, null, targets, false);
+        blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, mirrorAddr,
+            dataXceiverServer.getWriteThrottler(), targets, false);
 
         // send close-ack for transfer-RBW/Finalized 
         if (isTransfer) {

+ 15 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java

@@ -171,6 +171,8 @@ class DataXceiverServer implements Runnable {
 
   private final DataTransferThrottler transferThrottler;
 
+  private final DataTransferThrottler writeThrottler;
+
   /**
    * Stores an estimate for block size to check if the disk partition has enough
    * space. Newer clients pass the expected block size to the DataNode. For
@@ -205,6 +207,15 @@ class DataXceiverServer implements Runnable {
     } else {
       this.transferThrottler = null;
     }
+
+    bandwidthPerSec = conf.getLongBytes(
+        DFSConfigKeys.DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY,
+        DFSConfigKeys.DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_DEFAULT);
+    if (bandwidthPerSec > 0) {
+      this.writeThrottler = new DataTransferThrottler(bandwidthPerSec);
+    } else {
+      this.writeThrottler = null;
+    }
   }
 
   @Override
@@ -458,6 +469,10 @@ class DataXceiverServer implements Runnable {
     return transferThrottler;
   }
 
+  public DataTransferThrottler getWriteThrottler() {
+    return writeThrottler;
+  }
+
   /**
    * Release a peer.
    *

+ 15 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -4145,9 +4145,21 @@
     <name>dfs.datanode.data.transfer.bandwidthPerSec</name>
     <value>0</value>
     <description>
-      Specifies the maximum amount of bandwidth that each datanode can utilize for the data transfering purpose in term
-      of the number of bytes per second.
-      when the bandwidth value is zero, there is no limit.
+      Specifies the maximum amount of bandwidth that the data transfering can utilize for transfering block when
+      BlockConstructionStage is
+      PIPELINE_SETUP_CREATE and clientName is empty.
+      When the bandwidth value is zero, there is no limit.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.datanode.data.write.bandwidthPerSec</name>
+    <value>0</value>
+    <description>
+      Specifies the maximum amount of bandwidth that the data transfering can utilize for writing block or pipeline
+      recovery when
+      BlockConstructionStage is PIPELINE_SETUP_APPEND_RECOVERY or PIPELINE_SETUP_STREAMING_RECOVERY.
+      When the bandwidth value is zero, there is no limit.
     </description>
   </property>
 

+ 12 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java

@@ -42,6 +42,8 @@ import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY;
+
 /** Test transferring RBW between datanodes */
 public class TestTransferRbw {
   private static final Logger LOG =
@@ -102,13 +104,22 @@ public class TestTransferRbw {
       final String bpid = cluster.getNamesystem().getBlockPoolId();
       {
         final DataNode oldnode = cluster.getDataNodes().get(0);
+        // DataXceiverServer#writeThrottler is null if
+        // dfs.datanode.data.write.bandwidthPerSec default value is 0.
+        Assert.assertNull(oldnode.xserver.getWriteThrottler());
         oldrbw = getRbw(oldnode, bpid);
         LOG.info("oldrbw = " + oldrbw);
         
         //add a datanode
+        conf.setLong(DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY,
+            1024 * 1024 * 8);
         cluster.startDataNodes(conf, 1, true, null, null);
         newnode = cluster.getDataNodes().get(REPLICATION);
-        
+        // DataXceiverServer#writeThrottler#balancer is equal to
+        // dfs.datanode.data.write.bandwidthPerSec value if
+        // dfs.datanode.data.write.bandwidthPerSec value is not zero.
+        Assert.assertEquals(1024 * 1024 * 8,
+            newnode.xserver.getWriteThrottler().getBandwidth());
         final DatanodeInfo oldnodeinfo;
         {
           final DatanodeInfo[] datatnodeinfos = cluster.getNameNodeRpc(