|
@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
|
|
+import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
|
@@ -74,6 +75,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
@@ -93,6 +96,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
|
@@ -117,6 +121,7 @@ public class MockRM extends ResourceManager {
|
|
|
private static final int WAIT_MS_PER_LOOP = 10;
|
|
|
|
|
|
private final boolean useNullRMNodeLabelsManager;
|
|
|
+ private boolean disableDrainEventsImplicitly;
|
|
|
|
|
|
public MockRM() {
|
|
|
this(new YarnConfiguration());
|
|
@@ -135,13 +140,41 @@ public class MockRM extends ResourceManager {
|
|
|
super();
|
|
|
this.useNullRMNodeLabelsManager = useNullRMNodeLabelsManager;
|
|
|
init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
|
|
|
- if(store != null) {
|
|
|
+ if (store != null) {
|
|
|
setRMStateStore(store);
|
|
|
+ } else {
|
|
|
+ Class storeClass = getRMContext().getStateStore().getClass();
|
|
|
+ if (storeClass.equals(MemoryRMStateStore.class)) {
|
|
|
+ MockRMMemoryStateStore mockStateStore = new MockRMMemoryStateStore();
|
|
|
+ mockStateStore.init(conf);
|
|
|
+ setRMStateStore(mockStateStore);
|
|
|
+ } else if (storeClass.equals(NullRMStateStore.class)) {
|
|
|
+ MockRMNullStateStore mockStateStore = new MockRMNullStateStore();
|
|
|
+ mockStateStore.init(conf);
|
|
|
+ setRMStateStore(mockStateStore);
|
|
|
+ }
|
|
|
}
|
|
|
Logger rootLogger = LogManager.getRootLogger();
|
|
|
rootLogger.setLevel(Level.DEBUG);
|
|
|
+ disableDrainEventsImplicitly = false;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ public class MockRMMemoryStateStore extends MemoryRMStateStore {
|
|
|
+ @SuppressWarnings("rawtypes")
|
|
|
+ @Override
|
|
|
+ protected EventHandler getRMStateStoreEventHandler() {
|
|
|
+ return rmStateStoreEventHandler;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public class MockRMNullStateStore extends NullRMStateStore {
|
|
|
+ @SuppressWarnings("rawtypes")
|
|
|
+ @Override
|
|
|
+ protected EventHandler getRMStateStoreEventHandler() {
|
|
|
+ return rmStateStoreEventHandler;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
protected RMNodeLabelsManager createNodeLabelManager()
|
|
|
throws InstantiationException, IllegalAccessException {
|
|
@@ -159,6 +192,16 @@ public class MockRM extends ResourceManager {
|
|
|
return new DrainDispatcher();
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
|
|
|
+ return new EventHandler<SchedulerEvent>() {
|
|
|
+ @Override
|
|
|
+ public void handle(SchedulerEvent event) {
|
|
|
+ scheduler.handle(event);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
public void drainEvents() {
|
|
|
Dispatcher rmDispatcher = getRmDispatcher();
|
|
|
if (rmDispatcher instanceof DrainDispatcher) {
|
|
@@ -170,6 +213,7 @@ public class MockRM extends ResourceManager {
|
|
|
|
|
|
private void waitForState(ApplicationId appId, EnumSet<RMAppState> finalStates)
|
|
|
throws InterruptedException {
|
|
|
+ drainEventsImplicitly();
|
|
|
RMApp app = getRMContext().getRMApps().get(appId);
|
|
|
Assert.assertNotNull("app shouldn't be null", app);
|
|
|
final int timeoutMsecs = 80 * SECOND;
|
|
@@ -200,6 +244,7 @@ public class MockRM extends ResourceManager {
|
|
|
*/
|
|
|
public void waitForState(ApplicationId appId, RMAppState finalState)
|
|
|
throws InterruptedException {
|
|
|
+ drainEventsImplicitly();
|
|
|
RMApp app = getRMContext().getRMApps().get(appId);
|
|
|
Assert.assertNotNull("app shouldn't be null", app);
|
|
|
final int timeoutMsecs = 80 * SECOND;
|
|
@@ -245,6 +290,7 @@ public class MockRM extends ResourceManager {
|
|
|
public void waitForState(ApplicationAttemptId attemptId,
|
|
|
RMAppAttemptState finalState, int timeoutMsecs)
|
|
|
throws InterruptedException {
|
|
|
+ drainEventsImplicitly();
|
|
|
RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId());
|
|
|
Assert.assertNotNull("app shouldn't be null", app);
|
|
|
RMAppAttempt attempt = app.getRMAppAttempt(attemptId);
|
|
@@ -295,6 +341,7 @@ public class MockRM extends ResourceManager {
|
|
|
|
|
|
public void waitForContainerToComplete(RMAppAttempt attempt,
|
|
|
NMContainerStatus completedContainer) throws InterruptedException {
|
|
|
+ drainEventsImplicitly();
|
|
|
int timeWaiting = 0;
|
|
|
while (timeWaiting < TIMEOUT_MS_FOR_CONTAINER_AND_NODE) {
|
|
|
List<ContainerStatus> containers = attempt.getJustFinishedContainers();
|
|
@@ -394,6 +441,7 @@ public class MockRM extends ResourceManager {
|
|
|
*/
|
|
|
public boolean waitForState(Collection<MockNM> nms, ContainerId containerId,
|
|
|
RMContainerState containerState, int timeoutMsecs) throws Exception {
|
|
|
+ drainEventsImplicitly();
|
|
|
RMContainer container = getResourceScheduler().getRMContainer(containerId);
|
|
|
int timeWaiting = 0;
|
|
|
while (container == null) {
|
|
@@ -404,6 +452,7 @@ public class MockRM extends ResourceManager {
|
|
|
for (MockNM nm : nms) {
|
|
|
nm.nodeHeartbeat(true);
|
|
|
}
|
|
|
+ drainEventsImplicitly();
|
|
|
container = getResourceScheduler().getRMContainer(containerId);
|
|
|
LOG.info("Waiting for container " + containerId + " to be "
|
|
|
+ containerState + ", container is null right now.");
|
|
@@ -421,6 +470,7 @@ public class MockRM extends ResourceManager {
|
|
|
for (MockNM nm : nms) {
|
|
|
nm.nodeHeartbeat(true);
|
|
|
}
|
|
|
+ drainEventsImplicitly();
|
|
|
Thread.sleep(WAIT_MS_PER_LOOP);
|
|
|
timeWaiting += WAIT_MS_PER_LOOP;
|
|
|
}
|
|
@@ -698,7 +748,7 @@ public class MockRM extends ResourceManager {
|
|
|
public MockNM registerNode(String nodeIdStr, int memory) throws Exception {
|
|
|
MockNM nm = new MockNM(nodeIdStr, memory, getResourceTrackerService());
|
|
|
nm.registerNode();
|
|
|
- drainEvents();
|
|
|
+ drainEventsImplicitly();
|
|
|
return nm;
|
|
|
}
|
|
|
|
|
@@ -707,7 +757,7 @@ public class MockRM extends ResourceManager {
|
|
|
MockNM nm =
|
|
|
new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService());
|
|
|
nm.registerNode();
|
|
|
- drainEvents();
|
|
|
+ drainEventsImplicitly();
|
|
|
return nm;
|
|
|
}
|
|
|
|
|
@@ -717,7 +767,7 @@ public class MockRM extends ResourceManager {
|
|
|
new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService(),
|
|
|
YarnVersionInfo.getVersion());
|
|
|
nm.registerNode(runningApplications);
|
|
|
- drainEvents();
|
|
|
+ drainEventsImplicitly();
|
|
|
return nm;
|
|
|
}
|
|
|
|
|
@@ -725,12 +775,14 @@ public class MockRM extends ResourceManager {
|
|
|
RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
|
|
|
nm.getNodeId());
|
|
|
node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null));
|
|
|
+ drainEventsImplicitly();
|
|
|
}
|
|
|
|
|
|
public void sendNodeLost(MockNM nm) throws Exception {
|
|
|
RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
|
|
|
nm.getNodeId());
|
|
|
node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.EXPIRE));
|
|
|
+ drainEventsImplicitly();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -743,6 +795,7 @@ public class MockRM extends ResourceManager {
|
|
|
*/
|
|
|
public void waitForState(NodeId nodeId, NodeState finalState)
|
|
|
throws InterruptedException {
|
|
|
+ drainEventsImplicitly();
|
|
|
RMNode node = getRMContext().getRMNodes().get(nodeId);
|
|
|
if (node == null) {
|
|
|
node = getRMContext().getInactiveRMNodes().get(nodeId);
|
|
@@ -774,7 +827,9 @@ public class MockRM extends ResourceManager {
|
|
|
public KillApplicationResponse killApp(ApplicationId appId) throws Exception {
|
|
|
ApplicationClientProtocol client = getClientRMService();
|
|
|
KillApplicationRequest req = KillApplicationRequest.newInstance(appId);
|
|
|
- return client.forceKillApplication(req);
|
|
|
+ KillApplicationResponse response = client.forceKillApplication(req);
|
|
|
+ drainEventsImplicitly();
|
|
|
+ return response;
|
|
|
}
|
|
|
|
|
|
public FailApplicationAttemptResponse failApplicationAttempt(
|
|
@@ -782,7 +837,10 @@ public class MockRM extends ResourceManager {
|
|
|
ApplicationClientProtocol client = getClientRMService();
|
|
|
FailApplicationAttemptRequest req =
|
|
|
FailApplicationAttemptRequest.newInstance(attemptId);
|
|
|
- return client.failApplicationAttempt(req);
|
|
|
+ FailApplicationAttemptResponse response =
|
|
|
+ client.failApplicationAttempt(req);
|
|
|
+ drainEventsImplicitly();
|
|
|
+ return response;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -807,6 +865,7 @@ public class MockRM extends ResourceManager {
|
|
|
.getEventHandler()
|
|
|
.handle(
|
|
|
new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.LAUNCHED));
|
|
|
+ drainEventsImplicitly();
|
|
|
return am;
|
|
|
}
|
|
|
|
|
@@ -817,6 +876,7 @@ public class MockRM extends ResourceManager {
|
|
|
getRMContext().getDispatcher().getEventHandler()
|
|
|
.handle(new RMAppAttemptEvent(appAttemptId,
|
|
|
RMAppAttemptEventType.LAUNCH_FAILED, "Failed"));
|
|
|
+ drainEventsImplicitly();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -966,6 +1026,7 @@ public class MockRM extends ResourceManager {
|
|
|
am.unregisterAppAttempt(req,true);
|
|
|
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHING);
|
|
|
nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
|
|
+ rm.drainEventsImplicitly();
|
|
|
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
|
|
|
rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
|
|
|
}
|
|
@@ -974,6 +1035,7 @@ public class MockRM extends ResourceManager {
|
|
|
private static void waitForSchedulerAppAttemptAdded(
|
|
|
ApplicationAttemptId attemptId, MockRM rm) throws InterruptedException {
|
|
|
int tick = 0;
|
|
|
+ rm.drainEventsImplicitly();
|
|
|
// Wait for at most 5 sec
|
|
|
while (null == ((AbstractYarnScheduler) rm.getResourceScheduler())
|
|
|
.getApplicationAttempt(attemptId) && tick < 50) {
|
|
@@ -1015,9 +1077,11 @@ public class MockRM extends ResourceManager {
|
|
|
*/
|
|
|
public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
|
|
|
throws Exception {
|
|
|
+ rm.drainEventsImplicitly();
|
|
|
RMAppAttempt attempt = waitForAttemptScheduled(app, rm);
|
|
|
LOG.info("Launch AM " + attempt.getAppAttemptId());
|
|
|
nm.nodeHeartbeat(true);
|
|
|
+ rm.drainEventsImplicitly();
|
|
|
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
|
|
|
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
|
|
|
return am;
|
|
@@ -1025,12 +1089,14 @@ public class MockRM extends ResourceManager {
|
|
|
|
|
|
public static MockAM launchUAM(RMApp app, MockRM rm, MockNM nm)
|
|
|
throws Exception {
|
|
|
+ rm.drainEventsImplicitly();
|
|
|
// UAMs go directly to LAUNCHED state
|
|
|
rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
|
|
|
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
|
|
waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm);
|
|
|
LOG.info("Launch AM " + attempt.getAppAttemptId());
|
|
|
nm.nodeHeartbeat(true);
|
|
|
+ rm.drainEventsImplicitly();
|
|
|
MockAM am = new MockAM(rm.getRMContext(), rm.masterService,
|
|
|
attempt.getAppAttemptId());
|
|
|
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
|
|
@@ -1067,6 +1133,7 @@ public class MockRM extends ResourceManager {
|
|
|
throws IOException, YarnException {
|
|
|
ApplicationClientProtocol client = getClientRMService();
|
|
|
client.updateReservation(request);
|
|
|
+ drainEventsImplicitly();
|
|
|
}
|
|
|
|
|
|
// Explicitly reset queue metrics for testing.
|
|
@@ -1087,6 +1154,7 @@ public class MockRM extends ResourceManager {
|
|
|
SignalContainerRequest req =
|
|
|
SignalContainerRequest.newInstance(containerId, command);
|
|
|
client.signalToContainer(req);
|
|
|
+ drainEventsImplicitly();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1099,6 +1167,7 @@ public class MockRM extends ResourceManager {
|
|
|
public void waitForAppRemovedFromScheduler(ApplicationId appId)
|
|
|
throws InterruptedException {
|
|
|
int timeWaiting = 0;
|
|
|
+ drainEventsImplicitly();
|
|
|
|
|
|
Map<ApplicationId, SchedulerApplication> apps =
|
|
|
((AbstractYarnScheduler) getResourceScheduler())
|
|
@@ -1116,6 +1185,20 @@ public class MockRM extends ResourceManager {
|
|
|
LOG.info("app is removed from scheduler, " + appId);
|
|
|
}
|
|
|
|
|
|
+ private void drainEventsImplicitly() {
|
|
|
+ if (!disableDrainEventsImplicitly) {
|
|
|
+ drainEvents();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void disableDrainEventsImplicitly() {
|
|
|
+ disableDrainEventsImplicitly = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void enableDrainEventsImplicityly() {
|
|
|
+ disableDrainEventsImplicitly = false;
|
|
|
+ }
|
|
|
+
|
|
|
public RMApp submitApp(int masterMemory, Priority priority,
|
|
|
Map<ApplicationTimeoutType, Long> applicationTimeouts) throws Exception {
|
|
|
Resource resource = Resource.newInstance(masterMemory, 0);
|