Explorar o código

YARN-9760. Support configuring application priorities on a workflow level. Contributed by Varun Saxena

Jonathan Hung %!s(int64=5) %!d(string=hai) anos
pai
achega
5462d2176f
Modificáronse 10 ficheiros con 562 adicións e 30 borrados
  1. 26 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
  2. 52 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
  3. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  4. 11 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  5. 24 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
  6. 23 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  7. 29 27
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
  8. 230 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/WorkflowPriorityMappingsManager.java
  9. 9 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
  10. 151 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerWorkflowPriorityMapping.java

+ 26 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java

@@ -424,6 +424,32 @@ public class StringUtils {
     return values;
   }
 
+  /**
+   * Returns a collection of strings, trimming leading and trailing whitespace
+   * on each value. Duplicates are not removed.
+   *
+   * @param str
+   *          String separated by delim.
+   * @param delim
+   *          Delimiter to separate the values in str.
+   * @return Collection of string values.
+   */
+  public static Collection<String> getTrimmedStringCollection(String str,
+      String delim) {
+    List<String> values = new ArrayList<String>();
+    if (str == null)
+      return values;
+    StringTokenizer tokenizer = new StringTokenizer(str, delim);
+    while (tokenizer.hasMoreTokens()) {
+      String next = tokenizer.nextToken();
+      if (next == null || next.trim().isEmpty()) {
+        continue;
+      }
+      values.add(next.trim());
+    }
+    return values;
+  }
+
   /**
    * Splits a comma separated value <code>String</code>, trimming leading and
    * trailing whitespace on each value. Duplicate and empty values are removed.

+ 52 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java

@@ -91,10 +91,14 @@ import org.apache.hadoop.util.JarFinder;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.WorkflowPriorityMappingsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.WorkflowPriorityMappingsManager.WorkflowPriorityMapping;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -452,6 +456,53 @@ public class TestMRJobs {
     Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
   }
 
+  @Test(timeout = 300000)
+  public void testJobWithWorkflowPriority() throws Exception {
+    Configuration sleepConf = new Configuration(mrCluster.getConfig());
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+          + " not found. Not running test.");
+      return;
+    }
+    CapacityScheduler scheduler = (CapacityScheduler) mrCluster
+        .getResourceManager().getResourceScheduler();
+    CapacitySchedulerConfiguration csConf = scheduler.getConfiguration();
+    csConf.set(CapacitySchedulerConfiguration.WORKFLOW_PRIORITY_MAPPINGS,
+        WorkflowPriorityMappingsManager.getWorkflowPriorityMappingStr(
+        Arrays.asList(new WorkflowPriorityMapping(
+            "wf1", "root.default", Priority.newInstance(1)))));
+    csConf.setBoolean(CapacitySchedulerConfiguration.
+        ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE, true);
+    scheduler.reinitialize(csConf, scheduler.getRMContext());
+
+    // set master address to local to test that local mode applied if framework
+    // equals local
+    sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
+    sleepConf
+        .setInt("yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms", 5);
+    sleepConf.set(MRJobConfig.JOB_TAGS,
+        YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX + "wf1");
+
+    SleepJob sleepJob = new SleepJob();
+    sleepJob.setConf(sleepConf);
+    Job job = sleepJob.createJob(1, 1, 1000, 20, 50, 1);
+
+    job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+    job.setJarByClass(SleepJob.class);
+    job.setMaxMapAttempts(1); // speed up failures
+    // VERY_HIGH priority should get overwritten by workflow priority mapping
+    job.setPriority(JobPriority.VERY_HIGH);
+    job.submit();
+
+    waitForPriorityToUpdate(job, JobPriority.VERY_LOW);
+    // Verify the priority from job itself
+    Assert.assertEquals(JobPriority.VERY_LOW, job.getPriority());
+
+    boolean succeeded = job.waitForCompletion(true);
+    Assert.assertTrue(succeeded);
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+  }
+
   private void waitForPriorityToUpdate(Job job, JobPriority expectedStatus)
       throws IOException, InterruptedException {
     // Max wait time to get the priority update can be kept as 20sec (100 *

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -4137,6 +4137,13 @@ public class YarnConfiguration extends Configuration {
   public static final String NM_CONTAINERS_LAUNCHER_CLASS =
       NM_PREFIX + "containers-launcher.class";
 
+  // Configuration for the prefix of the tag which contains workflow ID,
+  // followed by the prefix.
+  public static final String YARN_WORKFLOW_ID_TAG_PREFIX =
+      YARN_PREFIX + "workflow-id.tag-prefix";
+  public static final String DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX =
+      "workflowid:";
+
   public YarnConfiguration() {
     super();
   }

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -4310,4 +4310,15 @@
     <name>yarn.node-labels.exclusive-enforced-partitions</name>
     <value></value>
   </property>
+
+  <property>
+    <description>
+      Prefix used to identify the YARN tag which contains workflow ID. If a tag coming in application
+      submission context has this prefix, whatever follows the prefix will be considered as workflow ID
+      associated with the application. This configuration is used by features such as workflow priority
+      for identifying the workflow associated with an application.
+    </description>
+    <name>yarn.workflow-id.tag-prefix</name>
+    <value>workflowid:</value>
+  </property>
 </configuration>

+ 24 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml

@@ -217,4 +217,28 @@
     </description>
   </property>
 
+  <property>
+    <name>yarn.scheduler.capacity.workflow-priority-mappings</name>
+    <value></value>
+    <description>
+      A list of mappings that will be used to override application priority.
+      The syntax for this list is
+      [workflowId]:[full_queue_name]:[priority][,next mapping]*
+      where an application submitted (or mapped to) queue "full_queue_name"
+      and workflowId "workflowId" (as specified in application submission
+      context) will be given priority "priority".
+    </description>
+  </property>
+
+  <property>
+    <name>yarn.scheduler.capacity.workflow-priority-mappings-override.enable</name>
+    <value>false</value>
+    <description>
+      If a priority mapping is present, will it override the value specified
+      by the user? This can be used by administrators to give applications a
+      priority that is different than the one specified by the user.
+      The default is false.
+    </description>
+  </property>
+
 </configuration>

+ 23 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -183,6 +183,8 @@ public class CapacityScheduler extends
 
   private CapacitySchedulerQueueManager queueManager;
 
+  private WorkflowPriorityMappingsManager workflowPriorityMappingsMgr;
+
   // timeout to join when we stop this service
   protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
 
@@ -364,6 +366,8 @@ public class CapacityScheduler extends
           this.labelManager, this.appPriorityACLManager);
       this.queueManager.setCapacitySchedulerContext(this);
 
+      this.workflowPriorityMappingsMgr = new WorkflowPriorityMappingsManager();
+
       this.activitiesManager = new ActivitiesManager(rmContext);
       activitiesManager.init(conf);
       initializeQueues(this.conf);
@@ -770,6 +774,8 @@ public class CapacityScheduler extends
 
     updatePlacementRules();
 
+    this.workflowPriorityMappingsMgr.initialize(this);
+
     // Notify Preemption Manager
     preemptionManager.refreshQueues(null, this.getRootQueue());
   }
@@ -780,6 +786,8 @@ public class CapacityScheduler extends
     this.queueManager.reinitializeQueues(newConf);
     updatePlacementRules();
 
+    this.workflowPriorityMappingsMgr.initialize(this);
+
     // Notify Preemption Manager
     preemptionManager.refreshQueues(null, this.getRootQueue());
   }
@@ -987,6 +995,17 @@ public class CapacityScheduler extends
         }
       }
 
+      try {
+        priority = workflowPriorityMappingsMgr.mapWorkflowPriorityForApp(
+            applicationId, queue, user, priority);
+      } catch (YarnException e) {
+        String message = "Failed to submit application " + applicationId +
+            " submitted by user " + user + " reason: " + e.getMessage();
+        this.rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent(
+            applicationId, RMAppEventType.APP_REJECTED, message));
+        return;
+      }
+
       // Submit to the queue
       try {
         queue.submitApplication(applicationId, user, queueName);
@@ -3045,6 +3064,10 @@ public class CapacityScheduler extends
     return this.queueManager;
   }
 
+  public WorkflowPriorityMappingsManager getWorkflowPriorityMappingsManager() {
+    return this.workflowPriorityMappingsMgr;
+  }
+
   /**
    * Try to move a reserved container to a targetNode.
    * If the targetNode is reserved by another application (other than this one).

+ 29 - 27
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java

@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingP
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLConfigurationParser.AppPriorityACLKeyType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.WorkflowPriorityMappingsManager.WorkflowPriorityMapping;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeLookupPolicy;
@@ -71,7 +72,6 @@ import java.util.Map.Entry;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.Set;
-import java.util.StringTokenizer;
 
 public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration {
 
@@ -280,6 +280,17 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   @Private
   public static final boolean DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE = false;
 
+  @Private
+  public static final String WORKFLOW_PRIORITY_MAPPINGS =
+      PREFIX + "workflow-priority-mappings";
+
+  @Private
+  public static final String ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE =
+      WORKFLOW_PRIORITY_MAPPINGS + "-override.enable";
+
+  @Private
+  public static final boolean DEFAULT_ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE = false;
+
   @Private
   public static final String QUEUE_PREEMPTION_DISABLED = "disable_preemption";
 
@@ -1022,7 +1033,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
         getTrimmedStringCollection(queueMappingName);
     for (String mappingValue : mappingsString) {
       String[] mapping =
-          getTrimmedStringCollection(mappingValue, ":")
+          StringUtils.getTrimmedStringCollection(mappingValue, ":")
               .toArray(new String[] {});
       if (mapping.length != 2 || mapping[1].length() == 0) {
         throw new IllegalArgumentException(
@@ -1060,30 +1071,13 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
     setStrings(mappingRuleProp, StringUtils.join(",", queueMappingStrs));
   }
 
-  /**
-   * Returns a collection of strings, trimming leading and trailing whitespeace
-   * on each value
-   *
-   * @param str
-   *          String to parse
-   * @param delim
-   *          delimiter to separate the values
-   * @return Collection of parsed elements.
-   */
-  private static Collection<String> getTrimmedStringCollection(String str,
-      String delim) {
-    List<String> values = new ArrayList<String>();
-    if (str == null)
-      return values;
-    StringTokenizer tokenizer = new StringTokenizer(str, delim);
-    while (tokenizer.hasMoreTokens()) {
-      String next = tokenizer.nextToken();
-      if (next == null || next.trim().isEmpty()) {
-        continue;
-      }
-      values.add(next.trim());
-    }
-    return values;
+  public boolean getOverrideWithWorkflowPriorityMappings() {
+    return getBoolean(ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE,
+        DEFAULT_ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE);
+  }
+
+  public Collection<String> getWorkflowPriorityMappings() {
+    return getTrimmedStringCollection(WORKFLOW_PRIORITY_MAPPINGS);
   }
 
   /**
@@ -1098,7 +1092,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
         getTrimmedStringCollection(QUEUE_MAPPING);
     for (String mappingValue : mappingsString) {
       String[] mapping =
-          getTrimmedStringCollection(mappingValue, ":")
+          StringUtils.getTrimmedStringCollection(mappingValue, ":")
               .toArray(new String[] {});
       if (mapping.length != 3 || mapping[1].length() == 0
           || mapping[2].length() == 0) {
@@ -1159,6 +1153,14 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
     setStrings(QUEUE_MAPPING, StringUtils.join(",", queueMappingStrs));
   }
 
+  @Private
+  @VisibleForTesting
+  void setWorkflowPriorityMappings(
+      List<WorkflowPriorityMapping> workflowPriorityMappings) {
+    setStrings(WORKFLOW_PRIORITY_MAPPINGS, WorkflowPriorityMappingsManager
+        .getWorkflowPriorityMappingStr(workflowPriorityMappings));
+  }
+
   public boolean isReservable(String queue) {
     boolean isReservable =
         getBoolean(getQueuePrefix(queue) + IS_RESERVABLE, false);

+ 230 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/WorkflowPriorityMappingsManager.java

@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+@Private
+@VisibleForTesting
+public class WorkflowPriorityMappingsManager {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(WorkflowPriorityMappingsManager.class);
+
+  private static final String WORKFLOW_PART_SEPARATOR = ":";
+
+  private static final String WORKFLOW_SEPARATOR = ",";
+
+  private CapacityScheduler scheduler;
+
+  private CapacitySchedulerConfiguration conf;
+
+  private boolean overrideWithPriorityMappings = false;
+  // Map of queue to a map of workflow ID to priority
+  private Map<String, Map<String, WorkflowPriorityMapping>> priorityMappings =
+      new HashMap<String, Map<String, WorkflowPriorityMapping>>();
+
+  public static class WorkflowPriorityMapping {
+    String workflowID;
+    String queue;
+    Priority priority;
+
+    public WorkflowPriorityMapping(String workflowID, String queue,
+        Priority priority) {
+      this.workflowID = workflowID;
+      this.queue = queue;
+      this.priority = priority;
+    }
+
+    public Priority getPriority() {
+      return this.priority;
+    }
+
+    @Override
+    public int hashCode() {
+      return super.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj instanceof WorkflowPriorityMapping) {
+        WorkflowPriorityMapping other = (WorkflowPriorityMapping) obj;
+        return (other.workflowID.equals(workflowID) &&
+            other.queue.equals(queue) &&
+            other.priority.equals(priority));
+      } else {
+        return false;
+      }
+    }
+
+    public String toString() {
+      return workflowID + WORKFLOW_PART_SEPARATOR + queue
+          + WORKFLOW_PART_SEPARATOR + priority.getPriority();
+    }
+  }
+
+  @VisibleForTesting
+  public void initialize(CapacityScheduler scheduler) throws IOException {
+    this.scheduler = scheduler;
+    this.conf = scheduler.getConfiguration();
+    boolean overrideWithWorkflowPriorityMappings =
+        conf.getOverrideWithWorkflowPriorityMappings();
+    LOG.info("Initialized workflow priority mappings, override: "
+        + overrideWithWorkflowPriorityMappings);
+    this.overrideWithPriorityMappings = overrideWithWorkflowPriorityMappings;
+    this.priorityMappings = getWorkflowPriorityMappings();
+  }
+
+  /**
+   * Get workflow ID to priority mappings for a queue.
+   *
+   * @return workflowID to priority mappings for a queue
+   */
+  public Map<String, Map<String, WorkflowPriorityMapping>>
+      getWorkflowPriorityMappings() {
+    Map<String, Map<String, WorkflowPriorityMapping>> mappings =
+        new HashMap<String, Map<String, WorkflowPriorityMapping>>();
+
+    Collection<String> workflowMappings = conf.getWorkflowPriorityMappings();
+    for (String workflowMapping : workflowMappings) {
+      WorkflowPriorityMapping mapping =
+          getWorkflowMappingFromString(workflowMapping);
+      if (mapping != null) {
+        if (!mappings.containsKey(mapping.queue)) {
+          mappings.put(mapping.queue,
+              new HashMap<String, WorkflowPriorityMapping>());
+        }
+        mappings.get(mapping.queue).put(mapping.workflowID, mapping);
+      }
+    }
+    return mappings;
+  }
+
+  private WorkflowPriorityMapping getWorkflowMappingFromString(
+      String mappingString) {
+    if (mappingString == null) {
+      return null;
+    }
+    String[] mappingArray = StringUtils
+        .getTrimmedStringCollection(mappingString, WORKFLOW_PART_SEPARATOR)
+            .toArray(new String[] {});
+    if (mappingArray.length != 3 || mappingArray[0].length() == 0
+        || mappingArray[1].length() == 0 || mappingArray[2].length() == 0) {
+      throw new IllegalArgumentException(
+          "Illegal workflow priority mapping " + mappingString);
+    }
+    WorkflowPriorityMapping mapping;
+    try {
+      mapping = new WorkflowPriorityMapping(mappingArray[0], mappingArray[1],
+          Priority.newInstance(Integer.parseInt(mappingArray[2])));
+    } catch (NumberFormatException e) {
+      throw new IllegalArgumentException(
+          "Illegal workflow priority for mapping " + mappingString);
+    }
+    return mapping;
+  }
+
+  public Priority getMappedPriority(String workflowID, CSQueue queue) {
+    // Recursively fetch the priority mapping for the given workflow tracing
+    // up the queue hierarchy until the first match.
+    if (queue.equals(scheduler.getRootQueue())) {
+      return null;
+    }
+    String queuePath = queue.getQueuePath();
+    if (priorityMappings.containsKey(queuePath)
+        && priorityMappings.get(queuePath).containsKey(workflowID)) {
+      return priorityMappings.get(queuePath).get(workflowID).priority;
+    } else {
+      queue = queue.getParent();
+      return getMappedPriority(workflowID, queue);
+    }
+  }
+
+  public Priority mapWorkflowPriorityForApp(ApplicationId applicationId,
+      CSQueue queue, String user, Priority priority) throws YarnException {
+    if (overrideWithPriorityMappings) {
+      // Set the correct workflow priority
+      RMApp rmApp = scheduler.getRMContext().getRMApps().get(applicationId);
+      if (rmApp != null && rmApp.getApplicationTags() != null
+          && rmApp.getApplicationSubmissionContext() != null) {
+        String workflowTagPrefix = scheduler.getConf().get(
+            YarnConfiguration.YARN_WORKFLOW_ID_TAG_PREFIX,
+            YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX);
+        String workflowID = null;
+        for(String tag : rmApp.getApplicationTags()) {
+          if (tag.trim().startsWith(workflowTagPrefix)) {
+            workflowID = tag.trim().substring(workflowTagPrefix.length());
+          }
+        }
+        if (workflowID != null && !workflowID.isEmpty()
+            && priorityMappings != null && priorityMappings.size() > 0) {
+          Priority mappedPriority = getMappedPriority(workflowID, queue);
+          if (mappedPriority != null) {
+            LOG.info("Application " + applicationId + " user " + user
+                + " workflow " + workflowID + " queue " + queue.getQueueName()
+                + " mapping [" + priority + "] to [" + mappedPriority
+                + "] override " + overrideWithPriorityMappings);
+
+            // If workflow ID exists in workflow mapping, change this
+            // application's priority to mapped value. Else, use queue
+            // default priority.
+            priority = mappedPriority;
+            priority = scheduler.checkAndGetApplicationPriority(
+                priority, UserGroupInformation.createRemoteUser(user),
+                queue.getQueueName(), applicationId);
+            rmApp.getApplicationSubmissionContext().setPriority(priority);
+            ((RMAppImpl)rmApp).setApplicationPriority(priority);
+          }
+        }
+      }
+    }
+    return priority;
+  }
+
+  public static String getWorkflowPriorityMappingStr(
+      List<WorkflowPriorityMapping> workflowPriorityMappings) {
+    if (workflowPriorityMappings == null) {
+      return "";
+    }
+    List<String> workflowPriorityMappingStrs = new ArrayList<>();
+    for (WorkflowPriorityMapping mapping : workflowPriorityMappings) {
+      workflowPriorityMappingStrs.add(mapping.toString());
+    }
+    return StringUtils.join(WORKFLOW_SEPARATOR, workflowPriorityMappingStrs);
+  }
+}

