|
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager;
|
|
|
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.util.concurrent.BlockingQueue;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -32,7 +34,6 @@ import org.apache.hadoop.util.ReflectionUtils;
|
|
|
import org.apache.hadoop.yarn.YarnException;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
-import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
|
@@ -43,8 +44,8 @@ import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
|
@@ -53,17 +54,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
|
|
|
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
|
|
|
+import org.apache.hadoop.yarn.service.AbstractService;
|
|
|
import org.apache.hadoop.yarn.service.CompositeService;
|
|
|
import org.apache.hadoop.yarn.service.Service;
|
|
|
import org.apache.hadoop.yarn.webapp.WebApp;
|
|
@@ -97,6 +97,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|
|
private ContainerAllocationExpirer containerAllocationExpirer;
|
|
|
protected NMLivelinessMonitor nmLivelinessMonitor;
|
|
|
protected NodesListManager nodesListManager;
|
|
|
+ private SchedulerEventDispatcher schedulerDispatcher;
|
|
|
|
|
|
private final AtomicBoolean shutdown = new AtomicBoolean(false);
|
|
|
private WebApp webApp;
|
|
@@ -136,7 +137,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|
|
this.conf = new YarnConfiguration(conf);
|
|
|
// Initialize the scheduler
|
|
|
this.scheduler = createScheduler();
|
|
|
- this.rmDispatcher.register(SchedulerEventType.class, scheduler);
|
|
|
+ this.schedulerDispatcher = new SchedulerEventDispatcher(this.scheduler);
|
|
|
+ addService(this.schedulerDispatcher);
|
|
|
+ this.rmDispatcher.register(SchedulerEventType.class,
|
|
|
+ this.schedulerDispatcher);
|
|
|
|
|
|
// Register event handler for RmAppEvents
|
|
|
this.rmDispatcher.register(RMAppEventType.class,
|
|
@@ -211,6 +215,73 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|
|
return new AMLivelinessMonitor(this.rmDispatcher);
|
|
|
}
|
|
|
|
|
|
+ @Private
|
|
|
+ public static final class SchedulerEventDispatcher extends AbstractService
|
|
|
+ implements EventHandler<SchedulerEvent> {
|
|
|
+
|
|
|
+ private final ResourceScheduler scheduler;
|
|
|
+ private final BlockingQueue<SchedulerEvent> eventQueue =
|
|
|
+ new LinkedBlockingQueue<SchedulerEvent>();
|
|
|
+ private final Thread eventProcessor;
|
|
|
+
|
|
|
+ public SchedulerEventDispatcher(ResourceScheduler scheduler) {
|
|
|
+ super(SchedulerEventDispatcher.class.getName());
|
|
|
+ this.scheduler = scheduler;
|
|
|
+ this.eventProcessor = new Thread(new EventProcessor());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void start() {
|
|
|
+ this.eventProcessor.start();
|
|
|
+ super.start();
|
|
|
+ }
|
|
|
+
|
|
|
+ private final class EventProcessor implements Runnable {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+
|
|
|
+ SchedulerEvent event;
|
|
|
+
|
|
|
+ while (!Thread.currentThread().isInterrupted()) {
|
|
|
+ try {
|
|
|
+ event = eventQueue.take();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ LOG.error("Returning, interrupted : " + e);
|
|
|
+ return; // TODO: Kill RM.
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ scheduler.handle(event);
|
|
|
+ } catch (Throwable t) {
|
|
|
+ LOG.error("Error in handling event type " + event.getType()
|
|
|
+ + " to the scheduler", t);
|
|
|
+ return; // TODO: Kill RM.
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void stop() {
|
|
|
+ this.eventProcessor.interrupt();
|
|
|
+ try {
|
|
|
+ this.eventProcessor.join();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ throw new YarnException(e);
|
|
|
+ }
|
|
|
+ super.stop();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void handle(SchedulerEvent event) {
|
|
|
+ try {
|
|
|
+ this.eventQueue.put(event);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ throw new YarnException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Private
|
|
|
public static final class ApplicationEventDispatcher implements
|
|
|
EventHandler<RMAppEvent> {
|