Jelajahi Sumber

HDFS-14061. Check if the cluster topology supports the EC policy before setting, enabling or adding it. Contributed by Kitti Nanasi.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
(cherry picked from commit 951cdd7e4cbe68284620f6805f85c51301150c58)
Kitti Nanasi 6 tahun lalu
induk
melakukan
0dbcb81a41

+ 35 - 24
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/ECTopologyVerifier.java

@@ -18,13 +18,15 @@ package org.apache.hadoop.hdfs.server.common;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.server.namenode.ECTopologyVerifierResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Class for verifying whether the cluster setup can support
@@ -43,58 +45,57 @@ public final class ECTopologyVerifier {
   private ECTopologyVerifier() {}
 
   /**
-   * Verifies whether the cluster setup can support all enabled EC policies.
+   * Verifies whether the cluster setup can support the given EC policies.
    *
    * @param report list of data node descriptors for all data nodes
-   * @param policies all system and user defined erasure coding policies
+   * @param policies erasure coding policies to verify
    * @return the status of the verification
    */
   public static ECTopologyVerifierResult getECTopologyVerifierResult(
-      final DatanodeInfo[] report, final ErasureCodingPolicyInfo[] policies) {
+      final DatanodeInfo[] report, final ErasureCodingPolicy... policies) {
     final int numOfRacks = getNumberOfRacks(report);
-    return getECTopologyVerifierResult(policies, numOfRacks, report.length);
+    return getECTopologyVerifierResult(numOfRacks, report.length, policies);
   }
 
   /**
    * Verifies whether the cluster setup can support all enabled EC policies.
    *
-   * @param policies all system and user defined erasure coding policies
+   * @param policies erasure coding policies to verify
    * @param numOfRacks number of racks
    * @param numOfDataNodes number of data nodes
    * @return the status of the verification
    */
   public static ECTopologyVerifierResult getECTopologyVerifierResult(
-      final ErasureCodingPolicyInfo[] policies, final int numOfRacks,
-      final int numOfDataNodes) {
+      final int numOfRacks, final int numOfDataNodes,
+      final ErasureCodingPolicy... policies) {
     int minDN = 0;
     int minRack = 0;
-    for (ErasureCodingPolicyInfo policy: policies) {
-      if (policy.isEnabled()) {
-        final int policyDN =
-            policy.getPolicy().getNumDataUnits() + policy.getPolicy()
-                .getNumParityUnits();
-        minDN = Math.max(minDN, policyDN);
-        final int policyRack = (int) Math.ceil(
-            policyDN / (double) policy.getPolicy().getNumParityUnits());
-        minRack = Math.max(minRack, policyRack);
-      }
+    for (ErasureCodingPolicy policy: policies) {
+      final int policyDN =
+          policy.getNumDataUnits() + policy
+              .getNumParityUnits();
+      minDN = Math.max(minDN, policyDN);
+      final int policyRack = (int) Math.ceil(
+          policyDN / (double) policy.getNumParityUnits());
+      minRack = Math.max(minRack, policyRack);
     }
     if (minDN == 0 || minRack == 0) {
-      String resultMessage = "No erasure coding policy is enabled.";
+      String resultMessage = "No erasure coding policy is given.";
       LOG.trace(resultMessage);
       return new ECTopologyVerifierResult(true, resultMessage);
     }
-    return verifyECWithTopology(minDN, minRack, numOfRacks, numOfDataNodes);
+    return verifyECWithTopology(minDN, minRack, numOfRacks, numOfDataNodes,
+        getReadablePolicies(policies));
   }
 
   private static ECTopologyVerifierResult verifyECWithTopology(
       final int minDN, final int minRack,
-      final int numOfRacks, final int numOfDataNodes) {
+      final int numOfRacks, final int numOfDataNodes, String readablePolicies) {
     String resultMessage;
     if (numOfDataNodes < minDN) {
       resultMessage = "The number of DataNodes (" + numOfDataNodes
           + ") is less than the minimum required number of DataNodes ("
-          + minDN + ") for enabled erasure coding policy.";
+          + minDN + ") for the erasure coding policies: " + readablePolicies;
       LOG.debug(resultMessage);
       return new ECTopologyVerifierResult(false, resultMessage);
     }
@@ -102,12 +103,14 @@ public final class ECTopologyVerifier {
     if (numOfRacks < minRack) {
       resultMessage = "The number of racks (" + numOfRacks
           + ") is less than the minimum required number of racks ("
-          + minRack + ") for enabled erasure coding policy.";
+          + minRack + ") for the erasure coding policies: "
+          + readablePolicies;
       LOG.debug(resultMessage);
       return new ECTopologyVerifierResult(false, resultMessage);
     }
     return new ECTopologyVerifierResult(true,
-        "The cluster setup can support all enabled EC policies");
+        "The cluster setup can support EC policies: "
+            + readablePolicies);
   }
 
   private static int getNumberOfRacks(DatanodeInfo[] report) {
@@ -121,4 +124,12 @@ public final class ECTopologyVerifier {
     }
     return racks.size();
   }
+
+  private static String getReadablePolicies(
+      final ErasureCodingPolicy... policies) {
+    return Arrays.asList(policies)
+            .stream()
+            .map(policyInfo -> policyInfo.getName())
+            .collect(Collectors.joining(", "));
+  }
 }

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingPolicyManager.java

@@ -213,10 +213,10 @@ public final class ErasureCodingPolicyManager {
         .toArray(new ErasureCodingPolicyInfo[0]);
   }
 
-  public ErasureCodingPolicyInfo[] getCopyOfPolicies() {
-    ErasureCodingPolicyInfo[] copy;
+  public ErasureCodingPolicy[] getCopyOfEnabledPolicies() {
+    ErasureCodingPolicy[] copy;
     synchronized (this) {
-      copy = Arrays.copyOf(allPolicies, allPolicies.length);
+      copy = Arrays.copyOf(enabledPolicies, enabledPolicies.length);
     }
     return copy;
   }

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -8198,11 +8198,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         .getNumOfDataNodes();
     int numOfRacks = getBlockManager().getDatanodeManager()
         .getNetworkTopology().getNumOfRacks();
-    ErasureCodingPolicyInfo[] ecPolicies =
-        getErasureCodingPolicyManager().getCopyOfPolicies();
+    ErasureCodingPolicy[] enabledEcPolicies =
+        getErasureCodingPolicyManager().getCopyOfEnabledPolicies();
     ECTopologyVerifierResult result =
-        ECTopologyVerifier.getECTopologyVerifierResult(ecPolicies,
-        numOfRacks, numOfDataNodes);
+        ECTopologyVerifier.getECTopologyVerifierResult(
+            numOfRacks, numOfDataNodes, enabledEcPolicies);
 
     Map<String, String> resultMap = new HashMap<String, String>();
     resultMap.put("isSupported", Boolean.toString(result.isSupported()));

+ 50 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/ECAdmin.java

@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.hdfs.tools;
 
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
@@ -536,6 +537,13 @@ public class ECAdmin extends Configured implements Tool {
         dfs.enableErasureCodingPolicy(ecPolicyName);
         System.out.println("Erasure coding policy " + ecPolicyName +
             " is enabled");
+        ECTopologyVerifierResult result =
+            getECTopologyVerifierResultForPolicy(dfs, ecPolicyName);
+        if (!result.isSupported()) {
+          System.err.println("Warning: The cluster setup does not support " +
+              "EC policy " + ecPolicyName + ". Reason: " +
+              result.getResultMessage());
+        }
       } catch (IOException e) {
         System.err.println(AdminHelper.prettifyException(e));
         return 2;
@@ -621,13 +629,7 @@ public class ECAdmin extends Configured implements Tool {
         return 1;
       }
       final DistributedFileSystem dfs = AdminHelper.getDFS(conf);
-      final ErasureCodingPolicyInfo[] policies =
-          dfs.getClient().getNamenode().getErasureCodingPolicies();
-      final DatanodeInfo[] report = dfs.getClient().getNamenode()
-          .getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
-
-      ECTopologyVerifierResult result = ECTopologyVerifier
-          .getECTopologyVerifierResult(report, policies);
+      ECTopologyVerifierResult result = getECTopologyVerifierResult(dfs);
       System.out.println(result.getResultMessage());
       if (result.isSupported()) {
         return 0;
@@ -636,6 +638,47 @@ public class ECAdmin extends Configured implements Tool {
     }
   }
 
+  private static ECTopologyVerifierResult getECTopologyVerifierResult(
+      final DistributedFileSystem dfs) throws IOException {
+    final ErasureCodingPolicyInfo[] policies =
+        dfs.getClient().getNamenode().getErasureCodingPolicies();
+    final DatanodeInfo[] report = dfs.getClient().getNamenode()
+        .getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
+
+    return ECTopologyVerifier.getECTopologyVerifierResult(report,
+        getEnabledPolicies(policies));
+  }
+
+  private static ECTopologyVerifierResult getECTopologyVerifierResultForPolicy(
+      final DistributedFileSystem dfs, final String policyName)
+      throws IOException {
+    final ErasureCodingPolicy policy =
+        getPolicy(dfs.getClient().getNamenode().getErasureCodingPolicies(),
+            policyName);
+    final DatanodeInfo[] report = dfs.getClient().getNamenode()
+        .getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
+    return ECTopologyVerifier.getECTopologyVerifierResult(report, policy);
+  }
+
+  private static ErasureCodingPolicy getPolicy(
+      final ErasureCodingPolicyInfo[] policies, final String policyName) {
+    for (ErasureCodingPolicyInfo policy : policies) {
+      if (policyName.equals(policy.getPolicy().getName())) {
+        return policy.getPolicy();
+      }
+    }
+    throw new HadoopIllegalArgumentException("The given erasure coding " +
+        "policy " + policyName + " does not exist.");
+  }
+
+  private static ErasureCodingPolicy[] getEnabledPolicies(
+      final ErasureCodingPolicyInfo[] policies) {
+    return Arrays.asList(policies).stream()
+        .filter(policyInfo -> policyInfo.isEnabled())
+        .map(ErasureCodingPolicyInfo::getPolicy)
+        .toArray(ErasureCodingPolicy[]::new);
+  }
+
   private static final AdminHelper.Command[] COMMANDS = {
       new ListECPoliciesCommand(),
       new AddECPoliciesCommand(),

+ 105 - 17
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestECAdmin.java

@@ -33,6 +33,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -46,7 +47,10 @@ public class TestECAdmin {
   private ECAdmin admin = new ECAdmin(conf);
 
   private final ByteArrayOutputStream out = new ByteArrayOutputStream();
+  private final ByteArrayOutputStream err = new ByteArrayOutputStream();
+
   private static final PrintStream OLD_OUT = System.out;
+  private static final PrintStream OLD_ERR = System.err;
 
   @Rule
   public Timeout globalTimeout = new Timeout(300000);
@@ -54,6 +58,7 @@ public class TestECAdmin {
   @Before
   public void setup() throws Exception {
     System.setOut(new PrintStream(out));
+    System.setErr(new PrintStream(err));
   }
 
   @After
@@ -62,8 +67,10 @@ public class TestECAdmin {
       System.out.flush();
       System.err.flush();
       out.reset();
+      err.reset();
     } finally {
       System.setOut(OLD_OUT);
+      System.setErr(OLD_ERR);
     }
 
     if (cluster != null) {
@@ -77,10 +84,13 @@ public class TestECAdmin {
     cluster = DFSTestUtil.setupCluster(conf, 6, 3, 0);
     String[] args = {"-verifyClusterSetup"};
     final int ret = admin.run(args);
-    LOG.info("Commend stdout: {}", out.toString());
-    assertEquals(2, ret);
-    assertTrue(out.toString()
+    LOG.info("Command stdout: {}", out.toString());
+    LOG.info("Command stderr: {}", err.toString());
+    assertEquals("Return value of the command is not successful", 2, ret);
+    assertTrue("Result of cluster topology verify " +
+        "should be logged correctly", out.toString()
         .contains("less than the minimum required number of DataNodes"));
+    assertTrue("Error output should be empty", err.toString().isEmpty());
   }
 
   @Test
@@ -91,10 +101,13 @@ public class TestECAdmin {
             .getByID(SystemErasureCodingPolicies.RS_10_4_POLICY_ID).getName());
     String[] args = {"-verifyClusterSetup"};
     final int ret = admin.run(args);
-    LOG.info("Commend stdout: {}", out.toString());
-    assertEquals(2, ret);
-    assertTrue(out.toString()
+    LOG.info("Command stdout: {}", out.toString());
+    LOG.info("Command stderr: {}", err.toString());
+    assertEquals("Return value of the command is not successful", 2, ret);
+    assertTrue("Result of cluster topology verify " +
+        "should be logged correctly", out.toString()
         .contains("less than the minimum required number of racks"));
+    assertTrue("Error output should be empty", err.toString().isEmpty());
   }
 
   @Test
@@ -108,10 +121,13 @@ public class TestECAdmin {
             .getByID(SystemErasureCodingPolicies.XOR_2_1_POLICY_ID).getName());
     String[] args = {"-verifyClusterSetup"};
     final int ret = admin.run(args);
-    LOG.info("Commend stdout: {}", out.toString());
-    assertEquals(2, ret);
-    assertTrue(out.toString()
+    LOG.info("Command stdout: {}", out.toString());
+    LOG.info("Command stderr: {}", err.toString());
+    assertEquals("Return value of the command is not successful", 2, ret);
+    assertTrue("Result of cluster topology verify " +
+        "should be logged correctly", out.toString()
         .contains("less than the minimum required number of racks"));
+    assertTrue("Error output should be empty", err.toString().isEmpty());
   }
 
   @Test
@@ -125,10 +141,13 @@ public class TestECAdmin {
             .getByID(SystemErasureCodingPolicies.RS_3_2_POLICY_ID).getName());
     String[] args = {"-verifyClusterSetup"};
     final int ret = admin.run(args);
-    LOG.info("Commend stdout: {}", out.toString());
-    assertEquals(2, ret);
-    assertTrue(out.toString()
+    LOG.info("Command stdout: {}", out.toString());
+    LOG.info("Command stderr: {}", err.toString());
+    assertEquals("Return value of the command is not successful", 2, ret);
+    assertTrue("Result of cluster topology verify " +
+        "should be logged correctly", out.toString()
         .contains("less than the minimum required number of racks"));
+    assertTrue("Error output should be empty", err.toString().isEmpty());
   }
 
   @Test
@@ -136,8 +155,13 @@ public class TestECAdmin {
     cluster = DFSTestUtil.setupCluster(conf, 9, 3, 0);
     String[] args = {"-verifyClusterSetup"};
     final int ret = admin.run(args);
-    LOG.info("Commend stdout: {}", out.toString());
-    assertEquals(0, ret);
+    LOG.info("Command stdout: {}", out.toString());
+    LOG.info("Command stderr: {}", err.toString());
+    assertEquals("Return value of the command is successful", 0, ret);
+    assertTrue("Result of cluster topology verify " +
+        "should be logged correctly", out.toString().contains(
+        "The cluster setup can support EC policies: RS-6-3-1024k"));
+    assertTrue("Error output should be empty", err.toString().isEmpty());
   }
 
   @Test
@@ -148,8 +172,72 @@ public class TestECAdmin {
             .getByID(SystemErasureCodingPolicies.RS_6_3_POLICY_ID).getName());
     String[] args = {"-verifyClusterSetup"};
     final int ret = admin.run(args);
-    LOG.info("Commend stdout: {}", out.toString());
-    assertEquals(0, ret);
-    assertTrue(out.toString().contains("No erasure coding policy is enabled"));
+    LOG.info("Command stdout: {}", out.toString());
+    LOG.info("Command stderr: {}", err.toString());
+    assertEquals("Return value of the command is successful", 0, ret);
+    assertTrue("Result of cluster topology verify " +
+            "should be logged correctly",
+        out.toString().contains("No erasure coding policy is given"));
+    assertTrue("Error output should be empty", err.toString().isEmpty());
+  }
+
+  @Test
+  public void testUnsuccessfulEnablePolicyMessage() throws Exception {
+    cluster = DFSTestUtil.setupCluster(conf, 5, 2, 0);
+    cluster.getFileSystem().disableErasureCodingPolicy(
+        SystemErasureCodingPolicies
+            .getByID(SystemErasureCodingPolicies.RS_6_3_POLICY_ID).getName());
+    String[] args = {"-enablePolicy", "-policy", "RS-3-2-1024k"};
+
+    final int ret = admin.run(args);
+    LOG.info("Command stdout: {}", out.toString());
+    LOG.info("Command stderr: {}", err.toString());
+    assertEquals("Return value of the command is successful", 0, ret);
+    assertTrue("Enabling policy should be logged", out.toString()
+        .contains("Erasure coding policy RS-3-2-1024k is enabled"));
+    assertTrue("Warning about cluster topology should be printed",
+        err.toString().contains("Warning: The cluster setup does not support " +
+        "EC policy RS-3-2-1024k. Reason:"));
+    assertTrue("Warning about cluster topology should be printed",
+        err.toString()
+            .contains("less than the minimum required number of racks"));
+  }
+
+  @Test
+  public void testSuccessfulEnablePolicyMessage() throws Exception {
+    cluster = DFSTestUtil.setupCluster(conf, 5, 3, 0);
+    cluster.getFileSystem().disableErasureCodingPolicy(
+        SystemErasureCodingPolicies
+            .getByID(SystemErasureCodingPolicies.RS_6_3_POLICY_ID).getName());
+    String[] args = {"-enablePolicy", "-policy", "RS-3-2-1024k"};
+
+    final int ret = admin.run(args);
+    LOG.info("Command stdout: {}", out.toString());
+    LOG.info("Command stderr: {}", err.toString());
+    assertEquals("Return value of the command is successful", 0, ret);
+    assertTrue("Enabling policy should be logged", out.toString()
+        .contains("Erasure coding policy RS-3-2-1024k is enabled"));
+    assertFalse("Warning about cluster topology should not be printed",
+        out.toString().contains("Warning: The cluster setup does not support"));
+    assertTrue("Error output should be empty", err.toString().isEmpty());
+  }
+
+  @Test
+  public void testEnableNonExistentPolicyMessage() throws Exception {
+    cluster = DFSTestUtil.setupCluster(conf, 5, 3, 0);
+    cluster.getFileSystem().disableErasureCodingPolicy(
+        SystemErasureCodingPolicies
+            .getByID(SystemErasureCodingPolicies.RS_6_3_POLICY_ID).getName());
+    String[] args = {"-enablePolicy", "-policy", "NonExistentPolicy"};
+
+    final int ret = admin.run(args);
+    LOG.info("Command stdout: {}", out.toString());
+    LOG.info("Command stderr: {}", err.toString());
+    assertEquals("Return value of the command is unsuccessful", 2, ret);
+    assertFalse("Enabling policy should not be logged when " +
+        "it was unsuccessful", out.toString().contains("is enabled"));
+    assertTrue("Error message should be printed",
+        err.toString().contains("RemoteException: The policy name " +
+            "NonExistentPolicy does not exist"));
   }
 }