|
@@ -1254,10 +1254,17 @@ public class CapacityScheduler extends
|
|
|
*/
|
|
|
private CSAssignment allocateContainerOnSingleNode(PlacementSet<FiCaSchedulerNode> ps,
|
|
|
FiCaSchedulerNode node, boolean withNodeHeartbeat) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug(
|
|
|
+ "Trying to schedule on node: " + node.getNodeName() + ", available: "
|
|
|
+ + node.getUnallocatedResource());
|
|
|
+ }
|
|
|
+
|
|
|
// Backward compatible way to make sure previous behavior which allocation
|
|
|
// driven by node heartbeat works.
|
|
|
if (getNode(node.getNodeID()) != node) {
|
|
|
- LOG.error("Trying to schedule on a removed node, please double check.");
|
|
|
+ LOG.error("Trying to schedule on a removed node, please double check, "
|
|
|
+ + "nodeId=" + node.getNodeID());
|
|
|
return null;
|
|
|
}
|
|
|
|
|
@@ -1271,14 +1278,19 @@ public class CapacityScheduler extends
|
|
|
FiCaSchedulerApp reservedApplication = getCurrentAttemptForContainer(
|
|
|
reservedContainer.getContainerId());
|
|
|
if (reservedApplication == null) {
|
|
|
- LOG.error("Trying to schedule for a finished app, please double check.");
|
|
|
+ LOG.error(
|
|
|
+ "Trying to schedule for a finished app, please double check. nodeId="
|
|
|
+ + node.getNodeID() + " container=" + reservedContainer
|
|
|
+ .getContainerId());
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
// Try to fulfill the reservation
|
|
|
- LOG.info(
|
|
|
- "Trying to fulfill reservation for application " + reservedApplication
|
|
|
- .getApplicationId() + " on node: " + node.getNodeID());
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Trying to fulfill reservation for application "
|
|
|
+ + reservedApplication.getApplicationId() + " on node: " + node
|
|
|
+ .getNodeID());
|
|
|
+ }
|
|
|
|
|
|
LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
|
|
|
assignment = queue.assignContainers(getClusterResource(), ps,
|
|
@@ -1342,12 +1354,6 @@ public class CapacityScheduler extends
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug(
|
|
|
- "Trying to schedule on node: " + node.getNodeName() + ", available: "
|
|
|
- + node.getUnallocatedResource());
|
|
|
- }
|
|
|
-
|
|
|
return allocateOrReserveNewContainers(ps, withNodeHeartbeat);
|
|
|
}
|
|
|
|
|
@@ -2578,6 +2584,7 @@ public class CapacityScheduler extends
|
|
|
LOG.debug("Try to commit allocation proposal=" + request);
|
|
|
}
|
|
|
|
|
|
+ boolean isSuccess = false;
|
|
|
if (attemptId != null) {
|
|
|
FiCaSchedulerApp app = getApplicationAttempt(attemptId);
|
|
|
// Required sanity check for attemptId - when async-scheduling enabled,
|
|
@@ -2589,6 +2596,7 @@ public class CapacityScheduler extends
|
|
|
CapacitySchedulerMetrics.getMetrics()
|
|
|
.addCommitSuccess(commitSuccess);
|
|
|
LOG.info("Allocation proposal accepted");
|
|
|
+ isSuccess = true;
|
|
|
} else{
|
|
|
long commitFailed = System.nanoTime() - commitStart;
|
|
|
CapacitySchedulerMetrics.getMetrics()
|
|
@@ -2596,6 +2604,11 @@ public class CapacityScheduler extends
|
|
|
LOG.info("Failed to accept allocation proposal");
|
|
|
}
|
|
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Allocation proposal accepted=" + isSuccess + ", proposal="
|
|
|
+ + request);
|
|
|
+ }
|
|
|
+
|
|
|
// Update unconfirmed allocated resource.
|
|
|
if (updateUnconfirmedAllocatedResource) {
|
|
|
app.decUnconfirmedRes(request.getTotalAllocatedResource());
|