Forráskód Böngészése

YARN-9001. [Submarine] Use AppAdminClient instead of ServiceClient to sumbit jobs. (Zac Zhou via wangda)

Change-Id: I7e8d1c27ebd37e0907ca570c4f3d56fe7a859635
Wangda Tan 6 éve
szülő
commit
bee5bf867b

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java

@@ -1360,6 +1360,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
       LOG.info("Service {} does not have an application ID", serviceName);
       return appSpec;
     }
+    appSpec.setId(currentAppId.toString());
     ApplicationReport appReport = yarnClient.getApplicationReport(currentAppId);
     appSpec.setState(convertState(appReport.getYarnApplicationState()));
     ApplicationTimeout lifetime =

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java

@@ -47,6 +47,11 @@ public abstract class JobMonitor {
   public abstract JobStatus getTrainingJobStatus(String jobName)
       throws IOException, YarnException;
 
+  /**
+   * Cleanup AppAdminClient, etc.
+   */
+  public void cleanup() throws IOException {}
+
   /**
    * Continue wait and print status if job goes to ready or final state.
    * @param jobName
@@ -80,5 +85,6 @@ public abstract class JobMonitor {
         throw new IOException(e);
       }
     }
+    cleanup();
   }
 }

+ 19 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java

@@ -14,9 +14,10 @@
 
 package org.apache.hadoop.yarn.submarine.runtimes.yarnservice;
 
+import org.apache.hadoop.yarn.client.api.AppAdminClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.service.api.records.Service;
-import org.apache.hadoop.yarn.service.client.ServiceClient;
+import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.apache.hadoop.yarn.submarine.common.ClientContext;
 import org.apache.hadoop.yarn.submarine.common.api.JobStatus;
 import org.apache.hadoop.yarn.submarine.common.api.builder.JobStatusBuilder;
@@ -25,22 +26,34 @@ import org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor;
 import java.io.IOException;
 
 public class YarnServiceJobMonitor extends JobMonitor {
-  private ServiceClient serviceClient = null;
+  private volatile AppAdminClient serviceClient = null;
 
   public YarnServiceJobMonitor(ClientContext clientContext) {
     super(clientContext);
   }
 
   @Override
-  public synchronized JobStatus getTrainingJobStatus(String jobName)
+  public JobStatus getTrainingJobStatus(String jobName)
       throws IOException, YarnException {
     if (this.serviceClient == null) {
-      this.serviceClient = YarnServiceUtils.createServiceClient(
-          clientContext.getYarnConfig());
+      synchronized(this) {
+        if (this.serviceClient == null) {
+          this.serviceClient = YarnServiceUtils.createServiceClient(
+              clientContext.getYarnConfig());
+        }
+      }
     }
 
-    Service serviceSpec = this.serviceClient.getStatus(jobName);
+    String appStatus=serviceClient.getStatusString(jobName);
+    Service serviceSpec= ServiceApiUtil.jsonSerDeser.fromJson(appStatus);
     JobStatus jobStatus = JobStatusBuilder.fromServiceSpec(serviceSpec);
     return jobStatus;
   }
+
+  @Override
+  public void cleanup() throws IOException{
+    if (this.serviceClient != null) {
+      this.serviceClient.close();
+    }
+  }
 }

+ 38 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java

@@ -19,6 +19,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.AppAdminClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.service.api.ServiceApiConstants;
 import org.apache.hadoop.yarn.service.api.records.Artifact;
@@ -27,7 +28,7 @@ import org.apache.hadoop.yarn.service.api.records.ConfigFile;
 import org.apache.hadoop.yarn.service.api.records.Resource;
 import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.service.api.records.Service;
-import org.apache.hadoop.yarn.service.client.ServiceClient;
+import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.apache.hadoop.yarn.submarine.client.cli.param.Quicklink;
 import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
 import org.apache.hadoop.yarn.submarine.common.ClientContext;
@@ -53,6 +54,8 @@ import java.util.Set;
 import java.util.StringTokenizer;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_SUCCESS;
+import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
 
 /**
  * Submit a job to cluster
@@ -530,6 +533,20 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
     return serviceSpec;
   }
 
+  private String generateServiceSpecFile(Service service) throws IOException {
+    File serviceSpecFile = File.createTempFile(service.getName(), ".json");
+    String buffer = jsonSerDeser.toJson(service);
+    Writer w = new OutputStreamWriter(new FileOutputStream(serviceSpecFile),
+        "UTF-8");
+    PrintWriter pw = new PrintWriter(w);
+    try {
+      pw.append(buffer);
+    } finally {
+      pw.close();
+    }
+    return serviceSpecFile.getAbsolutePath();
+  }
+
   /**
    * {@inheritDoc}
    */
@@ -537,13 +554,30 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
   public ApplicationId submitJob(RunJobParameters parameters)
       throws IOException, YarnException {
     createServiceByParameters(parameters);
-    ServiceClient serviceClient = YarnServiceUtils.createServiceClient(
+    String serviceSpecFile = generateServiceSpecFile(serviceSpec);
+
+    AppAdminClient appAdminClient = YarnServiceUtils.createServiceClient(
         clientContext.getYarnConfig());
-    ApplicationId appid = serviceClient.actionCreate(serviceSpec);
-    serviceClient.stop();
+    int code = appAdminClient.actionLaunch(serviceSpecFile,
+        serviceSpec.getName(), null, null);
+    if(code != EXIT_SUCCESS) {
+      throw new YarnException("Fail to launch application with exit code:" +
+          code);
+    }
+
+    String appStatus=appAdminClient.getStatusString(serviceSpec.getName());
+    Service app=ServiceApiUtil.jsonSerDeser.fromJson(appStatus);
+    if(app.getId() == null) {
+      throw new YarnException("Can't get application id for Service " +
+          serviceSpec.getName());
+    }
+    ApplicationId appid = ApplicationId.fromString(app.getId());
+    appAdminClient.stop();
     return appid;
   }
 
+
+
   @VisibleForTesting
   public Service getServiceSpec() {
     return serviceSpec;

+ 8 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java

@@ -16,8 +16,8 @@ package org.apache.hadoop.yarn.submarine.runtimes.yarnservice;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.client.api.AppAdminClient;
 import org.apache.hadoop.yarn.service.api.records.Service;
-import org.apache.hadoop.yarn.service.client.ServiceClient;
 import org.apache.hadoop.yarn.submarine.common.Envs;
 import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs;
 import org.slf4j.Logger;
@@ -26,27 +26,28 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.hadoop.yarn.client.api.AppAdminClient.DEFAULT_TYPE;
+
 public class YarnServiceUtils {
   private static final Logger LOG =
       LoggerFactory.getLogger(YarnServiceUtils.class);
 
   // This will be true only in UT.
-  private static ServiceClient stubServiceClient = null;
+  private static AppAdminClient stubServiceClient = null;
 
-  public static ServiceClient createServiceClient(
+  public static AppAdminClient createServiceClient(
       Configuration yarnConfiguration) {
     if (stubServiceClient != null) {
       return stubServiceClient;
     }
 
-    ServiceClient serviceClient = new ServiceClient();
-    serviceClient.init(yarnConfiguration);
-    serviceClient.start();
+    AppAdminClient serviceClient = AppAdminClient.createAppAdminClient(
+        DEFAULT_TYPE, yarnConfiguration);
     return serviceClient;
   }
 
   @VisibleForTesting
-  public static void setStubServiceClient(ServiceClient stubServiceClient) {
+  public static void setStubServiceClient(AppAdminClient stubServiceClient) {
     YarnServiceUtils.stubServiceClient = stubServiceClient;
   }
 

+ 7 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java

@@ -19,12 +19,11 @@
 package org.apache.hadoop.yarn.submarine.client.cli.yarnservice;
 
 import com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AppAdminClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.service.api.records.Component;
 import org.apache.hadoop.yarn.service.api.records.Service;
-import org.apache.hadoop.yarn.service.client.ServiceClient;
 import org.apache.hadoop.yarn.submarine.client.cli.RunJobCli;
 import org.apache.hadoop.yarn.submarine.common.MockClientContext;
 import org.apache.hadoop.yarn.submarine.common.api.TaskType;
@@ -45,6 +44,7 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Map;
 
+import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_SUCCESS;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -53,9 +53,11 @@ public class TestYarnServiceRunJobCli {
   @Before
   public void before() throws IOException, YarnException {
     SubmarineLogs.verboseOff();
-    ServiceClient serviceClient = mock(ServiceClient.class);
-    when(serviceClient.actionCreate(any(Service.class))).thenReturn(
-        ApplicationId.newInstance(1234L, 1));
+    AppAdminClient serviceClient = mock(AppAdminClient.class);
+    when(serviceClient.actionLaunch(any(String.class), any(String.class),
+        any(Long.class), any(String.class))).thenReturn(EXIT_SUCCESS);
+    when(serviceClient.getStatusString(any(String.class))).thenReturn(
+        "{\"id\": \"application_1234_1\"}");
     YarnServiceUtils.setStubServiceClient(serviceClient);
   }
 

+ 0 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java

@@ -22,7 +22,6 @@ import org.apache.hadoop.yarn.submarine.common.fs.MockRemoteDirectoryManager;
 import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.service.client.ServiceClient;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 
 import java.io.IOException;