浏览代码

HDFS-16398. Reconfig block report parameters for datanode (#3831)

litao 3 年之前
父节点
当前提交
c2ff39006f

+ 30 - 17
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java

@@ -107,12 +107,12 @@ public class DNConf {
   final long heartBeatInterval;
   private final long lifelineIntervalMs;
   volatile long blockReportInterval;
-  final long blockReportSplitThreshold;
+  volatile long blockReportSplitThreshold;
   final boolean peerStatsEnabled;
   final boolean diskStatsEnabled;
   final long outliersReportIntervalMs;
   final long ibrInterval;
-  final long initialBlockReportDelayMs;
+  volatile long initialBlockReportDelayMs;
   volatile long cacheReportInterval;
   final long datanodeSlowIoWarningThresholdMs;
 
@@ -215,19 +215,7 @@ public class DNConf {
     this.datanodeSlowIoWarningThresholdMs = getConf().getLong(
         DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY,
         DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
-
-    long initBRDelay = getConf().getTimeDuration(
-        DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
-        DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT,
-        TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
-    if (initBRDelay >= blockReportInterval) {
-      initBRDelay = 0;
-      DataNode.LOG.info(DFS_BLOCKREPORT_INITIAL_DELAY_KEY + " is "
-          + "greater than or equal to" + DFS_BLOCKREPORT_INTERVAL_MSEC_KEY
-          + ".  Setting initial delay to 0 msec:");
-    }
-    initialBlockReportDelayMs = initBRDelay;
-    
+    initBlockReportDelay();
     heartBeatInterval = getConf().getTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY,
         DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS,
         TimeUnit.MILLISECONDS);
@@ -311,6 +299,19 @@ public class DNConf {
     );
   }
 
+  private void initBlockReportDelay() {
+    long initBRDelay = getConf().getTimeDuration(
+        DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
+        DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT, TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
+    if (initBRDelay >= blockReportInterval || initBRDelay < 0) {
+      initBRDelay = 0;
+      DataNode.LOG.info(DFS_BLOCKREPORT_INITIAL_DELAY_KEY +
+          " is greater than or equal to " + DFS_BLOCKREPORT_INTERVAL_MSEC_KEY +
+          ". Setting initial delay to 0 msec.");
+    }
+    initialBlockReportDelayMs = initBRDelay;
+  }
+
   // We get minimumNameNodeVersion via a method so it can be mocked out in tests.
   String getMinimumNameNodeVersion() {
     return this.minimumNameNodeVersion;
@@ -477,7 +478,8 @@ public class DNConf {
   }
 
   void setBlockReportInterval(long intervalMs) {
-    Preconditions.checkArgument(intervalMs > 0);
+    Preconditions.checkArgument(intervalMs > 0,
+        DFS_BLOCKREPORT_INTERVAL_MSEC_KEY + " should be larger than 0");
     blockReportInterval = intervalMs;
   }
 
@@ -487,11 +489,22 @@ public class DNConf {
 
   void setCacheReportInterval(long intervalMs) {
     Preconditions.checkArgument(intervalMs > 0,
-        "dfs.cachereport.intervalMsec should be larger than 0");
+        DFS_CACHEREPORT_INTERVAL_MSEC_KEY + " should be larger than 0");
     cacheReportInterval = intervalMs;
   }
 
   public long getCacheReportInterval() {
     return cacheReportInterval;
   }
+
+  void setBlockReportSplitThreshold(long threshold) {
+    Preconditions.checkArgument(threshold >= 0,
+        DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY + " should be larger than or equal to 0");
+    blockReportSplitThreshold = threshold;
+  }
+
+  void setInitBRDelayMs(String delayMs) {
+    dn.getConf().set(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, delayMs);
+    initBlockReportDelay();
+  }
 }

+ 50 - 34
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -18,10 +18,14 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING;
@@ -308,6 +312,8 @@ public class DataNode extends ReconfigurableBase
               DFS_DATANODE_DATA_DIR_KEY,
               DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
               DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
+              DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
+              DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
               DFS_DATANODE_MAX_RECEIVER_THREADS_KEY,
               DFS_CACHEREPORT_INTERVAL_MSEC_KEY));
 
@@ -620,39 +626,10 @@ public class DataNode extends ReconfigurableBase
       }
       break;
     }
