Преглед изворни кода

YARN-321. Forwarding YARN-321 branch to latest trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/YARN-321@1559250 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli пре 11 година
родитељ
комит
4e1884bb5e
20 измењених фајлова са 723 додато и 20 уклоњено
  1. 2 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 6 3
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  3. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
  4. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  5. 5 13
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
  6. 25 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
  7. 9 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
  8. 244 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java
  9. 7 1
      hadoop-project/pom.xml
  10. 3 0
      hadoop-yarn-project/CHANGES.txt
  11. 15 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
  12. 69 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/MoveApplicationAcrossQueuesRequest.java
  13. 47 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/MoveApplicationAcrossQueuesResponse.java
  14. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto
  15. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
  16. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java
  17. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java
  18. 158 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/MoveApplicationAcrossQueuesRequestPBImpl.java
  19. 68 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/MoveApplicationAcrossQueuesResponsePBImpl.java
  20. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java

+ 2 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -528,6 +528,8 @@ Release 2.4.0 - UNRELEASED
     HADOOP-10125. no need to process RPC request if the client connection
     has been dropped (Ming Ma via brandonli)
 
+    HADOOP-10235. Hadoop tarball has 2 versions of stax-api JARs. (tucu)
+
 Release 2.3.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 6 - 3
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -484,9 +484,8 @@ Trunk (Unreleased)
     HDFS-5775. Consolidate the code for serialization in CacheManager
     (Haohui Mai via brandonli)
 
-    HDFS-5704. Change OP_UPDATE_BLOCKS with a new OP_ADD_BLOCK. (jing9)
-
-    HDFS-5777. Update LayoutVersion for the new editlog op OP_ADD_BLOCK. (jing9)
+    HDFS-5794. Fix the inconsistency of layout version number of 
+    ADD_DATANODE_AND_STORAGE_UUIDS between trunk and branch-2. (jing9)
 
 Release 2.4.0 - UNRELEASED
 
@@ -694,6 +693,8 @@ Release 2.4.0 - UNRELEASED
     InvalidEncryptionKeyException in fetchBlockByteRange (Liang Xie via Colin
     Patrick McCabe)
 
+    HDFS-5704. Change OP_UPDATE_BLOCKS with a new OP_ADD_BLOCK. (jing9)
+
   OPTIMIZATIONS
 
     HDFS-5239.  Allow FSNamesystem lock fairness to be configurable (daryn)
@@ -772,6 +773,8 @@ Release 2.4.0 - UNRELEASED
     HDFS-5579. Under construction files make DataNode decommission take very long
     hours. (zhaoyunjiong via jing9)
 
+    HDFS-5777. Update LayoutVersion for the new editlog op OP_ADD_BLOCK. (jing9)
+
   BREAKDOWN OF HDFS-2832 SUBTASKS AND RELATED JIRAS
 
     HDFS-4985. Add storage type to the protocol and expose it in block report

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java

@@ -109,10 +109,10 @@ public class LayoutVersion {
         + "enable rebuilding retry cache in case of HA failover"),
     EDITLOG_ADD_BLOCK(-48, "Add new editlog that only records allocation of "
         + "the new block instead of the entire block list"),
