瀏覽代碼

YARN-11230. [Federation] Add getContainer, signalToContainer REST APIs for Router. (#4689)

slfan1989 2 年之前
父節點
當前提交
c5ec727435

+ 19 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java

@@ -82,4 +82,23 @@ public final class RouterServerUtil {
     }
   }
 
+  /**
+   * Throws an RunTimeException due to an error.
+   *
+   * @param errMsg the error message
+   * @param t the throwable raised in the called class.
+   * @throws RuntimeException on failure
+   */
+  @Public
+  @Unstable
+  public static void logAndThrowRunTimeException(String errMsg, Throwable t)
+      throws RuntimeException {
+    if (t != null) {
+      LOG.error(errMsg, t);
+      throw new RuntimeException(errMsg, t);
+    } else {
+      LOG.error(errMsg);
+      throw new RuntimeException(errMsg);
+    }
+  }
 }

+ 89 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java

@@ -50,6 +50,7 @@ import org.apache.hadoop.util.Sets;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -1415,7 +1416,39 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
   public ContainerInfo getContainer(HttpServletRequest req,
       HttpServletResponse res, String appId, String appAttemptId,
       String containerId) {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (appId == null || appId.isEmpty()) {
+      throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
+    }
+    if (appAttemptId == null || appAttemptId.isEmpty()) {
+      throw new IllegalArgumentException("Parameter error, the appAttemptId is empty or null.");
+    }
+    if (containerId == null || containerId.isEmpty()) {
+      throw new IllegalArgumentException("Parameter error, the containerId is empty or null.");
+    }
+
+    try {
+      ApplicationId applicationId = ApplicationId.fromString(appId);
+      SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId);
+
+      if (subClusterInfo == null) {
+        RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " +
+            applicationId, null);
+      }
+
+      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
+          subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
+      return interceptor.getContainer(req, res, appId, appAttemptId, containerId);
+    } catch (IllegalArgumentException e) {
+      String msg = String.format(
+          "Unable to get the AppAttempt appId: %s, appAttemptId: %s, containerId: %s.", appId,
+          appAttemptId, containerId);
+      RouterServerUtil.logAndThrowRunTimeException(msg, e);
+    } catch (YarnException e) {
+      RouterServerUtil.logAndThrowRunTimeException("getContainer Failed.", e);
+    }
+
+    return null;
   }
 
   @Override
@@ -1442,7 +1475,37 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
   @Override
   public Response signalToContainer(String containerId, String command,
       HttpServletRequest req) {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (containerId == null || containerId.isEmpty()) {
+      throw new IllegalArgumentException("Parameter error, the containerId is empty or null.");
+    }
+
+    if (command == null || command.isEmpty()) {
+      throw new IllegalArgumentException("Parameter error, the command is empty or null.");
+    }
+
+    try {
+      ContainerId containerIdObj = ContainerId.fromString(containerId);
+      ApplicationId applicationId = containerIdObj.getApplicationAttemptId().getApplicationId();
+
+      SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId);
+
+      if (subClusterInfo == null) {
+        RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " +
+            applicationId, null);
+      }
+
+      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
+          subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
+      return interceptor.signalToContainer(containerId, command, req);
+
+    } catch (YarnException e) {
+      RouterServerUtil.logAndThrowRunTimeException("signalToContainer Failed.", e);
+    } catch (AuthorizationException e) {
+      RouterServerUtil.logAndThrowRunTimeException("signalToContainer Author Failed.", e);
+    }
+
+    return null;
   }
 
   @Override
@@ -1494,4 +1557,28 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
     });
     return results;
   }
+
+  /**
+   * get the HomeSubCluster according to ApplicationId.
+   *
+   * @param applicationId applicationId
+   * @return HomeSubCluster
+   * @throws YarnException on failure
+   */
+  private SubClusterInfo getHomeSubClusterInfoByAppId(ApplicationId applicationId) throws YarnException {
+    SubClusterInfo subClusterInfo = null;
+    SubClusterId subClusterId = null;
+    try {
+      subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId);
+      if (subClusterId == null) {
+        RouterServerUtil.logAndThrowException("Can't get HomeSubCluster by applicationId "
+            + applicationId, null);
+      }
+      subClusterInfo = federationFacade.getSubCluster(subClusterId);
+    } catch (YarnException e) {
+      RouterServerUtil.logAndThrowException("Get HomeSubClusterInfo by applicationId "
+          + applicationId + " failed.", e);
+    }
+    return subClusterInfo;
+  }
 }

+ 49 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java

@@ -35,6 +35,7 @@ import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 
+import org.apache.commons.lang3.EnumUtils;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.util.Sets;
 import org.apache.hadoop.util.StringUtils;
@@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerReport;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -363,4 +365,51 @@ public class MockDefaultRequestInterceptorREST
       return null;
     }
   }
+
+  @Override
+  public ContainerInfo getContainer(HttpServletRequest req, HttpServletResponse res,
+      String appId, String appAttemptId, String containerId) {
+    if (!isRunning) {
+      throw new RuntimeException("RM is stopped");
+    }
+
+    ContainerId newContainerId = ContainerId.newContainerId(
+        ApplicationAttemptId.fromString(appAttemptId), Integer.valueOf(containerId));
+
+    Resource allocatedResource = Resource.newInstance(1024, 2);
+
+    int subClusterId = Integer.valueOf(getSubClusterId().getId());
+    NodeId assignedNode = NodeId.newInstance("Node", subClusterId);
+    Priority priority = Priority.newInstance(subClusterId);
+    long creationTime = subClusterId;
+    long finishTime = subClusterId;
+    String diagnosticInfo = "Diagnostic " + subClusterId;
+    String logUrl = "Log " + subClusterId;
+    int containerExitStatus = subClusterId;
+    ContainerState containerState = ContainerState.COMPLETE;
+    String nodeHttpAddress = "HttpAddress " + subClusterId;
+
+    ContainerReport containerReport = ContainerReport.newInstance(
+        newContainerId, allocatedResource, assignedNode, priority,
+        creationTime, finishTime, diagnosticInfo, logUrl,
+        containerExitStatus, containerState, nodeHttpAddress);
+
+    return new ContainerInfo(containerReport);
+  }
+
+  @Override
+  public Response signalToContainer(String containerId, String command,
+      HttpServletRequest req) throws AuthorizationException {
+    if (!isRunning) {
+      throw new RuntimeException("RM is stopped");
+    }
+
+    if (!EnumUtils.isValidEnum(SignalContainerCommand.class, command.toUpperCase())) {
+      String errMsg = "Invalid command: " + command.toUpperCase() + ", valid commands are: "
+          + Arrays.asList(SignalContainerCommand.values());
+      return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
+    }
+
+    return Response.status(Status.OK).build();
+  }
 }

+ 19 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java

@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
+import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
 import org.apache.hadoop.yarn.util.MonotonicClock;
 import org.junit.Assert;
@@ -698,4 +699,22 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
     Assert.assertNotNull(nodeLabelsInfo2);
     Assert.assertEquals(0, nodeLabelsInfo2.getNodeLabelsName().size());
   }
+
+  @Test
+  public void testGetContainer()
+      throws IOException, InterruptedException, YarnException {
+    // Submit application to multiSubCluster
+    ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
+    ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
+    context.setApplicationId(appId.toString());
+
+    Assert.assertNotNull(interceptor.submitApplication(context, null));
+
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+
+    ContainerInfo containerInfo = interceptor.getContainer(null, null,
+        appId.toString(), appAttemptId.toString(), "0");
+    Assert.assertNotNull(containerInfo);
+  }
 }