Sfoglia il codice sorgente

YARN-8186. [Router] Federation: routing getAppState REST invocations transparently to multiple RMs. Contributed by Giovanni Matteo Fumarola.

Inigo Goiri 7 anni fa
parent
commit
da5bcf5f7d

+ 46 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java

@@ -987,6 +987,52 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
     return metrics;
   }
 
+  /**
+   * The YARN Router will forward to the respective YARN RM in which the AM is
+   * running.
+   * <p>
+   * Possible failure:
+   * <p>
+   * Client: identical behavior as {@code RMWebServices}.
+   * <p>
+   * Router: the Client will timeout and resubmit the request.
+   * <p>
+   * ResourceManager: the Router will timeout and the call will fail.
+   * <p>
+   * State Store: the Router will timeout and it will retry depending on the
+   * FederationFacade settings - if the failure happened before the select
+   * operation.
+   */
+  @Override
+  public AppState getAppState(HttpServletRequest hsr, String appId)
+      throws AuthorizationException {
+
+    ApplicationId applicationId = null;
+    try {
+      applicationId = ApplicationId.fromString(appId);
+    } catch (IllegalArgumentException e) {
+      return null;
+    }
+
+    SubClusterInfo subClusterInfo = null;
+    SubClusterId subClusterId = null;
+    try {
+      subClusterId =
+          federationFacade.getApplicationHomeSubCluster(applicationId);
+      if (subClusterId == null) {
+        return null;
+      }
+      subClusterInfo = federationFacade.getSubCluster(subClusterId);
+    } catch (YarnException e) {
+      return null;
+    }
+
+    DefaultRequestInterceptorREST interceptor =
+        getOrCreateInterceptorForSubCluster(subClusterId,
+            subClusterInfo.getRMWebServiceAddress());
+    return interceptor.getAppState(hsr, appId);
+  }
+
   @Override
   public ClusterInfo get() {
     return getClusterInfo();
@@ -1025,12 +1071,6 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
     throw new NotImplementedException();
   }
 
-  @Override
-  public AppState getAppState(HttpServletRequest hsr, String appId)
-      throws AuthorizationException {
-    throw new NotImplementedException();
-  }
-
   @Override
   public NodeToLabelsInfo getNodeToLabels(HttpServletRequest hsr)
       throws IOException {

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java

@@ -61,6 +61,7 @@ public class MockDefaultRequestInterceptorREST
   // down e.g. network issue, failover.
   private boolean isRunning = true;
   private HashSet<ApplicationId> applicationMap = new HashSet<>();
+  public static final String APP_STATE_RUNNING = "RUNNING";
 
   private void validateRunning() throws ConnectException {
     if (!isRunning) {
@@ -192,6 +193,21 @@ public class MockDefaultRequestInterceptorREST
     return metrics;
   }
 
+  @Override
+  public AppState getAppState(HttpServletRequest hsr, String appId)
+      throws AuthorizationException {
+    if (!isRunning) {
+      throw new RuntimeException("RM is stopped");
+    }
+
+    ApplicationId applicationId = ApplicationId.fromString(appId);
+    if (!applicationMap.contains(applicationId)) {
+      throw new NotFoundException("app with id: " + appId + " not found");
+    }
+
+    return new AppState(APP_STATE_RUNNING);
+  }
+
   public void setSubClusterId(int subClusterId) {
     setSubClusterId(SubClusterId.newInstance(Integer.toString(subClusterId)));
   }

+ 56 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java

@@ -443,4 +443,60 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
     // The merge operations is tested in TestRouterWebServiceUtil
   }
 
+  /**
+   * This test validates the correctness of GetApplicationState in case the
+   * application exists in the cluster.
+   */
+  @Test
+  public void testGetApplicationState()
+      throws YarnException, IOException, InterruptedException {
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationSubmissionContextInfo context =
+        new ApplicationSubmissionContextInfo();
+    context.setApplicationId(appId.toString());
+
+    // Submit the application we want the report later
+    Response response = interceptor.submitApplication(context, null);
+
+    Assert.assertNotNull(response);
+    Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+    AppState responseGet = interceptor.getAppState(null, appId.toString());
+
+    Assert.assertNotNull(responseGet);
+    Assert.assertEquals(MockDefaultRequestInterceptorREST.APP_STATE_RUNNING,
+        responseGet.getState());
+  }
+
+  /**
+   * This test validates the correctness of GetApplicationState in case the
+   * application does not exist in StateStore.
+   */
+  @Test
+  public void testGetApplicationStateNotExists()
+      throws YarnException, IOException, InterruptedException {
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+
+    AppState response = interceptor.getAppState(null, appId.toString());
+
+    Assert.assertNull(response);
+  }
+
+  /**
+   * This test validates the correctness of GetApplicationState in case of
+   * application in wrong format.
+   */
+  @Test
+  public void testGetApplicationStateWrongFormat()
+      throws YarnException, IOException, InterruptedException {
+
+    AppState response = interceptor.getAppState(null, "Application_wrong_id");
+
+    Assert.assertNull(response);
+  }
+
 }