Browse Source

HDFS-17113. Reconfig transfer and write bandwidth for datanode. (#5869)

WangYuanben 1 year ago
parent
commit
04c34a2171

+ 41 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -36,6 +36,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY;
@@ -362,7 +366,9 @@ public class DataNode extends ReconfigurableBase
               FS_GETSPACEUSED_JITTER_KEY,
               FS_GETSPACEUSED_CLASSNAME,
               DFS_DISK_BALANCER_ENABLED,
-              DFS_DISK_BALANCER_PLAN_VALID_INTERVAL));
+              DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
+              DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY,
+              DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY));
 
   public static final String METRICS_LOG_NAME = "DataNodeMetricsLog";
 
@@ -694,6 +700,8 @@ public class DataNode extends ReconfigurableBase
     case DFS_BLOCKREPORT_INITIAL_DELAY_KEY:
       return reconfBlockReportParameters(property, newVal);
     case DFS_DATANODE_MAX_RECEIVER_THREADS_KEY:
+    case DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY:
+    case DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY:
       return reconfDataXceiverParameters(property, newVal);
     case DFS_CACHEREPORT_INTERVAL_MSEC_KEY:
       return reconfCacheReportParameters(property, newVal);
@@ -724,14 +732,40 @@ public class DataNode extends ReconfigurableBase
 
   private String reconfDataXceiverParameters(String property, String newVal)
       throws ReconfigurationException {
-    String result;
+    String result = null;
     try {
       LOG.info("Reconfiguring {} to {}", property, newVal);
-      Preconditions.checkNotNull(getXferServer(), "DataXceiverServer has not been initialized.");
-      int threads = (newVal == null ? DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT :
-          Integer.parseInt(newVal));
-      result = Integer.toString(threads);
-      getXferServer().setMaxXceiverCount(threads);
+      if (property.equals(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY)) {
+        Preconditions.checkNotNull(getXferServer(), "DataXceiverServer has not been initialized.");
+        int threads = (newVal == null ? DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT :
+            Integer.parseInt(newVal));
+        result = Integer.toString(threads);
+        getXferServer().setMaxXceiverCount(threads);
+      } else if (property.equals(DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY)) {
+        Preconditions.checkNotNull(getXferServer(), "DataXceiverServer has not been initialized.");
+        long bandwidthPerSec = (newVal == null ?
+            DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_DEFAULT : Long.parseLong(newVal));
+        DataTransferThrottler transferThrottler = null;
+        if (bandwidthPerSec > 0) {
+          transferThrottler = new DataTransferThrottler(bandwidthPerSec);
+        } else {
+          bandwidthPerSec = 0;
+        }
+        result = Long.toString(bandwidthPerSec);
+        getXferServer().setTransferThrottler(transferThrottler);
+      } else if (property.equals(DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY)) {
+        Preconditions.checkNotNull(getXferServer(), "DataXceiverServer has not been initialized.");
+        long bandwidthPerSec = (newVal == null ? DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_DEFAULT :
+            Long.parseLong(newVal));
+        DataTransferThrottler writeThrottler = null;
+        if (bandwidthPerSec > 0) {
+          writeThrottler = new DataTransferThrottler(bandwidthPerSec);
+        } else {
+          bandwidthPerSec = 0;
+        }
+        result = Long.toString(bandwidthPerSec);
+        getXferServer().setWriteThrottler(writeThrottler);
+      }
       LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
       return result;
     } catch (IllegalArgumentException e) {

+ 13 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java

@@ -168,9 +168,9 @@ class DataXceiverServer implements Runnable {
 
   final BlockBalanceThrottler balanceThrottler;
 
-  private final DataTransferThrottler transferThrottler;
+  private volatile DataTransferThrottler transferThrottler;
 
-  private final DataTransferThrottler writeThrottler;
+  private volatile DataTransferThrottler writeThrottler;
 
   /**
    * Stores an estimate for block size to check if the disk partition has enough
@@ -200,7 +200,10 @@ class DataXceiverServer implements Runnable {
             DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT),
         conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
             DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT));
+    initBandwidthPerSec(conf);
+  }
 
+  private void initBandwidthPerSec(Configuration conf) {
     long bandwidthPerSec = conf.getLongBytes(
         DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY,
         DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_DEFAULT);
@@ -524,4 +527,12 @@ class DataXceiverServer implements Runnable {
   public int getMaxXceiverCount() {
     return maxXceiverCount;
   }
+
+  public void setTransferThrottler(DataTransferThrottler transferThrottler) {
+    this.transferThrottler = transferThrottler;
+  }
+
+  public void setWriteThrottler(DataTransferThrottler writeThrottler) {
+    this.writeThrottler = writeThrottler;
+  }
 }

+ 44 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java

@@ -31,6 +31,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_DEFAULT;
@@ -443,20 +445,61 @@ public class TestDataNodeReconfiguration {
         assertTrue("expecting IllegalArgumentException",
             expected.getCause() instanceof IllegalArgumentException);
       }
+      try {
+        dn.reconfigureProperty(DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY, "text");
+        fail("ReconfigurationException expected");
+      } catch (ReconfigurationException expected) {
+        assertTrue("expecting NumberFormatException",
+            expected.getCause() instanceof NumberFormatException);
+      }
+      try {
+        dn.reconfigureProperty(DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY, "text");
+        fail("ReconfigurationException expected");
+      } catch (ReconfigurationException expected) {
+        assertTrue("expecting NumberFormatException",
+            expected.getCause() instanceof NumberFormatException);
+      }
 
       // Change properties and verify change.
       dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, String.valueOf(123));
       assertEquals(String.format("%s has wrong value", DFS_DATANODE_MAX_RECEIVER_THREADS_KEY),
           123, dn.getXferServer().getMaxXceiverCount());
 
+      dn.reconfigureProperty(DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY,
+          String.valueOf(1000));
+      assertEquals(String.format("%s has wrong value",
+              DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY),
+          1000, dn.getXferServer().getTransferThrottler().getBandwidth());
+
+      dn.reconfigureProperty(DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY,
+          String.valueOf(1000));
+      assertEquals(String.format("%s has wrong value",
+              DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY),
+          1000, dn.getXferServer().getWriteThrottler().getBandwidth());
+
       // Revert to default.
       dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, null);
       assertEquals(String.format("%s has wrong value", DFS_DATANODE_MAX_RECEIVER_THREADS_KEY),
           DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT, dn.getXferServer().getMaxXceiverCount());
-
       assertNull(String.format("expect %s is not configured",
           DFS_DATANODE_MAX_RECEIVER_THREADS_KEY),
           dn.getConf().get(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY));
+
+      dn.reconfigureProperty(DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY, null);
+      assertEquals(String.format("%s has wrong value",
+              DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY),
+          null, dn.getXferServer().getTransferThrottler());
+      assertNull(String.format("expect %s is not configured",
+              DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY),
+          dn.getConf().get(DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY));
+
+      dn.reconfigureProperty(DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY, null);
+      assertEquals(String.format("%s has wrong value",
+              DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY),
+          null, dn.getXferServer().getWriteThrottler());
+      assertNull(String.format("expect %s is not configured",
+              DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY),
+          dn.getConf().get(DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY));
     }
   }
 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java

@@ -346,7 +346,7 @@ public class TestDFSAdmin {
     final List<String> outs = Lists.newArrayList();
     final List<String> errs = Lists.newArrayList();
     getReconfigurableProperties("datanode", address, outs, errs);
-    assertEquals(22, outs.size());
+    assertEquals(24, outs.size());
     assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
   }