Kaynağa Gözat

YARN-78. Changed UnManagedAM application to use YarnClient. Contributed by Bikas Saha.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1383705 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 12 yıl önce
ebeveyn
işleme
11cf00b08a

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -25,6 +25,9 @@ Release 2.0.3-alpha - Unreleased
 
   IMPROVEMENTS
 
+    YARN-78. Changed UnManagedAM application to use YarnClient. (Bikas Saha via
+    vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml

@@ -56,6 +56,10 @@
       <artifactId>hadoop-yarn-server-common</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-core</artifactId>

+ 78 - 106
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java

@@ -22,7 +22,6 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.Map;
@@ -37,12 +36,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -51,9 +45,9 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.util.Records;
 
 /**
@@ -73,11 +67,8 @@ public class UnmanagedAMLauncher {
 
   private Configuration conf;
 
-  // RPC to communicate to RM
-  private YarnRPC rpc;
-
   // Handle to talk to the Resource Manager/Applications Manager
-  private ClientRMProtocol rmClient;
+  private YarnClientImpl rmClient;
 
   // Application master specific info to register a new Application with RM/ASM
   private String appName = "";
@@ -114,7 +105,6 @@ public class UnmanagedAMLauncher {
   public UnmanagedAMLauncher(Configuration conf) throws Exception {
     // Set up RPC
     this.conf = conf;
-    rpc = YarnRPC.create(conf);
   }
 
   public UnmanagedAMLauncher() throws Exception {
@@ -163,25 +153,11 @@ public class UnmanagedAMLauncher {
           "No cmd specified for application master");
     }
 
-    return true;
-  }
-
-  private void connectToRM() throws IOException {
     YarnConfiguration yarnConf = new YarnConfiguration(conf);
-    InetSocketAddress rmAddress = yarnConf.getSocketAddr(
-        YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_PORT);
-    LOG.info("Connecting to ResourceManager at " + rmAddress);
-    rmClient = ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
-        rmAddress, conf));
-  }
+    rmClient = new YarnClientImpl();
+    rmClient.init(yarnConf);
 
-  private GetNewApplicationResponse getApplication() throws YarnRemoteException {
-    GetNewApplicationRequest request = Records
-        .newRecord(GetNewApplicationRequest.class);
-    GetNewApplicationResponse response = rmClient.getNewApplication(request);
-    LOG.info("Got new application id=" + response.getApplicationId());
-    return response;
+    return true;
   }
 
   public void launchAM(ApplicationAttemptId attemptId) throws IOException {
@@ -275,80 +251,81 @@ public class UnmanagedAMLauncher {
     }
     amProc.destroy();
   }
-
+  
   public boolean run() throws IOException {
     LOG.info("Starting Client");
-
+    
     // Connect to ResourceManager
-    connectToRM();
-    assert (rmClient != null);
-
-    // Get a new application id
-    GetNewApplicationResponse newApp = getApplication();
-    ApplicationId appId = newApp.getApplicationId();
-
-    // Create launch context for app master
-    LOG.info("Setting up application submission context for ASM");
-    ApplicationSubmissionContext appContext = Records
-        .newRecord(ApplicationSubmissionContext.class);
-
-    // set the application id
-    appContext.setApplicationId(appId);
-    // set the application name
-    appContext.setApplicationName(appName);
-
-    // Set the priority for the application master
-    Priority pri = Records.newRecord(Priority.class);
-    pri.setPriority(amPriority);
-    appContext.setPriority(pri);
-
-    // Set the queue to which this application is to be submitted in the RM
-    appContext.setQueue(amQueue);
-
-    // Set up the container launch context for the application master
-    ContainerLaunchContext amContainer = Records
-        .newRecord(ContainerLaunchContext.class);
-    appContext.setAMContainerSpec(amContainer);
-
-    // unmanaged AM
-    appContext.setUnmanagedAM(true);
-    LOG.info("Setting unmanaged AM");
-
-    // Create the request to send to the applications manager
-    SubmitApplicationRequest appRequest = Records
-        .newRecord(SubmitApplicationRequest.class);
-    appRequest.setApplicationSubmissionContext(appContext);
-
-    // Submit the application to the applications manager
-    LOG.info("Submitting application to ASM");
-    rmClient.submitApplication(appRequest);
-
-    // Monitor the application to wait for launch state
-    ApplicationReport appReport = monitorApplication(appId,
-        EnumSet.of(YarnApplicationState.ACCEPTED));
-    ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
-    LOG.info("Launching application with id: " + attemptId);
-
-    // launch AM
-    launchAM(attemptId);
-
-    // Monitor the application for end state
-    appReport = monitorApplication(appId, EnumSet.of(
-        YarnApplicationState.KILLED, YarnApplicationState.FAILED,
-        YarnApplicationState.FINISHED));
-    YarnApplicationState appState = appReport.getYarnApplicationState();
-    FinalApplicationStatus appStatus = appReport.getFinalApplicationStatus();
-
-    LOG.info("App ended with state: " + appReport.getYarnApplicationState()
-        + " and status: " + appStatus);
-    if (YarnApplicationState.FINISHED == appState
-        && FinalApplicationStatus.SUCCEEDED == appStatus) {
-      LOG.info("Application has completed successfully.");
-      return true;
-    } else {
-      LOG.info("Application did finished unsuccessfully." + " YarnState="
-          + appState.toString() + ", FinalStatus=" + appStatus.toString());
-      return false;
+    rmClient.start();
+    try {  
+      // Get a new application id
+      GetNewApplicationResponse newApp = rmClient.getNewApplication();
+      ApplicationId appId = newApp.getApplicationId();
+  
+      // Create launch context for app master
+      LOG.info("Setting up application submission context for ASM");
+      ApplicationSubmissionContext appContext = Records
+          .newRecord(ApplicationSubmissionContext.class);
+  
+      // set the application id
+      appContext.setApplicationId(appId);
+      // set the application name
+      appContext.setApplicationName(appName);
+  
+      // Set the priority for the application master
+      Priority pri = Records.newRecord(Priority.class);
+      pri.setPriority(amPriority);
+      appContext.setPriority(pri);
+  
+      // Set the queue to which this application is to be submitted in the RM
+      appContext.setQueue(amQueue);
+  
+      // Set up the container launch context for the application master
+      ContainerLaunchContext amContainer = Records
+          .newRecord(ContainerLaunchContext.class);
+      appContext.setAMContainerSpec(amContainer);
+  
+      // unmanaged AM
+      appContext.setUnmanagedAM(true);
+      LOG.info("Setting unmanaged AM");
+  
+      // Submit the application to the applications manager
+      LOG.info("Submitting application to ASM");
+      rmClient.submitApplication(appContext);
+  
+      // Monitor the application to wait for launch state
+      ApplicationReport appReport = monitorApplication(appId,
+          EnumSet.of(YarnApplicationState.ACCEPTED));
+      ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
+      LOG.info("Launching application with id: " + attemptId);
+  
+      // launch AM
+      launchAM(attemptId);
+  
+      // Monitor the application for end state
+      appReport = monitorApplication(appId, EnumSet.of(
+          YarnApplicationState.KILLED, YarnApplicationState.FAILED,
+          YarnApplicationState.FINISHED));
+      YarnApplicationState appState = appReport.getYarnApplicationState();
+      FinalApplicationStatus appStatus = appReport.getFinalApplicationStatus();
+  
+      LOG.info("App ended with state: " + appReport.getYarnApplicationState()
+          + " and status: " + appStatus);
+      
+      boolean success;
+      if (YarnApplicationState.FINISHED == appState
+          && FinalApplicationStatus.SUCCEEDED == appStatus) {
+        LOG.info("Application has completed successfully.");
+        success = true;
+      } else {
+        LOG.info("Application did finished unsuccessfully." + " YarnState="
+            + appState.toString() + ", FinalStatus=" + appStatus.toString());
+        success = false;
+      }
+      
+      return success;
+    } finally {
+      rmClient.stop();
     }
   }
 
@@ -374,12 +351,7 @@ public class UnmanagedAMLauncher {
       }
 
       // Get application report for the appId we are interested in
-      GetApplicationReportRequest reportRequest = Records
-          .newRecord(GetApplicationReportRequest.class);
-      reportRequest.setApplicationId(appId);
-      GetApplicationReportResponse reportResponse = rmClient
-          .getApplicationReport(reportRequest);
-      ApplicationReport report = reportResponse.getApplicationReport();
+      ApplicationReport report = rmClient.getApplicationReport(appId);
 
       LOG.info("Got application report from ASM for" + ", appId="
           + appId.getId() + ", appAttemptId="