|
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertFalse;
|
|
|
import static org.junit.Assert.assertNull;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
+import static org.mockito.Mockito.mock;
|
|
|
+import static org.mockito.Mockito.when;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.net.UnknownHostException;
|
|
@@ -39,16 +41,22 @@ import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
|
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
@@ -56,8 +64,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidException;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
|
@@ -86,6 +94,7 @@ import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
import org.junit.runner.RunWith;
|
|
|
import org.junit.runners.Parameterized;
|
|
|
+import org.mortbay.log.Log;
|
|
|
|
|
|
import com.google.common.base.Supplier;
|
|
|
|
|
@@ -361,6 +370,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|
|
private static final String R = "Default";
|
|
|
private static final String A = "QueueA";
|
|
|
private static final String B = "QueueB";
|
|
|
+ private static final String B1 = "QueueB1";
|
|
|
+ private static final String B2 = "QueueB2";
|
|
|
//don't ever create the below queue ;-)
|
|
|
private static final String QUEUE_DOESNT_EXIST = "NoSuchQueue";
|
|
|
private static final String USER_1 = "user1";
|
|
@@ -391,6 +402,24 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|
|
.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 1.0f);
|
|
|
}
|
|
|
|
|
|
+ private void setupQueueConfigurationChildOfB(CapacitySchedulerConfiguration conf) {
|
|
|
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { R });
|
|
|
+ final String Q_R = CapacitySchedulerConfiguration.ROOT + "." + R;
|
|
|
+ conf.setCapacity(Q_R, 100);
|
|
|
+ final String Q_A = Q_R + "." + A;
|
|
|
+ final String Q_B = Q_R + "." + B;
|
|
|
+ final String Q_B1 = Q_B + "." + B1;
|
|
|
+ final String Q_B2 = Q_B + "." + B2;
|
|
|
+ conf.setQueues(Q_R, new String[] {A, B});
|
|
|
+ conf.setCapacity(Q_A, 50);
|
|
|
+ conf.setCapacity(Q_B, 50);
|
|
|
+ conf.setQueues(Q_B, new String[] {B1, B2});
|
|
|
+ conf.setCapacity(Q_B1, 50);
|
|
|
+ conf.setCapacity(Q_B2, 50);
|
|
|
+ conf.setDouble(CapacitySchedulerConfiguration
|
|
|
+ .MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.5f);
|
|
|
+ }
|
|
|
+
|
|
|
// Test CS recovery with multi-level queues and multi-users:
|
|
|
// 1. setup 2 NMs each with 8GB memory;
|
|
|
// 2. setup 2 level queues: Default -> (QueueA, QueueB)
|
|
@@ -513,18 +542,106 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|
|
totalAvailableResource.getVirtualCores(), totalUsedResource.getMemory(),
|
|
|
totalUsedResource.getVirtualCores());
|
|
|
}
|
|
|
-
|
|
|
- //Test that we receive a meaningful exit-causing exception if a queue
|
|
|
- //is removed during recovery
|
|
|
+
|
|
|
+ private void verifyAppRecoveryWithWrongQueueConfig(
|
|
|
+ CapacitySchedulerConfiguration csConf, RMApp app, String diagnostics,
|
|
|
+ MemoryRMStateStore memStore, RMState state) throws Exception {
|
|
|
+ // Restart RM with fail-fast as false. App should be killed.
|
|
|
+ csConf.setBoolean(YarnConfiguration.RM_FAIL_FAST, false);
|
|
|
+ rm2 = new MockRM(csConf, memStore);
|
|
|
+ rm2.start();
|
|
|
+ // Wait for app to be killed.
|
|
|
+ rm2.waitForState(app.getApplicationId(), RMAppState.KILLED);
|
|
|
+ ApplicationReport report = rm2.getApplicationReport(app.getApplicationId());
|
|
|
+ assertEquals(report.getFinalApplicationStatus(),
|
|
|
+ FinalApplicationStatus.KILLED);
|
|
|
+ assertEquals(report.getYarnApplicationState(), YarnApplicationState.KILLED);
|
|
|
+ assertEquals(report.getDiagnostics(), diagnostics);
|
|
|
+
|
|
|
+ // Remove updated app info(app being KILLED) from state store and reinstate
|
|
|
+ // state store to previous state i.e. which indicates app is RUNNING.
|
|
|
+ // This is to simulate app recovery with fail fast config as true.
|
|
|
+ for(Map.Entry<ApplicationId, ApplicationStateData> entry :
|
|
|
+ state.getApplicationState().entrySet()) {
|
|
|
+ ApplicationStateData appState = mock(ApplicationStateData.class);
|
|
|
+ ApplicationSubmissionContext ctxt =
|
|
|
+ mock(ApplicationSubmissionContext.class);
|
|
|
+ when(appState.getApplicationSubmissionContext()).thenReturn(ctxt);
|
|
|
+ when(ctxt.getApplicationId()).thenReturn(entry.getKey());
|
|
|
+ memStore.removeApplicationStateInternal(appState);
|
|
|
+ memStore.storeApplicationStateInternal(
|
|
|
+ entry.getKey(), entry.getValue());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Now restart RM with fail-fast as true. QueueException should be thrown.
|
|
|
+ csConf.setBoolean(YarnConfiguration.RM_FAIL_FAST, true);
|
|
|
+ MockRM rm = new MockRM(csConf, memStore);
|
|
|
+ try {
|
|
|
+ rm.start();
|
|
|
+ Assert.fail("QueueException must have been thrown");
|
|
|
+ } catch (QueueInvalidException e) {
|
|
|
+ } finally {
|
|
|
+ rm.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //Test behavior of an app if queue is changed from leaf to parent during
|
|
|
+ //recovery. Test case does following:
|
|
|
+ //1. Add an app to QueueB and start the attempt.
|
|
|
+ //2. Add 2 subqueues(QueueB1 and QueueB2) to QueueB, restart the RM, once with
|
|
|
+ // fail fast config as false and once with fail fast as true.
|
|
|
+ //3. Verify that app was killed if fail fast is false.
|
|
|
+ //4. Verify that QueueException was thrown if fail fast is true.
|
|
|
+ @Test (timeout = 30000)
|
|
|
+ public void testCapacityLeafQueueBecomesParentOnRecovery() throws Exception {
|
|
|
+ if (getSchedulerType() != SchedulerType.CAPACITY) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
|
|
|
+ conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
|
|
|
+ DominantResourceCalculator.class.getName());
|
|
|
+ CapacitySchedulerConfiguration csConf =
|
|
|
+ new CapacitySchedulerConfiguration(conf);
|
|
|
+ setupQueueConfiguration(csConf);
|
|
|
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
+ memStore.init(csConf);
|
|
|
+ rm1 = new MockRM(csConf, memStore);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm =
|
|
|
+ new MockNM("127.1.1.1:4321", 8192, rm1.getResourceTrackerService());
|
|
|
+ nm.registerNode();
|
|
|
+
|
|
|
+ // Submit an app to QueueB.
|
|
|
+ RMApp app = rm1.submitApp(1024, "app", USER_2, null, B);
|
|
|
+ MockRM.launchAndRegisterAM(app, rm1, nm);
|
|
|
+ assertEquals(rm1.getApplicationReport(app.getApplicationId()).
|
|
|
+ getYarnApplicationState(), YarnApplicationState.RUNNING);
|
|
|
+
|
|
|
+ // Take a copy of state store so that it can be reset to this state.
|
|
|
+ RMState state = memStore.loadState();
|
|
|
+
|
|
|
+ // Change scheduler config with child queues added to QueueB.
|
|
|
+ csConf = new CapacitySchedulerConfiguration(conf);
|
|
|
+ setupQueueConfigurationChildOfB(csConf);
|
|
|
+
|
|
|
+ String diags = "Application killed on recovery as it was submitted to " +
|
|
|
+ "queue QueueB which is no longer a leaf queue after restart.";
|
|
|
+ verifyAppRecoveryWithWrongQueueConfig(csConf, app, diags, memStore, state);
|
|
|
+ }
|
|
|
+
|
|
|
+ //Test behavior of an app if queue is removed during recovery. Test case does
|
|
|
+ //following:
|
|
|
//1. Add some apps to two queues, attempt to add an app to a non-existant
|
|
|
// queue to verify that the new logic is not in effect during normal app
|
|
|
// submission
|
|
|
- //2. Remove one of the queues, restart the RM
|
|
|
- //3. Verify that the expected exception was thrown
|
|
|
- @Test (timeout = 30000, expected = QueueNotFoundException.class)
|
|
|
+ //2. Remove one of the queues, restart the RM, once with fail fast config as
|
|
|
+ // false and once with fail fast as true.
|
|
|
+ //3. Verify that app was killed if fail fast is false.
|
|
|
+ //4. Verify that QueueException was thrown if fail fast is true.
|
|
|
+ @Test (timeout = 30000)
|
|
|
public void testCapacitySchedulerQueueRemovedRecovery() throws Exception {
|
|
|
if (getSchedulerType() != SchedulerType.CAPACITY) {
|
|
|
- throw new QueueNotFoundException("Dummy");
|
|
|
+ return;
|
|
|
}
|
|
|
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
|
|
|
conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
|
|
@@ -549,7 +666,9 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|
|
|
|
|
RMApp app2 = rm1.submitApp(1024, "app2", USER_2, null, B);
|
|
|
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
|
|
|
-
|
|
|
+ assertEquals(rm1.getApplicationReport(app2.getApplicationId()).
|
|
|
+ getYarnApplicationState(), YarnApplicationState.RUNNING);
|
|
|
+
|
|
|
//Submit an app with a non existant queue to make sure it does not
|
|
|
//cause a fatal failure in the non-recovery case
|
|
|
RMApp appNA = rm1.submitApp(1024, "app1_2", USER_1, null,
|
|
@@ -560,12 +679,16 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|
|
rm1.clearQueueMetrics(app1_2);
|
|
|
rm1.clearQueueMetrics(app2);
|
|
|
|
|
|
- // Re-start RM
|
|
|
- csConf =
|
|
|
- new CapacitySchedulerConfiguration(conf);
|
|
|
+ // Take a copy of state store so that it can be reset to this state.
|
|
|
+ RMState state = memStore.loadState();
|
|
|
+
|
|
|
+ // Set new configuration with QueueB removed.
|
|
|
+ csConf = new CapacitySchedulerConfiguration(conf);
|
|
|
setupQueueConfigurationOnlyA(csConf);
|
|
|
- rm2 = new MockRM(csConf, memStore);
|
|
|
- rm2.start();
|
|
|
+
|
|
|
+ String diags = "Application killed on recovery as it was submitted to " +
|
|
|
+ "queue QueueB which no longer exists after restart.";
|
|
|
+ verifyAppRecoveryWithWrongQueueConfig(csConf, app2, diags, memStore, state);
|
|
|
}
|
|
|
|
|
|
private void checkParentQueue(ParentQueue parentQueue, int numContainers,
|