Jelajahi Sumber

YARN-7269. Tracking URL in the app state does not get redirected to ApplicationMaster for Running applications. Contributed by Wangda Tan

Jian He 7 tahun lalu
induk
melakukan
4111e6c781

+ 33 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/amfilter/AmFilterInitializer.java

@@ -18,21 +18,26 @@
 
 package org.apache.hadoop.yarn.server.webproxy.amfilter;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.http.FilterContainer;
 import org.apache.hadoop.http.FilterInitializer;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 public class AmFilterInitializer extends FilterInitializer {
   private static final String FILTER_NAME = "AM_PROXY_FILTER";
   private static final String FILTER_CLASS = AmIpFilter.class.getCanonicalName();
+  public static final String RM_HA_URLS = "RM_HA_URLS";
   
   @Override
   public void initFilter(FilterContainer container, Configuration conf) {
@@ -55,6 +60,29 @@ public class AmFilterInitializer extends FilterInitializer {
     sb.setLength(sb.length() - 1);
     params.put(AmIpFilter.PROXY_URI_BASES, sb.toString());
     container.addFilter(FILTER_NAME, FILTER_CLASS, params);
+
+    // Handle RM HA urls
+    List<String> urls = new ArrayList<>();
+
+    // Include yarn-site.xml in the classpath
+    YarnConfiguration yarnConf = new YarnConfiguration(conf);
+    for (String rmId : getRmIds(yarnConf)) {
+      String url = getUrlByRmId(yarnConf, rmId);
+      urls.add(url);
+    }
+    params.put(RM_HA_URLS, StringUtils.join(",", urls));
+  }
+
+  private Collection<String> getRmIds(Configuration conf) {
+    return conf.getStringCollection(YarnConfiguration.RM_HA_IDS);
+  }
+
+  private String getUrlByRmId(Configuration conf, String rmId) {
+    String addressPropertyPrefix = YarnConfiguration.useHttps(conf) ?
+        YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS :
+        YarnConfiguration.RM_WEBAPP_ADDRESS;
+    String host = conf.get(HAUtil.addSuffix(addressPropertyPrefix, rmId));
+    return host;
   }
 
   @VisibleForTesting

+ 26 - 41
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/amfilter/AmIpFilter.java

@@ -18,17 +18,12 @@
 
 package org.apache.hadoop.yarn.server.webproxy.amfilter;
 
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.UnknownHostException;
-import java.net.HttpURLConnection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.Collection;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.yarn.server.webproxy.ProxyUtils;
+import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.servlet.Filter;
 import javax.servlet.FilterChain;
@@ -39,15 +34,16 @@ import javax.servlet.ServletResponse;
 import javax.servlet.http.Cookie;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.yarn.conf.HAUtil;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.webproxy.ProxyUtils;
-import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 
 @Public
 public class AmIpFilter implements Filter {
@@ -70,6 +66,7 @@ public class AmIpFilter implements Filter {
   private long lastUpdate;
   @VisibleForTesting
   Map<String, String> proxyUriBases;
+  String rmUrls[] = null;
 
   @Override
   public void init(FilterConfig conf) throws ServletException {
@@ -95,6 +92,10 @@ public class AmIpFilter implements Filter {
         }
       }
     }
+
+    if (conf.getInitParameter(AmFilterInitializer.RM_HA_URLS) != null) {
+      rmUrls = conf.getInitParameter(AmFilterInitializer.RM_HA_URLS).split(",");
+    }
   }
 
   protected Set<String> getProxyAddresses() throws ServletException {
@@ -196,13 +197,11 @@ public class AmIpFilter implements Filter {
     if (proxyUriBases.size() == 1) {
       // external proxy or not RM HA
       addr = proxyUriBases.values().iterator().next();
-    } else {
-      // RM HA
-      YarnConfiguration conf = new YarnConfiguration();
-      for (String rmId : getRmIds(conf)) {
-        String url = getUrlByRmId(conf, rmId);
-        if (isValidUrl(url)) {
-          addr = url;
+    } else if (rmUrls != null) {
+      for (String url : rmUrls) {
+        String host = proxyUriBases.get(url);
+        if (isValidUrl(host)) {
+          addr = host;
           break;
         }
       }
@@ -215,20 +214,6 @@ public class AmIpFilter implements Filter {
     return addr;
   }
 
-  @VisibleForTesting
-  Collection<String> getRmIds(YarnConfiguration conf) {
-    return conf.getStringCollection(YarnConfiguration.RM_HA_IDS);
-  }
-
-  @VisibleForTesting
-  String getUrlByRmId(YarnConfiguration conf, String rmId) {
-    String addressPropertyPrefix = YarnConfiguration.useHttps(conf) ?
-        YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS :
-        YarnConfiguration.RM_WEBAPP_ADDRESS;
-    String host = conf.get(HAUtil.addSuffix(addressPropertyPrefix, rmId));
-    return proxyUriBases.get(host);
-  }
-
   private boolean isValidUrl(String url) {
     boolean isValid = false;
     try {

+ 1 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/amfilter/TestAmFilter.java

@@ -159,13 +159,7 @@ public class TestAmFilter {
     spy.proxyUriBases = new HashMap<>();
     spy.proxyUriBases.put(rm1, rm1Url);
     spy.proxyUriBases.put(rm2, rm2Url);
-
-    Collection<String> rmIds = new ArrayList<>(Arrays.asList(rm1, rm2));
-    Mockito.doReturn(rmIds).when(spy).getRmIds(Mockito.any());
-    Mockito.doReturn(rm1Url).when(spy)
-        .getUrlByRmId(Mockito.any(), Mockito.eq(rm2));
-    Mockito.doReturn(rm2Url).when(spy)
-        .getUrlByRmId(Mockito.any(), Mockito.eq(rm1));
+    spy.rmUrls = new String[] { rm1, rm2 };
 
     assertEquals(spy.findRedirectUrl(), rm1Url);
   }

+ 10 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/amfilter/TestAmFilterInitializer.java

@@ -56,10 +56,11 @@ public class TestAmFilterInitializer {
     AmFilterInitializer afi = new MockAmFilterInitializer();
     assertNull(con.givenParameters);
     afi.initFilter(con, conf);
-    assertEquals(2, con.givenParameters.size());
+    assertEquals(3, con.givenParameters.size());
     assertEquals("host1", con.givenParameters.get(AmIpFilter.PROXY_HOSTS));
     assertEquals("http://host1:1000/foo",
         con.givenParameters.get(AmIpFilter.PROXY_URI_BASES));
+    assertEquals("", con.givenParameters.get(AmFilterInitializer.RM_HA_URLS));
 
     // Check a single RM_WEBAPP_ADDRESS
     con = new MockFilterContainer();
@@ -68,10 +69,11 @@ public class TestAmFilterInitializer {
     afi = new MockAmFilterInitializer();
     assertNull(con.givenParameters);
     afi.initFilter(con, conf);
-    assertEquals(2, con.givenParameters.size());
+    assertEquals(3, con.givenParameters.size());
     assertEquals("host2", con.givenParameters.get(AmIpFilter.PROXY_HOSTS));
     assertEquals("http://host2:2000/foo",
         con.givenParameters.get(AmIpFilter.PROXY_URI_BASES));
+    assertEquals("", con.givenParameters.get(AmFilterInitializer.RM_HA_URLS));
 
     // Check multiple RM_WEBAPP_ADDRESSes (RM HA)
     con = new MockFilterContainer();
@@ -84,7 +86,7 @@ public class TestAmFilterInitializer {
     afi = new MockAmFilterInitializer();
     assertNull(con.givenParameters);
     afi.initFilter(con, conf);
-    assertEquals(2, con.givenParameters.size());
+    assertEquals(3, con.givenParameters.size());
     String[] proxyHosts = con.givenParameters.get(AmIpFilter.PROXY_HOSTS)
         .split(AmIpFilter.PROXY_HOSTS_DELIMITER);
     assertEquals(3, proxyHosts.length);
@@ -99,6 +101,8 @@ public class TestAmFilterInitializer {
     assertEquals("http://host2:2000/foo", proxyBases[0]);
     assertEquals("http://host3:3000/foo", proxyBases[1]);
     assertEquals("http://host4:4000/foo", proxyBases[2]);
+    assertEquals("host2:2000,host3:3000,host4:4000",
+        con.givenParameters.get(AmFilterInitializer.RM_HA_URLS));
 
     // Check multiple RM_WEBAPP_ADDRESSes (RM HA) with HTTPS
     con = new MockFilterContainer();
@@ -112,7 +116,7 @@ public class TestAmFilterInitializer {
     afi = new MockAmFilterInitializer();
     assertNull(con.givenParameters);
     afi.initFilter(con, conf);
-    assertEquals(2, con.givenParameters.size());
+    assertEquals(3, con.givenParameters.size());
     proxyHosts = con.givenParameters.get(AmIpFilter.PROXY_HOSTS)
         .split(AmIpFilter.PROXY_HOSTS_DELIMITER);
     assertEquals(2, proxyHosts.length);
@@ -125,6 +129,8 @@ public class TestAmFilterInitializer {
     Arrays.sort(proxyBases);
     assertEquals("https://host5:5000/foo", proxyBases[0]);
     assertEquals("https://host6:6000/foo", proxyBases[1]);
+    assertEquals("host5:5000,host6:6000",
+        con.givenParameters.get(AmFilterInitializer.RM_HA_URLS));
   }
 
   @Test