浏览代码

YARN-1639. Modified RM HA configuration handling to have a way of not requiring separate configuration files for each RM. Contributed by Xuan Gong.
svn merge --ignore-ancestry -c 1564032 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1564033 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli 11 年之前
父节点
当前提交
bd84052ff6

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -80,6 +80,9 @@ Release 2.4.0 - UNRELEASED
     YARN-1617. Remove ancient comment and surround LOG.debug in
     YARN-1617. Remove ancient comment and surround LOG.debug in
     AppSchedulingInfo.allocate (Sandy Ryza)
     AppSchedulingInfo.allocate (Sandy Ryza)
 
 
+    YARN-1639. Modified RM HA configuration handling to have a way of not
+    requiring separate configuration files for each RM. (Xuan Gong via vinodkv)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
   BUG FIXES
   BUG FIXES

+ 43 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java

@@ -21,10 +21,13 @@ package org.apache.hadoop.yarn.conf;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
 
+import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.Collection;
 
 
 @InterfaceAudience.Private
 @InterfaceAudience.Private
@@ -108,8 +111,7 @@ public class HAUtil {
           String errmsg = iae.getMessage();
           String errmsg = iae.getMessage();
           if (confKey == null) {
           if (confKey == null) {
             // Error at addSuffix
             // Error at addSuffix
-            errmsg = getInvalidValueMessage(YarnConfiguration.RM_HA_ID,
-              getRMHAId(conf));
+            errmsg = getInvalidValueMessage(YarnConfiguration.RM_HA_ID, id);
           }
           }
           throwBadConfigurationException(errmsg);
           throwBadConfigurationException(errmsg);
         }
         }
