Browse Source

YARN-10552. Eliminate code duplication in SLSCapacityScheduler and SLSFairScheduler. Contributed by Szilard Nemeth.

9uapaw 3 years ago
parent
commit
526142447a

+ 28 - 300
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java

@@ -17,12 +17,7 @@
  */
 package org.apache.hadoop.yarn.sls.scheduler;
 
-import java.io.IOException;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -30,119 +25,51 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
-import org.apache.hadoop.yarn.sls.SLSRunner;
-import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.codahale.metrics.Timer;
 
 @Private
 @Unstable
 public class SLSCapacityScheduler extends CapacityScheduler implements
         SchedulerWrapper,Configurable {
-  private Configuration conf;
- 
-  private Map<ApplicationAttemptId, String> appQueueMap =
-          new ConcurrentHashMap<ApplicationAttemptId, String>();
-
-  private Map<ContainerId, Resource> preemptionContainerMap =
-          new ConcurrentHashMap<ContainerId, Resource>();
-
-  // metrics
-  private SchedulerMetrics schedulerMetrics;
-  private boolean metricsON;
-  private Tracker tracker;
-
-  // logger
-  private static final Logger LOG = LoggerFactory.getLogger(SLSCapacityScheduler.class);
 
-  public Tracker getTracker() {
-    return tracker;
-  }
+  private final SLSSchedulerCommons schedulerCommons;
+  private Configuration conf;
 
   public SLSCapacityScheduler() {
-    tracker = new Tracker();
+    schedulerCommons = new SLSSchedulerCommons(this);
   }
 
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
     super.setConf(conf);
-    metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true);
-    if (metricsON) {
-      try {
-        schedulerMetrics = SchedulerMetrics.getInstance(conf,
-            CapacityScheduler.class);
-        schedulerMetrics.init(this, conf);
-      } catch (Exception e) {
-        LOG.error("Caught exception while initializing schedulerMetrics", e);
-      }
-    }
+    schedulerCommons.initMetrics(CapacityScheduler.class, conf);
   }
 
   @Override
   public Allocation allocate(ApplicationAttemptId attemptId,
       List<ResourceRequest> resourceRequests,
       List<SchedulingRequest> schedulingRequests, List<ContainerId> containerIds,
-      List<String> strings, List<String> strings2, ContainerUpdates updateRequests) {
-    if (metricsON) {
-      final Timer.Context context = schedulerMetrics.getSchedulerAllocateTimer()
-          .time();
-      Allocation allocation = null;
-      try {
-        allocation = super
-            .allocate(attemptId, resourceRequests, schedulingRequests,
-                containerIds, strings,
-                strings2, updateRequests);
-        return allocation;
-      } catch (Exception e) {
-        LOG.error("Caught exception from allocate", e);
-        throw e;
-      } finally {
-        context.stop();
-        schedulerMetrics.increaseSchedulerAllocationCounter();
-        try {
-          updateQueueWithAllocateRequest(allocation, attemptId,
-                  resourceRequests, containerIds);
-        } catch (IOException e) {
-          LOG.error("Caught exception while executing finally block", e);
-        }
-      }
-    } else {
-      return super.allocate(attemptId, resourceRequests, schedulingRequests,
-          containerIds, strings,
-          strings2, updateRequests);
-    }
+      List<String> blacklistAdditions, List<String> blacklistRemovals,
+      ContainerUpdates updateRequests) {
+    return schedulerCommons.allocate(attemptId, resourceRequests, schedulingRequests,
+        containerIds, blacklistAdditions, blacklistRemovals, updateRequests);
   }
 
 
   @Override
   public boolean tryCommit(Resource cluster, ResourceCommitRequest r,
       boolean updatePending) {
-    if (metricsON) {
+    if (schedulerCommons.isMetricsON()) {
       boolean isSuccess = false;
       long startTimeNs = System.nanoTime();
       try {
@@ -151,13 +78,13 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
       } finally {
         long elapsedNs = System.nanoTime() - startTimeNs;
         if (isSuccess) {
-          schedulerMetrics.getSchedulerCommitSuccessTimer()
+          getSchedulerMetrics().getSchedulerCommitSuccessTimer()
               .update(elapsedNs, TimeUnit.NANOSECONDS);
-          schedulerMetrics.increaseSchedulerCommitSuccessCounter();
+          getSchedulerMetrics().increaseSchedulerCommitSuccessCounter();
         } else {
-          schedulerMetrics.getSchedulerCommitFailureTimer()
+          getSchedulerMetrics().getSchedulerCommitFailureTimer()
               .update(elapsedNs, TimeUnit.NANOSECONDS);
-          schedulerMetrics.increaseSchedulerCommitFailureCounter();
+          getSchedulerMetrics().increaseSchedulerCommitFailureCounter();
         }
       }
     } else {
@@ -167,222 +94,26 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
 
   @Override
   public void handle(SchedulerEvent schedulerEvent) {
-    if (!metricsON) {
-      super.handle(schedulerEvent);
-      return;
-    }
-
-    if (!schedulerMetrics.isRunning()) {
-      schedulerMetrics.setRunning(true);
-    }
-
-    Timer.Context handlerTimer = null;
-    Timer.Context operationTimer = null;
-
-    NodeUpdateSchedulerEventWrapper eventWrapper;
-    try {
-      if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE
-          && schedulerEvent instanceof NodeUpdateSchedulerEvent) {
-        eventWrapper = new NodeUpdateSchedulerEventWrapper(
-            (NodeUpdateSchedulerEvent)schedulerEvent);
-        schedulerEvent = eventWrapper;
-        updateQueueWithNodeUpdate(eventWrapper);
-      } else if (schedulerEvent.getType() ==
-          SchedulerEventType.APP_ATTEMPT_REMOVED
-          && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
-        // check if having AM Container, update resource usage information
-        AppAttemptRemovedSchedulerEvent appRemoveEvent =
-            (AppAttemptRemovedSchedulerEvent) schedulerEvent;
-        ApplicationAttemptId appAttemptId =
-            appRemoveEvent.getApplicationAttemptID();
-        String queue = appQueueMap.get(appAttemptId);
-        SchedulerAppReport app = super.getSchedulerAppInfo(appAttemptId);
-        if (!app.getLiveContainers().isEmpty()) {  // have 0 or 1
-          // should have one container which is AM container
-          RMContainer rmc = app.getLiveContainers().iterator().next();
-          schedulerMetrics.updateQueueMetricsByRelease(
-              rmc.getContainer().getResource(), queue);
-        }
-      }
-
-      handlerTimer = schedulerMetrics.getSchedulerHandleTimer().time();
-      operationTimer = schedulerMetrics.getSchedulerHandleTimer(
-          schedulerEvent.getType()).time();
-
-      super.handle(schedulerEvent);
-    } finally {
-      if (handlerTimer != null) {
-        handlerTimer.stop();
-      }
-      if (operationTimer != null) {
-        operationTimer.stop();
-      }
-      schedulerMetrics.increaseSchedulerHandleCounter(schedulerEvent.getType());
-
-      if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
-          && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
-        SLSRunner.decreaseRemainingApps();
-        AppAttemptRemovedSchedulerEvent appRemoveEvent =
-            (AppAttemptRemovedSchedulerEvent) schedulerEvent;
-        appQueueMap.remove(appRemoveEvent.getApplicationAttemptID());
-        if (SLSRunner.getRemainingApps() == 0) {
-          try {
-            getSchedulerMetrics().tearDown();
-            SLSRunner.exitSLSRunner();
-          } catch (Exception e) {
-            LOG.error("Scheduler Metrics failed to tear down.", e);
-          }
-        }
-      } else if (schedulerEvent.getType() ==
-          SchedulerEventType.APP_ATTEMPT_ADDED
-          && schedulerEvent instanceof AppAttemptAddedSchedulerEvent) {
-        AppAttemptAddedSchedulerEvent appAddEvent =
-            (AppAttemptAddedSchedulerEvent) schedulerEvent;
-        SchedulerApplication app =
-            applications.get(appAddEvent.getApplicationAttemptId()
-                .getApplicationId());
-        appQueueMap.put(appAddEvent.getApplicationAttemptId(), app.getQueue()
-            .getQueueName());
-      }
-    }
-  }
-
-  private void updateQueueWithNodeUpdate(
-          NodeUpdateSchedulerEventWrapper eventWrapper) {
-    RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode();
-    List<UpdatedContainerInfo> containerList = node.getContainerUpdates();
-    for (UpdatedContainerInfo info : containerList) {
-      for (ContainerStatus status : info.getCompletedContainers()) {
-        ContainerId containerId = status.getContainerId();
-        SchedulerAppReport app = super.getSchedulerAppInfo(
-                containerId.getApplicationAttemptId());
-
-        if (app == null) {
-          // this happens for the AM container
-          // The app have already removed when the NM sends the release
-          // information.
-          continue;
-        }
-
-        String queue = appQueueMap.get(containerId.getApplicationAttemptId());
-        int releasedMemory = 0, releasedVCores = 0;
-        if (status.getExitStatus() == ContainerExitStatus.SUCCESS) {
-          for (RMContainer rmc : app.getLiveContainers()) {
-            if (rmc.getContainerId() == containerId) {
-              releasedMemory += rmc.getContainer().getResource().getMemorySize();
-              releasedVCores += rmc.getContainer()
-                      .getResource().getVirtualCores();
-              break;
-            }
-          }
-        } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) {
-          if (preemptionContainerMap.containsKey(containerId)) {
-            Resource preResource = preemptionContainerMap.get(containerId);
-            releasedMemory += preResource.getMemorySize();
-            releasedVCores += preResource.getVirtualCores();
-            preemptionContainerMap.remove(containerId);
-          }
-        }
-        // update queue counters
-        schedulerMetrics.updateQueueMetricsByRelease(
-            Resource.newInstance(releasedMemory, releasedVCores), queue);
-      }
-    }
-  }
-
-  private void updateQueueWithAllocateRequest(Allocation allocation,
-                        ApplicationAttemptId attemptId,
-                        List<ResourceRequest> resourceRequests,
-                        List<ContainerId> containerIds) throws IOException {
-    // update queue information
-    Resource pendingResource = Resources.createResource(0, 0);
-    Resource allocatedResource = Resources.createResource(0, 0);
-    String queueName = appQueueMap.get(attemptId);
-    // container requested
-    for (ResourceRequest request : resourceRequests) {
-      if (request.getResourceName().equals(ResourceRequest.ANY)) {
-        Resources.addTo(pendingResource,
-                Resources.multiply(request.getCapability(),
-                        request.getNumContainers()));
-      }
-    }
-    // container allocated
-    for (Container container : allocation.getContainers()) {
-      Resources.addTo(allocatedResource, container.getResource());
-      Resources.subtractFrom(pendingResource, container.getResource());
-    }
-    // container released from AM
-    SchedulerAppReport report = super.getSchedulerAppInfo(attemptId);
-    for (ContainerId containerId : containerIds) {
-      Container container = null;
-      for (RMContainer c : report.getLiveContainers()) {
-        if (c.getContainerId().equals(containerId)) {
-          container = c.getContainer();
-          break;
-        }
-      }
-      if (container != null) {
-        // released allocated containers
-        Resources.subtractFrom(allocatedResource, container.getResource());
-      } else {
-        for (RMContainer c : report.getReservedContainers()) {
-          if (c.getContainerId().equals(containerId)) {
-            container = c.getContainer();
-            break;
-          }
-        }
-        if (container != null) {
-          // released reserved containers
-          Resources.subtractFrom(pendingResource, container.getResource());
-        }
-      }
-    }
-    // containers released/preemption from scheduler
-    Set<ContainerId> preemptionContainers = new HashSet<ContainerId>();
-    if (allocation.getContainerPreemptions() != null) {
-      preemptionContainers.addAll(allocation.getContainerPreemptions());
-    }
-    if (allocation.getStrictContainerPreemptions() != null) {
-      preemptionContainers.addAll(allocation.getStrictContainerPreemptions());
-    }
-    if (! preemptionContainers.isEmpty()) {
-      for (ContainerId containerId : preemptionContainers) {
-        if (! preemptionContainerMap.containsKey(containerId)) {
-          Container container = null;
-          for (RMContainer c : report.getLiveContainers()) {
-            if (c.getContainerId().equals(containerId)) {
-              container = c.getContainer();
-              break;
-            }
-          }
-          if (container != null) {
-            preemptionContainerMap.put(containerId, container.getResource());
-          }
-        }
-
-      }
-    }
-
-    // update metrics
-    schedulerMetrics.updateQueueMetrics(pendingResource, allocatedResource,
-        queueName);
+    schedulerCommons.handle(schedulerEvent);
   }
 
   @Override
   public void serviceStop() throws Exception {
-    try {
-      if (metricsON) {
-        schedulerMetrics.tearDown();
-      }
-    } catch (Exception e) {
-      LOG.error("Caught exception while stopping service", e);
-    }
+    schedulerCommons.stopMetrics();
     super.serviceStop();
   }
 
 
