|
@@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.api.records.Container;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
+import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
import org.apache.hadoop.yarn.util.RackResolver;
|
|
|
|
|
@@ -140,6 +141,8 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
BlockingQueue<ContainerAllocatorEvent> eventQueue
|
|
|
= new LinkedBlockingQueue<ContainerAllocatorEvent>();
|
|
|
|
|
|
+ private ScheduleStats scheduleStats = new ScheduleStats();
|
|
|
+
|
|
|
public RMContainerAllocator(ClientService clientService, AppContext context) {
|
|
|
super(clientService, context);
|
|
|
this.stopped = new AtomicBoolean(false);
|
|
@@ -203,13 +206,10 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
|
|
|
@Override
|
|
|
protected synchronized void heartbeat() throws Exception {
|
|
|
- LOG.info("Before Scheduling: " + getStat());
|
|
|
+ scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
|
|
|
List<Container> allocatedContainers = getResources();
|
|
|
- LOG.info("After Scheduling: " + getStat());
|
|
|
if (allocatedContainers.size() > 0) {
|
|
|
- LOG.info("Before Assign: " + getStat());
|
|
|
scheduledRequests.assign(allocatedContainers);
|
|
|
- LOG.info("After Assign: " + getStat());
|
|
|
}
|
|
|
|
|
|
int completedMaps = getJob().getCompletedMaps();
|
|
@@ -230,6 +230,8 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
maxReduceRampupLimit, reduceSlowStart);
|
|
|
recalculateReduceSchedule = false;
|
|
|
}
|
|
|
+
|
|
|
+ scheduleStats.updateAndLogIfChanged("After Scheduling: ");
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -240,7 +242,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
}
|
|
|
eventHandlingThread.interrupt();
|
|
|
super.stop();
|
|
|
- LOG.info("Final Stats: " + getStat());
|
|
|
+ scheduleStats.log("Final Stats: ");
|
|
|
}
|
|
|
|
|
|
public boolean getIsReduceStarted() {
|
|
@@ -422,7 +424,9 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- LOG.info("Recalculating schedule...");
|
|
|
+ int headRoom = getAvailableResources() != null ?
|
|
|
+ getAvailableResources().getMemory() : 0;
|
|
|
+ LOG.info("Recalculating schedule, headroom=" + headRoom);
|
|
|
|
|
|
//check for slow start
|
|
|
if (!getIsReduceStarted()) {//not set yet
|
|
@@ -531,24 +535,6 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Synchronized to avoid findbugs warnings
|
|
|
- */
|
|
|
- private synchronized String getStat() {
|
|
|
- return "PendingReduces:" + pendingReduces.size() +
|
|
|
- " ScheduledMaps:" + scheduledRequests.maps.size() +
|
|
|
- " ScheduledReduces:" + scheduledRequests.reduces.size() +
|
|
|
- " AssignedMaps:" + assignedRequests.maps.size() +
|
|
|
- " AssignedReduces:" + assignedRequests.reduces.size() +
|
|
|
- " completedMaps:" + getJob().getCompletedMaps() +
|
|
|
- " completedReduces:" + getJob().getCompletedReduces() +
|
|
|
- " containersAllocated:" + containersAllocated +
|
|
|
- " containersReleased:" + containersReleased +
|
|
|
- " hostLocalAssigned:" + hostLocalAssigned +
|
|
|
- " rackLocalAssigned:" + rackLocalAssigned +
|
|
|
- " availableResources(headroom):" + getAvailableResources();
|
|
|
- }
|
|
|
-
|
|
|
@SuppressWarnings("unchecked")
|
|
|
private List<Container> getResources() throws Exception {
|
|
|
int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null
|
|
@@ -590,6 +576,9 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
if (newContainers.size() + finishedContainers.size() > 0 || headRoom != newHeadRoom) {
|
|
|
//something changed
|
|
|
recalculateReduceSchedule = true;
|
|
|
+ if (LOG.isDebugEnabled() && headRoom != newHeadRoom) {
|
|
|
+ LOG.debug("headroom=" + newHeadRoom);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -1064,4 +1053,60 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private class ScheduleStats {
|
|
|
+ int numPendingReduces;
|
|
|
+ int numScheduledMaps;
|
|
|
+ int numScheduledReduces;
|
|
|
+ int numAssignedMaps;
|
|
|
+ int numAssignedReduces;
|
|
|
+ int numCompletedMaps;
|
|
|
+ int numCompletedReduces;
|
|
|
+ int numContainersAllocated;
|
|
|
+ int numContainersReleased;
|
|
|
+
|
|
|
+ public void updateAndLogIfChanged(String msgPrefix) {
|
|
|
+ boolean changed = false;
|
|
|
+
|
|
|
+ // synchronized to fix findbug warnings
|
|
|
+ synchronized (RMContainerAllocator.this) {
|
|
|
+ changed |= (numPendingReduces != pendingReduces.size());
|
|
|
+ numPendingReduces = pendingReduces.size();
|
|
|
+ changed |= (numScheduledMaps != scheduledRequests.maps.size());
|
|
|
+ numScheduledMaps = scheduledRequests.maps.size();
|
|
|
+ changed |= (numScheduledReduces != scheduledRequests.reduces.size());
|
|
|
+ numScheduledReduces = scheduledRequests.reduces.size();
|
|
|
+ changed |= (numAssignedMaps != assignedRequests.maps.size());
|
|
|
+ numAssignedMaps = assignedRequests.maps.size();
|
|
|
+ changed |= (numAssignedReduces != assignedRequests.reduces.size());
|
|
|
+ numAssignedReduces = assignedRequests.reduces.size();
|
|
|
+ changed |= (numCompletedMaps != getJob().getCompletedMaps());
|
|
|
+ numCompletedMaps = getJob().getCompletedMaps();
|
|
|
+ changed |= (numCompletedReduces != getJob().getCompletedReduces());
|
|
|
+ numCompletedReduces = getJob().getCompletedReduces();
|
|
|
+ changed |= (numContainersAllocated != containersAllocated);
|
|
|
+ numContainersAllocated = containersAllocated;
|
|
|
+ changed |= (numContainersReleased != containersReleased);
|
|
|
+ numContainersReleased = containersReleased;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (changed) {
|
|
|
+ log(msgPrefix);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void log(String msgPrefix) {
|
|
|
+ LOG.info(msgPrefix + "PendingReds:" + numPendingReduces +
|
|
|
+ " ScheduledMaps:" + numScheduledMaps +
|
|
|
+ " ScheduledReds:" + numScheduledReduces +
|
|
|
+ " AssignedMaps:" + numAssignedMaps +
|
|
|
+ " AssignedReds:" + numAssignedReduces +
|
|
|
+ " CompletedMaps:" + numCompletedMaps +
|
|
|
+ " CompletedReds:" + numCompletedReduces +
|
|
|
+ " ContAlloc:" + numContainersAllocated +
|
|
|
+ " ContRel:" + numContainersReleased +
|
|
|
+ " HostLocal:" + hostLocalAssigned +
|
|
|
+ " RackLocal:" + rackLocalAssigned);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|