@@ -122,10 +124,18 @@ public class HAUtil {
   }
   }
 
 
   private static void verifyAndSetCurrentRMHAId(Configuration conf) {
   private static void verifyAndSetCurrentRMHAId(Configuration conf) {
-    String rmId = conf.getTrimmed(YarnConfiguration.RM_HA_ID);
+    String rmId = getRMHAId(conf);
     if (rmId == null) {
     if (rmId == null) {
-      throwBadConfigurationException(
-        getNeedToSetValueMessage(YarnConfiguration.RM_HA_ID));
+      StringBuilder msg = new StringBuilder();
+      msg.append("Can not find valid RM_HA_ID. None of ");
+      for (String id : conf
+          .getTrimmedStringCollection(YarnConfiguration.RM_HA_IDS)) {
+        msg.append(addSuffix(YarnConfiguration.RM_ADDRESS, id) + " ");
+      }
+      msg.append(" are matching" +
+          " the local address OR " + YarnConfiguration.RM_HA_ID + " is not" +
+          " specified in HA Configuration");
+      throwBadConfigurationException(msg.toString());
     } else {
     } else {
       Collection<String> ids = getRMHAIds(conf);
       Collection<String> ids = getRMHAIds(conf);
       if (!ids.contains(rmId)) {
       if (!ids.contains(rmId)) {
@@ -179,7 +189,34 @@ public class HAUtil {
    * @return RM Id on success
    * @return RM Id on success
    */
    */
   public static String getRMHAId(Configuration conf) {
   public static String getRMHAId(Configuration conf) {
-    return conf.get(YarnConfiguration.RM_HA_ID);
+    int found = 0;
+    String currentRMId = conf.getTrimmed(YarnConfiguration.RM_HA_ID);
+    if(currentRMId == null) {
+      for(String rmId : getRMHAIds(conf)) {
+        String key = addSuffix(YarnConfiguration.RM_ADDRESS, rmId);
+        String addr = conf.get(key);
+        if (addr == null) {
+          continue;
+        }
+        InetSocketAddress s;
+        try {
+          s = NetUtils.createSocketAddr(addr);
+        } catch (Exception e) {
+          LOG.warn("Exception in creating socket address " + addr, e);
+          continue;
+        }
+        if (!s.isUnresolved() && NetUtils.isLocalAddress(s.getAddress())) {
+          currentRMId = rmId.trim();
+          found++;
+        }
+      }
+    }
+    if (found > 1) { // Only one address must match the local address
+      String msg = "The HA Configuration has multiple addresses that match "
+          + "local node's address.";
+      throw new HadoopIllegalArgumentException(msg);
+    }
+    return currentRMId;
   }
   }
 
 
   @VisibleForTesting
   @VisibleForTesting

+ 7 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -395,7 +395,9 @@
       the Active mode when prompted to.
       the Active mode when prompted to.
       (2) The nodes in the RM ensemble are listed in
       (2) The nodes in the RM ensemble are listed in
       yarn.resourcemanager.ha.rm-ids
       yarn.resourcemanager.ha.rm-ids
-      (3) The id of each RM comes from yarn.resourcemanager.ha.id
+      (3) The id of each RM either comes from yarn.resourcemanager.ha.id
+      if yarn.resourcemanager.ha.id is explicitly specified or can be
+      figured out by matching yarn.resourcemanager.address.{id} with local address
       (4) The actual physical addresses come from the configs of the pattern
       (4) The actual physical addresses come from the configs of the pattern
       - {rpc-config}.{id}</description>
       - {rpc-config}.{id}</description>
     <name>yarn.resourcemanager.ha.enabled</name>
     <name>yarn.resourcemanager.ha.enabled</name>
@@ -442,7 +444,10 @@
 
 
   <property>
   <property>
     <description>The id (string) of the current RM. When HA is enabled, this
     <description>The id (string) of the current RM. When HA is enabled, this
-      is a required config. See description of yarn.resourcemanager.ha.enabled
+      is an optional config. The id of current RM can be set by explicitly
+      specifying yarn.resourcemanager.ha.id or figured out by matching
+      yarn.resourcemanager.address.{id} with local address
+      See description of yarn.resourcemanager.ha.enabled
       for full details on how this is used.</description>
       for full details on how this is used.</description>
     <name>yarn.resourcemanager.ha.id</name>
     <name>yarn.resourcemanager.ha.id</name>
     <!--value>rm1</value-->
     <!--value>rm1</value-->

+ 38 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java

@@ -36,6 +36,8 @@ import org.junit.Test;
 
 
 import java.io.IOException;
 import java.io.IOException;
 
 
+import junit.framework.Assert;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertTrue;
@@ -48,12 +50,15 @@ public class TestRMHA {
   private static final String STATE_ERR =
   private static final String STATE_ERR =
       "ResourceManager is in wrong HA state";
       "ResourceManager is in wrong HA state";
 
 
-  private static final String RM1_ADDRESS = "0.0.0.0:0";
+  private static final String RM1_ADDRESS = "1.1.1.1:1";
   private static final String RM1_NODE_ID = "rm1";
   private static final String RM1_NODE_ID = "rm1";
 
 
-  private static final String RM2_ADDRESS = "1.1.1.1:1";
+  private static final String RM2_ADDRESS = "0.0.0.0:0";
   private static final String RM2_NODE_ID = "rm2";
   private static final String RM2_NODE_ID = "rm2";
 
 
+  private static final String RM3_ADDRESS = "2.2.2.2:2";
+  private static final String RM3_NODE_ID = "rm3";
+
   @Before
   @Before
   public void setUp() throws Exception {
   public void setUp() throws Exception {
     configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
     configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
@@ -61,8 +66,8 @@ public class TestRMHA {
     for (String confKey : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
     for (String confKey : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
       configuration.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
       configuration.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
       configuration.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
       configuration.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
+      configuration.set(HAUtil.addSuffix(confKey, RM3_NODE_ID), RM3_ADDRESS);
     }
     }
-    configuration.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
   }
   }
 
 
   private void checkMonitorHealth() throws IOException {
   private void checkMonitorHealth() throws IOException {
@@ -278,6 +283,36 @@ public class TestRMHA {
     rm.stop();
     rm.stop();
   }
   }
 
 
+  @Test
+  public void testHAIDLookup() {
+    //test implicitly lookup HA-ID
+    Configuration conf = new YarnConfiguration(configuration);
+    rm = new MockRM(conf);
+    rm.init(conf);
+
+    assertEquals(conf.get(YarnConfiguration.RM_HA_ID), RM2_NODE_ID);
+
+    //test explicitly lookup HA-ID
+    configuration.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
+    conf = new YarnConfiguration(configuration);
+    rm = new MockRM(conf);
+    rm.init(conf);
+    assertEquals(conf.get(YarnConfiguration.RM_HA_ID), RM1_NODE_ID);
+
+    //test if RM_HA_ID can not be found
+    configuration.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID+ "," + RM3_NODE_ID);
+    configuration.unset(YarnConfiguration.RM_HA_ID);
+    conf = new YarnConfiguration(configuration);
+    try {
+      rm = new MockRM(conf);
+      rm.init(conf);
+      fail("Should get an exception here.");
+    } catch (Exception ex) {
+      Assert.assertTrue(ex.getMessage().contains(
+          "Invalid configuration! Can not find valid RM_HA_ID."));
+    }
+  }
+
   @SuppressWarnings("rawtypes")
   @SuppressWarnings("rawtypes")
   class MyCountingDispatcher extends AbstractService implements Dispatcher {
   class MyCountingDispatcher extends AbstractService implements Dispatcher {