Bladeren bron

HDFS-16457.Make fs.getspaceused.classname reconfigurable (#4069)

singer-bin 3 jaren geleden
bovenliggende
commit
807a428b55

+ 30 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_CLASSNAME;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_DEFAULT;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_DEFAULT;
@@ -87,6 +88,9 @@ import static org.apache.hadoop.util.Time.now;
 
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.DF;
+import org.apache.hadoop.fs.DU;
+import org.apache.hadoop.fs.GetSpaceUsed;
+import org.apache.hadoop.fs.WindowsGetSpaceUsed;
 import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService;
 
 import java.io.BufferedOutputStream;
@@ -349,7 +353,8 @@ public class DataNode extends ReconfigurableBase
               DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY,
               DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY,
               FS_DU_INTERVAL_KEY,
-              FS_GETSPACEUSED_JITTER_KEY));
+              FS_GETSPACEUSED_JITTER_KEY,
+              FS_GETSPACEUSED_CLASSNAME));
 
   public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
 
@@ -683,6 +688,7 @@ public class DataNode extends ReconfigurableBase
       return reconfSlowDiskParameters(property, newVal);
     case FS_DU_INTERVAL_KEY:
     case FS_GETSPACEUSED_JITTER_KEY:
+    case FS_GETSPACEUSED_CLASSNAME:
       return reconfDfsUsageParameters(property, newVal);
     default:
       break;
@@ -879,7 +885,7 @@ public class DataNode extends ReconfigurableBase
         for (FsVolumeImpl fsVolume : volumeList) {
           Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
           for (BlockPoolSlice value : blockPoolSlices.values()) {
-            value.updateDfsUsageConfig(interval, null);
+            value.updateDfsUsageConfig(interval, null, null);
           }
         }
       } else if (property.equals(FS_GETSPACEUSED_JITTER_KEY)) {
@@ -891,13 +897,33 @@ public class DataNode extends ReconfigurableBase
         for (FsVolumeImpl fsVolume : volumeList) {
           Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
           for (BlockPoolSlice value : blockPoolSlices.values()) {
-            value.updateDfsUsageConfig(null, jitter);
+            value.updateDfsUsageConfig(null, jitter, null);
+          }
+        }
+      } else if (property.equals(FS_GETSPACEUSED_CLASSNAME)) {
+        Preconditions.checkNotNull(data, "FsDatasetSpi has not been initialized.");
+        Class<? extends GetSpaceUsed> klass;
+        if (newVal == null) {
+          if (Shell.WINDOWS) {
+            klass = DU.class;
+          } else {
+            klass = WindowsGetSpaceUsed.class;
+          }
+        } else {
+          klass = Class.forName(newVal).asSubclass(GetSpaceUsed.class);
+        }
+        result = klass.getName();
+        List<FsVolumeImpl> volumeList = data.getVolumeList();
+        for (FsVolumeImpl fsVolume : volumeList) {
+          Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
+          for (BlockPoolSlice value : blockPoolSlices.values()) {
+            value.updateDfsUsageConfig(null, null, klass);
           }
         }
       }
       LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
       return result;
-    } catch (IllegalArgumentException | IOException e) {
+    } catch (IllegalArgumentException | IOException | ClassNotFoundException e) {
       throw new ReconfigurationException(property, newVal, getConf().get(property), e);
     }
   }

+ 7 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java

@@ -80,6 +80,7 @@ import org.apache.hadoop.classification.VisibleForTesting;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_CLASSNAME;
 
 /**
  * A block pool slice represents a portion of a block pool stored on a volume.
@@ -240,7 +241,8 @@ public class BlockPoolSlice {
         SHUTDOWN_HOOK_PRIORITY);
   }
 
-  public void updateDfsUsageConfig(Long interval, Long jitter) throws IOException {
+  public void updateDfsUsageConfig(Long interval, Long jitter, Class<? extends GetSpaceUsed> klass)
+          throws IOException {
     // Close the old dfsUsage if it is CachingGetSpaceUsed.
     if (dfsUsage instanceof CachingGetSpaceUsed) {
       ((CachingGetSpaceUsed) dfsUsage).close();
@@ -255,6 +257,10 @@ public class BlockPoolSlice {
           FS_GETSPACEUSED_JITTER_KEY + " should be larger than or equal to 0");
       config.setLong(FS_GETSPACEUSED_JITTER_KEY, jitter);
     }
+
+    if (klass != null) {
+      config.setClass(FS_GETSPACEUSED_CLASSNAME, klass, CachingGetSpaceUsed.class);
+    }
     // Start new dfsUsage.
     this.dfsUsage = new FSCachingGetSpaceUsed.Builder().setBpid(bpid)
         .setVolume(volume)

+ 31 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java

@@ -23,6 +23,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_DEFAULT;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_CLASSNAME;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
@@ -86,6 +87,7 @@ public class TestDataNodeReconfiguration {
   private final int NUM_NAME_NODE = 1;
   private final int NUM_DATA_NODE = 10;
   private MiniDFSCluster cluster;
+  private static long counter = 0;
 
   @Before
   public void Setup() throws IOException {
@@ -756,4 +758,33 @@ public class TestDataNodeReconfiguration {
       }
     }
   }
+
+  public static class DummyCachingGetSpaceUsed extends CachingGetSpaceUsed {
+    public DummyCachingGetSpaceUsed(Builder builder) throws IOException {
+      super(builder.setInterval(1000).setJitter(0L));
+    }
+
+    @Override
+    protected void refresh() {
+      counter++;
+    }
+  }
+
+  @Test
+  public void testDfsUsageKlass() throws ReconfigurationException, InterruptedException {
+
+    long lastCounter = counter;
+    Thread.sleep(5000);
+    assertEquals(lastCounter, counter);
+
+    for (int i = 0; i < NUM_DATA_NODE; i++) {
+      DataNode dn = cluster.getDataNodes().get(i);
+      dn.reconfigurePropertyImpl(FS_GETSPACEUSED_CLASSNAME,
+              DummyCachingGetSpaceUsed.class.getName());
+    }
+
+    lastCounter = counter;
+    Thread.sleep(5000);
+    assertTrue(counter > lastCounter);
+  }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java

@@ -339,7 +339,7 @@ public class TestDFSAdmin {
     final List<String> outs = Lists.newArrayList();
     final List<String> errs = Lists.newArrayList();
     getReconfigurableProperties("datanode", address, outs, errs);
-    assertEquals(18, outs.size());
+    assertEquals(19, outs.size());
     assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
   }