Browse Source

MAPREDUCE-2899. Replace major parts of ApplicationSubmissionContext with a ContainerLaunchContext (Arun Murthy via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1170459 13f79535-47bb-0310-9956-ffa450edef68
Mahadev Konar 13 years ago
parent
commit
88b82a0f66
22 changed files with 373 additions and 1080 deletions
  1. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 4 115
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
  3. 129 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
  4. 53 170
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
  5. 15 203
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
  6. 2 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java
  7. 43 469
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
  8. 16 19
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java
  9. 6 12
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
  10. 6 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  11. 3 3
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
  12. 7 6
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
  13. 23 11
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
  14. 0 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java
  15. 23 25
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
  16. 3 3
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
  17. 2 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
  18. 2 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java
  19. 7 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
  20. 15 25
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
  21. 1 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
  22. 10 10
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -289,6 +289,9 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-2676. MR-279: JobHistory Job page needs reformatted. (Robert Evans via 
     MAPREDUCE-2676. MR-279: JobHistory Job page needs reformatted. (Robert Evans via 
     mahadev)
     mahadev)
 
 
+    MAPREDUCE-2899. Replace major parts of ApplicationSubmissionContext with a 
+    ContainerLaunchContext (Arun Murthy via mahadev)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     MAPREDUCE-2026. Make JobTracker.getJobCounters() and
     MAPREDUCE-2026. Make JobTracker.getJobCounters() and

+ 4 - 115
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -579,13 +579,12 @@ public abstract class TaskAttemptImpl implements
           + remoteJobConfPath.toUri().toASCIIString());
           + remoteJobConfPath.toUri().toASCIIString());
       // //////////// End of JobConf setup
       // //////////// End of JobConf setup
 
 
-      
       // Setup DistributedCache
       // Setup DistributedCache
-      setupDistributedCache(remoteFS, conf, localResources, environment);
+      MRApps.setupDistributedCache(conf, localResources, environment);
 
 
       // Set local-resources and environment
       // Set local-resources and environment
       container.setLocalResources(localResources);
       container.setLocalResources(localResources);
-      container.setEnv(environment);
+      container.setEnvironment(environment);
       
       
       // Setup up tokens
       // Setup up tokens
       Credentials taskCredentials = new Credentials();
       Credentials taskCredentials = new Credentials();
@@ -618,7 +617,7 @@ public abstract class TaskAttemptImpl implements
           ShuffleHandler.serializeServiceData(jobToken));
           ShuffleHandler.serializeServiceData(jobToken));
       container.setServiceData(serviceData);
       container.setServiceData(serviceData);
 
 
-      MRApps.addToClassPath(container.getEnv(), getInitialClasspath());
+      MRApps.addToClassPath(container.getEnvironment(), getInitialClasspath());
     } catch (IOException e) {
     } catch (IOException e) {
       throw new YarnException(e);
       throw new YarnException(e);
     }
     }
@@ -645,7 +644,7 @@ public abstract class TaskAttemptImpl implements
         taskAttemptListener.getAddress(), remoteTask, javaHome,
         taskAttemptListener.getAddress(), remoteTask, javaHome,
         workDir.toString(), containerLogDir, childTmpDir, jvmID));
         workDir.toString(), containerLogDir, childTmpDir, jvmID));
 
 
-    MapReduceChildJVM.setVMEnv(container.getEnv(), classPaths,
+    MapReduceChildJVM.setVMEnv(container.getEnvironment(), classPaths,
         workDir.toString(), containerLogDir, nmLdLibraryPath, remoteTask,
         workDir.toString(), containerLogDir, nmLdLibraryPath, remoteTask,
         localizedApplicationTokensFile);
         localizedApplicationTokensFile);
 
 
@@ -656,116 +655,6 @@ public abstract class TaskAttemptImpl implements
     return container;
     return container;
   }
   }
 
 
-  private static long[] parseTimeStamps(String[] strs) {
-    if (null == strs) {
-      return null;
-    }
-    long[] result = new long[strs.length];
-    for(int i=0; i < strs.length; ++i) {
-      result[i] = Long.parseLong(strs[i]);
-    }
-    return result;
-  }
-
-  private void setupDistributedCache(FileSystem remoteFS, 
-      Configuration conf, 
-      Map<String, LocalResource> localResources,
-      Map<String, String> env) 
-  throws IOException {
-    
-    // Cache archives
-    parseDistributedCacheArtifacts(remoteFS, localResources, env, 
-        LocalResourceType.ARCHIVE, 
-        DistributedCache.getCacheArchives(conf), 
-        parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)), 
-        getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), 
-        DistributedCache.getArchiveVisibilities(conf), 
-        DistributedCache.getArchiveClassPaths(conf));
-    
-    // Cache files
-    parseDistributedCacheArtifacts(remoteFS, 
-        localResources, env, 
-        LocalResourceType.FILE, 
-        DistributedCache.getCacheFiles(conf),
-        parseTimeStamps(DistributedCache.getFileTimestamps(conf)),
-        getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
-        DistributedCache.getFileVisibilities(conf),
-        DistributedCache.getFileClassPaths(conf));
-  }
-
-  // TODO - Move this to MR!
-  // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], 
-  // long[], boolean[], Path[], FileType)
-  private void parseDistributedCacheArtifacts(
-      FileSystem remoteFS, 
-      Map<String, LocalResource> localResources,
-      Map<String, String> env,
-      LocalResourceType type,
-      URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[], 
-      Path[] pathsToPutOnClasspath) throws IOException {
-
-    if (uris != null) {
-      // Sanity check
-      if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
-          (uris.length != visibilities.length)) {
-        throw new IllegalArgumentException("Invalid specification for " +
-        		"distributed-cache artifacts of type " + type + " :" +
-        		" #uris=" + uris.length +
-        		" #timestamps=" + timestamps.length +
-        		" #visibilities=" + visibilities.length
-        		);
-      }
-      
-      Map<String, Path> classPaths = new HashMap<String, Path>();
-      if (pathsToPutOnClasspath != null) {
-        for (Path p : pathsToPutOnClasspath) {
-          p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
-              remoteFS.getWorkingDirectory()));
-          classPaths.put(p.toUri().getPath().toString(), p);
-        }
-      }
-      for (int i = 0; i < uris.length; ++i) {
-        URI u = uris[i];
-        Path p = new Path(u);
-        p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
-            remoteFS.getWorkingDirectory()));
-        // Add URI fragment or just the filename
-        Path name = new Path((null == u.getFragment())
-          ? p.getName()
-          : u.getFragment());
-        if (name.isAbsolute()) {
-          throw new IllegalArgumentException("Resource name must be relative");
-        }
-        String linkName = name.toUri().getPath();
-        localResources.put(
-            linkName,
-            BuilderUtils.newLocalResource(
-                p.toUri(), type, 
-                visibilities[i]
-                  ? LocalResourceVisibility.PUBLIC
-                  : LocalResourceVisibility.PRIVATE,
-                sizes[i], timestamps[i])
-        );
-        if (classPaths.containsKey(u.getPath())) {
-          MRApps.addToClassPath(env, linkName);
-        }
-      }
-    }
-  }
-  
-  // TODO - Move this to MR!
-  private static long[] getFileSizes(Configuration conf, String key) {
-    String[] strs = conf.getStrings(key);
-    if (strs == null) {
-      return null;
-    }
-    long[] result = new long[strs.length];
-    for(int i=0; i < strs.length; ++i) {
-      result[i] = Long.parseLong(strs[i]);
-    }
-    return result;
-  }
-  
   @Override
   @Override
   public ContainerId getAssignedContainerID() {
   public ContainerId getAssignedContainerID() {
     readLock.lock();
     readLock.lock();

+ 129 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java

@@ -25,14 +25,20 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.InputStreamReader;
+import java.net.URI;
 import java.util.Arrays;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.v2.MRConstants;
 import org.apache.hadoop.mapreduce.v2.MRConstants;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -42,12 +48,18 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 
 
 /**
 /**
  * Helper class for MR applications
  * Helper class for MR applications
  */
  */
+@Private
+@Unstable
 public class MRApps extends Apps {
 public class MRApps extends Apps {
   public static final String JOB = "job";
   public static final String JOB = "job";
   public static final String TASK = "task";
   public static final String TASK = "task";
@@ -232,4 +244,121 @@ public class MRApps extends Apps {
         jobId.toString() + Path.SEPARATOR + MRConstants.JOB_CONF_FILE);
         jobId.toString() + Path.SEPARATOR + MRConstants.JOB_CONF_FILE);
     return jobFile.toString();
     return jobFile.toString();
   }
   }
+  
+
+
+  private static long[] parseTimeStamps(String[] strs) {
+    if (null == strs) {
+      return null;
+    }
+    long[] result = new long[strs.length];
+    for(int i=0; i < strs.length; ++i) {
+      result[i] = Long.parseLong(strs[i]);
+    }
+    return result;
+  }
+
+  public static void setupDistributedCache( 
+      Configuration conf, 
+      Map<String, LocalResource> localResources,
+      Map<String, String> env) 
+  throws IOException {
+    
+    // Cache archives
+    parseDistributedCacheArtifacts(conf, localResources, env, 
+        LocalResourceType.ARCHIVE, 
+        DistributedCache.getCacheArchives(conf), 
+        parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)), 
+        getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), 
+        DistributedCache.getArchiveVisibilities(conf), 
+        DistributedCache.getArchiveClassPaths(conf));
+    
+    // Cache files
+    parseDistributedCacheArtifacts(conf, 
+        localResources, env, 
+        LocalResourceType.FILE, 
+        DistributedCache.getCacheFiles(conf),
+        parseTimeStamps(DistributedCache.getFileTimestamps(conf)),
+        getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
+        DistributedCache.getFileVisibilities(conf),
+        DistributedCache.getFileClassPaths(conf));
+  }
+
+  // TODO - Move this to MR!
+  // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], 
+  // long[], boolean[], Path[], FileType)
+  private static void parseDistributedCacheArtifacts(
+      Configuration conf,
+      Map<String, LocalResource> localResources,
+      Map<String, String> env,
+      LocalResourceType type,
+      URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[], 
+      Path[] pathsToPutOnClasspath) throws IOException {
+
+    if (uris != null) {
+      // Sanity check
+      if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
+          (uris.length != visibilities.length)) {
+        throw new IllegalArgumentException("Invalid specification for " +
+            "distributed-cache artifacts of type " + type + " :" +
+            " #uris=" + uris.length +
+            " #timestamps=" + timestamps.length +
+            " #visibilities=" + visibilities.length
+            );
+      }
+      
+      Map<String, Path> classPaths = new HashMap<String, Path>();
+      if (pathsToPutOnClasspath != null) {
+        for (Path p : pathsToPutOnClasspath) {
+          FileSystem remoteFS = p.getFileSystem(conf);
+          p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
+              remoteFS.getWorkingDirectory()));
+          classPaths.put(p.toUri().getPath().toString(), p);
+        }
+      }
+      for (int i = 0; i < uris.length; ++i) {
+        URI u = uris[i];
+        Path p = new Path(u);
+        FileSystem remoteFS = p.getFileSystem(conf);
+        p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
+            remoteFS.getWorkingDirectory()));
+        // Add URI fragment or just the filename
+        Path name = new Path((null == u.getFragment())
+          ? p.getName()
+          : u.getFragment());
+        if (name.isAbsolute()) {
+          throw new IllegalArgumentException("Resource name must be relative");
+        }
+        String linkName = name.toUri().getPath();
+        localResources.put(
+            linkName,
+            BuilderUtils.newLocalResource(
+                p.toUri(), type, 
+                visibilities[i]
+                  ? LocalResourceVisibility.PUBLIC
+                  : LocalResourceVisibility.PRIVATE,
+                sizes[i], timestamps[i])
+        );
+        if (classPaths.containsKey(u.getPath())) {
+          MRApps.addToClassPath(env, linkName);
+        }
+      }
+    }
+  }
+  
+  // TODO - Move this to MR!
+  private static long[] getFileSizes(Configuration conf, String key) {
+    String[] strs = conf.getStrings(key);
+    if (strs == null) {
+      return null;
+    }
+    long[] result = new long[strs.length];
+    for(int i=0; i < strs.length; ++i) {
+      result[i] = Long.parseLong(strs[i]);
+    }
+    return result;
+  }
+  
+
+
 }
 }