-    CACHING(-49, "Support for cache pools and path-based caching"),
-    ADD_DATANODE_AND_STORAGE_UUIDS(-50, "Replace StorageID with DatanodeUuid."
+    ADD_DATANODE_AND_STORAGE_UUIDS(-49, "Replace StorageID with DatanodeUuid."
         + " Use distinct StorageUuid per storage directory."),
-    ADD_LAYOUT_FLAGS(-51, "Add support for layout flags.");
+    ADD_LAYOUT_FLAGS(-50, "Add support for layout flags."),
+    CACHING(-51, "Support for cache pools and path-based caching");
 
     final int lv;
     final int ancestorLV;

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -334,6 +334,9 @@ Release 2.3.0 - UNRELEASED
     MAPREDUCE-5674. Missing start and finish time in mapred.JobStatus.
     (Chuan Liu via cnauroth)
 
+    MAPREDUCE-5650. Job fails when hprof mapreduce.task.profile.map/reduce.params
+    is specified (Gera Shegalov via Sandy Ryza)
+
 Release 2.2.0 - 2013-10-13
 
   INCOMPATIBLE CHANGES

+ 5 - 13
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java

@@ -212,19 +212,11 @@ public class MapReduceChildJVM {
     if (conf.getProfileEnabled()) {
       if (conf.getProfileTaskRange(task.isMapTask()
                                    ).isIncluded(task.getPartition())) {
-        vargs.add(
-            String.format(
-                conf.getProfileParams(), 
-                getTaskLogFile(TaskLog.LogName.PROFILE)
-                )
-            );
-        if (task.isMapTask()) {
-          vargs.add(conf.get(MRJobConfig.TASK_MAP_PROFILE_PARAMS, ""));
-        }
-        else {
-          vargs.add(conf.get(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, ""));
-        }
-        
+        final String profileParams = conf.get(task.isMapTask()
+            ? MRJobConfig.TASK_MAP_PROFILE_PARAMS
+            : MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, conf.getProfileParams());
+        vargs.add(String.format(profileParams,
+            getTaskLogFile(TaskLog.LogName.PROFILE)));
       }
     }
 

+ 25 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -602,6 +602,31 @@
     </description>
   </property>
 
+  <property>
+    <name>mapreduce.task.profile.params</name>
+    <value></value>
+    <description>JVM profiler parameters used to profile map and reduce task
+      attempts. This string may contain a single format specifier %s that will
+      be replaced by the path to profile.out in the task attempt log directory.
+      To specify different profiling options for map tasks and reduce tasks,
+      more specific parameters mapreduce.task.profile.map.params and
+      mapreduce.task.profile.reduce.params should be used.</description>
+  </property>
+
+  <property>
+    <name>mapreduce.task.profile.map.params</name>
+    <value>${mapreduce.task.profile.params}</value>
+    <description>Map-task-specific JVM profiler parameters. See
+      mapreduce.task.profile.params</description>
+  </property>
+
+  <property>
+    <name>mapreduce.task.profile.reduce.params</name>
+    <value>${mapreduce.task.profile.params}</value>
+    <description>Reduce-task-specific JVM profiler parameters. See
+      mapreduce.task.profile.params</description>
+  </property>
+
   <property>
     <name>mapreduce.task.skip.start.attempts</name>
     <value>2</value>

+ 9 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java

@@ -90,6 +90,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
@@ -99,6 +101,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -354,6 +357,12 @@ public class TestClientRedirect {
         CancelDelegationTokenRequest request) throws IOException {
       return null;
     }
+
+    @Override
+    public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+        MoveApplicationAcrossQueuesRequest request) throws YarnException, IOException {
+      return null;
+    }
   }
 
   class HistoryService extends AMService implements HSClientProtocol {

+ 244 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java

@@ -0,0 +1,244 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2;
+
+import java.io.*;
+import java.util.*;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.SleepJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMRJobsWithProfiler {
+
+  private static final Log LOG =
+    LogFactory.getLog(TestMRJobsWithProfiler.class);
+
+  private static final EnumSet<RMAppState> TERMINAL_RM_APP_STATES =
+    EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED);
+
+  private static MiniMRYarnCluster mrCluster;
+
+  private static final Configuration CONF = new Configuration();
+  private static final FileSystem localFs;
+  static {
+    try {
+      localFs = FileSystem.getLocal(CONF);
+    } catch (IOException io) {
+      throw new RuntimeException("problem getting local fs", io);
+    }
+  }
+
+  private static final Path TEST_ROOT_DIR =
+    new Path("target",  TestMRJobs.class.getName() + "-tmpDir").
+      makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+
+  private static final Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
+
+  @Before
+  public void setup() throws InterruptedException, IOException {
+
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    if (mrCluster == null) {
+      mrCluster = new MiniMRYarnCluster(getClass().getName());
+      mrCluster.init(CONF);
+      mrCluster.start();
+    }
+
+    // Copy MRAppJar and make it private. TODO: FIXME. This is a hack to
+    // workaround the absent public discache.
+    localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
+    localFs.setPermission(APP_JAR, new FsPermission("700"));
+  }
+
+  @After
+  public void tearDown() {
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+          + " not found. Not running test.");
+      return;
+    }
+
+    if (mrCluster != null) {
+      mrCluster.stop();
+    }
+  }
+
+
+  @Test (timeout = 120000)
+  public void testProfiler() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+        + " not found. Not running test.");
+      return;
+    }
+
+    final SleepJob sleepJob = new SleepJob();
+    final JobConf sleepConf = new JobConf(mrCluster.getConfig());
+
+    sleepConf.setProfileEnabled(true);
+    // profile map split 1
+    sleepConf.setProfileTaskRange(true, "1");
+    // profile reduce of map output partitions 1
+    sleepConf.setProfileTaskRange(false, "1");
+
+    // use hprof for map to profile.out
+    sleepConf.set(MRJobConfig.TASK_MAP_PROFILE_PARAMS,
+        "-agentlib:hprof=cpu=times,heap=sites,force=n,thread=y,verbose=n,"
+      + "file=%s");
+
+    // use Xprof for reduce to stdout
+    sleepConf.set(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, "-Xprof");
+    sleepJob.setConf(sleepConf);
+
+    // 2-map-2-reduce SleepJob
+    final Job job = sleepJob.createJob(2, 2, 500, 1, 500, 1);
+    job.setJarByClass(SleepJob.class);
+    job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+    job.waitForCompletion(true);
+    final JobId jobId = TypeConverter.toYarn(job.getJobID());
+    final ApplicationId appID = jobId.getAppId();
+    int pollElapsed = 0;
+    while (true) {
+      Thread.sleep(1000);
+      pollElapsed += 1000;
+
+      if (TERMINAL_RM_APP_STATES.contains(
+        mrCluster.getResourceManager().getRMContext().getRMApps().get(appID)
+          .getState())) {
+        break;
+      }
+
+      if (pollElapsed >= 60000) {
+        LOG.warn("application did not reach terminal state within 60 seconds");
+        break;
+      }
+    }
+    Assert.assertEquals(RMAppState.FINISHED, mrCluster.getResourceManager()
+      .getRMContext().getRMApps().get(appID).getState());
+
+    // Job finished, verify logs
+    //
+    final Configuration nmConf = mrCluster.getNodeManager(0).getConfig();
+
+    final String appIdStr = appID.toString();
+    final String appIdSuffix = appIdStr.substring(
+      "application_".length(), appIdStr.length());
+    final String containerGlob = "container_" + appIdSuffix + "_*_*";
+
+    final Map<TaskAttemptID,Path> taLogDirs = new HashMap<TaskAttemptID,Path>();
+    final Pattern taskPattern = Pattern.compile(
+        ".*Task:(attempt_"
+      + appIdSuffix + "_[rm]_" + "[0-9]+_[0-9]+).*");
+    for (String logDir :
+         nmConf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS))
+    {
+      // filter out MRAppMaster and create attemptId->logDir map
+      //
+      for (FileStatus fileStatus :
+          localFs.globStatus(new Path(logDir
+            + Path.SEPARATOR + appIdStr
+            + Path.SEPARATOR + containerGlob
+            + Path.SEPARATOR + TaskLog.LogName.SYSLOG)))
+      {
+        final BufferedReader br = new BufferedReader(
+          new InputStreamReader(localFs.open(fileStatus.getPath())));
+        String line;
+        while ((line = br.readLine()) != null) {
+          final Matcher m = taskPattern.matcher(line);
+          if (m.matches()) {
+            // found Task done message
+            taLogDirs.put(TaskAttemptID.forName(m.group(1)),
+              fileStatus.getPath().getParent());
+            break;
+          }
+        }
+        br.close();
+      }
+    }
+
+    Assert.assertEquals(4, taLogDirs.size());  // all 4 attempts found
+
+    for (Map.Entry<TaskAttemptID,Path> dirEntry : taLogDirs.entrySet()) {
+      final TaskAttemptID tid = dirEntry.getKey();
+      final Path profilePath = new Path(dirEntry.getValue(),
+        TaskLog.LogName.PROFILE.toString());
+      final Path stdoutPath = new Path(dirEntry.getValue(),
+        TaskLog.LogName.STDOUT.toString());
+      if (tid.getTaskType() == TaskType.MAP) {
+        if (tid.getTaskID().getId() == 1) {
+          // verify profile.out
+          final BufferedReader br = new BufferedReader(new InputStreamReader(
+            localFs.open(profilePath)));
+          final String line = br.readLine();
+          Assert.assertTrue("No hprof content found!",
+            line !=null && line.startsWith("JAVA PROFILE"));
+          br.close();
+          Assert.assertEquals(0L, localFs.getFileStatus(stdoutPath).getLen());
+        } else {
+          Assert.assertFalse("hprof file should not exist",
+            localFs.exists(profilePath));
+        }
+      } else {
+        Assert.assertFalse("hprof file should not exist",
+          localFs.exists(profilePath));
+        if (tid.getTaskID().getId() == 1) {
+          final BufferedReader br = new BufferedReader(new InputStreamReader(
+            localFs.open(stdoutPath)));
+          boolean flatProfFound = false;
+          String line;
+          while ((line = br.readLine()) != null) {
+            if (line.startsWith("Flat profile")) {
+              flatProfFound = true;
+              break;
+            }
+          }
+          br.close();
+          Assert.assertTrue("Xprof flat profile not found!", flatProfFound);
+        } else {
+          Assert.assertEquals(0L, localFs.getFileStatus(stdoutPath).getLen());
+        }
+      }
+    }
+  }
+}

