Bläddra i källkod

HDFS-12567. BlockPlacementPolicyRackFaultTolerant fails with racks with very few nodes.

(cherry picked from commit 644c2f6924f341f51d809c91dccfff88fc82f6f0)
Andrew Wang 7 år sedan
förälder
incheckning
e159272003

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java

@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
  */
 @InterfaceAudience.Private
 public abstract class BlockPlacementPolicy {
-  static final Logger LOG = LoggerFactory.getLogger(
+  public static final Logger LOG = LoggerFactory.getLogger(
       BlockPlacementPolicy.class);
 
   @InterfaceAudience.Private

+ 38 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java

@@ -46,9 +46,12 @@ public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyD
     if (numOfRacks == 1 || totalNumOfReplicas <= 1) {
       return new int[] {numOfReplicas, totalNumOfReplicas};
     }
-    if(totalNumOfReplicas<numOfRacks){
+    // If more racks than replicas, put one replica per rack.
+    if (totalNumOfReplicas < numOfRacks) {
       return new int[] {numOfReplicas, 1};
     }
+    // If more replicas than racks, evenly spread the replicas.
+    // This calculation rounds up.
     int maxNodesPerRack = (totalNumOfReplicas - 1) / numOfRacks + 1;
     return new int[] {numOfReplicas, maxNodesPerRack};
   }
@@ -109,18 +112,42 @@ public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyD
     numOfReplicas = Math.min(totalReplicaExpected - results.size(),
         (maxNodesPerRack -1) * numOfRacks - (results.size() - excess));
 
-    // Fill each rack exactly (maxNodesPerRack-1) replicas.
-    writer = chooseOnce(numOfReplicas, writer, new HashSet<>(excludedNodes),
-        blocksize, maxNodesPerRack -1, results, avoidStaleNodes, storageTypes);
+    try {
+      // Try to spread the replicas as evenly as possible across racks.
+      // This is done by first placing with (maxNodesPerRack-1), then spreading
+      // the remainder by calling again with maxNodesPerRack.
+      writer = chooseOnce(numOfReplicas, writer, new HashSet<>(excludedNodes),
+          blocksize, maxNodesPerRack - 1, results, avoidStaleNodes,
+          storageTypes);
 
-    for (DatanodeStorageInfo resultStorage : results) {
-      addToExcludedNodes(resultStorage.getDatanodeDescriptor(), excludedNodes);
-    }
+      // Exclude the chosen nodes
+      for (DatanodeStorageInfo resultStorage : results) {
+        addToExcludedNodes(resultStorage.getDatanodeDescriptor(),
+            excludedNodes);
+      }
+      LOG.trace("Chosen nodes: {}", results);
+      LOG.trace("Excluded nodes: {}", excludedNodes);
 
-    // For some racks, place one more replica to each one of them.
-    numOfReplicas = totalReplicaExpected - results.size();
-    chooseOnce(numOfReplicas, writer, excludedNodes, blocksize,
-        maxNodesPerRack, results, avoidStaleNodes, storageTypes);
+      numOfReplicas = totalReplicaExpected - results.size();
+      chooseOnce(numOfReplicas, writer, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes, storageTypes);
+    } catch (NotEnoughReplicasException e) {
+      LOG.debug("Only able to place {} of {} (maxNodesPerRack={}) nodes " +
+              "evenly across racks, falling back to uneven placement.",
+          results.size(), numOfReplicas, maxNodesPerRack);
+      LOG.debug("Caught exception was:", e);
+      // Exclude the chosen nodes
+      for (DatanodeStorageInfo resultStorage : results) {
+        addToExcludedNodes(resultStorage.getDatanodeDescriptor(),
+            excludedNodes);
+      }
+
+      LOG.trace("Chosen nodes: {}", results);
+      LOG.trace("Excluded nodes: {}", excludedNodes);
+      numOfReplicas = totalReplicaExpected - results.size();
+      chooseOnce(numOfReplicas, writer, excludedNodes, blocksize,
+          totalReplicaExpected, results, avoidStaleNodes, storageTypes);
+    }
 
     return writer;
   }

+ 107 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingMultipleRacks.java

@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+/**
+ * Test erasure coding block placement with skewed # nodes per rack.
+ */
+public class TestErasureCodingMultipleRacks {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestErasureCodingMultipleRacks.class);
+
+  static {
+    GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.DEBUG);
+    GenericTestUtils.setLogLevel(BlockPlacementPolicyDefault.LOG, Level.DEBUG);
+    GenericTestUtils.setLogLevel(BlockPlacementPolicyRackFaultTolerant.LOG,
+        Level.DEBUG);
+    GenericTestUtils.setLogLevel(NetworkTopology.LOG, Level.DEBUG);
+  }
+
+  @Rule
+  public Timeout globalTimeout = new Timeout(300000);
+
+  public ErasureCodingPolicy getPolicy() {
+    return StripedFileTestUtil.getDefaultECPolicy();
+  }
+
+  private MiniDFSCluster cluster;
+  private ErasureCodingPolicy ecPolicy;
+  private Configuration conf;
+  private DistributedFileSystem dfs;
+
+  @Before
+  public void setup() throws Exception {
+    ecPolicy = getPolicy();
+    final int dataUnits = ecPolicy.getNumDataUnits();
+    final int parityUnits = ecPolicy.getNumParityUnits();
+    final int numDatanodes = dataUnits + parityUnits;
+    final int numRacks = 2;
+    final String[] racks = new String[numDatanodes];
+    for (int i = 0; i < numRacks; i++) {
+      racks[i] = "/rack" + i;
+    }
+    for (int i = numRacks; i < numDatanodes; i++) {
+      racks[i] = "/rack" + (numRacks - 1);
+    }
+    conf = new HdfsConfiguration();
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numDatanodes)
+        .racks(racks)
+        .build();
+    dfs = cluster.getFileSystem();
+    cluster.waitActive();
+    dfs.setErasureCodingPolicy(new Path("/"), ecPolicy.getName());
+  }
+
+  @After
+  public void teardown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testSkewedRack() throws Exception {
+    final int filesize = ecPolicy.getNumDataUnits() * ecPolicy
+        .getCellSize();
+    byte[] contents = new byte[filesize];
+
+    for (int i = 0; i < 10; i++) {
+      final Path path = new Path("/testfile" + i);
+      LOG.info("Writing file " + path);
+      DFSTestUtil.writeFile(dfs, path, contents);
+    }
+  }
+}