Browse Source

Merge 1558340 from trunk: YARN-1603. Remove two *.orig files which were unexpectedly committed. (Zhijie Shen via junping_du)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1558342 13f79535-47bb-0310-9956-ffa450edef68
Junping Du 11 years ago
parent
commit
534c8ce4c8

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -317,6 +317,9 @@ Release 2.4.0 - UNRELEASED
 
     YARN-1598. HA-related rmadmin commands don't work on a secure cluster (kasha)
 
+    YARN-1603. Remove two *.orig files which were unexpectedly committed. 
+    (Zhijie Shen via junping_du)
+
 Release 2.3.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 0 - 1361
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java.orig

@@ -1,1361 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.QueueACL;
-import org.apache.hadoop.yarn.api.records.QueueInfo;
-import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * A scheduler that schedules resources between a set of queues. The scheduler
- * keeps track of the resources used by each queue, and attempts to maintain
- * fairness by scheduling tasks at queues whose allocations are farthest below
- * an ideal fair distribution.
- * 
- * The fair scheduler supports hierarchical queues. All queues descend from a
- * queue named "root". Available resources are distributed among the children
- * of the root queue in the typical fair scheduling fashion. Then, the children
- * distribute the resources assigned to them to their children in the same
- * fashion.  Applications may only be scheduled on leaf queues. Queues can be
- * specified as children of other queues by placing them as sub-elements of their
- * parents in the fair scheduler configuration file.
- * 
- * A queue's name starts with the names of its parents, with periods as
- * separators.  So a queue named "queue1" under the root named, would be 
- * referred to as "root.queue1", and a queue named "queue2" under a queue
- * named "parent1" would be referred to as "root.parent1.queue2".
- */
-@LimitedPrivate("yarn")
-@Unstable
-@SuppressWarnings("unchecked")
-public class FairScheduler implements ResourceScheduler {
-  private boolean initialized;
-  private FairSchedulerConfiguration conf;
-  private RMContext rmContext;
-  private Resource minimumAllocation;
-  private Resource maximumAllocation;
-  private Resource incrAllocation;
-  private QueueManager queueMgr;
-  private Clock clock;
-  private boolean usePortForNodeName;
-
-  private static final Log LOG = LogFactory.getLog(FairScheduler.class);
-  
-  private static final ResourceCalculator RESOURCE_CALCULATOR =
-      new DefaultResourceCalculator();
-  
-  // Value that container assignment methods return when a container is
-  // reserved
-  public static final Resource CONTAINER_RESERVED = Resources.createResource(-1);
-
-  // How often fair shares are re-calculated (ms)
-  protected long UPDATE_INTERVAL = 500;
-
-  private final static List<Container> EMPTY_CONTAINER_LIST =
-      new ArrayList<Container>();
-
-  private static final Allocation EMPTY_ALLOCATION =
-      new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0));
-
-  // Aggregate metrics
-  FSQueueMetrics rootMetrics;
-
-  // Time when we last updated preemption vars
-  protected long lastPreemptionUpdateTime;
-  // Time we last ran preemptTasksIfNecessary
-  private long lastPreemptCheckTime;
-
-  // This stores per-application scheduling information,
-  @VisibleForTesting
-  protected Map<ApplicationId, SchedulerApplication> applications =
-      new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
-
-  // Nodes in the cluster, indexed by NodeId
-  private Map<NodeId, FSSchedulerNode> nodes = 
-      new ConcurrentHashMap<NodeId, FSSchedulerNode>();
-
-  // Aggregate capacity of the cluster
-  private Resource clusterCapacity = 
-      RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
-
-  // How often tasks are preempted 
-  protected long preemptionInterval; 
-  
-  // ms to wait before force killing stuff (must be longer than a couple
-  // of heartbeats to give task-kill commands a chance to act).
-  protected long waitTimeBeforeKill; 
-  
-  // Containers whose AMs have been warned that they will be preempted soon.
-  private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
-  
-  protected boolean preemptionEnabled;
-  protected boolean sizeBasedWeight; // Give larger weights to larger jobs
-  protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
-  protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
-  protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling
-  private Comparator nodeAvailableResourceComparator =
-          new NodeAvailableResourceComparator(); // Node available resource comparator
-  protected double nodeLocalityThreshold; // Cluster threshold for node locality
-  protected double rackLocalityThreshold; // Cluster threshold for rack locality
-  protected long nodeLocalityDelayMs; // Delay for node locality
-  protected long rackLocalityDelayMs; // Delay for rack locality
-  private FairSchedulerEventLog eventLog; // Machine-readable event log
-  protected boolean assignMultiple; // Allocate multiple containers per
-                                    // heartbeat
-  protected int maxAssign; // Max containers to assign per heartbeat
-
-  @VisibleForTesting
-  final MaxRunningAppsEnforcer maxRunningEnforcer;
-
-  private AllocationFileLoaderService allocsLoader;
-  @VisibleForTesting
-  AllocationConfiguration allocConf;
-  
-  public FairScheduler() {
-    clock = new SystemClock();
-    allocsLoader = new AllocationFileLoaderService();
-    queueMgr = new QueueManager(this);
-    maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
-  }
-
-  private void validateConf(Configuration conf) {
-    // validate scheduler memory allocation setting
-    int minMem = conf.getInt(
-      YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
-      YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
-    int maxMem = conf.getInt(
-      YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
-      YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
-
-    if (minMem < 0 || minMem > maxMem) {
-      throw new YarnRuntimeException("Invalid resource scheduler memory"
-        + " allocation configuration"
-        + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
-        + "=" + minMem
-        + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB
-        + "=" + maxMem + ", min should equal greater than 0"
-        + ", max should be no smaller than min.");
-    }
-
-    // validate scheduler vcores allocation setting
-    int minVcores = conf.getInt(
-      YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
-      YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
-    int maxVcores = conf.getInt(
-      YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
-      YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
-
-    if (minVcores < 0 || minVcores > maxVcores) {
-      throw new YarnRuntimeException("Invalid resource scheduler vcores"
-        + " allocation configuration"
-        + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES
-        + "=" + minVcores
-        + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES
-        + "=" + maxVcores + ", min should equal greater than 0"
-        + ", max should be no smaller than min.");
-    }
-  }
-
-  public FairSchedulerConfiguration getConf() {
-    return conf;
-  }
-
-  public QueueManager getQueueManager() {
-    return queueMgr;
-  }
-
-  @Override
-  public RMContainer getRMContainer(ContainerId containerId) {
-    FSSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
-    return (attempt == null) ? null : attempt.getRMContainer(containerId);
-  }
-
-  private FSSchedulerApp getCurrentAttemptForContainer(
-      ContainerId containerId) {
-    SchedulerApplication app =
-        applications.get(containerId.getApplicationAttemptId()
-          .getApplicationId());
-    if (app != null) {
-      return (FSSchedulerApp) app.getCurrentAppAttempt();
-    }
-    return null;
-  }
-
-  /**
-   * A runnable which calls {@link FairScheduler#update()} every
-   * <code>UPDATE_INTERVAL</code> milliseconds.
-   */
-  private class UpdateThread implements Runnable {
-    public void run() {
-      while (true) {
-        try {
-          Thread.sleep(UPDATE_INTERVAL);
-          update();
-          preemptTasksIfNecessary();
-        } catch (Exception e) {
-          LOG.error("Exception in fair scheduler UpdateThread", e);
-        }
-      }
-    }
-  }
-
-  /**
-   * Recompute the internal variables used by the scheduler - per-job weights,
-   * fair shares, deficits, minimum slot allocations, and amount of used and
-   * required resources per job.
-   */
-  protected synchronized void update() {
-    updatePreemptionVariables(); // Determine if any queues merit preemption
-
-    FSQueue rootQueue = queueMgr.getRootQueue();
-
-    // Recursively update demands for all queues
-    rootQueue.updateDemand();
-
-    rootQueue.setFairShare(clusterCapacity);
-    // Recursively compute fair shares for all queues
-    // and update metrics
-    rootQueue.recomputeShares();
-  }
-
-  /**
-   * Update the preemption fields for all QueueScheduables, i.e. the times since
-   * each queue last was at its guaranteed share and at > 1/2 of its fair share
-   * for each type of task.
-   */
-  private void updatePreemptionVariables() {
-    long now = clock.getTime();
-    lastPreemptionUpdateTime = now;
-    for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
-      if (!isStarvedForMinShare(sched)) {
-        sched.setLastTimeAtMinShare(now);
-      }
-      if (!isStarvedForFairShare(sched)) {
-        sched.setLastTimeAtHalfFairShare(now);
-      }
-    }
-  }
-
-  /**
-   * Is a queue below its min share for the given task type?
-   */
-  boolean isStarvedForMinShare(FSLeafQueue sched) {
-    Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
-      sched.getMinShare(), sched.getDemand());
-    return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity,
-        sched.getResourceUsage(), desiredShare);
-  }
-
-  /**
-   * Is a queue being starved for fair share for the given task type? This is
-   * defined as being below half its fair share.
-   */
-  boolean isStarvedForFairShare(FSLeafQueue sched) {
-    Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
-        Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
-    return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity,
-        sched.getResourceUsage(), desiredFairShare);
-  }
-
-  /**
-   * Check for queues that need tasks preempted, either because they have been
-   * below their guaranteed share for minSharePreemptionTimeout or they have
-   * been below half their fair share for the fairSharePreemptionTimeout. If
-   * such queues exist, compute how many tasks of each type need to be preempted
-   * and then select the right ones using preemptTasks.
-   */
-  protected synchronized void preemptTasksIfNecessary() {
-    if (!preemptionEnabled) {
-      return;
-    }
-
-    long curTime = clock.getTime();
-    if (curTime - lastPreemptCheckTime < preemptionInterval) {
-      return;
-    }
-    lastPreemptCheckTime = curTime;
-
-    Resource resToPreempt = Resources.none();
-
-    for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
-      resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime));
-    }
-    if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, resToPreempt,
-        Resources.none())) {
-      preemptResources(queueMgr.getLeafQueues(), resToPreempt);
-    }
-  }
-
-  /**
-   * Preempt a quantity of resources from a list of QueueSchedulables. The
-   * policy for this is to pick apps from queues that are over their fair share,
-   * but make sure that no queue is placed below its fair share in the process.
-   * We further prioritize preemption by choosing containers with lowest
-   * priority to preempt.
-   */
-  protected void preemptResources(Collection<FSLeafQueue> scheds,
-      Resource toPreempt) {
-    if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) {
-      return;
-    }
-
-    Map<RMContainer, FSSchedulerApp> apps = 
-        new HashMap<RMContainer, FSSchedulerApp>();
-    Map<RMContainer, FSLeafQueue> queues = 
-        new HashMap<RMContainer, FSLeafQueue>();
-
-    // Collect running containers from over-scheduled queues
-    List<RMContainer> runningContainers = new ArrayList<RMContainer>();
-    for (FSLeafQueue sched : scheds) {
-      if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
-          sched.getResourceUsage(), sched.getFairShare())) {
-        for (AppSchedulable as : sched.getRunnableAppSchedulables()) {
-          for (RMContainer c : as.getApp().getLiveContainers()) {
-            runningContainers.add(c);
-            apps.put(c, as.getApp());
-            queues.put(c, sched);
-          }
-        }
-      }
-    }
-
-    // Sort containers into reverse order of priority
-    Collections.sort(runningContainers, new Comparator<RMContainer>() {
-      public int compare(RMContainer c1, RMContainer c2) {
-        int ret = c1.getContainer().getPriority().compareTo(
-            c2.getContainer().getPriority());
-        if (ret == 0) {
-          return c2.getContainerId().compareTo(c1.getContainerId());
-        }
-        return ret;
-      }
-    });
-    
-    // Scan down the list of containers we've already warned and kill them
-    // if we need to.  Remove any containers from the list that we don't need
-    // or that are no longer running.
-    Iterator<RMContainer> warnedIter = warnedContainers.iterator();
-    Set<RMContainer> preemptedThisRound = new HashSet<RMContainer>();
-    while (warnedIter.hasNext()) {
-      RMContainer container = warnedIter.next();
-      if (container.getState() == RMContainerState.RUNNING &&
-          Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
-              toPreempt, Resources.none())) {
-        warnOrKillContainer(container, apps.get(container), queues.get(container));
-        preemptedThisRound.add(container);
-        Resources.subtractFrom(toPreempt, container.getContainer().getResource());
-      } else {
-        warnedIter.remove();
-      }
-    }
-
-    // Scan down the rest of the containers until we've preempted enough, making
-    // sure we don't preempt too many from any queue
-    Iterator<RMContainer> runningIter = runningContainers.iterator();
-    while (runningIter.hasNext() &&
-        Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
-            toPreempt, Resources.none())) {
-      RMContainer container = runningIter.next();
-      FSLeafQueue sched = queues.get(container);
-      if (!preemptedThisRound.contains(container) &&
-          Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
-              sched.getResourceUsage(), sched.getFairShare())) {
-        warnOrKillContainer(container, apps.get(container), sched);
-        
-        warnedContainers.add(container);
-        Resources.subtractFrom(toPreempt, container.getContainer().getResource());
-      }
-    }
-  }
-  
-  private void warnOrKillContainer(RMContainer container, FSSchedulerApp app,
-      FSLeafQueue queue) {
-    LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
-        "res=" + container.getContainer().getResource() +
-        ") from queue " + queue.getName());
-    
-    Long time = app.getContainerPreemptionTime(container);
-
-    if (time != null) {
-      // if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
-      // proceed with kill
-      if (time + waitTimeBeforeKill < clock.getTime()) {
-        ContainerStatus status =
-          SchedulerUtils.createPreemptedContainerStatus(
-            container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
-
-        // TODO: Not sure if this ever actually adds this to the list of cleanup
-        // containers on the RMNode (see SchedulerNode.releaseContainer()).
-        completedContainer(container, status, RMContainerEventType.KILL);
-        LOG.info("Killing container" + container +
-            " (after waiting for premption for " +
-            (clock.getTime() - time) + "ms)");
-      }
-    } else {
-      // track the request in the FSSchedulerApp itself
-      app.addPreemption(container, clock.getTime());
-    }
-  }
-
-  /**
-   * Return the resource amount that this queue is allowed to preempt, if any.
-   * If the queue has been below its min share for at least its preemption
-   * timeout, it should preempt the difference between its current share and
-   * this min share. If it has been below half its fair share for at least the
-   * fairSharePreemptionTimeout, it should preempt enough tasks to get up to its
-   * full fair share. If both conditions hold, we preempt the max of the two
-   * amounts (this shouldn't happen unless someone sets the timeouts to be
-   * identical for some reason).
-   */
-  protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
-    String queue = sched.getName();
-    long minShareTimeout = allocConf.getMinSharePreemptionTimeout(queue);
-    long fairShareTimeout = allocConf.getFairSharePreemptionTimeout();
-    Resource resDueToMinShare = Resources.none();
-    Resource resDueToFairShare = Resources.none();
-    if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
-      Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
-          sched.getMinShare(), sched.getDemand());
-      resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
-          Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
-    }
-    if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
-      Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
-          sched.getFairShare(), sched.getDemand());
-      resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
-          Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
-    }
-    Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
-        resDueToMinShare, resDueToFairShare);
-    if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
-        resToPreempt, Resources.none())) {
-      String message = "Should preempt " + resToPreempt + " res for queue "
-          + sched.getName() + ": resDueToMinShare = " + resDueToMinShare
-          + ", resDueToFairShare = " + resDueToFairShare;
-      LOG.info(message);
-    }
-    return resToPreempt;
-  }
-
-  public RMContainerTokenSecretManager getContainerTokenSecretManager() {
-    return rmContext.getContainerTokenSecretManager();
-  }
-
-  // synchronized for sizeBasedWeight
-  public synchronized ResourceWeights getAppWeight(AppSchedulable app) {
-    double weight = 1.0;
-    if (sizeBasedWeight) {
-      // Set weight based on current memory demand
-      weight = Math.log1p(app.getDemand().getMemory()) / Math.log(2);
-    }
-    weight *= app.getPriority().getPriority();
-    if (weightAdjuster != null) {
-      // Run weight through the user-supplied weightAdjuster
-      weight = weightAdjuster.adjustWeight(app, weight);
-    }
-    return new ResourceWeights((float)weight);
-  }
-
-  @Override
-  public Resource getMinimumResourceCapability() {
-    return minimumAllocation;
-  }
-
-  public Resource getIncrementResourceCapability() {
-    return incrAllocation;
-  }
-
-  @Override
-  public Resource getMaximumResourceCapability() {
-    return maximumAllocation;
-  }
-
-  public double getNodeLocalityThreshold() {
-    return nodeLocalityThreshold;
-  }
-
-  public double getRackLocalityThreshold() {
-    return rackLocalityThreshold;
-  }
-
-  public long getNodeLocalityDelayMs() {
-    return nodeLocalityDelayMs;
-  }
-
-  public long getRackLocalityDelayMs() {
-    return rackLocalityDelayMs;
-  }
-
-  public boolean isContinuousSchedulingEnabled() {
-    return continuousSchedulingEnabled;
-  }
-
-  public synchronized int getContinuousSchedulingSleepMs() {
-    return continuousSchedulingSleepMs;
-  }
-
-  public Resource getClusterCapacity() {
-    return clusterCapacity;
-  }
-
-  public synchronized Clock getClock() {
-    return clock;
-  }
-
-  protected synchronized void setClock(Clock clock) {
-    this.clock = clock;
-  }
-
-  public FairSchedulerEventLog getEventLog() {
-    return eventLog;
-  }
-
-  /**
-   * Add a new application to the scheduler, with a given id, queue name, and
-   * user. This will accept a new app even if the user or queue is above
-   * configured limits, but the app will not be marked as runnable.
-   */
-  protected synchronized void addApplication(ApplicationId applicationId,
-      String queueName, String user) {
-    if (queueName == null || queueName.isEmpty()) {
-      String message = "Reject application " + applicationId +
-              " submitted by user " + user + " with an empty queue name.";
-      LOG.info(message);
-      rmContext.getDispatcher().getEventHandler()
-          .handle(new RMAppRejectedEvent(applicationId, message));
-      return;
-    }
-
-    RMApp rmApp = rmContext.getRMApps().get(applicationId);
-    FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
-    if (queue == null) {
-      rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppRejectedEvent(applicationId,
-              "Application rejected by queue placement policy"));
-      return;
-    }
-
-    // Enforce ACLs
-    UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
-
-    if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)
-        && !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
-      String msg = "User " + userUgi.getUserName() +
-              " cannot submit applications to queue " + queue.getName();
-      LOG.info(msg);
-      rmContext.getDispatcher().getEventHandler()
-          .handle(new RMAppRejectedEvent(applicationId, msg));
-      return;
-    }
-  
-    SchedulerApplication application =
-        new SchedulerApplication(queue, user);
-    applications.put(applicationId, application);
-    queue.getMetrics().submitApp(user);
-
-    LOG.info("Accepted application " + applicationId + " from user: " + user
-        + ", in queue: " + queueName + ", currently num of applications: "
-        + applications.size());
-    rmContext.getDispatcher().getEventHandler()
-        .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
-  }
-
-  /**
-   * Add a new application attempt to the scheduler.
-   */
-  protected synchronized void addApplicationAttempt(
-      ApplicationAttemptId applicationAttemptId,
-      boolean transferStateFromPreviousAttempt) {
-    SchedulerApplication application =
-        applications.get(applicationAttemptId.getApplicationId());
-    String user = application.getUser();
-    FSLeafQueue queue = (FSLeafQueue) application.getQueue();
-
-    FSSchedulerApp attempt =
-        new FSSchedulerApp(applicationAttemptId, user,
-            queue, new ActiveUsersManager(getRootQueueMetrics()),
-            rmContext);
-    if (transferStateFromPreviousAttempt) {
-      attempt.transferStateFromPreviousAttempt(application
-        .getCurrentAppAttempt());
-    }
-    application.setCurrentAppAttempt(attempt);
-
-    boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
-    queue.addApp(attempt, runnable);
-    if (runnable) {
-      maxRunningEnforcer.trackRunnableApp(attempt);
-    } else {
-      maxRunningEnforcer.trackNonRunnableApp(attempt);
-    }
-    
-    queue.getMetrics().submitAppAttempt(user);
-
-    LOG.info("Added Application Attempt " + applicationAttemptId
-        + " to scheduler from user: " + user);
-    rmContext.getDispatcher().getEventHandler().handle(
-        new RMAppAttemptEvent(applicationAttemptId,
-            RMAppAttemptEventType.ATTEMPT_ADDED));
-  }
-  
-  @VisibleForTesting
-  FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) {
-    FSLeafQueue queue = null;
-    try {
-      QueuePlacementPolicy placementPolicy = allocConf.getPlacementPolicy();
-      queueName = placementPolicy.assignAppToQueue(queueName, user);
-      if (queueName == null) {
-        return null;
-      }
-      queue = queueMgr.getLeafQueue(queueName, true);
-    } catch (IOException ex) {
-      LOG.error("Error assigning app to queue, rejecting", ex);
-    }
-    
-    if (rmApp != null) {
-      rmApp.setQueue(queue.getName());
-    } else {
-      LOG.warn("Couldn't find RM app to set queue name on");
-    }
-    
-    return queue;
-  }
-
-  private synchronized void removeApplication(ApplicationId applicationId,
-      RMAppState finalState) {
-    SchedulerApplication application = applications.get(applicationId);
-    if (application == null){
-      LOG.warn("Couldn't find application " + applicationId);
-      return;
-    }
-    application.stop(finalState);
-    applications.remove(applicationId);
-  }
-
-  private synchronized void removeApplicationAttempt(
-      ApplicationAttemptId applicationAttemptId,
-      RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
-    LOG.info("Application " + applicationAttemptId + " is done." +
-        " finalState=" + rmAppAttemptFinalState);
-    SchedulerApplication application =
-        applications.get(applicationAttemptId.getApplicationId());
-    FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId);
-
-    if (attempt == null || application == null) {
-      LOG.info("Unknown application " + applicationAttemptId + " has completed!");
-      return;
-    }
-
-    // Release all the running containers
-    for (RMContainer rmContainer : attempt.getLiveContainers()) {
-      if (keepContainers
-          && rmContainer.getState().equals(RMContainerState.RUNNING)) {
-        // do not kill the running container in the case of work-preserving AM
-        // restart.
-        LOG.info("Skip killing " + rmContainer.getContainerId());
-        continue;
-      }
-      completedContainer(rmContainer,
-          SchedulerUtils.createAbnormalContainerStatus(
-              rmContainer.getContainerId(),
-              SchedulerUtils.COMPLETED_APPLICATION),
-              RMContainerEventType.KILL);
-    }
-
-    // Release all reserved containers
-    for (RMContainer rmContainer : attempt.getReservedContainers()) {
-      completedContainer(rmContainer,
-          SchedulerUtils.createAbnormalContainerStatus(
-              rmContainer.getContainerId(),
-              "Application Complete"),
-              RMContainerEventType.KILL);
-    }
-    // Clean up pending requests, metrics etc.
-    attempt.stop(rmAppAttemptFinalState);
-
-    // Inform the queue
-    FSLeafQueue queue = queueMgr.getLeafQueue(attempt.getQueue()
-        .getQueueName(), false);
-    boolean wasRunnable = queue.removeApp(attempt);
-
-    if (wasRunnable) {
-      maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt);
-    } else {
-      maxRunningEnforcer.untrackNonRunnableApp(attempt);
-    }
-  }
-
-  /**
-   * Clean up a completed container.
-   */
-  private synchronized void completedContainer(RMContainer rmContainer,
-      ContainerStatus containerStatus, RMContainerEventType event) {
-    if (rmContainer == null) {
-      LOG.info("Null container completed...");
-      return;
-    }
-
-    Container container = rmContainer.getContainer();
-
-    // Get the application for the finished container
-    FSSchedulerApp application =
-        getCurrentAttemptForContainer(container.getId());
-    ApplicationId appId =
-        container.getId().getApplicationAttemptId().getApplicationId();
-    if (application == null) {
-      LOG.info("Container " + container + " of" +
-          " unknown application attempt " + appId +
-          " completed with event " + event);
-      return;
-    }
-
-    // Get the node on which the container was allocated
-    FSSchedulerNode node = nodes.get(container.getNodeId());
-
-    if (rmContainer.getState() == RMContainerState.RESERVED) {
-      application.unreserve(node, rmContainer.getReservedPriority());
-      node.unreserveResource(application);
-    } else {
-      application.containerCompleted(rmContainer, containerStatus, event);
-      node.releaseContainer(container);
-      updateRootQueueMetrics();
-    }
-
-    LOG.info("Application attempt " + application.getApplicationAttemptId()
-        + " released container " + container.getId() + " on node: " + node
-        + " with event: " + event);
-  }
-
-  private synchronized void addNode(RMNode node) {
-    nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName));
-    Resources.addTo(clusterCapacity, node.getTotalCapability());
-    updateRootQueueMetrics();
-
-    LOG.info("Added node " + node.getNodeAddress() +
-        " cluster capacity: " + clusterCapacity);
-  }
-
-  private synchronized void removeNode(RMNode rmNode) {
-    FSSchedulerNode node = nodes.get(rmNode.getNodeID());
-    // This can occur when an UNHEALTHY node reconnects
-    if (node == null) {
-      return;
-    }
-    Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability());
-    updateRootQueueMetrics();
-
-    // Remove running containers
-    List<RMContainer> runningContainers = node.getRunningContainers();
-    for (RMContainer container : runningContainers) {
-      completedContainer(container,
-          SchedulerUtils.createAbnormalContainerStatus(
-              container.getContainerId(),
-              SchedulerUtils.LOST_CONTAINER),
-          RMContainerEventType.KILL);
-    }
-
-    // Remove reservations, if any
-    RMContainer reservedContainer = node.getReservedContainer();
-    if (reservedContainer != null) {
-      completedContainer(reservedContainer,
-          SchedulerUtils.createAbnormalContainerStatus(
-              reservedContainer.getContainerId(),
-              SchedulerUtils.LOST_CONTAINER),
-          RMContainerEventType.KILL);
-    }
-
-    nodes.remove(rmNode.getNodeID());
-    LOG.info("Removed node " + rmNode.getNodeAddress() +
-        " cluster capacity: " + clusterCapacity);
-  }
-
-  @Override
-  public Allocation allocate(ApplicationAttemptId appAttemptId,
-      List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
-
-    // Make sure this application exists
-    FSSchedulerApp application = getSchedulerApp(appAttemptId);
-    if (application == null) {
-      LOG.info("Calling allocate on removed " +
-          "or non existant application " + appAttemptId);
-      return EMPTY_ALLOCATION;
-    }
-
-    // Sanity check
-    SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
-        clusterCapacity, minimumAllocation, maximumAllocation, incrAllocation);
-
-    // Release containers
-    for (ContainerId releasedContainerId : release) {
-      RMContainer rmContainer = getRMContainer(releasedContainerId);
-      if (rmContainer == null) {
-        RMAuditLogger.logFailure(application.getUser(),
-            AuditConstants.RELEASE_CONTAINER,
-            "Unauthorized access or invalid container", "FairScheduler",
-            "Trying to release container not owned by app or with invalid id",
-            application.getApplicationId(), releasedContainerId);
-      }
-      completedContainer(rmContainer,
-          SchedulerUtils.createAbnormalContainerStatus(
-              releasedContainerId,
-              SchedulerUtils.RELEASED_CONTAINER),
-          RMContainerEventType.RELEASED);
-    }
-
-    synchronized (application) {
-      if (!ask.isEmpty()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("allocate: pre-update" +
-              " applicationAttemptId=" + appAttemptId +
-              " application=" + application.getApplicationId());
-        }
-        application.showRequests();
-
-        // Update application requests
-        application.updateResourceRequests(ask);
-
-        LOG.debug("allocate: post-update");
-        application.showRequests();
-      }
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("allocate:" +
-            " applicationAttemptId=" + appAttemptId +
-            " #ask=" + ask.size());
-
-        LOG.debug("Preempting " + application.getPreemptionContainers().size()
-            + " container(s)");
-      }
-      
-      Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>();
-      for (RMContainer container : application.getPreemptionContainers()) {
-        preemptionContainerIds.add(container.getContainerId());
-      }
-
-      application.updateBlacklist(blacklistAdditions, blacklistRemovals);
-      
-      return new Allocation(application.pullNewlyAllocatedContainers(),
-          application.getHeadroom(), preemptionContainerIds);
-    }
-  }
-
-  /**
-   * Process a container which has launched on a node, as reported by the node.
-   */
-  private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
-    // Get the application for the finished container
-    FSSchedulerApp application = getCurrentAttemptForContainer(containerId);
-    if (application == null) {
-      LOG.info("Unknown application "
-          + containerId.getApplicationAttemptId().getApplicationId()
-          + " launched container " + containerId + " on node: " + node);
-      return;
-    }
-
-    application.containerLaunchedOnNode(containerId, node.getNodeID());
-  }
-
-  /**
-   * Process a heartbeat update from a node.
-   */
-  private synchronized void nodeUpdate(RMNode nm) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
-    }
-    eventLog.log("HEARTBEAT", nm.getHostName());
-    FSSchedulerNode node = nodes.get(nm.getNodeID());
-
-    // Update resource if any change
-    SchedulerUtils.updateResourceIfChanged(node, nm, clusterCapacity, LOG);
-    
-    List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
-    List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
-    List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
-    for(UpdatedContainerInfo containerInfo : containerInfoList) {
-      newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
-      completedContainers.addAll(containerInfo.getCompletedContainers());
-    } 
-    // Processing the newly launched containers
-    for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
-      containerLaunchedOnNode(launchedContainer.getContainerId(), node);
-    }
-
-    // Process completed containers
-    for (ContainerStatus completedContainer : completedContainers) {
-      ContainerId containerId = completedContainer.getContainerId();
-      LOG.debug("Container FINISHED: " + containerId);
-      completedContainer(getRMContainer(containerId),
-          completedContainer, RMContainerEventType.FINISHED);
-    }
-
-    if (continuousSchedulingEnabled) {
-      if (!completedContainers.isEmpty()) {
-        attemptScheduling(node);
-      }
-    } else {
-      attemptScheduling(node);
-    }
-  }
-
-  private void continuousScheduling() {
-    while (true) {
-      List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
-      Collections.sort(nodeIdList, nodeAvailableResourceComparator);
-
-      // iterate all nodes
-      for (NodeId nodeId : nodeIdList) {
-        if (nodes.containsKey(nodeId)) {
-          FSSchedulerNode node = nodes.get(nodeId);
-          try {
-            if (Resources.fitsIn(minimumAllocation,
-                    node.getAvailableResource())) {
-              attemptScheduling(node);
-            }
-          } catch (Throwable ex) {
-            LOG.warn("Error while attempting scheduling for node " + node +
-                    ": " + ex.toString(), ex);
-          }
-        }
-      }
-      try {
-        Thread.sleep(getContinuousSchedulingSleepMs());
-      } catch (InterruptedException e) {
-        LOG.warn("Error while doing sleep in continuous scheduling: " +
-                e.toString(), e);
-      }
-    }
-  }
-
-  /** Sort nodes by available resource */
-  private class NodeAvailableResourceComparator implements Comparator<NodeId> {
-
-    @Override
-    public int compare(NodeId n1, NodeId n2) {
-      return RESOURCE_CALCULATOR.compare(clusterCapacity,
-              nodes.get(n2).getAvailableResource(),
-              nodes.get(n1).getAvailableResource());
-    }
-  }
-  
-  private synchronized void attemptScheduling(FSSchedulerNode node) {
-    // Assign new containers...
-    // 1. Check for reserved applications
-    // 2. Schedule if there are no reservations
-
-    AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable();
-    if (reservedAppSchedulable != null) {
-      Priority reservedPriority = node.getReservedContainer().getReservedPriority();
-      if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) {
-        // Don't hold the reservation if app can no longer use it
-        LOG.info("Releasing reservation that cannot be satisfied for application "
-            + reservedAppSchedulable.getApp().getApplicationAttemptId()
-            + " on node " + node);
-        reservedAppSchedulable.unreserve(reservedPriority, node);
-        reservedAppSchedulable = null;
-      } else {
-        // Reservation exists; try to fulfill the reservation
-        LOG.info("Trying to fulfill reservation for application "
-            + reservedAppSchedulable.getApp().getApplicationAttemptId()
-            + " on node: " + node);
-
-        node.getReservedAppSchedulable().assignReservedContainer(node);
-      }
-    }
-    if (reservedAppSchedulable == null) {
-      // No reservation, schedule at queue which is farthest below fair share
-      int assignedContainers = 0;
-      while (node.getReservedContainer() == null) {
-        boolean assignedContainer = false;
-        if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
-              queueMgr.getRootQueue().assignContainer(node),
-              Resources.none())) {
-          assignedContainers++;
-          assignedContainer = true;
-        }
-        if (!assignedContainer) { break; }
-        if (!assignMultiple) { break; }
-        if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; }
-      }
-    }
-    updateRootQueueMetrics();
-  }
-
-  @Override
-  public SchedulerNodeReport getNodeReport(NodeId nodeId) {
-    FSSchedulerNode node = nodes.get(nodeId);
-    return node == null ? null : new SchedulerNodeReport(node);
-  }
-  
-  public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) {
-    SchedulerApplication app =
-        applications.get(appAttemptId.getApplicationId());
-    if (app != null) {
-      return (FSSchedulerApp) app.getCurrentAppAttempt();
-    }
-    return null;
-  }
-  
-  @Override
-  public SchedulerAppReport getSchedulerAppInfo(
-      ApplicationAttemptId appAttemptId) {
-    FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
-    if (attempt == null) {
-      LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
-      return null;
-    }
-    return new SchedulerAppReport(attempt);
-  }
-  
-  @Override
-  public ApplicationResourceUsageReport getAppResourceUsageReport(
-      ApplicationAttemptId appAttemptId) {
-    FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
-    if (attempt == null) {
-      LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
-      return null;
-    }
-    return attempt.getResourceUsageReport();
-  }
-  
-  /**
-   * Subqueue metrics might be a little out of date because fair shares are
-   * recalculated at the update interval, but the root queue metrics needs to
-   * be updated synchronously with allocations and completions so that cluster
-   * metrics will be consistent.
-   */
-  private void updateRootQueueMetrics() {
-    rootMetrics.setAvailableResourcesToQueue(
-        Resources.subtract(
-            clusterCapacity, rootMetrics.getAllocatedResources()));
-  }
-
-  @Override
-  public QueueMetrics getRootQueueMetrics() {
-    return rootMetrics;
-  }
-
-  @Override
-  public void handle(SchedulerEvent event) {
-    switch (event.getType()) {
-    case NODE_ADDED:
-      if (!(event instanceof NodeAddedSchedulerEvent)) {
-        throw new RuntimeException("Unexpected event type: " + event);
-      }
-      NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
-      addNode(nodeAddedEvent.getAddedRMNode());
-      break;
-    case NODE_REMOVED:
-      if (!(event instanceof NodeRemovedSchedulerEvent)) {
-        throw new RuntimeException("Unexpected event type: " + event);
-      }
-      NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
-      removeNode(nodeRemovedEvent.getRemovedRMNode());
-      break;
-    case NODE_UPDATE:
-      if (!(event instanceof NodeUpdateSchedulerEvent)) {
-        throw new RuntimeException("Unexpected event type: " + event);
-      }
-      NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
-      nodeUpdate(nodeUpdatedEvent.getRMNode());
-      break;
-    case APP_ADDED:
-      if (!(event instanceof AppAddedSchedulerEvent)) {
-        throw new RuntimeException("Unexpected event type: " + event);
-      }
-      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
-      addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser());
-      break;
-    case APP_REMOVED:
-      if (!(event instanceof AppRemovedSchedulerEvent)) {
-        throw new RuntimeException("Unexpected event type: " + event);
-      }
-      AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
-      removeApplication(appRemovedEvent.getApplicationID(),
-        appRemovedEvent.getFinalState());
-      break;
-    case APP_ATTEMPT_ADDED:
-      if (!(event instanceof AppAttemptAddedSchedulerEvent)) {
-        throw new RuntimeException("Unexpected event type: " + event);
-      }
-      AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
-          (AppAttemptAddedSchedulerEvent) event;
-      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
-        appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
-      break;
-    case APP_ATTEMPT_REMOVED:
-      if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
-        throw new RuntimeException("Unexpected event type: " + event);
-      }
-      AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
-          (AppAttemptRemovedSchedulerEvent) event;
-      removeApplicationAttempt(
-          appAttemptRemovedEvent.getApplicationAttemptID(),
-          appAttemptRemovedEvent.getFinalAttemptState(),
-          appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
-      break;
-    case CONTAINER_EXPIRED:
-      if (!(event instanceof ContainerExpiredSchedulerEvent)) {
-        throw new RuntimeException("Unexpected event type: " + event);
-      }
-      ContainerExpiredSchedulerEvent containerExpiredEvent =
-          (ContainerExpiredSchedulerEvent)event;
-      ContainerId containerId = containerExpiredEvent.getContainerId();
-      completedContainer(getRMContainer(containerId),
-          SchedulerUtils.createAbnormalContainerStatus(
-              containerId,
-              SchedulerUtils.EXPIRED_CONTAINER),
-          RMContainerEventType.EXPIRE);
-      break;
-    default:
-      LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
-    }
-  }
-
-  @Override
-  public void recover(RMState state) throws Exception {
-    // NOT IMPLEMENTED
-  }
-
-  @Override
-  public synchronized void reinitialize(Configuration conf, RMContext rmContext)
-      throws IOException {
-    if (!initialized) {
-      this.conf = new FairSchedulerConfiguration(conf);
-      validateConf(this.conf);
-      minimumAllocation = this.conf.getMinimumAllocation();
-      maximumAllocation = this.conf.getMaximumAllocation();
-      incrAllocation = this.conf.getIncrementAllocation();
-      continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
-      continuousSchedulingSleepMs =
-              this.conf.getContinuousSchedulingSleepMs();
-      nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
-      rackLocalityThreshold = this.conf.getLocalityThresholdRack();
-      nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
-      rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
-      preemptionEnabled = this.conf.getPreemptionEnabled();
-      assignMultiple = this.conf.getAssignMultiple();
-      maxAssign = this.conf.getMaxAssign();
-      sizeBasedWeight = this.conf.getSizeBasedWeight();
-      preemptionInterval = this.conf.getPreemptionInterval();
-      waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
-      usePortForNodeName = this.conf.getUsePortForNodeName();
-      
-      rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
-      this.rmContext = rmContext;
-      this.eventLog = new FairSchedulerEventLog();
-      eventLog.init(this.conf);
-
-      initialized = true;
-
-      allocConf = new AllocationConfiguration(conf);
-      try {
-        queueMgr.initialize(conf);
-      } catch (Exception e) {
-        throw new IOException("Failed to start FairScheduler", e);
-      }
-
-      Thread updateThread = new Thread(new UpdateThread());
-      updateThread.setName("FairSchedulerUpdateThread");
-      updateThread.setDaemon(true);
-      updateThread.start();
-
-      if (continuousSchedulingEnabled) {
-        // start continuous scheduling thread
-        Thread schedulingThread = new Thread(
-          new Runnable() {
-            @Override
-            public void run() {
-              continuousScheduling();
-            }
-          }
-        );
-        schedulingThread.setName("ContinuousScheduling");
-        schedulingThread.setDaemon(true);
-        schedulingThread.start();
-      }
-      
-      allocsLoader.init(conf);
-      allocsLoader.setReloadListener(new AllocationReloadListener());
-      // If we fail to load allocations file on initialize, we want to fail
-      // immediately.  After a successful load, exceptions on future reloads
-      // will just result in leaving things as they are.
-      try {
-        allocsLoader.reloadAllocations();
-      } catch (Exception e) {
-        throw new IOException("Failed to initialize FairScheduler", e);
-      }
-      allocsLoader.start();
-    } else {
-      try {
-        allocsLoader.reloadAllocations();
-      } catch (Exception e) {
-        LOG.error("Failed to reload allocations file", e);
-      }
-    }
-  }
-
-  @Override
-  public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues,
-      boolean recursive) throws IOException {
-    if (!queueMgr.exists(queueName)) {
-      throw new IOException("queue " + queueName + " does not exist");
-    }
-    return queueMgr.getQueue(queueName).getQueueInfo(includeChildQueues,
-        recursive);
-  }
-
-  @Override
-  public List<QueueUserACLInfo> getQueueUserAclInfo() {
-    UserGroupInformation user = null;
-    try {
-      user = UserGroupInformation.getCurrentUser();
-    } catch (IOException ioe) {
-      return new ArrayList<QueueUserACLInfo>();
-    }
-
-    return queueMgr.getRootQueue().getQueueUserAclInfo(user);
-  }
-
-  @Override
-  public int getNumClusterNodes() {
-    return nodes.size();
-  }
-
-  @Override
-  public synchronized boolean checkAccess(UserGroupInformation callerUGI,
-      QueueACL acl, String queueName) {
-    FSQueue queue = getQueueManager().getQueue(queueName);
-    if (queue == null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("ACL not found for queue access-type " + acl
-            + " for queue " + queueName);
-      }
-      return false;
-    }
-    return queue.hasAccess(acl, callerUGI);
-  }
-  
-  public AllocationConfiguration getAllocationConfiguration() {
-    return allocConf;
-  }
-  
-  private class AllocationReloadListener implements
-      AllocationFileLoaderService.Listener {
-
-    @Override
-    public void onReload(AllocationConfiguration queueInfo) {
-      // Commit the reload; also create any queue defined in the alloc file
-      // if it does not already exist, so it can be displayed on the web UI.
-      synchronized (FairScheduler.this) {
-        allocConf = queueInfo;
-        allocConf.getDefaultSchedulingPolicy().initialize(clusterCapacity);
-        queueMgr.updateAllocationConfiguration(allocConf);
-      }
-    }
-  }
-
-  @Override
-  public List<ApplicationAttemptId> getAppsInQueue(String queueName) {
-    FSQueue queue = queueMgr.getQueue(queueName);
-    if (queue == null) {
-      return null;
-    }
-    List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>();
-    queue.collectSchedulerApplications(apps);
-    return apps;
-  }
-
-}

