|
@@ -22,8 +22,10 @@ import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.Comparator;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
import java.util.TreeMap;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -69,6 +71,9 @@ class CapacitySchedulerQueue {
|
|
|
//in cluster at any given time.
|
|
|
private int maxCapacity = -1;
|
|
|
|
|
|
+ // Active users
|
|
|
+ Set<String> users = new HashSet<String>();
|
|
|
+
|
|
|
/**
|
|
|
* for each user, we need to keep track of number of slots occupied by
|
|
|
* running tasks
|
|
@@ -82,6 +87,7 @@ class CapacitySchedulerQueue {
|
|
|
void reset() {
|
|
|
numRunningTasks = 0;
|
|
|
numSlotsOccupied = 0;
|
|
|
+ users.clear();
|
|
|
numSlotsOccupiedByUser.clear();
|
|
|
}
|
|
|
|
|
@@ -119,6 +125,13 @@ class CapacitySchedulerQueue {
|
|
|
return numSlotsOccupied;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * @return number of active users
|
|
|
+ */
|
|
|
+ int getNumActiveUsers() {
|
|
|
+ return users.size();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* return information about the tasks
|
|
|
*/
|
|
@@ -184,12 +197,15 @@ class CapacitySchedulerQueue {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- void updateSlotsUsage(String user, int numRunningTasks, int numSlotsOccupied) {
|
|
|
+ void updateSlotsUsage(String user, int pendingTasks, int numRunningTasks, int numSlotsOccupied) {
|
|
|
this.numRunningTasks += numRunningTasks;
|
|
|
this.numSlotsOccupied += numSlotsOccupied;
|
|
|
Integer i = this.numSlotsOccupiedByUser.get(user);
|
|
|
int slots = numSlotsOccupied + ((i == null) ? 0 : i.intValue());
|
|
|
this.numSlotsOccupiedByUser.put(user, slots);
|
|
|
+ if (pendingTasks > 0) {
|
|
|
+ users.add(user);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -489,6 +505,16 @@ class CapacitySchedulerQueue {
|
|
|
throw new IllegalArgumentException("Illegal taskType=" + taskType);
|
|
|
}
|
|
|
|
|
|
+ int getNumActiveUsersByTaskType(TaskType taskType) {
|
|
|
+ if (taskType == TaskType.MAP) {
|
|
|
+ return mapSlots.getNumActiveUsers();
|
|
|
+ } else if (taskType == TaskType.REDUCE) {
|
|
|
+ return reduceSlots.getNumActiveUsers();
|
|
|
+ }
|
|
|
+
|
|
|
+ throw new IllegalArgumentException("Illegal taskType=" + taskType);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* A new job is added to the
|
|
|
* @param job
|
|
@@ -553,12 +579,14 @@ class CapacitySchedulerQueue {
|
|
|
* @param numRunningTasks
|
|
|
* @param numSlotsOccupied
|
|
|
*/
|
|
|
- void update(TaskType type, String user,
|
|
|
+ void update(TaskType type, JobInProgress job, String user,
|
|
|
int numRunningTasks, int numSlotsOccupied) {
|
|
|
if (type == TaskType.MAP) {
|
|
|
- mapSlots.updateSlotsUsage(user, numRunningTasks, numSlotsOccupied);
|
|
|
+ mapSlots.updateSlotsUsage(user, job.pendingMaps(),
|
|
|
+ numRunningTasks, numSlotsOccupied);
|
|
|
} else if (type == TaskType.REDUCE) {
|
|
|
- reduceSlots.updateSlotsUsage(user, numRunningTasks, numSlotsOccupied);
|
|
|
+ reduceSlots.updateSlotsUsage(user, job.pendingReduces(),
|
|
|
+ numRunningTasks, numSlotsOccupied);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -611,9 +639,9 @@ class CapacitySchedulerQueue {
|
|
|
numRunningReduceSlots,
|
|
|
numReservedReduceSlotsForThisJob));
|
|
|
|
|
|
- update(TaskType.MAP, j.getProfile().getUser(),
|
|
|
+ update(TaskType.MAP, j, j.getProfile().getUser(),
|
|
|
numMapsRunningForThisJob, numMapSlotsForThisJob);
|
|
|
- update(TaskType.REDUCE, j.getProfile().getUser(),
|
|
|
+ update(TaskType.REDUCE, j, j.getProfile().getUser(),
|
|
|
numReducesRunningForThisJob, numReduceSlotsForThisJob);
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -1127,9 +1155,13 @@ class CapacitySchedulerQueue {
|
|
|
// queue's configured capacity * user-limit-factor.
|
|
|
// Also, the queue's configured capacity should be higher than
|
|
|
// queue-hard-limit * ulMin
|
|
|
+
|
|
|
+ // All users in this queue might not need any slots of type 'taskType'
|
|
|
+ int activeUsers = Math.max(1, getNumActiveUsersByTaskType(taskType));
|
|
|
+
|
|
|
int limit =
|
|
|
Math.min(
|
|
|
- Math.max(divideAndCeil(currentCapacity, numJobsByUser.size()),
|
|
|
+ Math.max(divideAndCeil(currentCapacity, activeUsers),
|
|
|
divideAndCeil(ulMin*currentCapacity, 100)),
|
|
|
(int)(queueCapacity * ulMinFactor)
|
|
|
);
|
|
@@ -1138,9 +1170,11 @@ class CapacitySchedulerQueue {
|
|
|
limit) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("User " + user + " is over limit for queue=" + queueName +
|
|
|
+ " queueCapacity=" + queueCapacity +
|
|
|
" num slots occupied=" + getNumSlotsOccupiedByUser(user, taskType) +
|
|
|
" limit=" + limit +" numSlotsRequested=" + numSlotsRequested +
|
|
|
- " currentCapacity=" + currentCapacity);
|
|
|
+ " currentCapacity=" + currentCapacity +
|
|
|
+ " numActiveUsers=" + getNumActiveUsersByTaskType(taskType));
|
|
|
}
|
|
|
return false;
|
|
|
}
|
|
@@ -1303,4 +1337,4 @@ class CapacitySchedulerQueue {
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
-}
|
|
|
+}
|