|
@@ -31,34 +31,47 @@ import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
+import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
|
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
|
|
import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
|
|
+import org.apache.hadoop.yarn.event.DrainDispatcher;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
+import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
|
|
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Test;
|
|
|
+import org.junit.After;
|
|
|
+import org.junit.Before;
|
|
|
|
|
|
public class TestNMTimelinePublisher {
|
|
|
private static final String MEMORY_ID = "MEMORY";
|
|
|
private static final String CPU_ID = "CPU";
|
|
|
|
|
|
- @Test
|
|
|
- public void testContainerResourceUsage() {
|
|
|
- Context context = mock(Context.class);
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- final DummyTimelineClient timelineClient = new DummyTimelineClient(null);
|
|
|
- when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
|
|
|
+ private NMTimelinePublisher publisher;
|
|
|
+ private DummyTimelineClient timelineClient;
|
|
|
+ private Configuration conf;
|
|
|
+ private DrainDispatcher dispatcher;
|
|
|
|
|
|
- Configuration conf = new Configuration();
|
|
|
+
|
|
|
+ @Before public void setup() throws Exception {
|
|
|
+ conf = new Configuration();
|
|
|
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
|
|
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
|
|
|
+ conf.setLong(YarnConfiguration.ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS,
|
|
|
+ 3000L);
|
|
|
+ timelineClient = new DummyTimelineClient(null);
|
|
|
+ Context context = createMockContext();
|
|
|
+ dispatcher = new DrainDispatcher();
|
|
|
|
|
|
- NMTimelinePublisher publisher = new NMTimelinePublisher(context) {
|
|
|
+ publisher = new NMTimelinePublisher(context) {
|
|
|
public void createTimelineClient(ApplicationId appId) {
|
|
|
if (!getAppToClientMap().containsKey(appId)) {
|
|
|
timelineClient.init(getConfig());
|
|
@@ -66,15 +79,73 @@ public class TestNMTimelinePublisher {
|
|
|
getAppToClientMap().put(appId, timelineClient);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ @Override protected AsyncDispatcher createDispatcher() {
|
|
|
+ return dispatcher;
|
|
|
+ }
|
|
|
};
|
|
|
publisher.init(conf);
|
|
|
publisher.start();
|
|
|
+ }
|
|
|
+
|
|
|
+ private Context createMockContext() {
|
|
|
+ Context context = mock(Context.class);
|
|
|
+ when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
|
|
|
+ return context;
|
|
|
+ }
|
|
|
+
|
|
|
+ @After public void tearDown() throws Exception {
|
|
|
+ if (publisher != null) {
|
|
|
+ publisher.stop();
|
|
|
+ }
|
|
|
+ if (timelineClient != null) {
|
|
|
+ timelineClient.stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test public void testPublishContainerFinish() throws Exception {
|
|
|
+ ApplicationId appId = ApplicationId.newInstance(0, 2);
|
|
|
+ ApplicationAttemptId appAttemptId =
|
|
|
+ ApplicationAttemptId.newInstance(appId, 1);
|
|
|
+ ContainerId cId = ContainerId.newContainerId(appAttemptId, 1);
|
|
|
+
|
|
|
+ String diag = "test-diagnostics";
|
|
|
+ int exitStatus = 0;
|
|
|
+ ContainerStatus cStatus = mock(ContainerStatus.class);
|
|
|
+ when(cStatus.getContainerId()).thenReturn(cId);
|
|
|
+ when(cStatus.getDiagnostics()).thenReturn(diag);
|
|
|
+ when(cStatus.getExitStatus()).thenReturn(exitStatus);
|
|
|
+ long timeStamp = System.currentTimeMillis();
|
|
|
+
|
|
|
+ ApplicationContainerFinishedEvent finishedEvent =
|
|
|
+ new ApplicationContainerFinishedEvent(cStatus, timeStamp);
|
|
|
+
|
|
|
+ publisher.createTimelineClient(appId);
|
|
|
+ publisher.publishApplicationEvent(finishedEvent);
|
|
|
+ publisher.stopTimelineClient(appId);
|
|
|
+ dispatcher.await();
|
|
|
+
|
|
|
+ ContainerEntity cEntity = new ContainerEntity();
|
|
|
+ cEntity.setId(cId.toString());
|
|
|
+ TimelineEntity[] lastPublishedEntities =
|
|
|
+ timelineClient.getLastPublishedEntities();
|
|
|
+
|
|
|
+ Assert.assertNotNull(lastPublishedEntities);
|
|
|
+ Assert.assertEquals(1, lastPublishedEntities.length);
|
|
|
+ TimelineEntity entity = lastPublishedEntities[0];
|
|
|
+ Assert.assertTrue(cEntity.equals(entity));
|
|
|
+ Assert.assertEquals(diag,
|
|
|
+ entity.getInfo().get(ContainerMetricsConstants.DIAGNOSTICS_INFO));
|
|
|
+ Assert.assertEquals(exitStatus,
|
|
|
+ entity.getInfo().get(ContainerMetricsConstants.EXIT_STATUS_INFO));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test public void testContainerResourceUsage() {
|
|
|
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
|
|
publisher.createTimelineClient(appId);
|
|
|
Container aContainer = mock(Container.class);
|
|
|
- when(aContainer.getContainerId()).thenReturn(ContainerId.newContainerId(
|
|
|
- ApplicationAttemptId.newInstance(appId, 1),
|
|
|
- 0L));
|
|
|
+ when(aContainer.getContainerId()).thenReturn(ContainerId
|
|
|
+ .newContainerId(ApplicationAttemptId.newInstance(appId, 1), 0L));
|
|
|
publisher.reportContainerResourceUsage(aContainer, 1024L, 8F);
|
|
|
verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 8);
|
|
|
timelineClient.reset();
|
|
@@ -91,7 +162,6 @@ public class TestNMTimelinePublisher {
|
|
|
(float) ResourceCalculatorProcessTree.UNAVAILABLE);
|
|
|
verifyPublishedResourceUsageMetrics(timelineClient, 1024L,
|
|
|
ResourceCalculatorProcessTree.UNAVAILABLE);
|
|
|
- publisher.stop();
|
|
|
}
|
|
|
|
|
|
private void verifyPublishedResourceUsageMetrics(
|
|
@@ -151,8 +221,12 @@ public class TestNMTimelinePublisher {
|
|
|
|
|
|
private TimelineEntity[] lastPublishedEntities;
|
|
|
|
|
|
- @Override
|
|
|
- public void putEntitiesAsync(TimelineEntity... entities)
|
|
|
+ @Override public void putEntitiesAsync(TimelineEntity... entities)
|
|
|
+ throws IOException, YarnException {
|
|
|
+ this.lastPublishedEntities = entities;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override public void putEntities(TimelineEntity... entities)
|
|
|
throws IOException, YarnException {
|
|
|
this.lastPublishedEntities = entities;
|
|
|
}
|