-    case DFS_BLOCKREPORT_INTERVAL_MSEC_KEY: {
-      ReconfigurationException rootException = null;
-      try {
-        LOG.info("Reconfiguring {} to {}", property, newVal);
-        long intervalMs;
-        if (newVal == null) {
-          // Set to default.
-          intervalMs = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
-        } else {
-          intervalMs = Long.parseLong(newVal);
-        }
-        dnConf.setBlockReportInterval(intervalMs);
-        for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
-          if (bpos != null) {
-            for (BPServiceActor actor : bpos.getBPServiceActors()) {
-              actor.getScheduler().setBlockReportIntervalMs(intervalMs);
-            }
-          }
-        }
-        return Long.toString(intervalMs);
-      } catch (IllegalArgumentException e) {
-        rootException = new ReconfigurationException(
-            property, newVal, getConf().get(property), e);
-      } finally {
-        if (rootException != null) {
-          LOG.warn(String.format(
-              "Exception in updating block report interval %s to %s",
-              property, newVal), rootException);
-          throw rootException;
-        }
-      }
-      break;
-    }
+    case DFS_BLOCKREPORT_INTERVAL_MSEC_KEY:
+    case DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY:
+    case DFS_BLOCKREPORT_INITIAL_DELAY_KEY:
+      return reconfBlockReportParameters(property, newVal);
     case DFS_DATANODE_MAX_RECEIVER_THREADS_KEY:
       return reconfDataXceiverParameters(property, newVal);
     case DFS_CACHEREPORT_INTERVAL_MSEC_KEY:
@@ -698,6 +675,44 @@ public class DataNode extends ReconfigurableBase
     }
   }
 
+  private String reconfBlockReportParameters(String property, String newVal)
+      throws ReconfigurationException {
+    String result = null;
+    try {
+      LOG.info("Reconfiguring {} to {}", property, newVal);
+      if (property.equals(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY)) {
+        Preconditions.checkNotNull(dnConf, "DNConf has not been initialized.");
+        long intervalMs = newVal == null ? DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT :
+            Long.parseLong(newVal);
+        result = Long.toString(intervalMs);
+        dnConf.setBlockReportInterval(intervalMs);
+        for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
+          if (bpos != null) {
+            for (BPServiceActor actor : bpos.getBPServiceActors()) {
+              actor.getScheduler().setBlockReportIntervalMs(intervalMs);
+            }
+          }
+        }
+      } else if (property.equals(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY)) {
+        Preconditions.checkNotNull(dnConf, "DNConf has not been initialized.");
+        long threshold = newVal == null ? DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT :
+            Long.parseLong(newVal);
+        result = Long.toString(threshold);
+        dnConf.setBlockReportSplitThreshold(threshold);
+      } else if (property.equals(DFS_BLOCKREPORT_INITIAL_DELAY_KEY)) {
+        Preconditions.checkNotNull(dnConf, "DNConf has not been initialized.");
+        int initialDelay = newVal == null ? DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT :
+            Integer.parseInt(newVal);
+        result = Integer.toString(initialDelay);
+        dnConf.setInitBRDelayMs(result);
+      }
+      LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
+      return result;
+    } catch (IllegalArgumentException e) {
+      throw new ReconfigurationException(property, newVal, getConf().get(property), e);
+    }
+  }
+
   /**
    * Get a list of the keys of the re-configurable properties in configuration.
    */
@@ -3944,7 +3959,8 @@ public class DataNode extends ReconfigurableBase
     return blockPoolManager.isSlownode();
   }
 
-  BlockPoolManager getBlockPoolManager() {
+  @VisibleForTesting
+  public BlockPoolManager getBlockPoolManager() {
     return blockPoolManager;
   }
 }

+ 46 - 28
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java

@@ -18,8 +18,10 @@
 
 package org.apache.hadoop.hdfs.server.datanode;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
