Explorar o código

HDFS-15120. Refresh BlockPlacementPolicy at runtime. Contributed by Jinglun.

Ayush Saxena %!s(int64=5) %!d(string=hai) anos
pai
achega
209630472a

+ 9 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -438,7 +438,7 @@ public class BlockManager implements BlockStatsMXBean {
   private double reconstructionQueuesInitProgress = 0.0;
 
   /** for block replicas placement */
-  private BlockPlacementPolicies placementPolicies;
+  private volatile BlockPlacementPolicies placementPolicies;
   private final BlockStoragePolicySuite storagePolicySuite;
 
   /** Check whether name system is running before terminating */
@@ -775,6 +775,14 @@ public class BlockManager implements BlockStatsMXBean {
     return placementPolicies.getPolicy(CONTIGUOUS);
   }
 
+  public void refreshBlockPlacementPolicy(Configuration conf) {
+    BlockPlacementPolicies bpp =
+        new BlockPlacementPolicies(conf, datanodeManager.getFSClusterStats(),
+            datanodeManager.getNetworkTopology(),
+            datanodeManager.getHost2DatanodeMap());
+    placementPolicies = bpp;
+  }
+
   /** Dump meta data to out. */
   public void metaSave(PrintWriter out) {
     assert namesystem.hasReadLock(); // TODO: block manager read lock and NS write lock

+ 14 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -184,6 +184,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_GC_TIME_MONITOR_
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_GC_TIME_MONITOR_OBSERVATION_WINDOW_MS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_GC_TIME_MONITOR_ENABLE;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_GC_TIME_MONITOR_ENABLE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY;
 
 import static org.apache.hadoop.util.ExitUtil.terminate;
 import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
@@ -322,7 +324,9 @@ public class NameNode extends ReconfigurableBase implements
           DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
           DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
           DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
-          DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION));
+          DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
+          DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+          DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY));
 
   private static final String USAGE = "Usage: hdfs namenode ["
       + StartupOption.BACKUP.getName() + "] | \n\t["
@@ -2179,6 +2183,10 @@ public class NameNode extends ReconfigurableBase implements
         || property.equals(
             DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)) {
       return reconfReplicationParameters(newVal, property);
+    } else if (property.equals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY) || property
+        .equals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY)) {
+      reconfBlockPlacementPolicy();
+      return newVal;
     } else {
       throw new ReconfigurationException(property, newVal, getConf().get(
           property));
@@ -2223,6 +2231,11 @@ public class NameNode extends ReconfigurableBase implements
     }
   }
 
+  private void reconfBlockPlacementPolicy() {
+    getNamesystem().getBlockManager()
+        .refreshBlockPlacementPolicy(getNewConf());
+  }
+
   private int adjustNewVal(int defaultVal, String newVal) {
     if (newVal == null) {
       return defaultVal;

+ 131 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRefreshBlockPlacementPolicy.java

@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.AddBlockFlag;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.net.Node;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test refresh block placement policy.
+ */
+public class TestRefreshBlockPlacementPolicy {
+  private MiniDFSCluster cluster;
+  private Configuration config;
+  private static int counter = 0;
+  static class MockBlockPlacementPolicy extends BlockPlacementPolicyDefault {
+    @Override
+    public DatanodeStorageInfo[] chooseTarget(String srcPath,
+        int numOfReplicas,
+        Node writer,
+        List<DatanodeStorageInfo> chosen,
+        boolean returnChosenNodes,
+        Set<Node> excludedNodes,
+        long blocksize,
+        BlockStoragePolicy storagePolicy,
+        EnumSet<AddBlockFlag> flags) {
+      counter++;
+      return super.chooseTarget(srcPath, numOfReplicas, writer, chosen,
+          returnChosenNodes, excludedNodes, blocksize, storagePolicy, flags);
+    }
+  }
+
+  @Before
+  public void setup() throws IOException {
+    config = new Configuration();
+    config.setClass(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+        MockBlockPlacementPolicy.class, BlockPlacementPolicy.class);
+    config.setClass(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY,
+        MockBlockPlacementPolicy.class, BlockPlacementPolicy.class);
+    cluster = new MiniDFSCluster.Builder(config).numDataNodes(9).build();
+    cluster.waitActive();
+  }
+
+  @After
+  public void cleanup() throws IOException {
+    cluster.shutdown();
+  }
+
+  @Test
+  public void testRefreshReplicationPolicy() throws Exception {
+    Path file = new Path("/test-file");
+    DistributedFileSystem dfs = cluster.getFileSystem();
+
+    verifyRefreshPolicy(dfs, file, () -> cluster.getNameNode()
+        .reconfigurePropertyImpl(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, null));
+  }
+
+  @Test
+  public void testRefreshEcPolicy() throws Exception {
+    Path ecDir = new Path("/ec");
+    Path file = new Path("/ec/test-file");
+    DistributedFileSystem dfs = cluster.getFileSystem();
+    dfs.mkdir(ecDir, FsPermission.createImmutable((short)755));
+    dfs.setErasureCodingPolicy(ecDir, null);
+
+    verifyRefreshPolicy(dfs, file, () -> cluster.getNameNode()
+        .reconfigurePropertyImpl(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, null));
+  }
+
+  @FunctionalInterface
+  private interface Refresh {
+    void refresh() throws ReconfigurationException;
+  }
+
+  private void verifyRefreshPolicy(DistributedFileSystem dfs, Path file,
+      Refresh func) throws IOException, ReconfigurationException {
+    // Choose datanode using the mock policy.
+    int lastCounter = counter;
+    OutputStream out = dfs.create(file, true);
+    out.write("test".getBytes());
+    out.close();
+    assert(counter > lastCounter);
+
+    // Refresh to the default policy.
+    func.refresh();
+
+    lastCounter = counter;
+    dfs.delete(file, true);
+    out = dfs.create(file, true);
+    out.write("test".getBytes());
+    out.close();
+    assertEquals(lastCounter, counter);
+  }
+}

+ 7 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java

@@ -22,6 +22,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
 
 import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
@@ -419,9 +421,11 @@ public class TestDFSAdmin {
     final List<String> outs = Lists.newArrayList();
     final List<String> errs = Lists.newArrayList();
     getReconfigurableProperties("namenode", address, outs, errs);
-    assertEquals(10, outs.size());
-    assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(1));
-    assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(2));
+    assertEquals(12, outs.size());
+    assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(1));
+    assertEquals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, outs.get(2));
+    assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(3));
+    assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(4));
     assertEquals(errs.size(), 0);
   }