|
@@ -18,10 +18,12 @@
|
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
-import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
|
|
|
/**
|
|
@@ -31,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
|
|
|
class JobQueueTaskScheduler extends TaskScheduler {
|
|
|
|
|
|
private static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3;
|
|
|
+ public static final Log LOG = LogFactory.getLog(JobQueueTaskScheduler.class);
|
|
|
|
|
|
protected JobQueueJobInProgressListener jobQueueJobInProgressListener;
|
|
|
protected EagerTaskInitializationListener eagerTaskInitializationListener;
|
|
@@ -78,7 +81,9 @@ class JobQueueTaskScheduler extends TaskScheduler {
|
|
|
throws IOException {
|
|
|
|
|
|
ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
|
|
|
- int numTaskTrackers = clusterStatus.getTaskTrackers();
|
|
|
+ final int numTaskTrackers = clusterStatus.getTaskTrackers();
|
|
|
+ final int clusterMapCapacity = clusterStatus.getMaxMapTasks();
|
|
|
+ final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();
|
|
|
|
|
|
Collection<JobInProgress> jobQueue =
|
|
|
jobQueueJobInProgressListener.getJobQueue();
|
|
@@ -86,97 +91,131 @@ class JobQueueTaskScheduler extends TaskScheduler {
|
|
|
//
|
|
|
// Get map + reduce counts for the current tracker.
|
|
|
//
|
|
|
- int maxCurrentMapTasks = taskTracker.getMaxMapTasks();
|
|
|
- int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();
|
|
|
- int numMaps = taskTracker.countMapTasks();
|
|
|
- int numReduces = taskTracker.countReduceTasks();
|
|
|
+ final int trackerMapCapacity = taskTracker.getMaxMapTasks();
|
|
|
+ final int trackerReduceCapacity = taskTracker.getMaxReduceTasks();
|
|
|
+ final int trackerRunningMaps = taskTracker.countMapTasks();
|
|
|
+ final int trackerRunningReduces = taskTracker.countReduceTasks();
|
|
|
+
|
|
|
+ // Assigned tasks
|
|
|
+ List<Task> assignedTasks = new ArrayList<Task>();
|
|
|
|
|
|
//
|
|
|
- // Compute average map and reduce task numbers across pool
|
|
|
+ // Compute (running + pending) map and reduce task numbers across pool
|
|
|
//
|
|
|
int remainingReduceLoad = 0;
|
|
|
int remainingMapLoad = 0;
|
|
|
synchronized (jobQueue) {
|
|
|
for (JobInProgress job : jobQueue) {
|
|
|
if (job.getStatus().getRunState() == JobStatus.RUNNING) {
|
|
|
- int totalMapTasks = job.desiredMaps();
|
|
|
- int totalReduceTasks = job.desiredReduces();
|
|
|
- remainingMapLoad += (totalMapTasks - job.finishedMaps());
|
|
|
- remainingReduceLoad += (totalReduceTasks - job.finishedReduces());
|
|
|
+ remainingMapLoad += (job.desiredMaps() - job.finishedMaps());
|
|
|
+ if (job.scheduleReduces()) {
|
|
|
+ remainingReduceLoad +=
|
|
|
+ (job.desiredReduces() - job.finishedReduces());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // find out the maximum number of maps or reduces that we are willing
|
|
|
- // to run on any node.
|
|
|
- int maxMapLoad = 0;
|
|
|
- int maxReduceLoad = 0;
|
|
|
- if (numTaskTrackers > 0) {
|
|
|
- maxMapLoad = Math.min(maxCurrentMapTasks,
|
|
|
- (int) Math.ceil((double) remainingMapLoad /
|
|
|
- numTaskTrackers));
|
|
|
- maxReduceLoad = Math.min(maxCurrentReduceTasks,
|
|
|
- (int) Math.ceil((double) remainingReduceLoad
|
|
|
- / numTaskTrackers));
|
|
|
+ // Compute the 'load factor' for maps and reduces
|
|
|
+ double mapLoadFactor = 0.0;
|
|
|
+ if (clusterMapCapacity > 0) {
|
|
|
+ mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;
|
|
|
+ }
|
|
|
+ double reduceLoadFactor = 0.0;
|
|
|
+ if (clusterReduceCapacity > 0) {
|
|
|
+ reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;
|
|
|
}
|
|
|
|
|
|
- int totalMaps = clusterStatus.getMapTasks();
|
|
|
- int totalMapTaskCapacity = clusterStatus.getMaxMapTasks();
|
|
|
- int totalReduces = clusterStatus.getReduceTasks();
|
|
|
- int totalReduceTaskCapacity = clusterStatus.getMaxReduceTasks();
|
|
|
-
|
|
|
//
|
|
|
- // In the below steps, we allocate first a map task (if appropriate),
|
|
|
- // and then a reduce task if appropriate. We go through all jobs
|
|
|
+ // In the below steps, we allocate first map tasks (if appropriate),
|
|
|
+ // and then reduce tasks if appropriate. We go through all jobs
|
|
|
// in order of job arrival; jobs only get serviced if their
|
|
|
// predecessors are serviced, too.
|
|
|
//
|
|
|
|
|
|
//
|
|
|
- // We hand a task to the current taskTracker if the given machine
|
|
|
+ // We assign tasks to the current taskTracker if the given machine
|
|
|
// has a workload that's less than the maximum load of that kind of
|
|
|
// task.
|
|
|
+ // However, if the cluster is close to getting loaded i.e. we don't
|
|
|
+ // have enough _padding_ for speculative executions etc., we only
|
|
|
+ // schedule the "highest priority" task i.e. the task from the job
|
|
|
+ // with the highest priority.
|
|
|
//
|
|
|
-
|
|
|
- if (numMaps < maxMapLoad) {
|
|
|
-
|
|
|
- int totalNeededMaps = 0;
|
|
|
+
|
|
|
+ final int trackerCurrentMapCapacity =
|
|
|
+ Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity),
|
|
|
+ trackerMapCapacity);
|
|
|
+ int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;
|
|
|
+ boolean exceededMapPadding = false;
|
|
|
+ if (availableMapSlots > 0) {
|
|
|
+ exceededMapPadding =
|
|
|
+ exceededPadding(true, clusterStatus, trackerMapCapacity);
|
|
|
+ }
|
|
|
+
|
|
|
+ int numLocalMaps = 0;
|
|
|
+ int numNonLocalMaps = 0;
|
|
|
+ scheduleMaps:
|
|
|
+ for (int i=0; i < availableMapSlots; ++i) {
|
|
|
synchronized (jobQueue) {
|
|
|
for (JobInProgress job : jobQueue) {
|
|
|
if (job.getStatus().getRunState() != JobStatus.RUNNING) {
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,
|
|
|
- taskTrackerManager.getNumberOfUniqueHosts());
|
|
|
+ Task t = null;
|
|
|
+
|
|
|
+ // Try to schedule a node-local or rack-local Map task
|
|
|
+ t =
|
|
|
+ job.obtainNewLocalMapTask(taskTracker, numTaskTrackers,
|
|
|
+ taskTrackerManager.getNumberOfUniqueHosts());
|
|
|
if (t != null) {
|
|
|
- return Collections.singletonList(t);
|
|
|
- }
|
|
|
-
|
|
|
- //
|
|
|
- // Beyond the highest-priority task, reserve a little
|
|
|
- // room for failures and speculative executions; don't
|
|
|
- // schedule tasks to the hilt.
|
|
|
- //
|
|
|
- totalNeededMaps += job.desiredMaps();
|
|
|
- int padding = 0;
|
|
|
- if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
|
|
|
- padding = Math.min(maxCurrentMapTasks,
|
|
|
- (int)(totalNeededMaps * padFraction));
|
|
|
- }
|
|
|
- if (totalMaps + padding >= totalMapTaskCapacity) {
|
|
|
+ assignedTasks.add(t);
|
|
|
+ ++numLocalMaps;
|
|
|
+
|
|
|
+ // Don't assign map tasks to the hilt!
|
|
|
+ // Leave some free slots in the cluster for future task-failures,
|
|
|
+ // speculative tasks etc. beyond the highest priority job
|
|
|
+ if (exceededMapPadding) {
|
|
|
+ break scheduleMaps;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Try all jobs again for the next Map task
|
|
|
break;
|
|
|
}
|
|
|
+
|
|
|
+ // Try to schedule a node-local or rack-local Map task
|
|
|
+ t =
|
|
|
+ job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers,
|
|
|
+ taskTrackerManager.getNumberOfUniqueHosts());
|
|
|
+
|
|
|
+ if (t != null) {
|
|
|
+ assignedTasks.add(t);
|
|
|
+ ++numNonLocalMaps;
|
|
|
+
|
|
|
+ // We assign at most 1 off-switch or speculative task
|
|
|
+ // This is to prevent TaskTrackers from stealing local-tasks
|
|
|
+ // from other TaskTrackers.
|
|
|
+ break scheduleMaps;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ int assignedMaps = assignedTasks.size();
|
|
|
|
|
|
//
|
|
|
// Same thing, but for reduce tasks
|
|
|
+ // However we _never_ assign more than 1 reduce task per heartbeat
|
|
|
//
|
|
|
- if (numReduces < maxReduceLoad) {
|
|
|
-
|
|
|
- int totalNeededReduces = 0;
|
|
|
+ final int trackerCurrentReduceCapacity =
|
|
|
+ Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity),
|
|
|
+ trackerReduceCapacity);
|
|
|
+ final int availableReduceSlots =
|
|
|
+ Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1);
|
|
|
+ boolean exceededReducePadding = false;
|
|
|
+ if (availableReduceSlots > 0) {
|
|
|
+ exceededReducePadding = exceededPadding(false, clusterStatus,
|
|
|
+ trackerReduceCapacity);
|
|
|
synchronized (jobQueue) {
|
|
|
for (JobInProgress job : jobQueue) {
|
|
|
if (job.getStatus().getRunState() != JobStatus.RUNNING ||
|
|
@@ -184,31 +223,84 @@ class JobQueueTaskScheduler extends TaskScheduler {
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers,
|
|
|
- taskTrackerManager.getNumberOfUniqueHosts());
|
|
|
+ Task t =
|
|
|
+ job.obtainNewReduceTask(taskTracker, numTaskTrackers,
|
|
|
+ taskTrackerManager.getNumberOfUniqueHosts()
|
|
|
+ );
|
|
|
if (t != null) {
|
|
|
- return Collections.singletonList(t);
|
|
|
- }
|
|
|
-
|
|
|
- //
|
|
|
- // Beyond the highest-priority task, reserve a little
|
|
|
- // room for failures and speculative executions; don't
|
|
|
- // schedule tasks to the hilt.
|
|
|
- //
|
|
|
- totalNeededReduces += job.desiredReduces();
|
|
|
- int padding = 0;
|
|
|
- if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
|
|
|
- padding =
|
|
|
- Math.min(maxCurrentReduceTasks,
|
|
|
- (int) (totalNeededReduces * padFraction));
|
|
|
+ assignedTasks.add(t);
|
|
|
+ break;
|
|
|
}
|
|
|
- if (totalReduces + padding >= totalReduceTaskCapacity) {
|
|
|
+
|
|
|
+ // Don't assign reduce tasks to the hilt!
|
|
|
+ // Leave some free slots in the cluster for future task-failures,
|
|
|
+ // speculative tasks etc. beyond the highest priority job
|
|
|
+ if (exceededReducePadding) {
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- return null;
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Task assignments for " + taskTracker.getTrackerName() + " --> " +
|
|
|
+ "[" + mapLoadFactor + ", " + trackerMapCapacity + ", " +
|
|
|
+ trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" +
|
|
|
+ (trackerCurrentMapCapacity - trackerRunningMaps) + ", " +
|
|
|
+ assignedMaps + " (" + numLocalMaps + ", " + numNonLocalMaps +
|
|
|
+ ")] [" + reduceLoadFactor + ", " + trackerReduceCapacity + ", " +
|
|
|
+ trackerCurrentReduceCapacity + "," + trackerRunningReduces +
|
|
|
+ "] -> [" + (trackerCurrentReduceCapacity - trackerRunningReduces) +
|
|
|
+ ", " + (assignedTasks.size()-assignedMaps) + "]");
|
|
|
+ }
|
|
|
+
|
|
|
+ return assignedTasks;
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean exceededPadding(boolean isMapTask,
|
|
|
+ ClusterStatus clusterStatus,
|
|
|
+ int maxTaskTrackerSlots) {
|
|
|
+ int numTaskTrackers = clusterStatus.getTaskTrackers();
|
|
|
+ int totalTasks =
|
|
|
+ (isMapTask) ? clusterStatus.getMapTasks() :
|
|
|
+ clusterStatus.getReduceTasks();
|
|
|
+ int totalTaskCapacity =
|
|
|
+ isMapTask ? clusterStatus.getMaxMapTasks() :
|
|
|
+ clusterStatus.getMaxReduceTasks();
|
|
|
+
|
|
|
+ Collection<JobInProgress> jobQueue =
|
|
|
+ jobQueueJobInProgressListener.getJobQueue();
|
|
|
+
|
|
|
+ boolean exceededPadding = false;
|
|
|
+ synchronized (jobQueue) {
|
|
|
+ int totalNeededTasks = 0;
|
|
|
+ for (JobInProgress job : jobQueue) {
|
|
|
+ if (job.getStatus().getRunState() != JobStatus.RUNNING ||
|
|
|
+ job.numReduceTasks == 0) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ //
|
|
|
+ // Beyond the highest-priority task, reserve a little
|
|
|
+ // room for failures and speculative executions; don't
|
|
|
+ // schedule tasks to the hilt.
|
|
|
+ //
|
|
|
+ totalNeededTasks +=
|
|
|
+ isMapTask ? job.desiredMaps() : job.desiredReduces();
|
|
|
+ int padding = 0;
|
|
|
+ if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
|
|
|
+ padding =
|
|
|
+ Math.min(maxTaskTrackerSlots,
|
|
|
+ (int) (totalNeededTasks * padFraction));
|
|
|
+ }
|
|
|
+ if (totalTasks + padding >= totalTaskCapacity) {
|
|
|
+ exceededPadding = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return exceededPadding;
|
|
|
}
|
|
|
|
|
|
@Override
|