|
@@ -28,6 +28,7 @@ import java.util.Set;
|
|
|
import java.util.Timer;
|
|
|
import java.util.TimerTask;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
+import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -72,8 +73,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReco
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
|
|
|
- .LeafQueue;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
|
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
@@ -94,7 +93,7 @@ public abstract class AbstractYarnScheduler
|
|
|
|
|
|
protected Resource minimumAllocation;
|
|
|
|
|
|
- protected RMContext rmContext;
|
|
|
+ protected volatile RMContext rmContext;
|
|
|
|
|
|
private volatile Priority maxClusterLevelAppPriority;
|
|
|
|
|
@@ -112,6 +111,18 @@ public abstract class AbstractYarnScheduler
|
|
|
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
|
|
|
EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
|
|
|
|
|
|
+ protected final ReentrantReadWriteLock.ReadLock readLock;
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Use writeLock for any of operations below:
|
|
|
+ * - queue change (hierarchy / configuration / container allocation)
|
|
|
+ * - application(add/remove/allocate-container, but not include container
|
|
|
+ * finish)
|
|
|
+ * - node (add/remove/change-resource/container-allocation, but not include
|
|
|
+ * container finish)
|
|
|
+ */
|
|
|
+ protected final ReentrantReadWriteLock.WriteLock writeLock;
|
|
|
+
|
|
|
/**
|
|
|
* Construct the service.
|
|
|
*
|
|
@@ -119,6 +130,9 @@ public abstract class AbstractYarnScheduler
|
|
|
*/
|
|
|
public AbstractYarnScheduler(String name) {
|
|
|
super(name);
|
|
|
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
|
|
+ readLock = lock.readLock();
|
|
|
+ writeLock = lock.writeLock();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -141,6 +155,10 @@ public abstract class AbstractYarnScheduler
|
|
|
return nodeTracker;
|
|
|
}
|
|
|
|
|
|
+ /*
|
|
|
+ * YARN-3136 removed synchronized lock for this method for performance
|
|
|
+ * purposes
|
|
|
+ */
|
|
|
public List<Container> getTransferredContainers(
|
|
|
ApplicationAttemptId currentAttempt) {
|
|
|
ApplicationId appId = currentAttempt.getApplicationId();
|
|
@@ -155,9 +173,8 @@ public abstract class AbstractYarnScheduler
|
|
|
}
|
|
|
Collection<RMContainer> liveContainers =
|
|
|
app.getCurrentAppAttempt().getLiveContainers();
|
|
|
- ContainerId amContainerId =
|
|
|
- rmContext.getRMApps().get(appId).getCurrentAppAttempt()
|
|
|
- .getMasterContainer().getId();
|
|
|
+ ContainerId amContainerId = rmContext.getRMApps().get(appId)
|
|
|
+ .getCurrentAppAttempt().getMasterContainer().getId();
|
|
|
for (RMContainer rmContainer : liveContainers) {
|
|
|
if (!rmContainer.getContainerId().equals(amContainerId)) {
|
|
|
containerList.add(rmContainer.getContainer());
|
|
@@ -211,54 +228,59 @@ public abstract class AbstractYarnScheduler
|
|
|
nodeTracker.setConfiguredMaxAllocation(maximumAllocation);
|
|
|
}
|
|
|
|
|
|
- protected synchronized void containerLaunchedOnNode(
|
|
|
+ protected void containerLaunchedOnNode(
|
|
|
ContainerId containerId, SchedulerNode node) {
|
|
|
- // Get the application for the finished container
|
|
|
- SchedulerApplicationAttempt application =
|
|
|
- getCurrentAttemptForContainer(containerId);
|
|
|
- if (application == null) {
|
|
|
- LOG.info("Unknown application " + containerId.getApplicationAttemptId()
|
|
|
- .getApplicationId() + " launched container " + containerId
|
|
|
- + " on node: " + node);
|
|
|
- this.rmContext.getDispatcher().getEventHandler()
|
|
|
- .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
|
|
|
- return;
|
|
|
- }
|
|
|
+ try {
|
|
|
+ readLock.lock();
|
|
|
+ // Get the application for the finished container
|
|
|
+ SchedulerApplicationAttempt application = getCurrentAttemptForContainer(
|
|
|
+ containerId);
|
|
|
+ if (application == null) {
|
|
|
+ LOG.info("Unknown application " + containerId.getApplicationAttemptId()
|
|
|
+ .getApplicationId() + " launched container " + containerId
|
|
|
+ + " on node: " + node);
|
|
|
+ this.rmContext.getDispatcher().getEventHandler().handle(
|
|
|
+ new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- application.containerLaunchedOnNode(containerId, node.getNodeID());
|
|
|
+ application.containerLaunchedOnNode(containerId, node.getNodeID());
|
|
|
+ } finally {
|
|
|
+ readLock.unlock();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
protected void containerIncreasedOnNode(ContainerId containerId,
|
|
|
SchedulerNode node, Container increasedContainerReportedByNM) {
|
|
|
+ /*
|
|
|
+ * No lock is required, as this method is protected by scheduler's writeLock
|
|
|
+ */
|
|
|
// Get the application for the finished container
|
|
|
- SchedulerApplicationAttempt application =
|
|
|
- getCurrentAttemptForContainer(containerId);
|
|
|
+ SchedulerApplicationAttempt application = getCurrentAttemptForContainer(
|
|
|
+ containerId);
|
|
|
if (application == null) {
|
|
|
- LOG.info("Unknown application "
|
|
|
- + containerId.getApplicationAttemptId().getApplicationId()
|
|
|
- + " increased container " + containerId + " on node: " + node);
|
|
|
- this.rmContext.getDispatcher().getEventHandler()
|
|
|
- .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
|
|
|
+ LOG.info("Unknown application " + containerId.getApplicationAttemptId()
|
|
|
+ .getApplicationId() + " increased container " + containerId
|
|
|
+ + " on node: " + node);
|
|
|
+ this.rmContext.getDispatcher().getEventHandler().handle(
|
|
|
+ new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
|
|
|
return;
|
|
|
}
|
|
|
- LeafQueue leafQueue = (LeafQueue) application.getQueue();
|
|
|
- synchronized (leafQueue) {
|
|
|
- RMContainer rmContainer = getRMContainer(containerId);
|
|
|
- if (rmContainer == null) {
|
|
|
- // Some unknown container sneaked into the system. Kill it.
|
|
|
- this.rmContext.getDispatcher().getEventHandler()
|
|
|
- .handle(new RMNodeCleanContainerEvent(
|
|
|
- node.getNodeID(), containerId));
|
|
|
- return;
|
|
|
- }
|
|
|
- rmContainer.handle(new RMContainerNMDoneChangeResourceEvent(
|
|
|
- containerId, increasedContainerReportedByNM.getResource()));
|
|
|
+
|
|
|
+ RMContainer rmContainer = getRMContainer(containerId);
|
|
|
+ if (rmContainer == null) {
|
|
|
+ // Some unknown container sneaked into the system. Kill it.
|
|
|
+ this.rmContext.getDispatcher().getEventHandler().handle(
|
|
|
+ new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
|
|
|
+ return;
|
|
|
}
|
|
|
+ rmContainer.handle(new RMContainerNMDoneChangeResourceEvent(containerId,
|
|
|
+ increasedContainerReportedByNM.getResource()));
|
|
|
}
|
|
|
|
|
|
public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
|
|
|
- SchedulerApplication<T> app =
|
|
|
- applications.get(applicationAttemptId.getApplicationId());
|
|
|
+ SchedulerApplication<T> app = applications.get(
|
|
|
+ applicationAttemptId.getApplicationId());
|
|
|
return app == null ? null : app.getCurrentAppAttempt();
|
|
|
}
|
|
|
|
|
@@ -338,96 +360,101 @@ public abstract class AbstractYarnScheduler
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public synchronized void recoverContainersOnNode(
|
|
|
+ public void recoverContainersOnNode(
|
|
|
List<NMContainerStatus> containerReports, RMNode nm) {
|
|
|
- if (!rmContext.isWorkPreservingRecoveryEnabled()
|
|
|
- || containerReports == null
|
|
|
- || (containerReports != null && containerReports.isEmpty())) {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- for (NMContainerStatus container : containerReports) {
|
|
|
- ApplicationId appId =
|
|
|
- container.getContainerId().getApplicationAttemptId().getApplicationId();
|
|
|
- RMApp rmApp = rmContext.getRMApps().get(appId);
|
|
|
- if (rmApp == null) {
|
|
|
- LOG.error("Skip recovering container " + container
|
|
|
- + " for unknown application.");
|
|
|
- killOrphanContainerOnNode(nm, container);
|
|
|
- continue;
|
|
|
+ try {
|
|
|
+ writeLock.lock();
|
|
|
+ if (!rmContext.isWorkPreservingRecoveryEnabled()
|
|
|
+ || containerReports == null || (containerReports != null
|
|
|
+ && containerReports.isEmpty())) {
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
- SchedulerApplication<T> schedulerApp = applications.get(appId);
|
|
|
- if (schedulerApp == null) {
|
|
|
- LOG.info("Skip recovering container " + container
|
|
|
- + " for unknown SchedulerApplication. Application current state is "
|
|
|
- + rmApp.getState());
|
|
|
- killOrphanContainerOnNode(nm, container);
|
|
|
- continue;
|
|
|
- }
|
|
|
+ for (NMContainerStatus container : containerReports) {
|
|
|
+ ApplicationId appId =
|
|
|
+ container.getContainerId().getApplicationAttemptId()
|
|
|
+ .getApplicationId();
|
|
|
+ RMApp rmApp = rmContext.getRMApps().get(appId);
|
|
|
+ if (rmApp == null) {
|
|
|
+ LOG.error("Skip recovering container " + container
|
|
|
+ + " for unknown application.");
|
|
|
+ killOrphanContainerOnNode(nm, container);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
|
|
|
- LOG.info("Recovering container " + container);
|
|
|
- SchedulerApplicationAttempt schedulerAttempt =
|
|
|
- schedulerApp.getCurrentAppAttempt();
|
|
|
-
|
|
|
- if (!rmApp.getApplicationSubmissionContext()
|
|
|
- .getKeepContainersAcrossApplicationAttempts()) {
|
|
|
- // Do not recover containers for stopped attempt or previous attempt.
|
|
|
- if (schedulerAttempt.isStopped()
|
|
|
- || !schedulerAttempt.getApplicationAttemptId().equals(
|
|
|
- container.getContainerId().getApplicationAttemptId())) {
|
|
|
- LOG.info("Skip recovering container " + container
|
|
|
- + " for already stopped attempt.");
|
|
|
+ SchedulerApplication<T> schedulerApp = applications.get(appId);
|
|
|
+ if (schedulerApp == null) {
|
|
|
+ LOG.info("Skip recovering container " + container
|
|
|
+ + " for unknown SchedulerApplication. "
|
|
|
+ + "Application current state is " + rmApp.getState());
|
|
|
killOrphanContainerOnNode(nm, container);
|
|
|
continue;
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- // create container
|
|
|
- RMContainer rmContainer = recoverAndCreateContainer(container, nm);
|
|
|
-
|
|
|
- // recover RMContainer
|
|
|
- rmContainer.handle(new RMContainerRecoverEvent(container.getContainerId(),
|
|
|
- container));
|
|
|
-
|
|
|
- // recover scheduler node
|
|
|
- SchedulerNode schedulerNode = nodeTracker.getNode(nm.getNodeID());
|
|
|
- schedulerNode.recoverContainer(rmContainer);
|
|
|
-
|
|
|
- // recover queue: update headroom etc.
|
|
|
- Queue queue = schedulerAttempt.getQueue();
|
|
|
- queue.recoverContainer(
|
|
|
- getClusterResource(), schedulerAttempt, rmContainer);
|
|
|
-
|
|
|
- // recover scheduler attempt
|
|
|
- schedulerAttempt.recoverContainer(schedulerNode, rmContainer);
|
|
|
-
|
|
|
- // set master container for the current running AMContainer for this
|
|
|
- // attempt.
|
|
|
- RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt();
|
|
|
- if (appAttempt != null) {
|
|
|
- Container masterContainer = appAttempt.getMasterContainer();
|
|
|
-
|
|
|
- // Mark current running AMContainer's RMContainer based on the master
|
|
|
- // container ID stored in AppAttempt.
|
|
|
- if (masterContainer != null
|
|
|
- && masterContainer.getId().equals(rmContainer.getContainerId())) {
|
|
|
- ((RMContainerImpl)rmContainer).setAMContainer(true);
|
|
|
+ LOG.info("Recovering container " + container);
|
|
|
+ SchedulerApplicationAttempt schedulerAttempt =
|
|
|
+ schedulerApp.getCurrentAppAttempt();
|
|
|
+
|
|
|
+ if (!rmApp.getApplicationSubmissionContext()
|
|
|
+ .getKeepContainersAcrossApplicationAttempts()) {
|
|
|
+ // Do not recover containers for stopped attempt or previous attempt.
|
|
|
+ if (schedulerAttempt.isStopped() || !schedulerAttempt
|
|
|
+ .getApplicationAttemptId().equals(
|
|
|
+ container.getContainerId().getApplicationAttemptId())) {
|
|
|
+ LOG.info("Skip recovering container " + container
|
|
|
+ + " for already stopped attempt.");
|
|
|
+ killOrphanContainerOnNode(nm, container);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- synchronized (schedulerAttempt) {
|
|
|
- Set<ContainerId> releases = schedulerAttempt.getPendingRelease();
|
|
|
- if (releases.contains(container.getContainerId())) {
|
|
|
+ // create container
|
|
|
+ RMContainer rmContainer = recoverAndCreateContainer(container, nm);
|
|
|
+
|
|
|
+ // recover RMContainer
|
|
|
+ rmContainer.handle(
|
|
|
+ new RMContainerRecoverEvent(container.getContainerId(), container));
|
|
|
+
|
|
|
+ // recover scheduler node
|
|
|
+ SchedulerNode schedulerNode = nodeTracker.getNode(nm.getNodeID());
|
|
|
+ schedulerNode.recoverContainer(rmContainer);
|
|
|
+
|
|
|
+ // recover queue: update headroom etc.
|
|
|
+ Queue queue = schedulerAttempt.getQueue();
|
|
|
+ queue.recoverContainer(getClusterResource(), schedulerAttempt,
|
|
|
+ rmContainer);
|
|
|
+
|
|
|
+ // recover scheduler attempt
|
|
|
+ schedulerAttempt.recoverContainer(schedulerNode, rmContainer);
|
|
|
+
|
|
|
+ // set master container for the current running AMContainer for this
|
|
|
+ // attempt.
|
|
|
+ RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt();
|
|
|
+ if (appAttempt != null) {
|
|
|
+ Container masterContainer = appAttempt.getMasterContainer();
|
|
|
+
|
|
|
+ // Mark current running AMContainer's RMContainer based on the master
|
|
|
+ // container ID stored in AppAttempt.
|
|
|
+ if (masterContainer != null && masterContainer.getId().equals(
|
|
|
+ rmContainer.getContainerId())) {
|
|
|
+ ((RMContainerImpl) rmContainer).setAMContainer(true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (schedulerAttempt.getPendingRelease().remove(
|
|
|
+ container.getContainerId())) {
|
|
|
// release the container
|
|
|
- rmContainer.handle(new RMContainerFinishedEvent(container
|
|
|
- .getContainerId(), SchedulerUtils.createAbnormalContainerStatus(
|
|
|
- container.getContainerId(), SchedulerUtils.RELEASED_CONTAINER),
|
|
|
- RMContainerEventType.RELEASED));
|
|
|
- releases.remove(container.getContainerId());
|
|
|
+ rmContainer.handle(
|
|
|
+ new RMContainerFinishedEvent(container.getContainerId(),
|
|
|
+ SchedulerUtils
|
|
|
+ .createAbnormalContainerStatus(container.getContainerId(),
|
|
|
+ SchedulerUtils.RELEASED_CONTAINER),
|
|
|
+ RMContainerEventType.RELEASED));
|
|
|
LOG.info(container.getContainerId() + " is released by application.");
|
|
|
}
|
|
|
}
|
|
|
+ } finally {
|
|
|
+ writeLock.unlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -492,17 +519,15 @@ public abstract class AbstractYarnScheduler
|
|
|
for (SchedulerApplication<T> app : applications.values()) {
|
|
|
T attempt = app.getCurrentAppAttempt();
|
|
|
if (attempt != null) {
|
|
|
- synchronized (attempt) {
|
|
|
- for (ContainerId containerId : attempt.getPendingRelease()) {
|
|
|
- RMAuditLogger.logFailure(app.getUser(),
|
|
|
- AuditConstants.RELEASE_CONTAINER,
|
|
|
- "Unauthorized access or invalid container", "Scheduler",
|
|
|
- "Trying to release container not owned by app "
|
|
|
- + "or with invalid id.", attempt.getApplicationId(),
|
|
|
- containerId, null);
|
|
|
- }
|
|
|
- attempt.getPendingRelease().clear();
|
|
|
+ for (ContainerId containerId : attempt.getPendingRelease()) {
|
|
|
+ RMAuditLogger.logFailure(app.getUser(),
|
|
|
+ AuditConstants.RELEASE_CONTAINER,
|
|
|
+ "Unauthorized access or invalid container", "Scheduler",
|
|
|
+ "Trying to release container not owned by app "
|
|
|
+ + "or with invalid id.", attempt.getApplicationId(),
|
|
|
+ containerId, null);
|
|
|
}
|
|
|
+ attempt.getPendingRelease().clear();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -558,9 +583,7 @@ public abstract class AbstractYarnScheduler
|
|
|
< nmExpireInterval) {
|
|
|
LOG.info(containerId + " doesn't exist. Add the container"
|
|
|
+ " to the release request cache as it maybe on recovery.");
|
|
|
- synchronized (attempt) {
|
|
|
- attempt.getPendingRelease().add(containerId);
|
|
|
- }
|
|
|
+ attempt.getPendingRelease().add(containerId);
|
|
|
} else {
|
|
|
RMAuditLogger.logFailure(attempt.getUser(),
|
|
|
AuditConstants.RELEASE_CONTAINER,
|
|
@@ -603,81 +626,92 @@ public abstract class AbstractYarnScheduler
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public synchronized void moveAllApps(String sourceQueue, String destQueue)
|
|
|
+ public void moveAllApps(String sourceQueue, String destQueue)
|
|
|
throws YarnException {
|
|
|
- // check if destination queue is a valid leaf queue
|
|
|
try {
|
|
|
- getQueueInfo(destQueue, false, false);
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.warn(e);
|
|
|
- throw new YarnException(e);
|
|
|
- }
|
|
|
- // check if source queue is a valid
|
|
|
- List<ApplicationAttemptId> apps = getAppsInQueue(sourceQueue);
|
|
|
- if (apps == null) {
|
|
|
- String errMsg = "The specified Queue: " + sourceQueue + " doesn't exist";
|
|
|
- LOG.warn(errMsg);
|
|
|
- throw new YarnException(errMsg);
|
|
|
- }
|
|
|
- // generate move events for each pending/running app
|
|
|
- for (ApplicationAttemptId app : apps) {
|
|
|
- SettableFuture<Object> future = SettableFuture.create();
|
|
|
- this.rmContext
|
|
|
- .getDispatcher()
|
|
|
- .getEventHandler()
|
|
|
- .handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future));
|
|
|
+ writeLock.lock();
|
|
|
+ // check if destination queue is a valid leaf queue
|
|
|
+ try {
|
|
|
+ getQueueInfo(destQueue, false, false);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn(e);
|
|
|
+ throw new YarnException(e);
|
|
|
+ }
|
|
|
+ // check if source queue is a valid
|
|
|
+ List<ApplicationAttemptId> apps = getAppsInQueue(sourceQueue);
|
|
|
+ if (apps == null) {
|
|
|
+ String errMsg =
|
|
|
+ "The specified Queue: " + sourceQueue + " doesn't exist";
|
|
|
+ LOG.warn(errMsg);
|
|
|
+ throw new YarnException(errMsg);
|
|
|
+ }
|
|
|
+ // generate move events for each pending/running app
|
|
|
+ for (ApplicationAttemptId app : apps) {
|
|
|
+ SettableFuture<Object> future = SettableFuture.create();
|
|
|
+ this.rmContext.getDispatcher().getEventHandler().handle(
|
|
|
+ new RMAppMoveEvent(app.getApplicationId(), destQueue, future));
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ writeLock.unlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public synchronized void killAllAppsInQueue(String queueName)
|
|
|
+ public void killAllAppsInQueue(String queueName)
|
|
|
throws YarnException {
|
|
|
- // check if queue is a valid
|
|
|
- List<ApplicationAttemptId> apps = getAppsInQueue(queueName);
|
|
|
- if (apps == null) {
|
|
|
- String errMsg = "The specified Queue: " + queueName + " doesn't exist";
|
|
|
- LOG.warn(errMsg);
|
|
|
- throw new YarnException(errMsg);
|
|
|
- }
|
|
|
- // generate kill events for each pending/running app
|
|
|
- for (ApplicationAttemptId app : apps) {
|
|
|
- this.rmContext
|
|
|
- .getDispatcher()
|
|
|
- .getEventHandler()
|
|
|
- .handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL,
|
|
|
- "Application killed due to expiry of reservation queue " +
|
|
|
- queueName + "."));
|
|
|
+ try {
|
|
|
+ writeLock.lock();
|
|
|
+ // check if queue is a valid
|
|
|
+ List<ApplicationAttemptId> apps = getAppsInQueue(queueName);
|
|
|
+ if (apps == null) {
|
|
|
+ String errMsg = "The specified Queue: " + queueName + " doesn't exist";
|
|
|
+ LOG.warn(errMsg);
|
|
|
+ throw new YarnException(errMsg);
|
|
|
+ }
|
|
|
+ // generate kill events for each pending/running app
|
|
|
+ for (ApplicationAttemptId app : apps) {
|
|
|
+ this.rmContext.getDispatcher().getEventHandler().handle(
|
|
|
+ new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL,
|
|
|
+ "Application killed due to expiry of reservation queue "
|
|
|
+ + queueName + "."));
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ writeLock.unlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Process resource update on a node.
|
|
|
*/
|
|
|
- public synchronized void updateNodeResource(RMNode nm,
|
|
|
+ public void updateNodeResource(RMNode nm,
|
|
|
ResourceOption resourceOption) {
|
|
|
- SchedulerNode node = getSchedulerNode(nm.getNodeID());
|
|
|
- Resource newResource = resourceOption.getResource();
|
|
|
- Resource oldResource = node.getTotalResource();
|
|
|
- if(!oldResource.equals(newResource)) {
|
|
|
- // Notify NodeLabelsManager about this change
|
|
|
- rmContext.getNodeLabelManager().updateNodeResource(nm.getNodeID(),
|
|
|
- newResource);
|
|
|
-
|
|
|
- // Log resource change
|
|
|
- LOG.info("Update resource on node: " + node.getNodeName()
|
|
|
- + " from: " + oldResource + ", to: "
|
|
|
- + newResource);
|
|
|
-
|
|
|
- nodeTracker.removeNode(nm.getNodeID());
|
|
|
-
|
|
|
- // update resource to node
|
|
|
- node.updateTotalResource(newResource);
|
|
|
-
|
|
|
- nodeTracker.addNode((N) node);
|
|
|
- } else {
|
|
|
- // Log resource change
|
|
|
- LOG.warn("Update resource on node: " + node.getNodeName()
|
|
|
- + " with the same resource: " + newResource);
|
|
|
+ try {
|
|
|
+ writeLock.lock();
|
|
|
+ SchedulerNode node = getSchedulerNode(nm.getNodeID());
|
|
|
+ Resource newResource = resourceOption.getResource();
|
|
|
+ Resource oldResource = node.getTotalResource();
|
|
|
+ if (!oldResource.equals(newResource)) {
|
|
|
+ // Notify NodeLabelsManager about this change
|
|
|
+ rmContext.getNodeLabelManager().updateNodeResource(nm.getNodeID(),
|
|
|
+ newResource);
|
|
|
+
|
|
|
+ // Log resource change
|
|
|
+ LOG.info("Update resource on node: " + node.getNodeName() + " from: "
|
|
|
+ + oldResource + ", to: " + newResource);
|
|
|
+
|
|
|
+ nodeTracker.removeNode(nm.getNodeID());
|
|
|
+
|
|
|
+ // update resource to node
|
|
|
+ node.updateTotalResource(newResource);
|
|
|
+
|
|
|
+ nodeTracker.addNode((N) node);
|
|
|
+ } else{
|
|
|
+ // Log resource change
|
|
|
+ LOG.warn("Update resource on node: " + node.getNodeName()
|
|
|
+ + " with the same resource: " + newResource);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ writeLock.unlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -735,7 +769,7 @@ public abstract class AbstractYarnScheduler
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public synchronized void setClusterMaxPriority(Configuration conf)
|
|
|
+ public void setClusterMaxPriority(Configuration conf)
|
|
|
throws YarnException {
|
|
|
try {
|
|
|
maxClusterLevelAppPriority = getMaxPriorityFromConf(conf);
|