+ 0 - 615
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java.orig

@@ -1,615 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import junit.framework.Assert;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.QueueInfo;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceOption;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.InlineDispatcher;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.Application;
-import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
-import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.Task;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestFifoScheduler {
-  private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
-  private final int GB = 1024;
-
-  private ResourceManager resourceManager = null;
-  
-  private static final RecordFactory recordFactory = 
-      RecordFactoryProvider.getRecordFactory(null);
-  
-  @Before
-  public void setUp() throws Exception {
-    resourceManager = new ResourceManager();
-    Configuration conf = new Configuration();
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, 
-        FifoScheduler.class, ResourceScheduler.class);
-    resourceManager.init(conf);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    resourceManager.stop();
-  }
-  
-  private org.apache.hadoop.yarn.server.resourcemanager.NodeManager
-      registerNode(String hostName, int containerManagerPort, int nmHttpPort,
-          String rackName, Resource capability) throws IOException,
-          YarnException {
-    return new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(
-        hostName, containerManagerPort, nmHttpPort, rackName, capability,
-        resourceManager);
-  }
-  
-  private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
-    ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
-    ApplicationAttemptId attId =
-        ApplicationAttemptId.newInstance(appIdImpl, attemptId);
-    return attId;
-  }
-
-  private ResourceRequest createResourceRequest(int memory, String host,
-      int priority, int numContainers) {
-    ResourceRequest request = recordFactory
-        .newRecordInstance(ResourceRequest.class);
-    request.setCapability(Resources.createResource(memory));
-    request.setResourceName(host);
-    request.setNumContainers(numContainers);
-    Priority prio = recordFactory.newRecordInstance(Priority.class);
-    prio.setPriority(priority);
-    request.setPriority(prio);
-    return request;
-  }
-
-  @Test(timeout=5000)
-  public void testFifoSchedulerCapacityWhenNoNMs() {
-    FifoScheduler scheduler = new FifoScheduler();
-    QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false);
-    Assert.assertEquals(0.0f, queueInfo.getCurrentCapacity());
-  }
-  
-  @Test(timeout=5000)
-  public void testAppAttemptMetrics() throws Exception {
-    AsyncDispatcher dispatcher = new InlineDispatcher();
-    RMContext rmContext = new RMContextImpl(dispatcher, null,
-        null, null, null, null, null, null, null);
-
-    FifoScheduler schedular = new FifoScheduler();
-    schedular.reinitialize(new Configuration(), rmContext);
-    QueueMetrics metrics = schedular.getRootQueueMetrics();
-    int beforeAppsSubmitted = metrics.getAppsSubmitted();
-
-    ApplicationId appId = BuilderUtils.newApplicationId(200, 1);
-    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
-        appId, 1);
-
-    SchedulerEvent appEvent = new AppAddedSchedulerEvent(appId, "queue", "user");
-    schedular.handle(appEvent);
-    SchedulerEvent attemptEvent =
-        new AppAttemptAddedSchedulerEvent(appAttemptId, false);
-    schedular.handle(attemptEvent);
-
-    appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 2);
-    SchedulerEvent attemptEvent2 =
-        new AppAttemptAddedSchedulerEvent(appAttemptId, false);
-    schedular.handle(attemptEvent2);
-
-    int afterAppsSubmitted = metrics.getAppsSubmitted();
-    Assert.assertEquals(1, afterAppsSubmitted - beforeAppsSubmitted);
-  }
-
-  @Test(timeout=2000)
-  public void testNodeLocalAssignment() throws Exception {
-    AsyncDispatcher dispatcher = new InlineDispatcher();
-    Configuration conf = new Configuration();
-    RMContainerTokenSecretManager containerTokenSecretManager =
-        new RMContainerTokenSecretManager(conf);
-    containerTokenSecretManager.rollMasterKey();
-    NMTokenSecretManagerInRM nmTokenSecretManager =
-        new NMTokenSecretManagerInRM(conf);
-    nmTokenSecretManager.rollMasterKey();
-    RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
-        null, containerTokenSecretManager, nmTokenSecretManager, null);
-
-    FifoScheduler scheduler = new FifoScheduler();
-    scheduler.reinitialize(new Configuration(), rmContext);
-
-    RMNode node0 = MockNodes.newNodeInfo(1,
-        Resources.createResource(1024 * 64), 1, "127.0.0.1");
-    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node0);
-    scheduler.handle(nodeEvent1);
-
-    int _appId = 1;
-    int _appAttemptId = 1;
-    ApplicationAttemptId appAttemptId = createAppAttemptId(_appId,
-        _appAttemptId);
-    AppAddedSchedulerEvent appEvent =
-        new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "queue1",
-          "user1");
-    scheduler.handle(appEvent);
-    AppAttemptAddedSchedulerEvent attemptEvent =
-        new AppAttemptAddedSchedulerEvent(appAttemptId, false);
-    scheduler.handle(attemptEvent);
-
-    int memory = 64;
-    int nConts = 3;
-    int priority = 20;
-
-    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
-    ResourceRequest nodeLocal = createResourceRequest(memory,
-        node0.getHostName(), priority, nConts);
-    ResourceRequest rackLocal = createResourceRequest(memory,
-        node0.getRackName(), priority, nConts);
-    ResourceRequest any = createResourceRequest(memory, ResourceRequest.ANY, priority,
-        nConts);
-    ask.add(nodeLocal);
-    ask.add(rackLocal);
-    ask.add(any);
-    scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null);
-
-    NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0);
-
-    // Before the node update event, there are 3 local requests outstanding
-    Assert.assertEquals(3, nodeLocal.getNumContainers());
-
-    scheduler.handle(node0Update);
-
-    // After the node update event, check that there are no more local requests
-    // outstanding
-    Assert.assertEquals(0, nodeLocal.getNumContainers());
-    //Also check that the containers were scheduled
-    SchedulerAppReport info = scheduler.getSchedulerAppInfo(appAttemptId);
-    Assert.assertEquals(3, info.getLiveContainers().size());
-  }
-  
-  @Test(timeout=2000)
-  public void testUpdateResourceOnNode() throws Exception {
-    AsyncDispatcher dispatcher = new InlineDispatcher();
-    Configuration conf = new Configuration();
-    RMContainerTokenSecretManager containerTokenSecretManager =
-        new RMContainerTokenSecretManager(conf);
-    containerTokenSecretManager.rollMasterKey();
-    NMTokenSecretManagerInRM nmTokenSecretManager =
-        new NMTokenSecretManagerInRM(conf);
-    nmTokenSecretManager.rollMasterKey();
-    RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
-        null, containerTokenSecretManager, nmTokenSecretManager, null);
-
-    FifoScheduler scheduler = new FifoScheduler(){
-      @SuppressWarnings("unused")
-      public Map<NodeId, FiCaSchedulerNode> getNodes(){
-        return nodes;
-      }
-    };
-    scheduler.reinitialize(new Configuration(), rmContext);
-    RMNode node0 = MockNodes.newNodeInfo(1,
-        Resources.createResource(2048, 4), 1, "127.0.0.1");
-    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node0);
-    scheduler.handle(nodeEvent1);
-    
-    Method method = scheduler.getClass().getDeclaredMethod("getNodes");
-    @SuppressWarnings("unchecked")
-    Map<NodeId, FiCaSchedulerNode> schedulerNodes = 
-        (Map<NodeId, FiCaSchedulerNode>) method.invoke(scheduler);
-    assertEquals(schedulerNodes.values().size(), 1);
-    
-    // set resource of RMNode to 1024 and verify it works.
-    node0.setResourceOption(ResourceOption.newInstance(
-        Resources.createResource(1024, 4), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT));
-    assertEquals(node0.getTotalCapability().getMemory(), 1024);
-    // verify that SchedulerNode's resource hasn't been changed.
-    assertEquals(schedulerNodes.get(node0.getNodeID()).
-        getAvailableResource().getMemory(), 2048);
-    // now, NM heartbeat comes.
-    NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0);
-    scheduler.handle(node0Update);
-    // SchedulerNode's available resource is changed.
-    assertEquals(schedulerNodes.get(node0.getNodeID()).
-        getAvailableResource().getMemory(), 1024);
-    QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false);
-    Assert.assertEquals(0.0f, queueInfo.getCurrentCapacity());
-    
-    int _appId = 1;
-    int _appAttemptId = 1;
-    ApplicationAttemptId appAttemptId = createAppAttemptId(_appId,
-        _appAttemptId);
-    AppAddedSchedulerEvent appEvent =
-        new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "queue1",
-          "user1");
-    scheduler.handle(appEvent);
-    AppAttemptAddedSchedulerEvent attemptEvent =
-        new AppAttemptAddedSchedulerEvent(appAttemptId, false);
-    scheduler.handle(attemptEvent);
-
-    int memory = 1024;
-    int priority = 1;
-
-    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
-    ResourceRequest nodeLocal = createResourceRequest(memory,
-        node0.getHostName(), priority, 1);
-    ResourceRequest rackLocal = createResourceRequest(memory,
-        node0.getRackName(), priority, 1);
-    ResourceRequest any = createResourceRequest(memory, ResourceRequest.ANY, priority,
-        1);
-    ask.add(nodeLocal);
-    ask.add(rackLocal);
-    ask.add(any);
-    scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null);
-
-    // Before the node update event, there are one local request
-    Assert.assertEquals(1, nodeLocal.getNumContainers());
-
-    // Now schedule.
-    scheduler.handle(node0Update);
-
-    // After the node update event, check no local request
-    Assert.assertEquals(0, nodeLocal.getNumContainers());
-    // Also check that one container was scheduled
-    SchedulerAppReport info = scheduler.getSchedulerAppInfo(appAttemptId);
-    Assert.assertEquals(1, info.getLiveContainers().size());
-    // And check the default Queue now is full.
-    queueInfo = scheduler.getQueueInfo(null, false, false);
-    Assert.assertEquals(1.0f, queueInfo.getCurrentCapacity());
-  }
-  
-//  @Test
-  public void testFifoScheduler() throws Exception {
-
-    LOG.info("--- START: testFifoScheduler ---");
-        
-    final int GB = 1024;
-    
-    // Register node1
-    String host_0 = "host_0";
-    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = 
-      registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, 
-          Resources.createResource(4 * GB, 1));
-    nm_0.heartbeat();
-    
-    // Register node2
-    String host_1 = "host_1";
-    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = 
-      registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, 
-          Resources.createResource(2 * GB, 1));
-    nm_1.heartbeat();
-
-    // ResourceRequest priorities
-    Priority priority_0 = 
-      org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(0); 
-    Priority priority_1 = 
-      org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(1);
-    
-    // Submit an application
-    Application application_0 = new Application("user_0", resourceManager);
-    application_0.submit();
-    
-    application_0.addNodeManager(host_0, 1234, nm_0);
-    application_0.addNodeManager(host_1, 1234, nm_1);
-
-    Resource capability_0_0 = Resources.createResource(GB);
-    application_0.addResourceRequestSpec(priority_1, capability_0_0);
-    
-    Resource capability_0_1 = Resources.createResource(2 * GB);
-    application_0.addResourceRequestSpec(priority_0, capability_0_1);
-
-    Task task_0_0 = new Task(application_0, priority_1, 
-        new String[] {host_0, host_1});
-    application_0.addTask(task_0_0);
-       
-    // Submit another application
-    Application application_1 = new Application("user_1", resourceManager);
-    application_1.submit();
-    
-    application_1.addNodeManager(host_0, 1234, nm_0);
-    application_1.addNodeManager(host_1, 1234, nm_1);
-    
-    Resource capability_1_0 = Resources.createResource(3 * GB);
-    application_1.addResourceRequestSpec(priority_1, capability_1_0);
-    
-    Resource capability_1_1 = Resources.createResource(4 * GB);
-    application_1.addResourceRequestSpec(priority_0, capability_1_1);
-
-    Task task_1_0 = new Task(application_1, priority_1, 
-        new String[] {host_0, host_1});
-    application_1.addTask(task_1_0);
-        
-    // Send resource requests to the scheduler
-    LOG.info("Send resource requests to the scheduler");
-    application_0.schedule();
-    application_1.schedule();
-    
-    // Send a heartbeat to kick the tires on the Scheduler
-    LOG.info("Send a heartbeat to kick the tires on the Scheduler... " +
-    		"nm0 -> task_0_0 and task_1_0 allocated, used=4G " +
-    		"nm1 -> nothing allocated");
-    nm_0.heartbeat();             // task_0_0 and task_1_0 allocated, used=4G
-    nm_1.heartbeat();             // nothing allocated
-    
-    // Get allocations from the scheduler
-    application_0.schedule();     // task_0_0 
-    checkApplicationResourceUsage(GB, application_0);
-
-    application_1.schedule();     // task_1_0
-    checkApplicationResourceUsage(3 * GB, application_1);
-    
-    nm_0.heartbeat();
-    nm_1.heartbeat();
-    
-    checkNodeResourceUsage(4*GB, nm_0);  // task_0_0 (1G) and task_1_0 (3G)
-    checkNodeResourceUsage(0*GB, nm_1);  // no tasks, 2G available
-    
-    LOG.info("Adding new tasks...");
-    
-    Task task_1_1 = new Task(application_1, priority_1, 
-        new String[] {ResourceRequest.ANY});
-    application_1.addTask(task_1_1);
-
-    Task task_1_2 = new Task(application_1, priority_1, 
-        new String[] {ResourceRequest.ANY});
-    application_1.addTask(task_1_2);
-
-    Task task_1_3 = new Task(application_1, priority_0, 
-        new String[] {ResourceRequest.ANY});
-    application_1.addTask(task_1_3);
-    
-    application_1.schedule();
-    
-    Task task_0_1 = new Task(application_0, priority_1, 
-        new String[] {host_0, host_1});
-    application_0.addTask(task_0_1);
-
-    Task task_0_2 = new Task(application_0, priority_1, 
-        new String[] {host_0, host_1});
-    application_0.addTask(task_0_2);
-    
-    Task task_0_3 = new Task(application_0, priority_0, 
-        new String[] {ResourceRequest.ANY});
-    application_0.addTask(task_0_3);
-
-    application_0.schedule();
-
-    // Send a heartbeat to kick the tires on the Scheduler
-    LOG.info("Sending hb from " + nm_0.getHostName());
-    nm_0.heartbeat();                   // nothing new, used=4G
-    
-    LOG.info("Sending hb from " + nm_1.getHostName());
-    nm_1.heartbeat();                   // task_0_3, used=2G
-    
-    // Get allocations from the scheduler
-    LOG.info("Trying to allocate...");
-    application_0.schedule();
-    checkApplicationResourceUsage(3 * GB, application_0);
-    application_1.schedule();
-    checkApplicationResourceUsage(3 * GB, application_1);
-    nm_0.heartbeat();
-    nm_1.heartbeat();
-    checkNodeResourceUsage(4*GB, nm_0);
-    checkNodeResourceUsage(2*GB, nm_1);
-    
-    // Complete tasks
-    LOG.info("Finishing up task_0_0");
-    application_0.finishTask(task_0_0); // Now task_0_1
-    application_0.schedule();
-    application_1.schedule();
-    nm_0.heartbeat();
-    nm_1.heartbeat();
-    checkApplicationResourceUsage(3 * GB, application_0);
-    checkApplicationResourceUsage(3 * GB, application_1);
-    checkNodeResourceUsage(4*GB, nm_0);
-    checkNodeResourceUsage(2*GB, nm_1);
-
-    LOG.info("Finishing up task_1_0");
-    application_1.finishTask(task_1_0);  // Now task_0_2
-    application_0.schedule(); // final overcommit for app0 caused here
-    application_1.schedule();
-    nm_0.heartbeat(); // final overcommit for app0 occurs here
-    nm_1.heartbeat();
-    checkApplicationResourceUsage(4 * GB, application_0);
-    checkApplicationResourceUsage(0 * GB, application_1);
-    //checkNodeResourceUsage(1*GB, nm_0);  // final over-commit -> rm.node->1G, test.node=2G
-    checkNodeResourceUsage(2*GB, nm_1);
-
-    LOG.info("Finishing up task_0_3");
-    application_0.finishTask(task_0_3); // No more
-    application_0.schedule();
-    application_1.schedule();
-    nm_0.heartbeat();
-    nm_1.heartbeat();
-    checkApplicationResourceUsage(2 * GB, application_0);
-    checkApplicationResourceUsage(0 * GB, application_1);
-    //checkNodeResourceUsage(2*GB, nm_0);  // final over-commit, rm.node->1G, test.node->2G
-    checkNodeResourceUsage(0*GB, nm_1);
-    
-    LOG.info("Finishing up task_0_1");
-    application_0.finishTask(task_0_1);
-    application_0.schedule();
-    application_1.schedule();
-    nm_0.heartbeat();
-    nm_1.heartbeat();
-    checkApplicationResourceUsage(1 * GB, application_0);
-    checkApplicationResourceUsage(0 * GB, application_1);
-    
-    LOG.info("Finishing up task_0_2");
-    application_0.finishTask(task_0_2); // now task_1_3 can go!
-    application_0.schedule();
-    application_1.schedule();
-    nm_0.heartbeat();
-    nm_1.heartbeat();
-    checkApplicationResourceUsage(0 * GB, application_0);
-    checkApplicationResourceUsage(4 * GB, application_1);
-    
-    LOG.info("Finishing up task_1_3");
-    application_1.finishTask(task_1_3); // now task_1_1
-    application_0.schedule();
-    application_1.schedule();
-    nm_0.heartbeat();
-    nm_1.heartbeat();
-    checkApplicationResourceUsage(0 * GB, application_0);
-    checkApplicationResourceUsage(3 * GB, application_1);
-    
-    LOG.info("Finishing up task_1_1");
-    application_1.finishTask(task_1_1);
-    application_0.schedule();
-    application_1.schedule();
-    nm_0.heartbeat();
-    nm_1.heartbeat();
-    checkApplicationResourceUsage(0 * GB, application_0);
-    checkApplicationResourceUsage(3 * GB, application_1);
-    
-    LOG.info("--- END: testFifoScheduler ---");
-  }
-
-  @SuppressWarnings("resource")
-  @Test
-  public void testBlackListNodes() throws Exception {
-    Configuration conf = new Configuration();
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
-        ResourceScheduler.class);
-    MockRM rm = new MockRM(conf);
-    rm.start();
-    FifoScheduler fs = (FifoScheduler) rm.getResourceScheduler();
-
-    String host = "127.0.0.1";
-    RMNode node =
-        MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host);
-    fs.handle(new NodeAddedSchedulerEvent(node));
-
-    ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
-    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
-        appId, 1);
-    SchedulerEvent appEvent =
-        new AppAddedSchedulerEvent(appId, "default",
-          "user");
-    fs.handle(appEvent);
-    SchedulerEvent attemptEvent =
-        new AppAttemptAddedSchedulerEvent(appAttemptId, false);
-    fs.handle(attemptEvent);
-
-    // Verify the blacklist can be updated independent of requesting containers
-    fs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
-        Collections.<ContainerId>emptyList(),
-        Collections.singletonList(host), null);
-    Assert.assertTrue(fs.getApplicationAttempt(appAttemptId).isBlacklisted(host));
-    fs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
-        Collections.<ContainerId>emptyList(), null,
-        Collections.singletonList(host));
-    Assert.assertFalse(fs.getApplicationAttempt(appAttemptId).isBlacklisted(host));
-    rm.stop();
-  }
-  
-  @Test
-  public void testGetAppsInQueue() throws Exception {
-    Application application_0 = new Application("user_0", resourceManager);
-    application_0.submit();
-    
-    Application application_1 = new Application("user_0", resourceManager);
-    application_1.submit();
-    
-    ResourceScheduler scheduler = resourceManager.getResourceScheduler();
-    
-    List<ApplicationAttemptId> appsInDefault = scheduler.getAppsInQueue("default");
-    assertTrue(appsInDefault.contains(application_0.getApplicationAttemptId()));
-    assertTrue(appsInDefault.contains(application_1.getApplicationAttemptId()));
-    assertEquals(2, appsInDefault.size());
-    
-    Assert.assertNull(scheduler.getAppsInQueue("someotherqueue"));
-  }
-
-  @Test
-  public void testAddAndRemoveAppFromFiFoScheduler() throws Exception {
-    Configuration conf = new Configuration();
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
-        ResourceScheduler.class);
-    MockRM rm = new MockRM(conf);
-    FifoScheduler fs = (FifoScheduler)rm.getResourceScheduler();
-    TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(fs.applications,
-      fs, "queue");
-  }
-
-  private void checkApplicationResourceUsage(int expected, 
-      Application application) {
-    Assert.assertEquals(expected, application.getUsedResources().getMemory());
-  }
-  
-  private void checkNodeResourceUsage(int expected,
-      org.apache.hadoop.yarn.server.resourcemanager.NodeManager node) {
-    Assert.assertEquals(expected, node.getUsed().getMemory());
-    node.checkResourceUsage();
-  }
-
-  public static void main(String[] arg) throws Exception {
-    TestFifoScheduler t = new TestFifoScheduler();
-    t.setUp();
-    t.testFifoScheduler();
-    t.tearDown();
-  }
-}