|
@@ -89,7 +89,6 @@ import java.io.IOException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
-import java.util.Comparator;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.List;
|
|
@@ -143,52 +142,12 @@ public class TestAppManager extends AppManagerTestBase{
|
|
|
return list;
|
|
|
}
|
|
|
|
|
|
- private static List<RMApp> newRMAppsMixedLogAggregationStatus(int n,
|
|
|
- long time, RMAppState state) {
|
|
|
- List<RMApp> list = Lists.newArrayList();
|
|
|
- for (int i = 0; i < n; ++i) {
|
|
|
- MockRMApp rmApp = new MockRMApp(i, time, state);
|
|
|
- rmApp.setLogAggregationEnabled(true);
|
|
|
- rmApp.setLogAggregationFinished(i % 2 == 0);
|
|
|
- list.add(rmApp);
|
|
|
- }
|
|
|
- return list;
|
|
|
- }
|
|
|
-
|
|
|
public RMContext mockRMContext(int n, long time) {
|
|
|
- final ConcurrentMap<ApplicationId, RMApp> map = createRMAppsMap(n, time);
|
|
|
- return createMockRMContextInternal(map);
|
|
|
- }
|
|
|
-
|
|
|
- public RMContext mockRMContextWithMixedLogAggregationStatus(int n,
|
|
|
- long time) {
|
|
|
- final ConcurrentMap<ApplicationId, RMApp> map =
|
|
|
- createRMAppsMapMixedLogAggStatus(n, time);
|
|
|
- return createMockRMContextInternal(map);
|
|
|
- }
|
|
|
-
|
|
|
- private ConcurrentMap<ApplicationId, RMApp> createRMAppsMap(int n,
|
|
|
- long time) {
|
|
|
final List<RMApp> apps = newRMApps(n, time, RMAppState.FINISHED);
|
|
|
final ConcurrentMap<ApplicationId, RMApp> map = Maps.newConcurrentMap();
|
|
|
for (RMApp app : apps) {
|
|
|
map.put(app.getApplicationId(), app);
|
|
|
}
|
|
|
- return map;
|
|
|
- }
|
|
|
-
|
|
|
- private ConcurrentMap<ApplicationId, RMApp> createRMAppsMapMixedLogAggStatus(
|
|
|
- int n, long time) {
|
|
|
- final List<RMApp> apps =
|
|
|
- newRMAppsMixedLogAggregationStatus(n, time, RMAppState.FINISHED);
|
|
|
- final ConcurrentMap<ApplicationId, RMApp> map = Maps.newConcurrentMap();
|
|
|
- for (RMApp app : apps) {
|
|
|
- map.put(app.getApplicationId(), app);
|
|
|
- }
|
|
|
- return map;
|
|
|
- }
|
|
|
-
|
|
|
- private RMContext createMockRMContextInternal(ConcurrentMap<ApplicationId, RMApp> map) {
|
|
|
Dispatcher rmDispatcher = new AsyncDispatcher();
|
|
|
ContainerAllocationExpirer containerAllocationExpirer = new ContainerAllocationExpirer(
|
|
|
rmDispatcher);
|
|
@@ -240,12 +199,8 @@ public class TestAppManager extends AppManagerTestBase{
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void addToCompletedApps(TestRMAppManager appMonitor,
|
|
|
- RMContext rmContext) {
|
|
|
- // ensure applications are finished in order by their IDs
|
|
|
- List<RMApp> sortedApps = new ArrayList<>(rmContext.getRMApps().values());
|
|
|
- sortedApps.sort(Comparator.comparingInt(o -> o.getApplicationId().getId()));
|
|
|
- for (RMApp app : sortedApps) {
|
|
|
+ protected void addToCompletedApps(TestRMAppManager appMonitor, RMContext rmContext) {
|
|
|
+ for (RMApp app : rmContext.getRMApps().values()) {
|
|
|
if (app.getState() == RMAppState.FINISHED
|
|
|
|| app.getState() == RMAppState.KILLED
|
|
|
|| app.getState() == RMAppState.FAILED) {
|
|
@@ -654,32 +609,18 @@ public class TestAppManager extends AppManagerTestBase{
|
|
|
addToCompletedApps(appMonitor, rmContext);
|
|
|
Assert.assertEquals("Number of completed apps incorrect", allApps,
|
|
|
appMonitor.getCompletedAppsListSize());
|
|
|
-
|
|
|
- int numRemoveAppsFromStateStore = allApps - maxAppsInStateStore;
|
|
|
- Set<ApplicationId> appsShouldBeRemovedFromStateStore = appMonitor
|
|
|
- .getFirstNCompletedApps(numRemoveAppsFromStateStore);
|
|
|
appMonitor.checkAppNumCompletedLimit();
|
|
|
|
|
|
- Set<ApplicationId> removedAppsFromStateStore = appMonitor
|
|
|
- .getRemovedAppsFromStateStore(numRemoveAppsFromStateStore);
|
|
|
-
|
|
|
Assert.assertEquals("Number of apps incorrect after # completed check",
|
|
|
maxAppsInMemory, rmContext.getRMApps().size());
|
|
|
Assert.assertEquals("Number of completed apps incorrect after check",
|
|
|
maxAppsInMemory, appMonitor.getCompletedAppsListSize());
|
|
|
|
|
|
+ int numRemoveAppsFromStateStore = 10 - maxAppsInStateStore;
|
|
|
verify(rmContext.getStateStore(), times(numRemoveAppsFromStateStore))
|
|
|
.removeApplication(isA(RMApp.class));
|
|
|
Assert.assertEquals(maxAppsInStateStore,
|
|
|
appMonitor.getNumberOfCompletedAppsInStateStore());
|
|
|
-
|
|
|
- List<ApplicationId> completedApps = appMonitor.getCompletedApps();
|
|
|
- Assert.assertEquals(maxAppsInMemory, completedApps.size());
|
|
|
- Assert.assertEquals(numRemoveAppsFromStateStore,
|
|
|
- removedAppsFromStateStore.size());
|
|
|
- Assert.assertEquals(numRemoveAppsFromStateStore,
|
|
|
- Sets.intersection(appsShouldBeRemovedFromStateStore,
|
|
|
- removedAppsFromStateStore).size());
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -697,12 +638,9 @@ public class TestAppManager extends AppManagerTestBase{
|
|
|
addToCompletedApps(appMonitor, rmContext);
|
|
|
Assert.assertEquals("Number of completed apps incorrect", allApps,
|
|
|
appMonitor.getCompletedAppsListSize());
|
|
|
-
|
|
|
- int numRemoveApps = allApps - maxAppsInMemory;
|
|
|
- Set<ApplicationId> appsShouldBeRemoved = appMonitor
|
|
|
- .getFirstNCompletedApps(numRemoveApps);
|
|
|
appMonitor.checkAppNumCompletedLimit();
|
|
|
|
|
|
+ int numRemoveApps = allApps - maxAppsInMemory;
|
|
|
Assert.assertEquals("Number of apps incorrect after # completed check",
|
|
|
maxAppsInMemory, rmContext.getRMApps().size());
|
|
|
Assert.assertEquals("Number of completed apps incorrect after check",
|
|
@@ -711,56 +649,6 @@ public class TestAppManager extends AppManagerTestBase{
|
|
|
isA(RMApp.class));
|
|
|
Assert.assertEquals(maxAppsInMemory,
|
|
|
appMonitor.getNumberOfCompletedAppsInStateStore());
|
|
|
-
|
|
|
- List<ApplicationId> completedApps = appMonitor.getCompletedApps();
|
|
|
- Assert.assertEquals(maxAppsInMemory, completedApps.size());
|
|
|
- Assert.assertEquals(numRemoveApps, appsShouldBeRemoved.size());
|
|
|
- assertTrue(Collections.disjoint(completedApps, appsShouldBeRemoved));
|
|
|
- }
|
|
|
-
|
|
|
- @Test
|
|
|
- public void testStateStoreAppLimitSomeAppsHaveNotFinishedLogAggregation() {
|
|
|
- long now = System.currentTimeMillis();
|
|
|
- final int allApps = 10;
|
|
|
- RMContext rmContext =
|
|
|
- mockRMContextWithMixedLogAggregationStatus(allApps, now - 20000);
|
|
|
- Configuration conf = new YarnConfiguration();
|
|
|
- int maxAppsInMemory = 2;
|
|
|
- conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS,
|
|
|
- maxAppsInMemory);
|
|
|
- // greater than maxCompletedAppsInMemory, reset to
|
|
|
- // RM_MAX_COMPLETED_APPLICATIONS.
|
|
|
- conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS,
|
|
|
- 1000);
|
|
|
- TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
|
|
|
-
|
|
|
- addToCompletedApps(appMonitor, rmContext);
|
|
|
- Assert.assertEquals("Number of completed apps incorrect", allApps,
|
|
|
- appMonitor.getCompletedAppsListSize());
|
|
|
-
|
|
|
- int numRemoveApps = allApps - maxAppsInMemory;
|
|
|
- int effectiveNumRemoveApps = numRemoveApps / 2;
|
|
|
- //only apps with even ID would be deleted due to log aggregation status
|
|
|
- int expectedNumberOfAppsInMemory = maxAppsInMemory + effectiveNumRemoveApps;
|
|
|
-
|
|
|
- Set<ApplicationId> appsShouldBeRemoved = appMonitor
|
|
|
- .getCompletedAppsWithEvenIdsInRange(numRemoveApps);
|
|
|
- appMonitor.checkAppNumCompletedLimit();
|
|
|
-
|
|
|
- Assert.assertEquals("Number of apps incorrect after # completed check",
|
|
|
- expectedNumberOfAppsInMemory, rmContext.getRMApps().size());
|
|
|
- Assert.assertEquals("Number of completed apps incorrect after check",
|
|
|
- expectedNumberOfAppsInMemory, appMonitor.getCompletedAppsListSize());
|
|
|
- verify(rmContext.getStateStore(), times(effectiveNumRemoveApps))
|
|
|
- .removeApplication(isA(RMApp.class));
|
|
|
- Assert.assertEquals(expectedNumberOfAppsInMemory,
|
|
|
- appMonitor.getNumberOfCompletedAppsInStateStore());
|
|
|
-
|
|
|
- List<ApplicationId> completedApps = appMonitor.getCompletedApps();
|
|
|
-
|
|
|
- Assert.assertEquals(expectedNumberOfAppsInMemory, completedApps.size());
|
|
|
- Assert.assertEquals(effectiveNumRemoveApps, appsShouldBeRemoved.size());
|
|
|
- assertTrue(Collections.disjoint(completedApps, appsShouldBeRemoved));
|
|
|
}
|
|
|
|
|
|
protected void setupDispatcher(RMContext rmContext, Configuration conf) {
|