+ 53 - 170
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java

@@ -19,7 +19,6 @@
 package org.apache.hadoop.mapred;
 package org.apache.hadoop.mapred;
 
 
 import java.io.IOException;
 import java.io.IOException;
-import java.net.URI;
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashMap;
@@ -33,7 +32,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -55,7 +53,6 @@ import org.apache.hadoop.mapreduce.TaskReport;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.v2.MRConstants;
 import org.apache.hadoop.mapreduce.v2.MRConstants;
@@ -72,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationState;
 import org.apache.hadoop.yarn.api.records.ApplicationState;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -237,7 +235,6 @@ public class YARNRunner implements ClientProtocol {
     // Construct necessary information to start the MR AM
     // Construct necessary information to start the MR AM
     ApplicationSubmissionContext appContext = 
     ApplicationSubmissionContext appContext = 
       createApplicationSubmissionContext(conf, jobSubmitDir, ts);
       createApplicationSubmissionContext(conf, jobSubmitDir, ts);
-    setupDistributedCache(conf, appContext);
     
     
     // XXX Remove
     // XXX Remove
     in.close();
     in.close();
@@ -273,16 +270,18 @@ public class YARNRunner implements ClientProtocol {
   public ApplicationSubmissionContext createApplicationSubmissionContext(
   public ApplicationSubmissionContext createApplicationSubmissionContext(
       Configuration jobConf,
       Configuration jobConf,
       String jobSubmitDir, Credentials ts) throws IOException {
       String jobSubmitDir, Credentials ts) throws IOException {
-    ApplicationSubmissionContext appContext =
-        recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
     ApplicationId applicationId = resMgrDelegate.getApplicationId();
     ApplicationId applicationId = resMgrDelegate.getApplicationId();
-    appContext.setApplicationId(applicationId);
+    
+    // Setup resource requirements
     Resource capability = recordFactory.newRecordInstance(Resource.class);
     Resource capability = recordFactory.newRecordInstance(Resource.class);
     capability.setMemory(conf.getInt(MRJobConfig.MR_AM_VMEM_MB,
     capability.setMemory(conf.getInt(MRJobConfig.MR_AM_VMEM_MB,
         MRJobConfig.DEFAULT_MR_AM_VMEM_MB));
         MRJobConfig.DEFAULT_MR_AM_VMEM_MB));
     LOG.info("AppMaster capability = " + capability);
     LOG.info("AppMaster capability = " + capability);
-    appContext.setMasterCapability(capability);
 
 
+    // Setup LocalResources
+    Map<String, LocalResource> localResources =
+        new HashMap<String, LocalResource>();
+    
     Path jobConfPath = new Path(jobSubmitDir, MRConstants.JOB_CONF_FILE);
     Path jobConfPath = new Path(jobSubmitDir, MRConstants.JOB_CONF_FILE);
     
     
     URL yarnUrlForJobSubmitDir = ConverterUtils
     URL yarnUrlForJobSubmitDir = ConverterUtils
@@ -292,14 +291,11 @@ public class YARNRunner implements ClientProtocol {
     LOG.debug("Creating setup context, jobSubmitDir url is "
     LOG.debug("Creating setup context, jobSubmitDir url is "
         + yarnUrlForJobSubmitDir);
         + yarnUrlForJobSubmitDir);
 
 
-    appContext.setResource(MRConstants.JOB_SUBMIT_DIR,
-        yarnUrlForJobSubmitDir);
-
-    appContext.setResourceTodo(MRConstants.JOB_CONF_FILE,
+    localResources.put(MRConstants.JOB_CONF_FILE,
         createApplicationResource(defaultFileContext,
         createApplicationResource(defaultFileContext,
             jobConfPath));
             jobConfPath));
     if (jobConf.get(MRJobConfig.JAR) != null) {
     if (jobConf.get(MRJobConfig.JAR) != null) {
-      appContext.setResourceTodo(MRConstants.JOB_JAR,
+      localResources.put(MRConstants.JOB_JAR,
           createApplicationResource(defaultFileContext,
           createApplicationResource(defaultFileContext,
               new Path(jobSubmitDir, MRConstants.JOB_JAR)));
               new Path(jobSubmitDir, MRConstants.JOB_JAR)));
     } else {
     } else {
@@ -312,30 +308,21 @@ public class YARNRunner implements ClientProtocol {
     // TODO gross hack
     // TODO gross hack
     for (String s : new String[] { "job.split", "job.splitmetainfo",
     for (String s : new String[] { "job.split", "job.splitmetainfo",
         MRConstants.APPLICATION_TOKENS_FILE }) {
         MRConstants.APPLICATION_TOKENS_FILE }) {
-      appContext.setResourceTodo(
+      localResources.put(
           MRConstants.JOB_SUBMIT_DIR + "/" + s,
           MRConstants.JOB_SUBMIT_DIR + "/" + s,
-          createApplicationResource(defaultFileContext, new Path(jobSubmitDir, s)));
+          createApplicationResource(defaultFileContext, 
+              new Path(jobSubmitDir, s)));
     }
     }
-
-    // TODO: Only if security is on.
-    List<String> fsTokens = new ArrayList<String>();
-    for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
-      fsTokens.add(token.encodeToUrlString());
-    }
-    
-    // TODO - Remove this!
-    appContext.addAllFsTokens(fsTokens);
-    DataOutputBuffer dob = new DataOutputBuffer();
-    ts.writeTokenStorageToStream(dob);
-    appContext.setFsTokensTodo(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
-
-    // Add queue information
-    appContext.setQueue(jobConf.get(JobContext.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME));
-    
-    // Add job name
-    appContext.setApplicationName(jobConf.get(JobContext.JOB_NAME, "N/A"));
     
     
-    // Add the command line
+    // Setup security tokens
+    ByteBuffer securityTokens = null;
+    if (UserGroupInformation.isSecurityEnabled()) {
+      DataOutputBuffer dob = new DataOutputBuffer();
+      ts.writeTokenStorageToStream(dob);
+      securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    }
+
+    // Setup the command to run the AM
     String javaHome = "$JAVA_HOME";
     String javaHome = "$JAVA_HOME";
     Vector<CharSequence> vargs = new Vector<CharSequence>(8);
     Vector<CharSequence> vargs = new Vector<CharSequence>(8);
     vargs.add(javaHome + "/bin/java");
     vargs.add(javaHome + "/bin/java");
@@ -346,13 +333,6 @@ public class YARNRunner implements ClientProtocol {
     vargs.add(conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
     vargs.add(conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
         MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS));
         MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS));
 
 
-    // Add { job jar, MR app jar } to classpath.
-    Map<String, String> environment = new HashMap<String, String>();
-    MRApps.setInitialClasspath(environment);
-    MRApps.addToClassPath(environment, MRConstants.JOB_JAR);
-    MRApps.addToClassPath(environment,
-        MRConstants.YARN_MAPREDUCE_APP_JAR_PATH);
-    appContext.addAllEnvironment(environment);
     vargs.add("org.apache.hadoop.mapreduce.v2.app.MRAppMaster");
     vargs.add("org.apache.hadoop.mapreduce.v2.app.MRAppMaster");
     vargs.add(String.valueOf(applicationId.getClusterTimestamp()));
     vargs.add(String.valueOf(applicationId.getClusterTimestamp()));
     vargs.add(String.valueOf(applicationId.getId()));
     vargs.add(String.valueOf(applicationId.getId()));
@@ -370,140 +350,43 @@ public class YARNRunner implements ClientProtocol {
 
 
     LOG.info("Command to launch container for ApplicationMaster is : "
     LOG.info("Command to launch container for ApplicationMaster is : "
         + mergedCommand);
         + mergedCommand);
+    
+    // Setup the environment - Add { job jar, MR app jar } to classpath.
+    Map<String, String> environment = new HashMap<String, String>();
+    MRApps.setInitialClasspath(environment);
+    MRApps.addToClassPath(environment, MRConstants.JOB_JAR);
+    MRApps.addToClassPath(environment,
+        MRConstants.YARN_MAPREDUCE_APP_JAR_PATH);
 
 
-    appContext.addAllCommands(vargsFinal);
-    // TODO: RM should get this from RPC.
-    appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
-    return appContext;
-  }
+    // Parse distributed cache
+    MRApps.setupDistributedCache(jobConf, localResources, environment);
 
 
-  /**
-   *    * TODO: Copied for now from TaskAttemptImpl.java ... fixme
-   * @param strs
-   * @return
-   */
-  private static long[] parseTimeStamps(String[] strs) {
-    if (null == strs) {
-      return null;
-    }
-    long[] result = new long[strs.length];
-    for(int i=0; i < strs.length; ++i) {
-      result[i] = Long.parseLong(strs[i]);
-    }
-    return result;
-  }
+    // Setup ContainerLaunchContext for AM container
+    ContainerLaunchContext amContainer =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+    amContainer.setResource(capability);             // Resource (mem) required
+    amContainer.setLocalResources(localResources);   // Local resources
+    amContainer.setEnvironment(environment);         // Environment
+    amContainer.setCommands(vargsFinal);             // Command for AM
+    amContainer.setContainerTokens(securityTokens);  // Security tokens
 
 
-  /**
-   * TODO: Copied for now from TaskAttemptImpl.java ... fixme
-   * 
-   * TODO: This is currently needed in YarnRunner as user code like setupJob,
-   * cleanupJob may need access to dist-cache. Once we separate distcache for
-   * maps, reduces, setup etc, this can include only a subset of artificats.
-   * This is also needed for uberAM case where we run everything inside AM.
-   */
-  private void setupDistributedCache(Configuration conf, 
-      ApplicationSubmissionContext container) throws IOException {
-    
-    // Cache archives
-    parseDistributedCacheArtifacts(conf, container, LocalResourceType.ARCHIVE, 
-        DistributedCache.getCacheArchives(conf), 
-        parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)), 
-        getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), 
-        DistributedCache.getArchiveVisibilities(conf), 
-        DistributedCache.getArchiveClassPaths(conf));
-    
-    // Cache files
-    parseDistributedCacheArtifacts(conf, container, LocalResourceType.FILE, 
-        DistributedCache.getCacheFiles(conf),
-        parseTimeStamps(DistributedCache.getFileTimestamps(conf)),
-        getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
-        DistributedCache.getFileVisibilities(conf),
-        DistributedCache.getFileClassPaths(conf));
-  }
-
-  // TODO - Move this to MR!
-  // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], long[], boolean[], Path[], FileType)
-  private void parseDistributedCacheArtifacts(Configuration conf,
-      ApplicationSubmissionContext container, LocalResourceType type,
-      URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[], 
-      Path[] pathsToPutOnClasspath) throws IOException {
-
-    if (uris != null) {
-      // Sanity check
-      if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
-          (uris.length != visibilities.length)) {
-        throw new IllegalArgumentException("Invalid specification for " +
-            "distributed-cache artifacts of type " + type + " :" +
-            " #uris=" + uris.length +
-            " #timestamps=" + timestamps.length +
-            " #visibilities=" + visibilities.length
-            );
-      }
-      
-      Map<String, Path> classPaths = new HashMap<String, Path>();
-      if (pathsToPutOnClasspath != null) {
-        for (Path p : pathsToPutOnClasspath) {
-          FileSystem fs = p.getFileSystem(conf);
-          p = p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
-          classPaths.put(p.toUri().getPath().toString(), p);
-        }
-      }
-      for (int i = 0; i < uris.length; ++i) {
-        URI u = uris[i];
-        Path p = new Path(u);
-        FileSystem fs = p.getFileSystem(conf);
-        p = fs.resolvePath(
-            p.makeQualified(fs.getUri(), fs.getWorkingDirectory()));
-        // Add URI fragment or just the filename
-        Path name = new Path((null == u.getFragment())
-          ? p.getName()
-          : u.getFragment());
-        if (name.isAbsolute()) {
-          throw new IllegalArgumentException("Resource name must be relative");
-        }
-        String linkName = name.toUri().getPath();
-        container.setResourceTodo(
-            linkName,
-            createLocalResource(
-                p.toUri(), type, 
-                visibilities[i]
-                  ? LocalResourceVisibility.PUBLIC
-                  : LocalResourceVisibility.PRIVATE,
-                sizes[i], timestamps[i])
-        );
-        if (classPaths.containsKey(u.getPath())) {
-          Map<String, String> environment = container.getAllEnvironment();
-          MRApps.addToClassPath(environment, linkName);
-        }
-      }
-    }
-  }
+    // Set up the ApplicationSubmissionContext
+    ApplicationSubmissionContext appContext =
+        recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+    appContext.setApplicationId(applicationId);                // ApplicationId
+    appContext.setUser(                                        // User name
+        UserGroupInformation.getCurrentUser().getShortUserName());
+    appContext.setQueue(                                       // Queue name
+        jobConf.get(JobContext.QUEUE_NAME,     
+        YarnConfiguration.DEFAULT_QUEUE_NAME));
+    appContext.setApplicationName(                             // Job name
+        jobConf.get(JobContext.JOB_NAME, 
+        YarnConfiguration.DEFAULT_APPLICATION_NAME));              
+    appContext.setAMContainerSpec(amContainer);         // AM Container 
 
 
-  // TODO - Move this to MR!
-  private static long[] getFileSizes(Configuration conf, String key) {
-    String[] strs = conf.getStrings(key);
-    if (strs == null) {
-      return null;
-    }
-    long[] result = new long[strs.length];
-    for(int i=0; i < strs.length; ++i) {
-      result[i] = Long.parseLong(strs[i]);
-    }
-    return result;
-  }
-  
-  private LocalResource createLocalResource(URI uri, 
-      LocalResourceType type, LocalResourceVisibility visibility, 
-      long size, long timestamp) throws IOException {
-    LocalResource resource = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(LocalResource.class);
-    resource.setResource(ConverterUtils.getYarnUrlFromURI(uri));
-    resource.setType(type);
-    resource.setVisibility(visibility);
-    resource.setSize(size);
-    resource.setTimestamp(timestamp);
-    return resource;
+    return appContext;
   }
   }
-  
+
   @Override
   @Override
   public void setJobPriority(JobID arg0, String arg1) throws IOException,
   public void setJobPriority(JobID arg0, String arg1) throws IOException,
       InterruptedException {
       InterruptedException {

+ 15 - 203
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java

@@ -18,14 +18,8 @@
 
 
 package org.apache.hadoop.yarn.api.records;
 package org.apache.hadoop.yarn.api.records;
 
 
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
 
 
 /**
 /**
@@ -36,26 +30,17 @@ import org.apache.hadoop.yarn.api.ClientRMProtocol;
  * <p>It includes details such as:
  * <p>It includes details such as:
  *   <ul>
  *   <ul>
  *     <li>{@link ApplicationId} of the application.</li>
  *     <li>{@link ApplicationId} of the application.</li>
- *     <li>
- *       {@link Resource} necessary to run the <code>ApplicationMaster</code>.
- *     </li>
  *     <li>Application user.</li>
  *     <li>Application user.</li>
  *     <li>Application name.</li>
  *     <li>Application name.</li>
  *     <li>{@link Priority} of the application.</li>
  *     <li>{@link Priority} of the application.</li>
- *     <li>Security tokens (if security is enabled).</li>
- *     <li>
- *       {@link LocalResource} necessary for running the 
- *       <code>ApplicationMaster</code> container such
- *       as binaries, jar, shared-objects, side-files etc. 
- *     </li>
  *     <li>
  *     <li>
- *       Environment variables for the launched <code>ApplicationMaster</code> 
- *       process.
+ *       {@link ContainerLaunchContext} of the container in which the 
+ *       <code>ApplicationMaster</code> is executed.
  *     </li>
  *     </li>
- *     <li>Command to launch the <code>ApplicationMaster</code>.</li>
  *   </ul>
  *   </ul>
  * </p>
  * </p>
  * 
  * 
+ * @see ContainerLaunchContext
  * @see ClientRMProtocol#submitApplication(org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest)
  * @see ClientRMProtocol#submitApplication(org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest)
  */
  */
 @Public
 @Public
@@ -143,198 +128,25 @@ public interface ApplicationSubmissionContext {
   public void setUser(String user);
   public void setUser(String user);
   
   
   /**
   /**
-   * Get the <code>Resource</code> required to run the 
-   * <code>ApplicationMaster</code>.
-   * @return <code>Resource</code> required to run the 
-   *         <code>ApplicationMaster</code>
-   */
-  @Public
-  @Stable
-  public Resource getMasterCapability();
-  
-  /**
-   * Set <code>Resource</code> required to run the 
-   * <code>ApplicationMaster</code>.
-   * @param masterCapability <code>Resource</code> required to run the 
-   *                         <code>ApplicationMaster</code>
-   */
-  @Public
-  @Stable
-  public void setMasterCapability(Resource masterCapability);
-  
-  @Private
-  @Unstable
-  public Map<String, URL> getAllResources();
-  
-  @Private
-  @Unstable
-  public URL getResource(String key);
-  
-  @Private
-  @Unstable
-  public void addAllResources(Map<String, URL> resources);
-
-  @Private
-  @Unstable
-  public void setResource(String key, URL url);
-
-  @Private
-  @Unstable
-  public void removeResource(String key);
-
-  @Private
-  @Unstable
-  public void clearResources();
-
-  /**
-   * Get all the <code>LocalResource</code> required to run the 
-   * <code>ApplicationMaster</code>.
-   * @return <code>LocalResource</code> required to run the 
-   *         <code>ApplicationMaster</code>
-   */
-  @Public
-  @Stable
-  public Map<String, LocalResource> getAllResourcesTodo();
-  
-  @Private
-  @Unstable
-  public LocalResource getResourceTodo(String key);
-  
-  /**
-   * Add all the <code>LocalResource</code> required to run the 
-   * <code>ApplicationMaster</code>.
-   * @param resources all <code>LocalResource</code> required to run the 
-   *                      <code>ApplicationMaster</code>
-   */
-  @Public
-  @Stable
-  public void addAllResourcesTodo(Map<String, LocalResource> resources);
-
-  @Private
-  @Unstable
-  public void setResourceTodo(String key, LocalResource localResource);
-
-  @Private
-  @Unstable
-  public void removeResourceTodo(String key);
-
-  @Private
-  @Unstable
-  public void clearResourcesTodo();
-
-  @Private
-  @Unstable
-  public List<String> getFsTokenList();
-  
-  @Private
-  @Unstable
-  public String getFsToken(int index);
-  
-  @Private
-  @Unstable
-  public int getFsTokenCount();
-  
-  @Private
-  @Unstable
-  public void addAllFsTokens(List<String> fsTokens);
-
-  @Private
-  @Unstable
-  public void addFsToken(String fsToken);
-
-  @Private
-  @Unstable
-  public void removeFsToken(int index);
-
-  @Private
-  @Unstable
-  public void clearFsTokens();
-
-  /**
-   * Get <em>file-system tokens</em> for the <code>ApplicationMaster</code>.
-   * @return file-system tokens for the <code>ApplicationMaster</code>
-   */
-  @Public
-  @Stable
-  public ByteBuffer getFsTokensTodo();
-  
-  /**
-   * Set <em>file-system tokens</em> for the <code>ApplicationMaster</code>.
-   * @param fsTokens file-system tokens for the <code>ApplicationMaster</code>
+   * Get the <code>ContainerLaunchContext</code> to describe the 
+   * <code>Container</code> with which the <code>ApplicationMaster</code> is
+   * launched.
+   * @return <code>ContainerLaunchContext</code> for the 
+   *         <code>ApplicationMaster</code> container
    */
    */
   @Public
   @Public
   @Stable
   @Stable
-  public void setFsTokensTodo(ByteBuffer fsTokens);
-
-  /**
-   * Get the <em>environment variables</em> for the 
-   * <code>ApplicationMaster</code>.
-   * @return environment variables for the <code>ApplicationMaster</code>
-   */
-  @Public
-  @Stable
-  public Map<String, String> getAllEnvironment();
-  
-  @Private
-  @Unstable
-  public String getEnvironment(String key);
+  public ContainerLaunchContext getAMContainerSpec();
   
   
   /**
   /**
-   * Add all of the <em>environment variables</em> for the 
-   * <code>ApplicationMaster</code>.
-   * @param environment environment variables for the 
-   *                    <code>ApplicationMaster</code>
+   * Set the <code>ContainerLaunchContext</code> to describe the 
+   * <code>Container</code> with which the <code>ApplicationMaster</code> is
+   * launched.
+   * @param amContainer <code>ContainerLaunchContext</code> for the 
+   *                    <code>ApplicationMaster</code> container
    */
    */
   @Public
   @Public
   @Stable
   @Stable
-  public void addAllEnvironment(Map<String, String> environment);
+  public void setAMContainerSpec(ContainerLaunchContext amContainer);
 
 
-  @Private
-  @Unstable
-  public void setEnvironment(String key, String env);
-
-  @Private
-  @Unstable
-  public void removeEnvironment(String key);
-
-  @Private
-  @Unstable
-  public void clearEnvironment();
-
-  /**
-   * Get the <em>commands</em> to launch the <code>ApplicationMaster</code>.
-   * @return commands to launch the <code>ApplicationMaster</code>
-   */
-  @Public
-  @Stable
-  public List<String> getCommandList();
-  
-  @Private
-  @Unstable
-  public String getCommand(int index);
-  
-  @Private
-  @Unstable
-  public int getCommandCount();
-  
-  /**
-   * Add all of the <em>commands</em> to launch the 
-   * <code>ApplicationMaster</code>.
-   * @param commands commands to launch the <code>ApplicationMaster</code>
-   */
-  @Public
-  @Stable
-  public void addAllCommands(List<String> commands);
-  
-  @Private
-  @Unstable
-  public void addCommand(String command);
-  
-  @Private
-  @Unstable
-  public void removeCommand(int index);
-  
-  @Private
-  @Unstable
-  public void clearCommands();
 }
 }

+ 2 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java

@@ -156,7 +156,7 @@ public interface ContainerLaunchContext {
    */
    */
   @Public
   @Public
   @Stable
   @Stable
-  Map<String, String> getEnv();
+  Map<String, String> getEnvironment();
     
     
   /**
   /**
    * Add <em>environment variables</em> for the container.
    * Add <em>environment variables</em> for the container.
@@ -164,7 +164,7 @@ public interface ContainerLaunchContext {
    */
    */
   @Public
   @Public
   @Stable
   @Stable
-  void setEnv(Map<String, String> environment);
+  void setEnvironment(Map<String, String> environment);
 
 
   /**
   /**
    * Get the list of <em>commands</em> for launching the container.
    * Get the list of <em>commands</em> for launching the container.

+ 43 - 469
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java

@@ -18,56 +18,35 @@
 
 
 package org.apache.hadoop.yarn.api.records.impl.pb;
 package org.apache.hadoop.yarn.api.records.impl.pb;
 
 
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ProtoBase;
 import org.apache.hadoop.yarn.api.records.ProtoBase;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder;
-import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.StringURLMapProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.URLProto;
-
-
     
     
-public class ApplicationSubmissionContextPBImpl extends ProtoBase<ApplicationSubmissionContextProto> implements ApplicationSubmissionContext {
-  ApplicationSubmissionContextProto proto = ApplicationSubmissionContextProto.getDefaultInstance();
+public class ApplicationSubmissionContextPBImpl 
+extends ProtoBase<ApplicationSubmissionContextProto> 
+implements ApplicationSubmissionContext {
+  ApplicationSubmissionContextProto proto = 
+      ApplicationSubmissionContextProto.getDefaultInstance();
   ApplicationSubmissionContextProto.Builder builder = null;
   ApplicationSubmissionContextProto.Builder builder = null;
   boolean viaProto = false;
   boolean viaProto = false;
   
   
   private ApplicationId applicationId = null;
   private ApplicationId applicationId = null;
-  private Resource masterCapability = null;
-  private Map<String, URL> resources = null;
-  private Map<String, LocalResource> resourcesTodo = null;
-  private List<String> fsTokenList = null;
-  private ByteBuffer fsTokenTodo = null;
-  private Map<String, String> environment = null;
-  private List<String> commandList = null;
   private Priority priority = null;
   private Priority priority = null;
-  
-  
+  private ContainerLaunchContext amContainer = null;
   
   
   public ApplicationSubmissionContextPBImpl() {
   public ApplicationSubmissionContextPBImpl() {
     builder = ApplicationSubmissionContextProto.newBuilder();
     builder = ApplicationSubmissionContextProto.newBuilder();
   }
   }
 
 
-  public ApplicationSubmissionContextPBImpl(ApplicationSubmissionContextProto proto) {
+  public ApplicationSubmissionContextPBImpl(
+      ApplicationSubmissionContextProto proto) {
     this.proto = proto;
     this.proto = proto;
     viaProto = true;
     viaProto = true;
   }
   }
@@ -83,30 +62,12 @@ public class ApplicationSubmissionContextPBImpl extends ProtoBase<ApplicationSub
     if (this.applicationId != null) {
     if (this.applicationId != null) {
       builder.setApplicationId(convertToProtoFormat(this.applicationId));
       builder.setApplicationId(convertToProtoFormat(this.applicationId));
     }
     }
-    if (this.masterCapability != null) {
-      builder.setMasterCapability(convertToProtoFormat(this.masterCapability));
-    }
-    if (this.resources != null) {
-      addResourcesToProto();
-    }
-    if (this.resourcesTodo != null) {
-      addResourcesTodoToProto();
-    }
-    if (this.fsTokenList != null) {
-      addFsTokenListToProto();
-    }
-    if (this.fsTokenTodo != null) {
-      builder.setFsTokensTodo(convertToProtoFormat(this.fsTokenTodo));
-    }
-    if (this.environment != null) {
-      addEnvironmentToProto();
-    }
-    if (this.commandList != null) {
-      addCommandsToProto();
-    }
     if (this.priority != null) {
     if (this.priority != null) {
       builder.setPriority(convertToProtoFormat(this.priority));
       builder.setPriority(convertToProtoFormat(this.priority));
     }
     }
+    if (this.amContainer != null) {
+      builder.setAmContainerSpec(convertToProtoFormat(this.amContainer));
+    }
   }
   }
 
 
   private void mergeLocalToProto() {
   private void mergeLocalToProto() {
@@ -145,6 +106,7 @@ public class ApplicationSubmissionContextPBImpl extends ProtoBase<ApplicationSub
       builder.clearPriority();
       builder.clearPriority();
     this.priority = priority;
     this.priority = priority;
   }
   }
+  
   @Override
   @Override
   public ApplicationId getApplicationId() {
   public ApplicationId getApplicationId() {
     ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
     ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
@@ -165,6 +127,7 @@ public class ApplicationSubmissionContextPBImpl extends ProtoBase<ApplicationSub
       builder.clearApplicationId();
       builder.clearApplicationId();
     this.applicationId = applicationId;
     this.applicationId = applicationId;
   }
   }
+  
   @Override
   @Override
   public String getApplicationName() {
   public String getApplicationName() {
     ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
     ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
@@ -183,403 +146,7 @@ public class ApplicationSubmissionContextPBImpl extends ProtoBase<ApplicationSub
     }
     }
     builder.setApplicationName((applicationName));
     builder.setApplicationName((applicationName));
   }
   }
-  @Override
-  public Resource getMasterCapability() {
-    ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
-    if (this.masterCapability != null) {
-      return masterCapability;
-    } // Else via proto
-    if (!p.hasMasterCapability()) {
-      return null;
-    }
-    masterCapability = convertFromProtoFormat(p.getMasterCapability());
-    return this.masterCapability;
-  }
-
-  @Override
-  public void setMasterCapability(Resource masterCapability) {
-    maybeInitBuilder();
-    if (masterCapability == null)
-      builder.clearMasterCapability();
-    this.masterCapability = masterCapability;
-  }
-  @Override
-  public Map<String, URL> getAllResources() {
-    initResources();
-    return this.resources;
-  }
-  @Override
-  public URL getResource(String key) {
-    initResources();
-    return this.resources.get(key);
-  }
-  
-  private void initResources() {
-    if (this.resources != null) {
-      return;
-    }
-    ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
-    List<StringURLMapProto> mapAsList = p.getResourcesList();
-    this.resources = new HashMap<String, URL>();
-    
-    for (StringURLMapProto c : mapAsList) {
-      this.resources.put(c.getKey(), convertFromProtoFormat(c.getValue()));
-    }
-  }
-  
-  @Override
-  public void addAllResources(final Map<String, URL> resources) {
-    if (resources == null)
-      return;
-    initResources();
-    this.resources.putAll(resources);
-  }
-  
-  private void addResourcesToProto() {
-    maybeInitBuilder();
-    builder.clearResources();
-    if (this.resources == null)
-      return;
-    Iterable<StringURLMapProto> iterable = new Iterable<StringURLMapProto>() {
-      
-      @Override
-      public Iterator<StringURLMapProto> iterator() {
-        return new Iterator<StringURLMapProto>() {
-          
-          Iterator<String> keyIter = resources.keySet().iterator();
-          
-          @Override
-          public void remove() {
-            throw new UnsupportedOperationException();
-          }
-          
-          @Override
-          public StringURLMapProto next() {
-            String key = keyIter.next();
-            return StringURLMapProto.newBuilder().setKey(key).setValue(convertToProtoFormat(resources.get(key))).build();
-          }
-          
-          @Override
-          public boolean hasNext() {
-            return keyIter.hasNext();
-          }
-        };
-      }
-    };
-    builder.addAllResources(iterable);
-  }
-  @Override
-  public void setResource(String key, URL val) {
-    initResources();
-    this.resources.put(key, val);
-  }
-  @Override
-  public void removeResource(String key) {
-    initResources();
-    this.resources.remove(key);
-  }
-  @Override
-  public void clearResources() {
-    initResources();
-    this.resources.clear();
-  }
-  @Override
-  public Map<String, LocalResource> getAllResourcesTodo() {
-    initResourcesTodo();
-    return this.resourcesTodo;
-  }
-  @Override
-  public LocalResource getResourceTodo(String key) {
-    initResourcesTodo();
-    return this.resourcesTodo.get(key);
-  }
-  
-  private void initResourcesTodo() {
-    if (this.resourcesTodo != null) {
-      return;
-    }
-    ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
-    List<StringLocalResourceMapProto> mapAsList = p.getResourcesTodoList();
-    this.resourcesTodo = new HashMap<String, LocalResource>();
-    
-    for (StringLocalResourceMapProto c : mapAsList) {
-      this.resourcesTodo.put(c.getKey(), convertFromProtoFormat(c.getValue()));
-    }
-  }
-  
-  @Override
-  public void addAllResourcesTodo(final Map<String, LocalResource> resourcesTodo) {
-    if (resourcesTodo == null) 
-      return;
-    initResourcesTodo();
-    this.resourcesTodo.putAll(resourcesTodo);
-  }
-  
-  private void addResourcesTodoToProto() {
-    maybeInitBuilder();
-    builder.clearResourcesTodo();
-    if (resourcesTodo == null)
-      return;
-    Iterable<StringLocalResourceMapProto> iterable = new Iterable<StringLocalResourceMapProto>() {
-      
-      @Override
-      public Iterator<StringLocalResourceMapProto> iterator() {
-        return new Iterator<StringLocalResourceMapProto>() {
-          
-          Iterator<String> keyIter = resourcesTodo.keySet().iterator();
-          
-          @Override
-          public void remove() {
-            throw new UnsupportedOperationException();
-          }
-          
-          @Override
-          public StringLocalResourceMapProto next() {
-            String key = keyIter.next();
-            return StringLocalResourceMapProto.newBuilder().setKey(key).setValue(convertToProtoFormat(resourcesTodo.get(key))).build();
-          }
-          
-          @Override
-          public boolean hasNext() {
-            return keyIter.hasNext();
-          }
-        };
-      }
-    };
-    builder.addAllResourcesTodo(iterable);
-  }
-  @Override
-  public void setResourceTodo(String key, LocalResource val) {
-    initResourcesTodo();
-    this.resourcesTodo.put(key, val);
-  }
-  @Override
-  public void removeResourceTodo(String key) {
-    initResourcesTodo();
-    this.resourcesTodo.remove(key);
-  }
-  @Override
-  public void clearResourcesTodo() {
-    initResourcesTodo();
-    this.resourcesTodo.clear();
-  }
-  @Override
-  public List<String> getFsTokenList() {
-    initFsTokenList();
-    return this.fsTokenList;
-  }
-  @Override
-  public String getFsToken(int index) {
-    initFsTokenList();
-    return this.fsTokenList.get(index);
-  }
-  @Override
-  public int getFsTokenCount() {
-    initFsTokenList();
-    return this.fsTokenList.size();
-  }
-  
-  private void initFsTokenList() {
-    if (this.fsTokenList != null) {
-      return;
-    }
-    ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
-    List<String> list = p.getFsTokensList();
-    this.fsTokenList = new ArrayList<String>();
-
-    for (String c : list) {
-      this.fsTokenList.add(c);
-    }
-  }
-  
-  @Override
-  public void addAllFsTokens(final List<String> fsTokens) {
-    if (fsTokens == null) 
-      return;
-    initFsTokenList();
-    this.fsTokenList.addAll(fsTokens);
-  }
-  
-  private void addFsTokenListToProto() {
-    maybeInitBuilder();
-    builder.clearFsTokens();
-    builder.addAllFsTokens(this.fsTokenList);
-  }
-
-  @Override
-  public void addFsToken(String fsTokens) {
-    initFsTokenList();
-    this.fsTokenList.add(fsTokens);
-  }
-  @Override
-  public void removeFsToken(int index) {
-    initFsTokenList();
-    this.fsTokenList.remove(index);
-  }
-  @Override
-  public void clearFsTokens() {
-    initFsTokenList();
-    this.fsTokenList.clear();
-  }
-  @Override
-  public ByteBuffer getFsTokensTodo() {
-    ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
-    if (this.fsTokenTodo != null) {
-      return this.fsTokenTodo;
-    }
-    if (!p.hasFsTokensTodo()) {
-      return null;
-    }
-    this.fsTokenTodo = convertFromProtoFormat(p.getFsTokensTodo());
-    return this.fsTokenTodo;
-  }
 
 
-  @Override
-  public void setFsTokensTodo(ByteBuffer fsTokensTodo) {
-    maybeInitBuilder();
-    if (fsTokensTodo == null) 
-      builder.clearFsTokensTodo();
-    this.fsTokenTodo = fsTokensTodo;
-  }
-  @Override
-  public Map<String, String> getAllEnvironment() {
-    initEnvironment();
-    return this.environment;
-  }
-  @Override
-  public String getEnvironment(String key) {
-    initEnvironment();
-    return this.environment.get(key);
-  }
-  
-  private void initEnvironment() {
-    if (this.environment != null) {
-      return;
-    }
-    ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
-    List<StringStringMapProto> mapAsList = p.getEnvironmentList();
-    this.environment = new HashMap<String, String>();
-    
-    for (StringStringMapProto c : mapAsList) {
-      this.environment.put(c.getKey(), c.getValue());
-    }
-  }
-  
-  @Override
-  public void addAllEnvironment(Map<String, String> environment) {
-    if (environment == null)
-      return;
-    initEnvironment();
-    this.environment.putAll(environment);
-  }
-  
-  private void addEnvironmentToProto() {
-    maybeInitBuilder();
-    builder.clearEnvironment();
-    if (environment == null)
-      return;
-    Iterable<StringStringMapProto> iterable = new Iterable<StringStringMapProto>() {
-      
-      @Override
-      public Iterator<StringStringMapProto> iterator() {
-        return new Iterator<StringStringMapProto>() {
-          
-          Iterator<String> keyIter = environment.keySet().iterator();
-          
-          @Override
-          public void remove() {
-            throw new UnsupportedOperationException();
-          }
-          
-          @Override
-          public StringStringMapProto next() {
-            String key = keyIter.next();
-            return StringStringMapProto.newBuilder().setKey(key).setValue((environment.get(key))).build();
-          }
-          
-          @Override
-          public boolean hasNext() {
-            return keyIter.hasNext();
-          }
-        };
-      }
-    };
-    builder.addAllEnvironment(iterable);
-  }
-  @Override
-  public void setEnvironment(String key, String val) {
-    initEnvironment();
-    this.environment.put(key, val);
-  }
-  @Override
-  public void removeEnvironment(String key) {
-    initEnvironment();
-    this.environment.remove(key);
-  }
-  @Override
-  public void clearEnvironment() {
-    initEnvironment();
-    this.environment.clear();
-  }
-  @Override
-  public List<String> getCommandList() {
-    initCommandList();
-    return this.commandList;
-  }
-  @Override
-  public String getCommand(int index) {
-    initCommandList();
-    return this.commandList.get(index);
-  }
-  @Override
-  public int getCommandCount() {
-    initCommandList();
-    return this.commandList.size();
-  }
-  
-  private void initCommandList() {
-    if (this.commandList != null) {
-      return;
-    }
-    ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
-    List<String> list = p.getCommandList();
-    this.commandList = new ArrayList<String>();
-
-    for (String c : list) {
-      this.commandList.add(c);
-    }
-  }
-  
-  @Override
-  public void addAllCommands(final List<String> command) {
-    if (command == null)
-      return;
-    initCommandList();
-    this.commandList.addAll(command);
-  }
-  
-  private void addCommandsToProto() {
-    maybeInitBuilder();
-    builder.clearCommand();
-    if (this.commandList == null) 
-      return;
-    builder.addAllCommand(this.commandList);
-  }
-  @Override
-  public void addCommand(String command) {
-    initCommandList();
-    this.commandList.add(command);
-  }
-  @Override
-  public void removeCommand(int index) {
-    initCommandList();
-    this.commandList.remove(index);
-  }
-  @Override
-  public void clearCommands() {
-    initCommandList();
-    this.commandList.clear();
-  }
   @Override
   @Override
   public String getQueue() {
   public String getQueue() {
     ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
     ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
@@ -598,6 +165,7 @@ public class ApplicationSubmissionContextPBImpl extends ProtoBase<ApplicationSub
     }
     }
     builder.setQueue((queue));
     builder.setQueue((queue));
   }
   }
+  
   @Override
   @Override
   public String getUser() {
   public String getUser() {
     ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
     ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
@@ -617,6 +185,28 @@ public class ApplicationSubmissionContextPBImpl extends ProtoBase<ApplicationSub
     builder.setUser((user));
     builder.setUser((user));
   }
   }
 
 
+  @Override
+  public ContainerLaunchContext getAMContainerSpec() {
+    ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.amContainer != null) {
+      return amContainer;
+    } // Else via proto
+    if (!p.hasAmContainerSpec()) {
+      return null;
+    }
+    amContainer = convertFromProtoFormat(p.getAmContainerSpec());
+    return amContainer;
+  }
+
+  @Override
+  public void setAMContainerSpec(ContainerLaunchContext amContainer) {
+    maybeInitBuilder();
+    if (amContainer == null) {
+      builder.clearAmContainerSpec();
+    }
+    this.amContainer = amContainer;
+  }
+
   private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
   private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
     return new PriorityPBImpl(p);
     return new PriorityPBImpl(p);
   }
   }
@@ -633,28 +223,12 @@ public class ApplicationSubmissionContextPBImpl extends ProtoBase<ApplicationSub
     return ((ApplicationIdPBImpl)t).getProto();
     return ((ApplicationIdPBImpl)t).getProto();
   }
   }
 
 
-  private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
-    return new ResourcePBImpl(p);
-  }
-
-  private ResourceProto convertToProtoFormat(Resource t) {
-    return ((ResourcePBImpl)t).getProto();
-  }
-
-  private URLPBImpl convertFromProtoFormat(URLProto p) {
-    return new URLPBImpl(p);
+  private ContainerLaunchContextPBImpl convertFromProtoFormat(
+      ContainerLaunchContextProto p) {
+    return new ContainerLaunchContextPBImpl(p);
   }
   }
 
 
-  private URLProto convertToProtoFormat(URL t) {
-    return ((URLPBImpl)t).getProto();
+  private ContainerLaunchContextProto convertToProtoFormat(ContainerLaunchContext t) {
+    return ((ContainerLaunchContextPBImpl)t).getProto();
   }
   }
-
-  private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) {
-    return new LocalResourcePBImpl(p);
-  }
-
-  private LocalResourceProto convertToProtoFormat(LocalResource t) {
-    return ((LocalResourcePBImpl)t).getProto();
-  }
-
 }  
 }  

+ 16 - 19
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java

@@ -39,8 +39,6 @@ import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto;
 
 
-
-    
 public class ContainerLaunchContextPBImpl 
 public class ContainerLaunchContextPBImpl 
 extends ProtoBase<ContainerLaunchContextProto> 
 extends ProtoBase<ContainerLaunchContextProto> 
 implements ContainerLaunchContext {
 implements ContainerLaunchContext {
@@ -54,10 +52,9 @@ implements ContainerLaunchContext {
   private Map<String, LocalResource> localResources = null;
   private Map<String, LocalResource> localResources = null;
   private ByteBuffer containerTokens = null;
   private ByteBuffer containerTokens = null;
   private Map<String, ByteBuffer> serviceData = null;
   private Map<String, ByteBuffer> serviceData = null;
-  private Map<String, String> env = null;
+  private Map<String, String> environment = null;
   private List<String> commands = null;
   private List<String> commands = null;
   
   
-  
   public ContainerLaunchContextPBImpl() {
   public ContainerLaunchContextPBImpl() {
     builder = ContainerLaunchContextProto.newBuilder();
     builder = ContainerLaunchContextProto.newBuilder();
   }
   }
@@ -94,7 +91,7 @@ implements ContainerLaunchContext {
     if (this.serviceData != null) {
     if (this.serviceData != null) {
       addServiceDataToProto();
       addServiceDataToProto();
     }
     }
-    if (this.env != null) {
+    if (this.environment != null) {
       addEnvToProto();
       addEnvToProto();
     }
     }
     if (this.commands != null) {
     if (this.commands != null) {
@@ -364,37 +361,37 @@ implements ContainerLaunchContext {
   }
   }
   
   
   @Override
   @Override
-  public Map<String, String> getEnv() {
+  public Map<String, String> getEnvironment() {
     initEnv();
     initEnv();
-    return this.env;
+    return this.environment;
   }
   }
   
   
   private void initEnv() {
   private void initEnv() {
-    if (this.env != null) {
+    if (this.environment != null) {
       return;
       return;
     }
     }
     ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder;
     ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder;
-    List<StringStringMapProto> list = p.getEnvList();
-    this.env = new HashMap<String, String>();
+    List<StringStringMapProto> list = p.getEnvironmentList();
+    this.environment = new HashMap<String, String>();
 
 
     for (StringStringMapProto c : list) {
     for (StringStringMapProto c : list) {
-      this.env.put(c.getKey(), c.getValue());
+      this.environment.put(c.getKey(), c.getValue());
     }
     }
   }
   }
   
   
   @Override
   @Override
-  public void setEnv(final Map<String, String> env) {
+  public void setEnvironment(final Map<String, String> env) {
     if (env == null)
     if (env == null)
       return;
       return;
     initEnv();
     initEnv();
-    this.env.clear();
-    this.env.putAll(env);
+    this.environment.clear();
+    this.environment.putAll(env);
   }
   }
   
   
   private void addEnvToProto() {
   private void addEnvToProto() {
     maybeInitBuilder();
     maybeInitBuilder();
-    builder.clearEnv();
-    if (env == null)
+    builder.clearEnvironment();
+    if (environment == null)
       return;
       return;
     Iterable<StringStringMapProto> iterable = 
     Iterable<StringStringMapProto> iterable = 
         new Iterable<StringStringMapProto>() {
         new Iterable<StringStringMapProto>() {
@@ -403,7 +400,7 @@ implements ContainerLaunchContext {
       public Iterator<StringStringMapProto> iterator() {
       public Iterator<StringStringMapProto> iterator() {
         return new Iterator<StringStringMapProto>() {
         return new Iterator<StringStringMapProto>() {
           
           
-          Iterator<String> keyIter = env.keySet().iterator();
+          Iterator<String> keyIter = environment.keySet().iterator();
           
           
           @Override
           @Override
           public void remove() {
           public void remove() {
@@ -414,7 +411,7 @@ implements ContainerLaunchContext {
           public StringStringMapProto next() {
           public StringStringMapProto next() {
             String key = keyIter.next();
             String key = keyIter.next();
             return StringStringMapProto.newBuilder().setKey(key).setValue(
             return StringStringMapProto.newBuilder().setKey(key).setValue(
-                (env.get(key))).build();
+                (environment.get(key))).build();
           }
           }
           
           
           @Override
           @Override
@@ -424,7 +421,7 @@ implements ContainerLaunchContext {
         };
         };
       }
       }
     };
     };
-    builder.addAllEnv(iterable);
+    builder.addAllEnvironment(iterable);
   }
   }
 
 
   private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
   private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {

+ 6 - 12
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto

@@ -188,17 +188,11 @@ message AMResponseProto {
 ////////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////////
 message ApplicationSubmissionContextProto {
 message ApplicationSubmissionContextProto {
   optional ApplicationIdProto application_id = 1;
   optional ApplicationIdProto application_id = 1;
-  optional string application_name = 2;
-  optional ResourceProto master_capability = 3;
-  repeated StringURLMapProto resources = 4;
-  repeated StringLocalResourceMapProto resources_todo = 5;
-  repeated string fs_tokens = 6;
-  optional bytes fs_tokens_todo = 7;
-  repeated StringStringMapProto environment = 8;
-  repeated string command = 9;
-  optional string queue = 10;
-  optional PriorityProto priority = 11;
-  optional string user = 12;
+  optional string application_name = 2 [default = "N/A"];
+  optional string user = 3; 
+  optional string queue = 4 [default = "default"];
+  optional PriorityProto priority = 5;
+  optional ContainerLaunchContextProto am_container_spec = 6;
 }
 }
 
 
 message YarnClusterMetricsProto {
 message YarnClusterMetricsProto {
@@ -242,7 +236,7 @@ message ContainerLaunchContextProto {
   repeated StringLocalResourceMapProto localResources = 4;
   repeated StringLocalResourceMapProto localResources = 4;
   optional bytes container_tokens = 5;
   optional bytes container_tokens = 5;
   repeated StringBytesMapProto service_data = 6;
   repeated StringBytesMapProto service_data = 6;
-  repeated StringStringMapProto env = 7;
+  repeated StringStringMapProto environment = 7;
   repeated string command = 8;
   repeated string command = 8;
 }
 }
 
 

+ 6 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -219,6 +219,12 @@ public class YarnConfiguration extends Configuration {
     RM_PREFIX + "max-completed-applications";
     RM_PREFIX + "max-completed-applications";
   public static final int DEFAULT_RM_MAX_COMPLETED_APPLICATIONS = 10000;
   public static final int DEFAULT_RM_MAX_COMPLETED_APPLICATIONS = 10000;
   
   
+  /** Default application name */
+  public static final String DEFAULT_APPLICATION_NAME = "N/A";
+
+  /** Default queue name */
+  public static final String DEFAULT_QUEUE_NAME = "default";
+  
   ////////////////////////////////
   ////////////////////////////////
   // Node Manager Configs
   // Node Manager Configs
   ////////////////////////////////
   ////////////////////////////////

+ 3 - 3
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java

@@ -89,7 +89,7 @@ public class ContainerLaunch implements Callable<Integer> {
     final Map<Path,String> localResources = container.getLocalizedResources();
     final Map<Path,String> localResources = container.getLocalizedResources();
     String containerIdStr = ConverterUtils.toString(container.getContainerID());
     String containerIdStr = ConverterUtils.toString(container.getContainerID());
     final String user = launchContext.getUser();
     final String user = launchContext.getUser();
-    final Map<String,String> env = launchContext.getEnv();
+    final Map<String,String> env = launchContext.getEnvironment();
     final List<String> command = launchContext.getCommands();
     final List<String> command = launchContext.getCommands();
     int ret = -1;
     int ret = -1;
 
 
@@ -109,7 +109,7 @@ public class ContainerLaunch implements Callable<Integer> {
       }
       }
       launchContext.setCommands(newCmds);
       launchContext.setCommands(newCmds);
 
 
-      Map<String, String> envs = launchContext.getEnv();
+      Map<String, String> envs = launchContext.getEnvironment();
       Map<String, String> newEnvs = new HashMap<String, String>(envs.size());
       Map<String, String> newEnvs = new HashMap<String, String>(envs.size());
       for (Entry<String, String> entry : envs.entrySet()) {
       for (Entry<String, String> entry : envs.entrySet()) {
         newEnvs.put(
         newEnvs.put(
@@ -118,7 +118,7 @@ public class ContainerLaunch implements Callable<Integer> {
                 ApplicationConstants.LOG_DIR_EXPANSION_VAR,
                 ApplicationConstants.LOG_DIR_EXPANSION_VAR,
                 containerLogDir.toUri().getPath()));
                 containerLogDir.toUri().getPath()));
       }
       }
-      launchContext.setEnv(newEnvs);
+      launchContext.setEnvironment(newEnvs);
       // /////////////////////////// End of variable expansion
       // /////////////////////////// End of variable expansion
 
 
       FileContext lfs = FileContext.getLocalFSFileContext();
       FileContext lfs = FileContext.getLocalFSFileContext();

+ 7 - 6
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java

@@ -71,7 +71,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstant
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.AbstractService;
@@ -90,7 +89,6 @@ public class ClientRMService extends AbstractService implements
   final private AtomicInteger applicationCounter = new AtomicInteger(0);
   final private AtomicInteger applicationCounter = new AtomicInteger(0);
   final private YarnScheduler scheduler;
   final private YarnScheduler scheduler;
   final private RMContext rmContext;
   final private RMContext rmContext;
-  private final AMLivelinessMonitor amLivelinessMonitor;
   private final RMAppManager rmAppManager;
   private final RMAppManager rmAppManager;
 
 
   private String clientServiceBindAddress;
   private String clientServiceBindAddress;
@@ -106,7 +104,6 @@ public class ClientRMService extends AbstractService implements
     super(ClientRMService.class.getName());
     super(ClientRMService.class.getName());
     this.scheduler = scheduler;
     this.scheduler = scheduler;
     this.rmContext = rmContext;
     this.rmContext = rmContext;
-    this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
     this.rmAppManager = rmAppManager;
     this.rmAppManager = rmAppManager;
   }
   }
   
   
@@ -195,15 +192,18 @@ public class ClientRMService extends AbstractService implements
       SubmitApplicationRequest request) throws YarnRemoteException {
       SubmitApplicationRequest request) throws YarnRemoteException {
     ApplicationSubmissionContext submissionContext = request
     ApplicationSubmissionContext submissionContext = request
         .getApplicationSubmissionContext();
         .getApplicationSubmissionContext();
-    ApplicationId applicationId = null;
-    String user = null;
+    ApplicationId applicationId = submissionContext.getApplicationId();
+    String user = submissionContext.getUser();
     try {
     try {
       user = UserGroupInformation.getCurrentUser().getShortUserName();
       user = UserGroupInformation.getCurrentUser().getShortUserName();
-      applicationId = submissionContext.getApplicationId();
       if (rmContext.getRMApps().get(applicationId) != null) {
       if (rmContext.getRMApps().get(applicationId) != null) {
         throw new IOException("Application with id " + applicationId
         throw new IOException("Application with id " + applicationId
             + " is already present! Cannot add a duplicate!");
             + " is already present! Cannot add a duplicate!");
       }
       }
+      
+      // Safety 
+      submissionContext.setUser(user);
+      
       // This needs to be synchronous as the client can query 
       // This needs to be synchronous as the client can query 
       // immediately following the submission to get the application status.
       // immediately following the submission to get the application status.
       // So call handle directly and do not send an event.
       // So call handle directly and do not send an event.
@@ -226,6 +226,7 @@ public class ClientRMService extends AbstractService implements
     return response;
     return response;
   }
   }
 
 
+  @SuppressWarnings("unchecked")
   @Override
   @Override
   public FinishApplicationResponse finishApplication(
   public FinishApplicationResponse finishApplication(
       FinishApplicationRequest request) throws YarnRemoteException {
       FinishApplicationRequest request) throws YarnRemoteException {

+ 23 - 11
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java

@@ -210,7 +210,9 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent> {
     }
     }
   }
   }
 
 