+ 7 - 1
hadoop-project/pom.xml

@@ -422,7 +422,7 @@
         <version>${jersey.version}</version>
         <exclusions>
           <exclusion>
-            <groupId>javax.xml.stream</groupId>
+            <groupId>stax</groupId>
             <artifactId>stax-api</artifactId>
           </exclusion>
         </exclusions>
@@ -770,6 +770,12 @@
         <groupId>org.codehaus.jettison</groupId>
         <artifactId>jettison</artifactId>
         <version>1.1</version>
+        <exclusions>
+          <exclusion>
+            <groupId>stax</groupId>
+            <artifactId>stax-api</artifactId>
+          </exclusion>
+        </exclusions>
       </dependency>
       <dependency>
         <groupId>com.sun.jersey</groupId>

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -96,6 +96,9 @@ Trunk - Unreleased
 
   NEW FEATURES
 
+    YARN-1496. Protocol additions to allow moving apps between queues (Sandy
+    Ryza)
+
   IMPROVEMENTS
 
   OPTIMIZATIONS

+ 15 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java

@@ -44,6 +44,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
@@ -339,4 +341,17 @@ public interface ApplicationClientProtocol {
   public CancelDelegationTokenResponse cancelDelegationToken(
       CancelDelegationTokenRequest request) throws YarnException,
       IOException;
+  
+  /**
+   * Move an application to a new queue.
+   * 
+   * @param request the application ID and the target queue
+   * @return an empty response
+   * @throws YarnException
+   * @throws IOException
+   */
+  @Public
+  @Unstable
+  public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+      MoveApplicationAcrossQueuesRequest request) throws YarnException, IOException;
 }