+  public String getRealQueueName(String queue) throws YarnException {
+    if (getQueue(queue) == null) {
+      throw new YarnException("Can't find the queue by the given name: " + queue
+          + "! Please check if queue " + queue + " is in the allocation file.");
+    }
+    return getQueue(queue).getQueuePath();
+  }
+
   public SchedulerMetrics getSchedulerMetrics() {
-    return schedulerMetrics;
+    return schedulerCommons.getSchedulerMetrics();
   }
 
   @Override
@@ -390,11 +121,8 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
     return conf;
   }
 
-  public String getRealQueueName(String queue) throws YarnException {
-    if (getQueue(queue) == null) {
-      throw new YarnException("Can't find the queue by the given name: " + queue
-          + "! Please check if queue " + queue + " is in the allocation file.");
-    }
-    return getQueue(queue).getQueuePath();
+  public Tracker getTracker() {
+    return schedulerCommons.getTracker();
   }
-}
+}
+

+ 15 - 276
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java

@@ -17,84 +17,35 @@
  */
 package org.apache.hadoop.yarn.sls.scheduler;
 
-import com.codahale.metrics.Timer;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
-import org.apache.hadoop.yarn.sls.SLSRunner;
-import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 @Private
 @Unstable
 public class SLSFairScheduler extends FairScheduler
     implements SchedulerWrapper, Configurable {
-  private SchedulerMetrics schedulerMetrics;
-  private boolean metricsON;
-  private Tracker tracker;
-
-  private Map<ContainerId, Resource> preemptionContainerMap =
-      new ConcurrentHashMap<>();
-
-  // logger
-  private static final Logger LOG =
-      LoggerFactory.getLogger(SLSFairScheduler.class);
-
-  public SchedulerMetrics getSchedulerMetrics() {
-    return schedulerMetrics;
-  }
-
-  public Tracker getTracker() {
-    return tracker;
-  }
+  private final SLSSchedulerCommons schedulerCommons;
 
   public SLSFairScheduler() {
-    tracker = new Tracker();
+    schedulerCommons = new SLSSchedulerCommons(this);
   }
 
   @Override
   public void setConf(Configuration conf) {
     super.setConfig(conf);
-
-    metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true);
-    if (metricsON) {
-      try {
-        schedulerMetrics = SchedulerMetrics.getInstance(conf,
-            FairScheduler.class);
-        schedulerMetrics.init(this, conf);
-      } catch (Exception e) {
-        LOG.error("Caught exception while initializing schedulerMetrics", e);
-      }
-    }
+    schedulerCommons.initMetrics(FairScheduler.class, conf);
   }
 
   @Override