@@ -303,40 +305,49 @@ public class TestDataNodeReconfiguration {
 
   @Test
   public void testBlockReportIntervalReconfiguration()
-      throws ReconfigurationException, IOException {
+      throws ReconfigurationException {
     int blockReportInterval = 300 * 1000;
+    String[] blockReportParameters = {
+        DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
+        DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
+        DFS_BLOCKREPORT_INITIAL_DELAY_KEY};
+
     for (int i = 0; i < NUM_DATA_NODE; i++) {
       DataNode dn = cluster.getDataNodes().get(i);
+      BlockPoolManager blockPoolManager = dn.getBlockPoolManager();
 
       // Try invalid values.
+      for (String blockReportParameter : blockReportParameters) {
+        try {
+          dn.reconfigureProperty(blockReportParameter, "text");
+          fail("ReconfigurationException expected");
+        } catch (ReconfigurationException expected) {
+          assertTrue("expecting NumberFormatException",
+              expected.getCause() instanceof NumberFormatException);
+        }
+      }
+
       try {
-        dn.reconfigureProperty(
-            DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, "text");
+        dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, String.valueOf(-1));
         fail("ReconfigurationException expected");
       } catch (ReconfigurationException expected) {
-        assertTrue("expecting NumberFormatException",
-            expected.getCause() instanceof NumberFormatException);
+        assertTrue("expecting IllegalArgumentException",
+            expected.getCause() instanceof IllegalArgumentException);
       }
       try {
-        dn.reconfigureProperty(
-            DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
-            String.valueOf(-1));
+        dn.reconfigureProperty(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, String.valueOf(-1));
         fail("ReconfigurationException expected");
       } catch (ReconfigurationException expected) {
         assertTrue("expecting IllegalArgumentException",
             expected.getCause() instanceof IllegalArgumentException);
       }
+      dn.reconfigureProperty(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, String.valueOf(-1));
+      assertEquals(0, dn.getDnConf().initialBlockReportDelayMs);
 
-      // Change properties.
+      // Change properties and verify the change.
       dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
           String.valueOf(blockReportInterval));
-
-      // Verify change.
-      assertEquals(String.format("%s has wrong value",
-          DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
-          blockReportInterval,
-          dn.getDnConf().getBlockReportInterval());
-      for (BPOfferService bpos : dn.getAllBpOs()) {
+      for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
         if (bpos != null) {
           for (BPServiceActor actor : bpos.getBPServiceActors()) {
             assertEquals(String.format("%s has wrong value",
@@ -347,15 +358,15 @@ public class TestDataNodeReconfiguration {
         }
       }
 
-      // Revert to default.
-      dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
-          null);
-      assertEquals(String.format("%s has wrong value",
-          DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
-          DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT,
-          dn.getDnConf().getBlockReportInterval());
-      // Verify default.
-      for (BPOfferService bpos : dn.getAllBpOs()) {
+      dn.reconfigureProperty(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, String.valueOf(123));
+      assertEquals(123, dn.getDnConf().blockReportSplitThreshold);
+
+      dn.reconfigureProperty(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, "123");
+      assertEquals(123000, dn.getDnConf().initialBlockReportDelayMs);
+
+      // Revert to default and verify default.
+      dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, null);
+      for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
         if (bpos != null) {
           for (BPServiceActor actor : bpos.getBPServiceActors()) {
             assertEquals(String.format("%s has wrong value",
@@ -365,9 +376,16 @@ public class TestDataNodeReconfiguration {
           }
         }
       }
-      assertEquals(String.format("expect %s is not configured",
-          DFS_BLOCKREPORT_INTERVAL_MSEC_KEY), null, dn
-          .getConf().get(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY));
+      assertNull(String.format("expect %s is not configured", DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
+          dn.getConf().get(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY));
+
+      dn.reconfigureProperty(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, null);
+      assertNull(String.format("expect %s is not configured", DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY),
+          dn.getConf().get(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY));
+
+      dn.reconfigureProperty(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, null);
+      assertNull(String.format("expect %s is not configured", DFS_BLOCKREPORT_INITIAL_DELAY_KEY),
+          dn.getConf().get(DFS_BLOCKREPORT_INITIAL_DELAY_KEY));
     }
   }
 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java

@@ -338,7 +338,7 @@ public class TestDFSAdmin {
     final List<String> outs = Lists.newArrayList();
     final List<String> errs = Lists.newArrayList();
     getReconfigurableProperties("datanode", address, outs, errs);
-    assertEquals(6, outs.size());
+    assertEquals(8, outs.size());
     assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
   }