+ 69 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/MoveApplicationAcrossQueuesRequest.java

@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * <p>The request sent by the client to the <code>ResourceManager</code>
+ * to move a submitted application to a different queue.</p>
+ * 
+ * <p>The request includes the {@link ApplicationId} of the application to be
+ * moved and the queue to place it in.</p>
+ * 
+ * @see ApplicationClientProtocol#moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequest)
+ */
+@Public
+@Unstable
+public abstract class MoveApplicationAcrossQueuesRequest {
+  public static MoveApplicationAcrossQueuesRequest newInstance(ApplicationId appId, String queue) {
+    MoveApplicationAcrossQueuesRequest request =
+        Records.newRecord(MoveApplicationAcrossQueuesRequest.class);
+    request.setApplicationId(appId);
+    request.setTargetQueue(queue);
+    return request;
+  }
+  
+  /**
+   * Get the <code>ApplicationId</code> of the application to be moved.
+   * @return <code>ApplicationId</code> of the application to be moved
+   */
+  public abstract ApplicationId getApplicationId();
+  
+  /**
+   * Set the <code>ApplicationId</code> of the application to be moved.
+   * @param appId <code>ApplicationId</code> of the application to be moved
+   */
+  public abstract void setApplicationId(ApplicationId appId);
+  
+  /**
+   * Get the queue to place the application in.
+   * @return the name of the queue to place the application in
+   */
+  public abstract String getTargetQueue();
+
+  /**
+   * Get the queue to place the application in.
+   * @param queue the name of the queue to place the application in
+   */
+  public abstract void setTargetQueue(String queue);
+}