-  protected synchronized void submitApplication(ApplicationSubmissionContext submissionContext) {
+  @SuppressWarnings("unchecked")
+  protected synchronized void submitApplication(
+      ApplicationSubmissionContext submissionContext) {
     ApplicationId applicationId = submissionContext.getApplicationId();
     ApplicationId applicationId = submissionContext.getApplicationId();
     RMApp application = null;
     RMApp application = null;
     try {
     try {
@@ -224,27 +226,37 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent> {
         clientTokenStr = clientToken.encodeToUrlString();
         clientTokenStr = clientToken.encodeToUrlString();
         LOG.debug("Sending client token as " + clientTokenStr);
         LOG.debug("Sending client token as " + clientTokenStr);
       }
       }
-      submissionContext.setQueue(submissionContext.getQueue() == null
-          ? "default" : submissionContext.getQueue());
-      submissionContext.setApplicationName(submissionContext
-          .getApplicationName() == null ? "N/A" : submissionContext
-          .getApplicationName());
+      
+      // Sanity checks
+      if (submissionContext.getQueue() == null) {
+        submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
+      }
+      if (submissionContext.getApplicationName() == null) {
+        submissionContext.setApplicationName(
+            YarnConfiguration.DEFAULT_APPLICATION_NAME);
+      }
+
+      // Store application for recovery
       ApplicationStore appStore = rmContext.getApplicationsStore()
       ApplicationStore appStore = rmContext.getApplicationsStore()
           .createApplicationStore(submissionContext.getApplicationId(),
           .createApplicationStore(submissionContext.getApplicationId(),
           submissionContext);
           submissionContext);
+      
+      // Create RMApp
       application = new RMAppImpl(applicationId, rmContext,
       application = new RMAppImpl(applicationId, rmContext,
           this.conf, submissionContext.getApplicationName(), user,
           this.conf, submissionContext.getApplicationName(), user,
           submissionContext.getQueue(), submissionContext, clientTokenStr,
           submissionContext.getQueue(), submissionContext, clientTokenStr,
-          appStore, rmContext.getAMLivelinessMonitor(), this.scheduler,
+          appStore, this.scheduler,
           this.masterService);
           this.masterService);
 
 
-      if (rmContext.getRMApps().putIfAbsent(applicationId, application) != null) {
+      if (rmContext.getRMApps().putIfAbsent(applicationId, application) != 
+          null) {
         LOG.info("Application with id " + applicationId + 
         LOG.info("Application with id " + applicationId + 
             " is already present! Cannot add a duplicate!");
             " is already present! Cannot add a duplicate!");
-        // don't send event through dispatcher as it will be handled by app already
-        // present with this id.
+        // don't send event through dispatcher as it will be handled by app 
+        // already present with this id.
         application.handle(new RMAppRejectedEvent(applicationId,
         application.handle(new RMAppRejectedEvent(applicationId,
-            "Application with this id is already present! Cannot add a duplicate!"));
+            "Application with this id is already present! " +
+            "Cannot add a duplicate!"));
       } else {
       } else {
         this.rmContext.getDispatcher().getEventHandler().handle(
         this.rmContext.getDispatcher().getEventHandler().handle(
             new RMAppEvent(applicationId, RMAppEventType.START));
             new RMAppEvent(applicationId, RMAppEventType.START));

+ 0 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java

@@ -18,7 +18,6 @@
 
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 
 
 public class RMAppManagerSubmitEvent extends RMAppManagerEvent {
 public class RMAppManagerSubmitEvent extends RMAppManagerEvent {

+ 23 - 25
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java

@@ -23,7 +23,6 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 
 
@@ -120,7 +119,8 @@ public class AMLauncher implements Runnable {
         + " for AM " + application.getAppAttemptId());  
         + " for AM " + application.getAppAttemptId());  
     ContainerLaunchContext launchContext =
     ContainerLaunchContext launchContext =
         createAMContainerLaunchContext(applicationContext, masterContainerID);
         createAMContainerLaunchContext(applicationContext, masterContainerID);
-    StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class);
+    StartContainerRequest request = 
+        recordFactory.newRecordInstance(StartContainerRequest.class);
     request.setContainerLaunchContext(launchContext);
     request.setContainerLaunchContext(launchContext);
     containerMgrProxy.startContainer(request);
     containerMgrProxy.startContainer(request);
     LOG.info("Done launching container " + application.getMasterContainer() 
     LOG.info("Done launching container " + application.getMasterContainer() 
@@ -130,7 +130,8 @@ public class AMLauncher implements Runnable {
   private void cleanup() throws IOException {
   private void cleanup() throws IOException {
     connect();
     connect();
     ContainerId containerId = application.getMasterContainer().getId();
     ContainerId containerId = application.getMasterContainer().getId();
-    StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
+    StopContainerRequest stopRequest = 
+        recordFactory.newRecordInstance(StopContainerRequest.class);
     stopRequest.setContainerId(containerId);
     stopRequest.setContainerId(containerId);
     containerMgrProxy.stopContainer(stopRequest);
     containerMgrProxy.stopContainer(stopRequest);
   }
   }
@@ -145,7 +146,7 @@ public class AMLauncher implements Runnable {
     final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again.
     final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again.
 
 
     UserGroupInformation currentUser =
     UserGroupInformation currentUser =
-        UserGroupInformation.createRemoteUser("TODO"); // TODO
+        UserGroupInformation.createRemoteUser("yarn"); // TODO
     if (UserGroupInformation.isSecurityEnabled()) {
     if (UserGroupInformation.isSecurityEnabled()) {
       ContainerToken containerToken = container.getContainerToken();
       ContainerToken containerToken = container.getContainerToken();
       Token<ContainerTokenIdentifier> token =
       Token<ContainerTokenIdentifier> token =
@@ -170,8 +171,8 @@ public class AMLauncher implements Runnable {
       ContainerId containerID) throws IOException {
       ContainerId containerID) throws IOException {
 
 
     // Construct the actual Container
     // Construct the actual Container
-    ContainerLaunchContext container = recordFactory.newRecordInstance(ContainerLaunchContext.class);
-    container.setCommands(applicationMasterContext.getCommandList());
+    ContainerLaunchContext container = 
+        applicationMasterContext.getAMContainerSpec();
     StringBuilder mergedCommand = new StringBuilder();
     StringBuilder mergedCommand = new StringBuilder();
     String failCount = Integer.toString(application.getAppAttemptId()
     String failCount = Integer.toString(application.getAppAttemptId()
         .getAttemptId());
         .getAttemptId());
@@ -189,34 +190,28 @@ public class AMLauncher implements Runnable {
    
    
     LOG.info("Command to launch container " + 
     LOG.info("Command to launch container " + 
         containerID + " : " + mergedCommand);
         containerID + " : " + mergedCommand);
-    Map<String, String> environment = 
-        applicationMasterContext.getAllEnvironment();
-    environment.putAll(setupTokensInEnv(applicationMasterContext));
-    container.setEnv(environment);
-
-    // Construct the actual Container
+    
+    // Finalize the container
     container.setContainerId(containerID);
     container.setContainerId(containerID);
     container.setUser(applicationMasterContext.getUser());
     container.setUser(applicationMasterContext.getUser());
-    container.setResource(applicationMasterContext.getMasterCapability());
-    container.setLocalResources(applicationMasterContext.getAllResourcesTodo());
-    container.setContainerTokens(applicationMasterContext.getFsTokensTodo());
+    setupTokensAndEnv(container);
+    
     return container;
     return container;
   }
   }
 
 
-  private Map<String, String> setupTokensInEnv(
-      ApplicationSubmissionContext asc)
+  private void setupTokensAndEnv(
+      ContainerLaunchContext container)
       throws IOException {
       throws IOException {
-    Map<String, String> env =
-      new HashMap<String, String>();
+    Map<String, String> environment = container.getEnvironment();
     if (UserGroupInformation.isSecurityEnabled()) {
     if (UserGroupInformation.isSecurityEnabled()) {
       // TODO: Security enabled/disabled info should come from RM.
       // TODO: Security enabled/disabled info should come from RM.
 
 
       Credentials credentials = new Credentials();
       Credentials credentials = new Credentials();
 
 
       DataInputByteBuffer dibb = new DataInputByteBuffer();
       DataInputByteBuffer dibb = new DataInputByteBuffer();
-      if (asc.getFsTokensTodo() != null) {
+      if (container.getContainerTokens() != null) {
         // TODO: Don't do this kind of checks everywhere.
         // TODO: Don't do this kind of checks everywhere.
-        dibb.reset(asc.getFsTokensTodo());
+        dibb.reset(container.getContainerTokens());
         credentials.readTokenStorageStream(dibb);
         credentials.readTokenStorageStream(dibb);
       }
       }
 
 
@@ -236,14 +231,16 @@ public class AMLauncher implements Runnable {
       token.setService(new Text(resolvedAddr));
       token.setService(new Text(resolvedAddr));
       String appMasterTokenEncoded = token.encodeToUrlString();
       String appMasterTokenEncoded = token.encodeToUrlString();
       LOG.debug("Putting appMaster token in env : " + appMasterTokenEncoded);
       LOG.debug("Putting appMaster token in env : " + appMasterTokenEncoded);
-      env.put(ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME,
+      environment.put(
+          ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME,
           appMasterTokenEncoded);
           appMasterTokenEncoded);
 
 
       // Add the RM token
       // Add the RM token
       credentials.addToken(new Text(resolvedAddr), token);
       credentials.addToken(new Text(resolvedAddr), token);
       DataOutputBuffer dob = new DataOutputBuffer();
       DataOutputBuffer dob = new DataOutputBuffer();
       credentials.writeTokenStorageToStream(dob);
       credentials.writeTokenStorageToStream(dob);
-      asc.setFsTokensTodo(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
+      container.setContainerTokens(
+          ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
 
 
       ApplicationTokenIdentifier identifier = new ApplicationTokenIdentifier(
       ApplicationTokenIdentifier identifier = new ApplicationTokenIdentifier(
           application.getAppAttemptId().getApplicationId());
           application.getAppAttemptId().getApplicationId());
@@ -252,9 +249,10 @@ public class AMLauncher implements Runnable {
       String encoded =
       String encoded =
           Base64.encodeBase64URLSafeString(clientSecretKey.getEncoded());
           Base64.encodeBase64URLSafeString(clientSecretKey.getEncoded());
       LOG.debug("The encoded client secret-key to be put in env : " + encoded);
       LOG.debug("The encoded client secret-key to be put in env : " + encoded);
-      env.put(ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME, encoded);
+      environment.put(
+          ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME, 
+          encoded);
     }
     }
-    return env;
   }
   }
   
   
   @SuppressWarnings("unchecked")
   @SuppressWarnings("unchecked")

+ 3 - 3
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -86,7 +86,6 @@ public class RMAppImpl implements RMApp {
   // Mutable fields
   // Mutable fields
   private long startTime;
   private long startTime;
   private long finishTime;
   private long finishTime;
-  private AMLivelinessMonitor amLivelinessMonitor;
   private RMAppAttempt currentAttempt;
   private RMAppAttempt currentAttempt;
 
 
   private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
   private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
@@ -163,7 +162,7 @@ public class RMAppImpl implements RMApp {
   public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
   public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
       Configuration config, String name, String user, String queue,
       Configuration config, String name, String user, String queue,
       ApplicationSubmissionContext submissionContext, String clientTokenStr,
       ApplicationSubmissionContext submissionContext, String clientTokenStr,
-      ApplicationStore appStore, AMLivelinessMonitor amLivelinessMonitor,
+      ApplicationStore appStore, 
       YarnScheduler scheduler, ApplicationMasterService masterService) {
       YarnScheduler scheduler, ApplicationMasterService masterService) {
 
 
     this.applicationId = applicationId;
     this.applicationId = applicationId;
@@ -176,7 +175,6 @@ public class RMAppImpl implements RMApp {
     this.submissionContext = submissionContext;
     this.submissionContext = submissionContext;
     this.clientTokenStr = clientTokenStr;
     this.clientTokenStr = clientTokenStr;
     this.appStore = appStore;
     this.appStore = appStore;
-    this.amLivelinessMonitor = amLivelinessMonitor;
     this.scheduler = scheduler;
     this.scheduler = scheduler;
     this.masterService = masterService;
     this.masterService = masterService;
     this.startTime = System.currentTimeMillis();
     this.startTime = System.currentTimeMillis();
@@ -380,6 +378,7 @@ public class RMAppImpl implements RMApp {
     }
     }
   }
   }
 
 
+  @SuppressWarnings("unchecked")
   private void createNewAttempt() {
   private void createNewAttempt() {
     ApplicationAttemptId appAttemptId = Records
     ApplicationAttemptId appAttemptId = Records
         .newRecord(ApplicationAttemptId.class);
         .newRecord(ApplicationAttemptId.class);
@@ -434,6 +433,7 @@ public class RMAppImpl implements RMApp {
       return nodes;
       return nodes;
     }
     }
 
 
+    @SuppressWarnings("unchecked")
     public void transition(RMAppImpl app, RMAppEvent event) {
     public void transition(RMAppImpl app, RMAppEvent event) {
       Set<NodeId> nodes = getNodesOnWhichAttemptRan(app);
       Set<NodeId> nodes = getNodesOnWhichAttemptRan(app);
       for (NodeId nodeId : nodes) {
       for (NodeId nodeId : nodes) {

+ 2 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java

@@ -84,6 +84,7 @@ public class RMAppAttemptImpl implements RMAppAttempt {
                              RMAppAttemptEvent> stateMachine;
                              RMAppAttemptEvent> stateMachine;
 
 
   private final RMContext rmContext;
   private final RMContext rmContext;
+  @SuppressWarnings("rawtypes")
   private final EventHandler eventHandler;
   private final EventHandler eventHandler;
   private final YarnScheduler scheduler;
   private final YarnScheduler scheduler;
   private final ApplicationMasterService masterService;
   private final ApplicationMasterService masterService;
@@ -459,7 +460,7 @@ public class RMAppAttemptImpl implements RMAppAttempt {
       // Request a container for the AM.
       // Request a container for the AM.
       ResourceRequest request = BuilderUtils.newResourceRequest(
       ResourceRequest request = BuilderUtils.newResourceRequest(
           AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext
           AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext
-              .getMasterCapability(), 1);
+              .getAMContainerSpec().getResource(), 1);
       LOG.debug("About to request resources for AM of "
       LOG.debug("About to request resources for AM of "
           + appAttempt.applicationAttemptId + " required " + request);
           + appAttempt.applicationAttemptId + " required " + request);
 
 

+ 2 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java

@@ -23,7 +23,6 @@ import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR;
 import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR_VALUE;
 import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR_VALUE;
 
 
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
@@ -59,7 +58,8 @@ class AppsBlock extends HtmlBlock {
       String appId = app.getApplicationId().toString();
       String appId = app.getApplicationId().toString();
       String trackingUrl = app.getTrackingUrl();
       String trackingUrl = app.getTrackingUrl();
       String ui = trackingUrl == null || trackingUrl.isEmpty() ? "UNASSIGNED" :
       String ui = trackingUrl == null || trackingUrl.isEmpty() ? "UNASSIGNED" :
-          (app.getFinishTime() == 0 ? "ApplicationMaster" : "JobHistory");
+          (app.getFinishTime() == 0 ? 
+              "ApplicationMaster URL" : "JobHistory URL");
       String percent = String.format("%.1f", app.getProgress() * 100);
       String percent = String.format("%.1f", app.getProgress() * 100);
       tbody.
       tbody.
         tr().
         tr().

+ 7 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
@@ -81,13 +82,17 @@ public class MockRM extends ResourceManager {
     ApplicationId appId = resp.getApplicationId();
     ApplicationId appId = resp.getApplicationId();
     
     
     SubmitApplicationRequest req = Records.newRecord(SubmitApplicationRequest.class);
     SubmitApplicationRequest req = Records.newRecord(SubmitApplicationRequest.class);
-    ApplicationSubmissionContext sub = Records.newRecord(ApplicationSubmissionContext.class);
+    ApplicationSubmissionContext sub = 
+        Records.newRecord(ApplicationSubmissionContext.class);
     sub.setApplicationId(appId);
     sub.setApplicationId(appId);
     sub.setApplicationName("");
     sub.setApplicationName("");
     sub.setUser("");
     sub.setUser("");
+    ContainerLaunchContext clc = 
+        Records.newRecord(ContainerLaunchContext.class);
     Resource capability = Records.newRecord(Resource.class);
     Resource capability = Records.newRecord(Resource.class);
     capability.setMemory(masterMemory);
     capability.setMemory(masterMemory);
-    sub.setMasterCapability(capability);
+    clc.setResource(capability);
+    sub.setAMContainerSpec(clc);
     req.setApplicationSubmissionContext(sub);
     req.setApplicationSubmissionContext(sub);
     
     
     client.submitApplication(req);
     client.submitApplication(req);

+ 15 - 25
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java

@@ -18,19 +18,12 @@
 
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 
-import static org.mockito.Mockito.*;
 
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
-import java.util.LinkedList;
-import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentMap;
 
 
-
 import junit.framework.Assert;
 import junit.framework.Assert;
 
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -44,7 +37,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
 import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
 import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
 import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
@@ -63,8 +55,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.service.Service;
 import org.apache.hadoop.yarn.service.Service;
 
 
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Lists;
@@ -75,7 +65,6 @@ import com.google.common.collect.Lists;
  */
  */
 
 
 public class TestAppManager{
 public class TestAppManager{
-  private static final Log LOG = LogFactory.getLog(TestAppManager.class);
   private static RMAppEventType appEventType = RMAppEventType.KILL; 
   private static RMAppEventType appEventType = RMAppEventType.KILL; 
 
 
   public synchronized RMAppEventType getAppEventType() {
   public synchronized RMAppEventType getAppEventType() {
@@ -117,10 +106,8 @@ public class TestAppManager{
   public class TestAppManagerDispatcher implements
   public class TestAppManagerDispatcher implements
       EventHandler<RMAppManagerEvent> {
       EventHandler<RMAppManagerEvent> {
 
 
-    private final RMContext rmContext;
 
 
-    public TestAppManagerDispatcher(RMContext rmContext) {
-      this.rmContext = rmContext;
+    public TestAppManagerDispatcher() {
     }
     }
 
 
     @Override
     @Override
@@ -132,15 +119,11 @@ public class TestAppManager{
   public class TestDispatcher implements
   public class TestDispatcher implements
       EventHandler<RMAppEvent> {
       EventHandler<RMAppEvent> {
 
 
-    private final RMContext rmContext;
-
-    public TestDispatcher(RMContext rmContext) {
-      this.rmContext = rmContext;
+    public TestDispatcher() {
     }
     }
 
 
     @Override
     @Override
     public void handle(RMAppEvent event) {
     public void handle(RMAppEvent event) {
-      ApplicationId appID = event.getApplicationId();
       //RMApp rmApp = this.rmContext.getRMApps().get(appID);
       //RMApp rmApp = this.rmContext.getRMApps().get(appID);
       setAppEventType(event.getType());
       setAppEventType(event.getType());
       System.out.println("in handle routine " + getAppEventType().toString());
       System.out.println("in handle routine " + getAppEventType().toString());
@@ -178,7 +161,8 @@ public class TestAppManager{
     public void setCompletedAppsMax(int max) {
     public void setCompletedAppsMax(int max) {
       super.setCompletedAppsMax(max);
       super.setCompletedAppsMax(max);
     }
     }
-    public void submitApplication(ApplicationSubmissionContext submissionContext) {
+    public void submitApplication(
+        ApplicationSubmissionContext submissionContext) {
       super.submitApplication(submissionContext);
       super.submitApplication(submissionContext);
     }
     }
   }
   }
@@ -336,8 +320,9 @@ public class TestAppManager{
   }
   }
 
 
   protected void setupDispatcher(RMContext rmContext, Configuration conf) {
   protected void setupDispatcher(RMContext rmContext, Configuration conf) {
-    TestDispatcher testDispatcher = new TestDispatcher(rmContext);
-    TestAppManagerDispatcher testAppManagerDispatcher = new TestAppManagerDispatcher(rmContext);
+    TestDispatcher testDispatcher = new TestDispatcher();
+    TestAppManagerDispatcher testAppManagerDispatcher = 
+        new TestAppManagerDispatcher();
     rmContext.getDispatcher().register(RMAppEventType.class, testDispatcher);
     rmContext.getDispatcher().register(RMAppEventType.class, testDispatcher);
     rmContext.getDispatcher().register(RMAppManagerEventType.class, testAppManagerDispatcher);
     rmContext.getDispatcher().register(RMAppManagerEventType.class, testAppManagerDispatcher);
     ((Service)rmContext.getDispatcher()).init(conf);
     ((Service)rmContext.getDispatcher()).init(conf);
@@ -359,7 +344,8 @@ public class TestAppManager{
 
 
     ApplicationId appID = MockApps.newAppID(1);
     ApplicationId appID = MockApps.newAppID(1);
     RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
     RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-    ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+    ApplicationSubmissionContext context = 
+        recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
     context.setApplicationId(appID);
     context.setApplicationId(appID);
     setupDispatcher(rmContext, conf);
     setupDispatcher(rmContext, conf);
 
 
@@ -367,8 +353,12 @@ public class TestAppManager{
     RMApp app = rmContext.getRMApps().get(appID);
     RMApp app = rmContext.getRMApps().get(appID);
     Assert.assertNotNull("app is null", app);
     Assert.assertNotNull("app is null", app);
     Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
     Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
-    Assert.assertEquals("app name doesn't match", "N/A", app.getName());
-    Assert.assertEquals("app queue doesn't match", "default", app.getQueue());
+    Assert.assertEquals("app name doesn't match", 
+        YarnConfiguration.DEFAULT_APPLICATION_NAME, 
+        app.getName());
+    Assert.assertEquals("app queue doesn't match", 
+        YarnConfiguration.DEFAULT_QUEUE_NAME, 
+        app.getQueue());
     Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState());
     Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState());
     Assert.assertNotNull("app store is null", app.getApplicationStore());
     Assert.assertNotNull("app store is null", app.getApplicationStore());
 
 

+ 1 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java

@@ -128,7 +128,7 @@ public class TestRMAppTransitions {
     RMApp application = new RMAppImpl(applicationId, rmContext,
     RMApp application = new RMAppImpl(applicationId, rmContext,
           conf, name, user,
           conf, name, user,
           queue, submissionContext, clientTokenStr,
           queue, submissionContext, clientTokenStr,
-          appStore, rmContext.getAMLivelinessMonitor(), scheduler,
+          appStore, scheduler,
           masterService);
           masterService);
 
 
     testAppStartState(applicationId, user, name, queue, application);
     testAppStartState(applicationId, user, name, queue, application);

+ 10 - 10
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java

@@ -27,6 +27,8 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.List;
 
 
 import junit.framework.Assert;
 import junit.framework.Assert;
@@ -54,10 +56,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationMaster;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -77,6 +79,7 @@ import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
 import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -137,15 +140,11 @@ public class TestContainerTokenSecretManager {
     ApplicationSubmissionContext appSubmissionContext =
     ApplicationSubmissionContext appSubmissionContext =
         recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
         recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
     appSubmissionContext.setApplicationId(appID);
     appSubmissionContext.setApplicationId(appID);
-    appSubmissionContext.setMasterCapability(recordFactory
-        .newRecordInstance(Resource.class));
-    appSubmissionContext.getMasterCapability().setMemory(1024);
-//    appSubmissionContext.resources = new HashMap<String, URL>();
+    ContainerLaunchContext amContainer =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+    amContainer.setResource(Resources.createResource(1024));
+    amContainer.setCommands(Arrays.asList("sleep", "100"));
     appSubmissionContext.setUser("testUser");
     appSubmissionContext.setUser("testUser");
-//    appSubmissionContext.environment = new HashMap<String, String>();
-//    appSubmissionContext.command = new ArrayList<String>();
-    appSubmissionContext.addCommand("sleep");
-    appSubmissionContext.addCommand("100");
 
 
     // TODO: Use a resource to work around bugs. Today NM doesn't create local
     // TODO: Use a resource to work around bugs. Today NM doesn't create local
     // app-dirs if there are no file to download!!
     // app-dirs if there are no file to download!!
@@ -162,10 +161,11 @@ public class TestContainerTokenSecretManager {
     rsrc.setTimestamp(file.lastModified());
     rsrc.setTimestamp(file.lastModified());
     rsrc.setType(LocalResourceType.FILE);
     rsrc.setType(LocalResourceType.FILE);
     rsrc.setVisibility(LocalResourceVisibility.PRIVATE);
     rsrc.setVisibility(LocalResourceVisibility.PRIVATE);
-    appSubmissionContext.setResourceTodo("testFile", rsrc);
+    amContainer.setLocalResources(Collections.singletonMap("testFile", rsrc));
     SubmitApplicationRequest submitRequest = recordFactory
     SubmitApplicationRequest submitRequest = recordFactory
         .newRecordInstance(SubmitApplicationRequest.class);
         .newRecordInstance(SubmitApplicationRequest.class);
     submitRequest.setApplicationSubmissionContext(appSubmissionContext);
     submitRequest.setApplicationSubmissionContext(appSubmissionContext);
+    appSubmissionContext.setAMContainerSpec(amContainer);
     resourceManager.getClientRMService().submitApplication(submitRequest);
     resourceManager.getClientRMService().submitApplication(submitRequest);
 
 
     // Wait till container gets allocated for AM
     // Wait till container gets allocated for AM