@@ -103,237 +54,18 @@ public class SLSFairScheduler extends FairScheduler
       List<SchedulingRequest> schedulingRequests, List<ContainerId> containerIds,
       List<String> blacklistAdditions, List<String> blacklistRemovals,
       ContainerUpdates updateRequests) {
-    if (metricsON) {
-      final Timer.Context context = schedulerMetrics.getSchedulerAllocateTimer()
-          .time();
-      Allocation allocation = null;
-      try {
-        allocation = super.allocate(attemptId, resourceRequests,
-            schedulingRequests, containerIds,
-            blacklistAdditions, blacklistRemovals, updateRequests);
-        return allocation;
-      } catch (Exception e) {
-        LOG.error("Caught exception from allocate", e);
-        throw e;
-      } finally {
-        context.stop();
-        schedulerMetrics.increaseSchedulerAllocationCounter();
-        try {
-          updateQueueWithAllocateRequest(allocation, attemptId,
-              resourceRequests, containerIds);
-        } catch (IOException e) {
-          LOG.error("Caught exception while executing finally block", e);
-        }
-      }
-    } else {
-      return super.allocate(attemptId, resourceRequests, schedulingRequests,
-          containerIds,
-          blacklistAdditions, blacklistRemovals, updateRequests);
-    }
+    return schedulerCommons.allocate(attemptId, resourceRequests, schedulingRequests,
+        containerIds, blacklistAdditions, blacklistRemovals, updateRequests);
   }
 
   @Override
   public void handle(SchedulerEvent schedulerEvent) {
-    // metrics off
-    if (!metricsON) {
-      super.handle(schedulerEvent);
-      return;
-    }
-
-    // metrics on
-    if(!schedulerMetrics.isRunning()) {
-      schedulerMetrics.setRunning(true);
-    }
-
-    Timer.Context handlerTimer = null;
-    Timer.Context operationTimer = null;
-
-    NodeUpdateSchedulerEventWrapper eventWrapper;
-    try {
-      if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE
-          && schedulerEvent instanceof NodeUpdateSchedulerEvent) {
-        eventWrapper = new NodeUpdateSchedulerEventWrapper(
-            (NodeUpdateSchedulerEvent)schedulerEvent);
-        schedulerEvent = eventWrapper;
-        updateQueueWithNodeUpdate(eventWrapper);
-      } else if (
-          schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
-          && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
-        // check if having AM Container, update resource usage information
-        AppAttemptRemovedSchedulerEvent appRemoveEvent =
-            (AppAttemptRemovedSchedulerEvent) schedulerEvent;
-        ApplicationAttemptId appAttemptId =
-            appRemoveEvent.getApplicationAttemptID();
-        String queueName = getSchedulerApp(appAttemptId).getQueue().getName();
-        SchedulerAppReport app = getSchedulerAppInfo(appAttemptId);
-        if (!app.getLiveContainers().isEmpty()) {  // have 0 or 1
-          // should have one container which is AM container
-          RMContainer rmc = app.getLiveContainers().iterator().next();
-          schedulerMetrics.updateQueueMetricsByRelease(
-              rmc.getContainer().getResource(), queueName);
-        }
-      }
-
-      handlerTimer = schedulerMetrics.getSchedulerHandleTimer().time();
-      operationTimer = schedulerMetrics.getSchedulerHandleTimer(
-          schedulerEvent.getType()).time();
-
-      super.handle(schedulerEvent);
-    } finally {
-      if (handlerTimer != null) {
-        handlerTimer.stop();
-      }
-      if (operationTimer != null) {
-        operationTimer.stop();
-      }
-      schedulerMetrics.increaseSchedulerHandleCounter(schedulerEvent.getType());
-
-      if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
-          && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
-        SLSRunner.decreaseRemainingApps();
-        if (SLSRunner.getRemainingApps() == 0) {
-          try {
-            getSchedulerMetrics().tearDown();
-            SLSRunner.exitSLSRunner();
-          } catch (Exception e) {
-            LOG.error("Scheduler Metrics failed to tear down.", e);
-          }
-        }
-      }
-    }
-  }
-
-  private void updateQueueWithNodeUpdate(
-      NodeUpdateSchedulerEventWrapper eventWrapper) {
-    RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode();
-    List<UpdatedContainerInfo> containerList = node.getContainerUpdates();
-    for (UpdatedContainerInfo info : containerList) {
-      for (ContainerStatus status : info.getCompletedContainers()) {
-        ContainerId containerId = status.getContainerId();
-        SchedulerAppReport app = super.getSchedulerAppInfo(
-            containerId.getApplicationAttemptId());
-
-        if (app == null) {
-          // this happens for the AM container
-          // The app have already removed when the NM sends the release
-          // information.
-          continue;
-        }
-
-        int releasedMemory = 0, releasedVCores = 0;
-        if (status.getExitStatus() == ContainerExitStatus.SUCCESS) {
-          for (RMContainer rmc : app.getLiveContainers()) {
-            if (rmc.getContainerId() == containerId) {
-              Resource resource = rmc.getContainer().getResource();
-              releasedMemory += resource.getMemorySize();
-              releasedVCores += resource.getVirtualCores();
-              break;
-            }
-          }
-        } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) {
-          if (preemptionContainerMap.containsKey(containerId)) {
-            Resource preResource = preemptionContainerMap.get(containerId);
-            releasedMemory += preResource.getMemorySize();
-            releasedVCores += preResource.getVirtualCores();
-            preemptionContainerMap.remove(containerId);
-          }
-        }
-        // update queue counters
-        String queue = getSchedulerApp(containerId.getApplicationAttemptId()).
-            getQueueName();
-        schedulerMetrics.updateQueueMetricsByRelease(
-            Resource.newInstance(releasedMemory, releasedVCores), queue);
-      }
-    }
-  }
-
-  private void updateQueueWithAllocateRequest(Allocation allocation,
-      ApplicationAttemptId attemptId,
-      List<ResourceRequest> resourceRequests,
-      List<ContainerId> containerIds) throws IOException {
-    // update queue information
-    Resource pendingResource = Resources.createResource(0, 0);
-    Resource allocatedResource = Resources.createResource(0, 0);
-    // container requested
-    for (ResourceRequest request : resourceRequests) {
-      if (request.getResourceName().equals(ResourceRequest.ANY)) {
-        Resources.addTo(pendingResource,
-            Resources.multiply(request.getCapability(),
-                request.getNumContainers()));
-      }
-    }
-    // container allocated
-    for (Container container : allocation.getContainers()) {
-      Resources.addTo(allocatedResource, container.getResource());
-      Resources.subtractFrom(pendingResource, container.getResource());
-    }
-    // container released from AM
-    SchedulerAppReport report = super.getSchedulerAppInfo(attemptId);
-    for (ContainerId containerId : containerIds) {
-      Container container = null;
-      for (RMContainer c : report.getLiveContainers()) {
-        if (c.getContainerId().equals(containerId)) {
-          container = c.getContainer();
-          break;
-        }
-      }
-      if (container != null) {
-        // released allocated containers
-        Resources.subtractFrom(allocatedResource, container.getResource());
-      } else {
-        for (RMContainer c : report.getReservedContainers()) {
-          if (c.getContainerId().equals(containerId)) {
-            container = c.getContainer();
-            break;
-          }
-        }
-        if (container != null) {
-          // released reserved containers
-          Resources.subtractFrom(pendingResource, container.getResource());
-        }
-      }
-    }
-    // containers released/preemption from scheduler
-    Set<ContainerId> preemptionContainers = new HashSet<ContainerId>();
-    if (allocation.getContainerPreemptions() != null) {
-      preemptionContainers.addAll(allocation.getContainerPreemptions());
-    }
-    if (allocation.getStrictContainerPreemptions() != null) {
-      preemptionContainers.addAll(allocation.getStrictContainerPreemptions());
-    }
-    if (!preemptionContainers.isEmpty()) {
-      for (ContainerId containerId : preemptionContainers) {
-        if (!preemptionContainerMap.containsKey(containerId)) {
-          Container container = null;
-          for (RMContainer c : report.getLiveContainers()) {
-            if (c.getContainerId().equals(containerId)) {
-              container = c.getContainer();
-              break;
-            }
-          }
-          if (container != null) {
-            preemptionContainerMap.put(containerId, container.getResource());
-          }
-        }
-
-      }
-    }
-
-    // update metrics
-    String queueName = getSchedulerApp(attemptId).getQueueName();
-    schedulerMetrics.updateQueueMetrics(pendingResource, allocatedResource,
-        queueName);
+    schedulerCommons.handle(schedulerEvent);
   }
 
   @Override
   public void serviceStop() throws Exception {
-    try {
-      if (metricsON) {
-        schedulerMetrics.tearDown();
-      }
-    } catch (Exception e) {
-      LOG.error("Caught exception while stopping service", e);
-    }
+    schedulerCommons.stopMetrics();
     super.serviceStop();
   }
 