+ 47 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/MoveApplicationAcrossQueuesResponse.java

@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * <p>
+ * The response sent by the <code>ResourceManager</code> to the client moving
+ * a submitted application to a different queue.
+ * </p>
+ * <p>
+ * A response without exception means that the move has completed successfully.
+ * </p>
+ * 
+ * @see ApplicationClientProtocol#moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequest)
+ */
+@Public
+@Unstable
+public class MoveApplicationAcrossQueuesResponse {
+  @Private
+  @Unstable
+  public MoveApplicationAcrossQueuesResponse newInstance() {
+    MoveApplicationAcrossQueuesResponse response =
+        Records.newRecord(MoveApplicationAcrossQueuesResponse.class);
+    return response;
+  }
+}

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto

@@ -44,5 +44,6 @@ service ApplicationClientProtocolService {
   rpc getDelegationToken(hadoop.common.GetDelegationTokenRequestProto) returns (hadoop.common.GetDelegationTokenResponseProto);
   rpc renewDelegationToken(hadoop.common.RenewDelegationTokenRequestProto) returns (hadoop.common.RenewDelegationTokenResponseProto);
   rpc cancelDelegationToken(hadoop.common.CancelDelegationTokenRequestProto) returns (hadoop.common.CancelDelegationTokenResponseProto);
+  rpc moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequestProto) returns (MoveApplicationAcrossQueuesResponseProto);
 }
 

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto

@@ -127,6 +127,14 @@ message GetClusterMetricsResponseProto {
   optional YarnClusterMetricsProto cluster_metrics = 1;
 }
 
+message MoveApplicationAcrossQueuesRequestProto {
+  required ApplicationIdProto application_id = 1;
+  required string target_queue = 2;
+}
+
+message MoveApplicationAcrossQueuesResponseProto {
+}
+
 message GetApplicationsRequestProto {
   repeated string application_types = 1;
   repeated YarnApplicationStateProto application_states = 2;

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java

@@ -51,6 +51,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
@@ -75,6 +77,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoRe
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.MoveApplicationAcrossQueuesRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.MoveApplicationAcrossQueuesResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
@@ -89,6 +93,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationRequestPr
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.MoveApplicationAcrossQueuesRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
 
 import com.google.protobuf.ServiceException;
@@ -291,4 +296,20 @@ public class ApplicationClientProtocolPBClientImpl implements ApplicationClientP
       return null;
     }
   }
+  
+  @Override
+  public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+      MoveApplicationAcrossQueuesRequest request) throws YarnException,
+      IOException {
+    MoveApplicationAcrossQueuesRequestProto requestProto =
+        ((MoveApplicationAcrossQueuesRequestPBImpl) request).getProto();
+    try {
+      return new MoveApplicationAcrossQueuesResponsePBImpl(
+          proxy.moveApplicationAcrossQueues(null, requestProto));
+
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
 }

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
@@ -61,6 +62,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoRe
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.MoveApplicationAcrossQueuesRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.MoveApplicationAcrossQueuesResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
@@ -82,6 +85,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoReques
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.MoveApplicationAcrossQueuesRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.MoveApplicationAcrossQueuesResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto;
 
@@ -278,4 +283,20 @@ public class ApplicationClientProtocolPBServiceImpl implements ApplicationClient
         throw new ServiceException(e);
       }
   }
