|
@@ -210,8 +210,9 @@ public class FifoScheduler implements ResourceScheduler {
|
|
|
private static final Allocation EMPTY_ALLOCATION =
|
|
|
new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0));
|
|
|
@Override
|
|
|
- public Allocation allocate(ApplicationAttemptId applicationAttemptId,
|
|
|
- List<ResourceRequest> ask, List<Container> release) {
|
|
|
+ public synchronized Allocation allocate(
|
|
|
+ ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
|
|
|
+ List<Container> release) {
|
|
|
SchedulerApp application = getApplication(applicationAttemptId);
|
|
|
if (application == null) {
|
|
|
LOG.error("Calling allocate on removed " +
|
|
@@ -227,31 +228,28 @@ public class FifoScheduler implements ResourceScheduler {
|
|
|
containerCompleted(releasedContainer, RMContainerEventType.RELEASED);
|
|
|
}
|
|
|
|
|
|
- synchronized (application) {
|
|
|
+ if (!ask.isEmpty()) {
|
|
|
+ LOG.debug("allocate: pre-update" +
|
|
|
+ " applicationId=" + applicationAttemptId +
|
|
|
+ " application=" + application);
|
|
|
+ application.showRequests();
|
|
|
|
|
|
- if (!ask.isEmpty()) {
|
|
|
- LOG.debug("allocate: pre-update" +
|
|
|
- " applicationId=" + applicationAttemptId +
|
|
|
- " application=" + application);
|
|
|
- application.showRequests();
|
|
|
-
|
|
|
- // Update application requests
|
|
|
- application.updateResourceRequests(ask);
|
|
|
-
|
|
|
- LOG.debug("allocate: post-update" +
|
|
|
- " applicationId=" + applicationAttemptId +
|
|
|
- " application=" + application);
|
|
|
- application.showRequests();
|
|
|
-
|
|
|
- LOG.debug("allocate:" +
|
|
|
- " applicationId=" + applicationAttemptId +
|
|
|
- " #ask=" + ask.size());
|
|
|
- }
|
|
|
+ // Update application requests
|
|
|
+ application.updateResourceRequests(ask);
|
|
|
|
|
|
- return new Allocation(
|
|
|
- application.pullNewlyAllocatedContainers(),
|
|
|
- application.getHeadroom());
|
|
|
+ LOG.debug("allocate: post-update" +
|
|
|
+ " applicationId=" + applicationAttemptId +
|
|
|
+ " application=" + application);
|
|
|
+ application.showRequests();
|
|
|
+
|
|
|
+ LOG.debug("allocate:" +
|
|
|
+ " applicationId=" + applicationAttemptId +
|
|
|
+ " #ask=" + ask.size());
|
|
|
}
|
|
|
+
|
|
|
+ return new Allocation(
|
|
|
+ application.pullNewlyAllocatedContainers(),
|
|
|
+ application.getHeadroom());
|
|
|
}
|
|
|
|
|
|
private void normalizeRequests(List<ResourceRequest> asks) {
|
|
@@ -268,16 +266,16 @@ public class FifoScheduler implements ResourceScheduler {
|
|
|
ask.setCapability(Resources.createResource(memory));
|
|
|
}
|
|
|
|
|
|
- private synchronized SchedulerApp getApplication(
|
|
|
+ private SchedulerApp getApplication(
|
|
|
ApplicationAttemptId applicationAttemptId) {
|
|
|
return applications.get(applicationAttemptId);
|
|
|
}
|
|
|
|
|
|
- private synchronized SchedulerNode getNode(NodeId nodeId) {
|
|
|
+ private SchedulerNode getNode(NodeId nodeId) {
|
|
|
return nodes.get(nodeId);
|
|
|
}
|
|
|
|
|
|
- private synchronized void addApplication(ApplicationAttemptId appAttemptId,
|
|
|
+ private void addApplication(ApplicationAttemptId appAttemptId,
|
|
|
String queueName, String user) {
|
|
|
AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
|
|
|
appAttemptId, queueName, user, null);
|
|
@@ -292,7 +290,7 @@ public class FifoScheduler implements ResourceScheduler {
|
|
|
RMAppAttemptEventType.APP_ACCEPTED));
|
|
|
}
|
|
|
|
|
|
- private synchronized void doneApplication(
|
|
|
+ private void doneApplication(
|
|
|
ApplicationAttemptId applicationAttemptId,
|
|
|
RMAppAttemptState rmAppAttemptFinalState)
|
|
|
throws IOException {
|
|
@@ -319,7 +317,7 @@ public class FifoScheduler implements ResourceScheduler {
|
|
|
*
|
|
|
* @param node node on which resources are available to be allocated
|
|
|
*/
|
|
|
- private synchronized void assignContainers(SchedulerNode node) {
|
|
|
+ private void assignContainers(SchedulerNode node) {
|
|
|
LOG.debug("assignContainers:" +
|
|
|
" node=" + node.getRMNode().getNodeAddress() +
|
|
|
" #applications=" + applications.size());
|
|
@@ -534,7 +532,7 @@ public class FifoScheduler implements ResourceScheduler {
|
|
|
return assignedContainers;
|
|
|
}
|
|
|
|
|
|
- private synchronized void nodeUpdate(RMNode rmNode,
|
|
|
+ private void nodeUpdate(RMNode rmNode,
|
|
|
Map<ApplicationId, List<Container>> remoteContainers) {
|
|
|
SchedulerNode node = getNode(rmNode.getNodeID());
|
|
|
|
|
@@ -548,18 +546,19 @@ public class FifoScheduler implements ResourceScheduler {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- LOG.info("Node heartbeat " + rmNode.getNodeID() +
|
|
|
- " available resource = " + node.getAvailableResource());
|
|
|
-
|
|
|
if (Resources.greaterThanOrEqual(node.getAvailableResource(),
|
|
|
minimumAllocation)) {
|
|
|
+ LOG.info("Node heartbeat " + rmNode.getNodeID() +
|
|
|
+ " available resource = " + node.getAvailableResource());
|
|
|
+
|
|
|
assignContainers(node);
|
|
|
+
|
|
|
+ LOG.info("Node after allocation " + rmNode.getNodeID() + " resource = "
|
|
|
+ + node.getAvailableResource());
|
|
|
}
|
|
|
|
|
|
metrics.setAvailableResourcesToQueue(
|
|
|
Resources.subtract(clusterResource, usedResource));
|
|
|
- LOG.info("Node after allocation " + rmNode.getNodeID() + " resource = "
|
|
|
- + node.getAvailableResource());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -632,7 +631,7 @@ public class FifoScheduler implements ResourceScheduler {
|
|
|
}
|
|
|
|
|
|
@Lock(FifoScheduler.class)
|
|
|
- private synchronized void containerCompleted(Container container, RMContainerEventType event) {
|
|
|
+ private void containerCompleted(Container container, RMContainerEventType event) {
|
|
|
// Get the application for the finished container
|
|
|
ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
|
|
|
SchedulerApp application = getApplication(applicationAttemptId);
|
|
@@ -664,7 +663,7 @@ public class FifoScheduler implements ResourceScheduler {
|
|
|
private Resource clusterResource = recordFactory.newRecordInstance(Resource.class);
|
|
|
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
|
|
|
|
|
|
- private synchronized void removeNode(RMNode nodeInfo) {
|
|
|
+ private void removeNode(RMNode nodeInfo) {
|
|
|
SchedulerNode node = getNode(nodeInfo.getNodeID());
|
|
|
// Kill running containers
|
|
|
for(Container container : node.getRunningContainers()) {
|
|
@@ -689,7 +688,7 @@ public class FifoScheduler implements ResourceScheduler {
|
|
|
return DEFAULT_QUEUE.getQueueUserAclInfo(null);
|
|
|
}
|
|
|
|
|
|
- private synchronized void addNode(RMNode nodeManager) {
|
|
|
+ private void addNode(RMNode nodeManager) {
|
|
|
this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager));
|
|
|
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
|
|
|
}
|