|
@@ -26,6 +26,7 @@ import java.util.List;
|
|
|
import java.util.concurrent.locks.Lock;
|
|
|
import java.util.concurrent.locks.ReadWriteLock;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
+import java.util.TreeSet;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -314,35 +315,33 @@ public class FSLeafQueue extends FSQueue {
|
|
|
return assigned;
|
|
|
}
|
|
|
|
|
|
- Comparator<Schedulable> comparator = policy.getComparator();
|
|
|
- writeLock.lock();
|
|
|
- try {
|
|
|
- Collections.sort(runnableApps, comparator);
|
|
|
- } finally {
|
|
|
- writeLock.unlock();
|
|
|
- }
|
|
|
- // Release write lock here for better performance and avoiding deadlocks.
|
|
|
- // runnableApps can be in unsorted state because of this section,
|
|
|
- // but we can accept it in practice since the probability is low.
|
|
|
+ // Apps that have resource demands.
|
|
|
+ TreeSet<FSAppAttempt> pendingForResourceApps =
|
|
|
+ new TreeSet<FSAppAttempt>(policy.getComparator());
|
|
|
readLock.lock();
|
|
|
try {
|
|
|
- for (FSAppAttempt sched : runnableApps) {
|
|
|
- if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) {
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- assigned = sched.assignContainer(node);
|
|
|
- if (!assigned.equals(Resources.none())) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Assigned container in queue:" + getName() + " " +
|
|
|
- "container:" + assigned);
|
|
|
- }
|
|
|
- break;
|
|
|
+ for (FSAppAttempt app : runnableApps) {
|
|
|
+ Resource pending = app.getAppAttemptResourceUsage().getPending();
|
|
|
+ if (!pending.equals(Resources.none())) {
|
|
|
+ pendingForResourceApps.add(app);
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
|
readLock.unlock();
|
|
|
}
|
|
|
+ for (FSAppAttempt sched : pendingForResourceApps) {
|
|
|
+ if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ assigned = sched.assignContainer(node);
|
|
|
+ if (!assigned.equals(Resources.none())) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Assigned container in queue:" + getName() + " " +
|
|
|
+ "container:" + assigned);
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
return assigned;
|
|
|
}
|
|
|
|