Browse Source

YARN-10767. Yarn Logs Command retrying on Standby RM for 30 times. Contributed by D M Murali Krishna Reddy.

(cherry picked from commit 9a6a11c4522f34fa4245983d8719675036879d7a)
Jim Brennan 3 years ago
parent
commit
2636b630b4

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/RMHAUtils.java

@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.List;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@@ -35,7 +36,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 @Unstable
 public class RMHAUtils {
 
-  public static String findActiveRMHAId(YarnConfiguration conf) {
+  public static String findActiveRMHAId(Configuration conf) {
     YarnConfiguration yarnConf = new YarnConfiguration(conf);
     Collection<String> rmIds =
         yarnConf.getStringCollection(YarnConfiguration.RM_HA_IDS);

+ 10 - 16
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.webapp.util;
 import static org.apache.hadoop.yarn.util.StringHelper.PATH_JOINER;
 
 import java.io.IOException;
+import java.net.ConnectException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
@@ -97,24 +98,17 @@ public class WebAppUtils {
    */
   public static <T, R> R execOnActiveRM(Configuration conf,
       ThrowingBiFunction<String, T, R> func, T arg) throws Exception {
-    String rm1Address = getRMWebAppURLWithScheme(conf, 0);
-    try {
-      return func.apply(rm1Address, arg);
-    } catch (Exception e) {
-      if (HAUtil.isHAEnabled(conf)) {
-        int rms = HAUtil.getRMHAIds(conf).size();
-        for (int i=1; i<rms; i++) {
-          try {
-            rm1Address = getRMWebAppURLWithScheme(conf, i);
-            return func.apply(rm1Address, arg);
-          } catch (Exception e1) {
-            // ignore and try next one when RM is down
-            e = e1;
-          }
-        }
+    int haIndex = 0;
+    if (HAUtil.isHAEnabled(conf)) {
+      String activeRMId = RMHAUtils.findActiveRMHAId(conf);
+      if (activeRMId != null) {
+        haIndex = new ArrayList<>(HAUtil.getRMHAIds(conf)).indexOf(activeRMId);
+      } else {
+        throw new ConnectException("No Active RM available");
       }
-      throw e;
     }
+    String rm1Address = getRMWebAppURLWithScheme(conf, haIndex);
+    return func.apply(rm1Address, arg);
   }
 
   /** A BiFunction which throws on Exception. */