+ 9 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java

@@ -528,14 +528,21 @@ public class MockRM extends ResourceManager {
 
   public RMApp submitApp(int masterMemory, Set<String> appTags)
       throws Exception {
+    return submitApp(masterMemory, null, false, null, Priority.newInstance(0),
+        appTags);
+  }
+
+  public RMApp submitApp(int masterMemory, String queue,
+      boolean isAppIdProvided, ApplicationId appId, Priority priority,
+      Set<String> appTags) throws Exception {
     Resource resource = Resource.newInstance(masterMemory, 0);
     ResourceRequest amResourceRequest = ResourceRequest.newInstance(
         Priority.newInstance(0), ResourceRequest.ANY, resource, 1);
     return submitApp(Collections.singletonList(amResourceRequest), "",
         UserGroupInformation.getCurrentUser().getShortUserName(), null, false,
-        null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+        queue, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
         YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
-        false, false, null, 0, null, true, Priority.newInstance(0), null,
+        false, isAppIdProvided, appId, 0, null, true, priority, null,
         null, null, appTags);
   }
 

+ 151 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerWorkflowPriorityMapping.java

@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.WorkflowPriorityMappingsManager.WorkflowPriorityMapping;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public class TestCapacitySchedulerWorkflowPriorityMapping
+    extends CapacitySchedulerTestBase {
+  private MockRM mockRM = null;
+
+  private static void setWorkFlowPriorityMappings(
+      CapacitySchedulerConfiguration conf) {
+    // Define top-level queues
+    conf.setQueues(
+        CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+
+    conf.setCapacity(A, A_CAPACITY);
+    conf.setCapacity(B, B_CAPACITY);
+
+    // Define 2nd-level queues
+    conf.setQueues(A, new String[] {"a1", "a2"});
+    conf.setCapacity(A1, A1_CAPACITY);
+    conf.setCapacity(A2, A2_CAPACITY);
+
+    conf.setQueues(B, new String[] {"b1", "b2", "b3"});
+    conf.setCapacity(B1, B1_CAPACITY);
+    conf.setCapacity(B2, B2_CAPACITY);
+    conf.setCapacity(B3, B3_CAPACITY);
+
+    List<WorkflowPriorityMapping> mappings = Arrays.asList(
+        new WorkflowPriorityMapping("workflow1", B, Priority.newInstance(2)),
+        new WorkflowPriorityMapping("workflow2", A1, Priority.newInstance(3)),
+        new WorkflowPriorityMapping("workflow3", A, Priority.newInstance(4)));
+    conf.setWorkflowPriorityMappings(mappings);
+  }
+
+  @Test
+  public void testWorkflowPriorityMappings() throws Exception {
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(CapacitySchedulerConfiguration
+        .ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE, true);
+    conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
+
+    // Initialize workflow priority mappings.
+    setWorkFlowPriorityMappings(conf);
+
+    mockRM = new MockRM(conf);
+    CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
+    mockRM.start();
+    cs.start();
+
+    Map<String, Map<String, Object>> expected = ImmutableMap.of(
+        A, ImmutableMap.of("workflow3",
+        new WorkflowPriorityMapping(
+            "workflow3", A, Priority.newInstance(4))),
+        B, ImmutableMap.of("workflow1",
+        new WorkflowPriorityMapping(
+            "workflow1", B, Priority.newInstance(2))),
+        A1, ImmutableMap.of("workflow2",
+        new WorkflowPriorityMapping(
+            "workflow2", A1, Priority.newInstance(3))));
+    assertEquals(expected, cs.getWorkflowPriorityMappingsManager()
+        .getWorkflowPriorityMappings());
+
+    // Maps to rule corresponding to parent queue "a" for workflow3.
+    mockRM.submitApp(1, "a2", true, ApplicationId.newInstance(0,1),
+        Priority.newInstance(0), ImmutableSet.of(
+            YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX
+            + "workflow3"));
+    RMApp app =
+        mockRM.getRMContext().getRMApps().get(ApplicationId.newInstance(0,1));
+    assertEquals(4, app.getApplicationSubmissionContext().getPriority()
+        .getPriority());
+
+    // Does not match any rule as rule for queue + workflow does not exist.
+    // Priority passed in the app is taken up.
+    mockRM.submitApp(1, "a1", true, ApplicationId.newInstance(0,2),
+        Priority.newInstance(6), ImmutableSet.of(
+            YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX
+            + "workflow1"));
+    app =
+        mockRM.getRMContext().getRMApps().get(ApplicationId.newInstance(0,2));
+    assertEquals(6, app.getApplicationSubmissionContext().getPriority()
+        .getPriority());
+
+    // Maps to rule corresponding to parent queue "a1" for workflow2.
+    mockRM.submitApp(1, "a1", true, ApplicationId.newInstance(0,3),
+        Priority.newInstance(0), ImmutableSet.of(
+            YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX
+            + "workflow2"));
+    app =
+        mockRM.getRMContext().getRMApps().get(ApplicationId.newInstance(0,3));
+    assertEquals(3, app.getApplicationSubmissionContext().getPriority()
+        .getPriority());
+
+    // Maps to rule corresponding to parent queue "b" for workflow1.
+    mockRM.submitApp(1, "b3", true, ApplicationId.newInstance(0,4),
+        Priority.newInstance(0), ImmutableSet.of(
+            YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX
+            + "workflow1"));
+    app = mockRM.getRMContext().getRMApps().get(ApplicationId.newInstance(0,4));
+    assertEquals(2, app.getApplicationSubmissionContext().getPriority()
+        .getPriority());
+
+    // Disable workflow priority mappings override and reinitialize scheduler.
+    conf.setBoolean(CapacitySchedulerConfiguration
+        .ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE, false);
+    cs.reinitialize(conf, mockRM.getRMContext());
+    mockRM.submitApp(1, "a2", true, ApplicationId.newInstance(0,5),
+        Priority.newInstance(0), ImmutableSet.of(
+            YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX
+            + "workflow3"));
+    app = mockRM.getRMContext().getRMApps().get(ApplicationId.newInstance(0,5));
+    assertEquals(0, app.getApplicationSubmissionContext().getPriority()
+        .getPriority());
+  }
+}