|
@@ -18,15 +18,15 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
|
|
|
|
|
+import static org.mockito.Matchers.any;
|
|
|
+import static org.mockito.Matchers.anyLong;
|
|
|
import static org.mockito.Matchers.isA;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.spy;
|
|
|
import static org.mockito.Mockito.timeout;
|
|
|
-import static org.mockito.Mockito.times;
|
|
|
import static org.mockito.Mockito.verify;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
|
-import com.google.common.base.Supplier;
|
|
|
import java.io.File;
|
|
|
import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
@@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
|
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
|
@@ -113,7 +114,9 @@ import org.junit.After;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
+import org.mockito.Mockito;
|
|
|
|
|
|
+import com.google.common.base.Supplier;
|
|
|
import com.google.common.collect.ImmutableMap;
|
|
|
import com.google.common.collect.Sets;
|
|
|
|
|
@@ -896,7 +899,13 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
memStore.init(conf);
|
|
|
|
|
|
// start RM
|
|
|
- MockRM rm1 = createMockRM(conf, memStore);
|
|
|
+ MockRM rm1 = new MockRM(conf, memStore) {
|
|
|
+ @Override
|
|
|
+ protected SystemMetricsPublisher createSystemMetricsPublisher() {
|
|
|
+ return spy(super.createSystemMetricsPublisher());
|
|
|
+ }
|
|
|
+ };
|
|
|
+ rms.add(rm1);
|
|
|
rm1.start();
|
|
|
MockNM nm1 =
|
|
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
|
@@ -925,6 +934,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
rm1.waitForState(app2.getApplicationId(), RMAppState.KILLED);
|
|
|
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.KILLED);
|
|
|
|
|
|
+ verify(rm1.getRMContext().getSystemMetricsPublisher(),Mockito.times(3))
|
|
|
+ .appCreated(any(RMApp.class), anyLong());
|
|
|
// restart rm
|
|
|
|
|
|
MockRM rm2 = new MockRM(conf, memStore) {
|
|
@@ -932,10 +943,18 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
protected RMAppManager createRMAppManager() {
|
|
|
return spy(super.createRMAppManager());
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected SystemMetricsPublisher createSystemMetricsPublisher() {
|
|
|
+ return spy(super.createSystemMetricsPublisher());
|
|
|
+ }
|
|
|
};
|
|
|
rms.add(rm2);
|
|
|
rm2.start();
|
|
|
|
|
|
+ verify(rm2.getRMContext().getSystemMetricsPublisher(),Mockito.times(3))
|
|
|
+ .appCreated(any(RMApp.class), anyLong());
|
|
|
+
|
|
|
GetApplicationsRequest request1 =
|
|
|
GetApplicationsRequest.newInstance(EnumSet.of(
|
|
|
YarnApplicationState.FINISHED, YarnApplicationState.KILLED,
|