+  
+  @Override
+  public MoveApplicationAcrossQueuesResponseProto moveApplicationAcrossQueues(
+      RpcController controller, MoveApplicationAcrossQueuesRequestProto proto)
+      throws ServiceException {
+    MoveApplicationAcrossQueuesRequestPBImpl request =
+        new MoveApplicationAcrossQueuesRequestPBImpl(proto);
+    try {
+      MoveApplicationAcrossQueuesResponse response = real.moveApplicationAcrossQueues(request);
+      return ((MoveApplicationAcrossQueuesResponsePBImpl)response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

+ 158 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/MoveApplicationAcrossQueuesRequestPBImpl.java

@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.MoveApplicationAcrossQueuesRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.MoveApplicationAcrossQueuesRequestProtoOrBuilder;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class MoveApplicationAcrossQueuesRequestPBImpl extends MoveApplicationAcrossQueuesRequest {
+  MoveApplicationAcrossQueuesRequestProto proto = MoveApplicationAcrossQueuesRequestProto.getDefaultInstance();
+  MoveApplicationAcrossQueuesRequestProto.Builder builder = null;
+  boolean viaProto = false;
+  
+  private ApplicationId applicationId;
+  private String targetQueue;
+  
+  public MoveApplicationAcrossQueuesRequestPBImpl() {
+    builder = MoveApplicationAcrossQueuesRequestProto.newBuilder();
+  }
+
+  public MoveApplicationAcrossQueuesRequestPBImpl(MoveApplicationAcrossQueuesRequestProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+  
+  public MoveApplicationAcrossQueuesRequestProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+  
+  @Override
+  public ApplicationId getApplicationId() {
+    if (this.applicationId != null) {
+      return this.applicationId;
+    }
+    
+    MoveApplicationAcrossQueuesRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasApplicationId()) {
+      return null;
+    }
+    
+    this.applicationId = convertFromProtoFormat(p.getApplicationId());
+    return this.applicationId;
+  }
+  
+  @Override
+  public void setApplicationId(ApplicationId appId) {
+    maybeInitBuilder();
+    if (applicationId == null) {
+      builder.clearApplicationId();
+    }
+    applicationId = appId;
+  }
+  
+  @Override
+  public String getTargetQueue() {
+    if (this.targetQueue != null) {
+      return this.targetQueue;
+    }
+    
+    MoveApplicationAcrossQueuesRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasApplicationId()) {
+      return null;
+    }
+    
+    this.targetQueue = p.getTargetQueue();
+    return this.targetQueue;
+  }
+  
+  @Override
+  public void setTargetQueue(String queue) {
+    maybeInitBuilder();
+    if (applicationId == null) {
+      builder.clearTargetQueue();
+    }
+    targetQueue = queue;
+  }
+  
+  private void mergeLocalToBuilder() {
+    if (applicationId != null) {
+      builder.setApplicationId(convertToProtoFormat(this.applicationId));
+    }
+    if (targetQueue != null) {
+      builder.setTargetQueue(this.targetQueue);
+    }
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto) {
+      maybeInitBuilder();
+    }
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = MoveApplicationAcrossQueuesRequestProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+  
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+  
+  private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+    return new ApplicationIdPBImpl(p);
+  }
+
+  private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+    return ((ApplicationIdPBImpl)t).getProto();
+  }
+}

+ 68 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/MoveApplicationAcrossQueuesResponsePBImpl.java

@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.MoveApplicationAcrossQueuesResponseProto;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class MoveApplicationAcrossQueuesResponsePBImpl extends MoveApplicationAcrossQueuesResponse {
+  MoveApplicationAcrossQueuesResponseProto proto = MoveApplicationAcrossQueuesResponseProto.getDefaultInstance();
+  MoveApplicationAcrossQueuesResponseProto.Builder builder = null;
+  boolean viaProto = false;
+  
+  public MoveApplicationAcrossQueuesResponsePBImpl() {
+    builder = MoveApplicationAcrossQueuesResponseProto.newBuilder();
+  }
+
+  public MoveApplicationAcrossQueuesResponsePBImpl(MoveApplicationAcrossQueuesResponseProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+  
+  public MoveApplicationAcrossQueuesResponseProto getProto() {
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null)
+      return false;
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+}

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java

@@ -64,6 +64,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
@@ -683,6 +685,12 @@ public class ClientRMService extends AbstractService implements
       throw RPCUtil.getRemoteException(e);
     }
   }
+  
+  @Override
+  public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+      MoveApplicationAcrossQueuesRequest request) throws YarnException {
+    throw new UnsupportedOperationException("Move not yet supported");
+  }
 
   private String getRenewerForToken(Token<RMDelegationTokenIdentifier> token)
       throws IOException {