@@ -344,5 +76,12 @@ public class SLSFairScheduler extends FairScheduler
     }
     return getQueueManager().getQueue(queue).getQueueName();
   }
-}
 
+  public SchedulerMetrics getSchedulerMetrics() {
+    return schedulerCommons.getSchedulerMetrics();
+  }
+
+  public Tracker getTracker() {
+    return schedulerCommons.getTracker();
+  }
+}

+ 343 - 0
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSSchedulerCommons.java

@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.scheduler;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import org.apache.hadoop.yarn.sls.SLSRunner;
+import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SLSSchedulerCommons {
+  private static final Logger LOG = LoggerFactory.getLogger(SLSSchedulerCommons.class);
+
+  private AbstractYarnScheduler scheduler;
+  private boolean metricsON;
+  private SchedulerMetrics schedulerMetrics;
+  private Map<ContainerId, Resource> preemptionContainerMap =
+      new ConcurrentHashMap<>();
+
+  private Map<ApplicationAttemptId, String> appQueueMap =
+      new ConcurrentHashMap<>();
+  private Tracker tracker;
+  
+  public SLSSchedulerCommons(AbstractYarnScheduler scheduler) {
+    this.scheduler = scheduler;
+    this.tracker = new Tracker();
+  }
+
+  public void initMetrics(Class<? extends AbstractYarnScheduler> schedulerClass, Configuration conf) {
+    metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true);
+    if (metricsON) {
+      try {
+        schedulerMetrics = SchedulerMetrics.getInstance(conf, schedulerClass);
+        schedulerMetrics.init(scheduler, conf);
+      } catch (Exception e) {
+        LOG.error("Caught exception while initializing schedulerMetrics", e);
+      }
+    }
+  }
+
+  void stopMetrics() {
+    try {
+      if (metricsON) {
+        schedulerMetrics.tearDown();
+      }
+    } catch (Exception e) {
+      LOG.error("Caught exception while stopping service", e);
+    }
+  }
+  
+  public Allocation allocate(ApplicationAttemptId attemptId,
+      List<ResourceRequest> resourceRequests,
+      List<SchedulingRequest> schedulingRequests,
+      List<ContainerId> containerIds,
+      List<String> blacklistAdditions,
+      List<String> blacklistRemovals,
+      ContainerUpdates updateRequests) {
+    if (metricsON) {
+      final Timer.Context context = schedulerMetrics.getSchedulerAllocateTimer()
+          .time();
+      Allocation allocation = null;
+      try {
+        allocation = scheduler.allocate(attemptId, resourceRequests,
+            schedulingRequests, containerIds,
+            blacklistAdditions, blacklistRemovals, updateRequests);
+        return allocation;
+      } catch (Exception e) {
+        LOG.error("Caught exception from allocate", e);
+        throw e;
+      } finally {
+        context.stop();
+        schedulerMetrics.increaseSchedulerAllocationCounter();
+        try {
+          updateQueueWithAllocateRequest(allocation, attemptId,
+              resourceRequests, containerIds);
+        } catch (IOException e) {
+          LOG.error("Caught exception while executing finally block", e);
+        }
+      }
+    } else {
+      return scheduler.allocate(attemptId, resourceRequests, schedulingRequests,
+          containerIds,
+          blacklistAdditions, blacklistRemovals, updateRequests);
+    }
+  }
+
+  private void updateQueueWithAllocateRequest(Allocation allocation,
+      ApplicationAttemptId attemptId,
+      List<ResourceRequest> resourceRequests,
+      List<ContainerId> containerIds) throws IOException {
+    // update queue information
+    Resource pendingResource = Resources.createResource(0, 0);
+    Resource allocatedResource = Resources.createResource(0, 0);
+    String queueName = appQueueMap.get(attemptId);
+    // container requested
+    for (ResourceRequest request : resourceRequests) {
+      if (request.getResourceName().equals(ResourceRequest.ANY)) {
+        Resources.addTo(pendingResource,
+            Resources.multiply(request.getCapability(),
+                request.getNumContainers()));
+      }
+    }
+    // container allocated
+    for (Container container : allocation.getContainers()) {
+      Resources.addTo(allocatedResource, container.getResource());
+      Resources.subtractFrom(pendingResource, container.getResource());
+    }
+    // container released from AM
+    SchedulerAppReport report = scheduler.getSchedulerAppInfo(attemptId);
+    for (ContainerId containerId : containerIds) {
+      Container container = null;
+      for (RMContainer c : report.getLiveContainers()) {
+        if (c.getContainerId().equals(containerId)) {
+          container = c.getContainer();
+          break;
+        }
+      }
+      if (container != null) {
+        // released allocated containers
+        Resources.subtractFrom(allocatedResource, container.getResource());
+      } else {
+        for (RMContainer c : report.getReservedContainers()) {
+          if (c.getContainerId().equals(containerId)) {
+            container = c.getContainer();
+            break;
+          }
+        }
+        if (container != null) {
+          // released reserved containers
+          Resources.subtractFrom(pendingResource, container.getResource());
+        }
+      }
+    }
+    // containers released/preemption from scheduler
+    Set<ContainerId> preemptionContainers = new HashSet<ContainerId>();
+    if (allocation.getContainerPreemptions() != null) {
+      preemptionContainers.addAll(allocation.getContainerPreemptions());
+    }
+    if (allocation.getStrictContainerPreemptions() != null) {
+      preemptionContainers.addAll(allocation.getStrictContainerPreemptions());
+    }
+    if (!preemptionContainers.isEmpty()) {
+      for (ContainerId containerId : preemptionContainers) {
+        if (!preemptionContainerMap.containsKey(containerId)) {
+          Container container = null;
+          for (RMContainer c : report.getLiveContainers()) {
+            if (c.getContainerId().equals(containerId)) {
+              container = c.getContainer();
+              break;
+            }
+          }
+          if (container != null) {
+            preemptionContainerMap.put(containerId, container.getResource());
+          }
+        }
+
+      }
+    }
+
+    // update metrics
+    schedulerMetrics.updateQueueMetrics(pendingResource, allocatedResource,
+        queueName);
+  }
+
+  public void handle(SchedulerEvent schedulerEvent) {
+    if (!metricsON) {
+      scheduler.handle(schedulerEvent);
+      return;
+    }
+
+    if (!schedulerMetrics.isRunning()) {
+      schedulerMetrics.setRunning(true);
+    }
+
+    Timer.Context handlerTimer = null;
+    Timer.Context operationTimer = null;
+
+    NodeUpdateSchedulerEventWrapper eventWrapper;
+    try {
+      if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE
+          && schedulerEvent instanceof NodeUpdateSchedulerEvent) {
+        eventWrapper = new NodeUpdateSchedulerEventWrapper(
+            (NodeUpdateSchedulerEvent) schedulerEvent);
+        schedulerEvent = eventWrapper;
+        updateQueueWithNodeUpdate(eventWrapper);
+      } else if (
+          schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
+              && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
+        // check if having AM Container, update resource usage information
+        AppAttemptRemovedSchedulerEvent appRemoveEvent =
+            (AppAttemptRemovedSchedulerEvent) schedulerEvent;
+        ApplicationAttemptId appAttemptId =
+            appRemoveEvent.getApplicationAttemptID();
+        String queue = appQueueMap.get(appAttemptId);
+        SchedulerAppReport app = scheduler.getSchedulerAppInfo(appAttemptId);
+        if (!app.getLiveContainers().isEmpty()) {  // have 0 or 1
+          // should have one container which is AM container
+          RMContainer rmc = app.getLiveContainers().iterator().next();
+          schedulerMetrics.updateQueueMetricsByRelease(
+              rmc.getContainer().getResource(), queue);
+        }
+      }
+
+      handlerTimer = schedulerMetrics.getSchedulerHandleTimer().time();
+      operationTimer = schedulerMetrics.getSchedulerHandleTimer(
+          schedulerEvent.getType()).time();
+
+      scheduler.handle(schedulerEvent);
+    } finally {
+      if (handlerTimer != null) {
+        handlerTimer.stop();
+      }
+      if (operationTimer != null) {
+        operationTimer.stop();
+      }
+      schedulerMetrics.increaseSchedulerHandleCounter(schedulerEvent.getType());
+
+      if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
+          && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
+        SLSRunner.decreaseRemainingApps();
+        AppAttemptRemovedSchedulerEvent appRemoveEvent =
+            (AppAttemptRemovedSchedulerEvent) schedulerEvent;
+        appQueueMap.remove(appRemoveEvent.getApplicationAttemptID());
+        if (SLSRunner.getRemainingApps() == 0) {
+          try {
+            schedulerMetrics.tearDown();
+            SLSRunner.exitSLSRunner();
+          } catch (Exception e) {
+            LOG.error("Scheduler Metrics failed to tear down.", e);
+          }
+        }
+      } else if (schedulerEvent.getType() ==
+          SchedulerEventType.APP_ATTEMPT_ADDED
+          && schedulerEvent instanceof AppAttemptAddedSchedulerEvent) {
+        AppAttemptAddedSchedulerEvent appAddEvent =
+            (AppAttemptAddedSchedulerEvent) schedulerEvent;
+        SchedulerApplication app =
+            (SchedulerApplication) scheduler.getSchedulerApplications()
+                .get(appAddEvent.getApplicationAttemptId().getApplicationId());
+        appQueueMap.put(appAddEvent.getApplicationAttemptId(), app.getQueue()
+            .getQueueName());
+      }
+    }
+  }
+
+  private void updateQueueWithNodeUpdate(
+      NodeUpdateSchedulerEventWrapper eventWrapper) {
+    RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode();
+    List<UpdatedContainerInfo> containerList = node.getContainerUpdates();
+    for (UpdatedContainerInfo info : containerList) {
+      for (ContainerStatus status : info.getCompletedContainers()) {
+        ContainerId containerId = status.getContainerId();
+        SchedulerAppReport app = scheduler.getSchedulerAppInfo(
+            containerId.getApplicationAttemptId());
+
+        if (app == null) {
+          // this happens for the AM container
+          // The app have already removed when the NM sends the release
+          // information.
+          continue;
+        }
+
+        int releasedMemory = 0, releasedVCores = 0;
+        if (status.getExitStatus() == ContainerExitStatus.SUCCESS) {
+          for (RMContainer rmc : app.getLiveContainers()) {
+            if (rmc.getContainerId() == containerId) {
+              Resource resource = rmc.getContainer().getResource();
+              releasedMemory += resource.getMemorySize();
+              releasedVCores += resource.getVirtualCores();
+              break;
+            }
+          }
+        } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) {
+          if (preemptionContainerMap.containsKey(containerId)) {
+            Resource preResource = preemptionContainerMap.get(containerId);
+            releasedMemory += preResource.getMemorySize();
+            releasedVCores += preResource.getVirtualCores();
+            preemptionContainerMap.remove(containerId);
+          }
+        }
+        // update queue counters
+        String queue = appQueueMap.get(containerId.getApplicationAttemptId());
+        schedulerMetrics.updateQueueMetricsByRelease(
+            Resource.newInstance(releasedMemory, releasedVCores), queue);
+      }
+    }
+  }
+  
+  public SchedulerMetrics getSchedulerMetrics() {
+    return schedulerMetrics;
+  }
+
+  public boolean isMetricsON() {
+    return metricsON;
+  }
+
+  public Tracker getTracker() {
+    return tracker;
+  }
+}