|
@@ -19,9 +19,9 @@
|
|
|
package org.apache.hadoop.mapreduce.jobhistory;
|
|
|
|
|
|
import static org.assertj.core.api.Assertions.assertThat;
|
|
|
-import static org.junit.jupiter.api.Assertions.assertEquals;
|
|
|
-import static org.junit.jupiter.api.Assertions.assertFalse;
|
|
|
-import static org.junit.jupiter.api.Assertions.assertTrue;
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
+import static org.junit.Assert.assertFalse;
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
import static org.mockito.ArgumentMatchers.any;
|
|
|
import static org.mockito.Mockito.doNothing;
|
|
|
import static org.mockito.Mockito.doReturn;
|
|
@@ -81,12 +81,11 @@ import org.apache.hadoop.yarn.event.DrainDispatcher;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
|
|
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
|
|
-import org.junit.jupiter.api.AfterEach;
|
|
|
-import org.junit.jupiter.api.AfterAll;
|
|
|
-import org.junit.jupiter.api.Assertions;
|
|
|
-import org.junit.jupiter.api.BeforeAll;
|
|
|
-import org.junit.jupiter.api.Test;
|
|
|
-import org.junit.jupiter.api.Timeout;
|
|
|
+import org.junit.After;
|
|
|
+import org.junit.AfterClass;
|
|
|
+import org.junit.Assert;
|
|
|
+import org.junit.BeforeClass;
|
|
|
+import org.junit.Test;
|
|
|
import org.mockito.Mockito;
|
|
|
|
|
|
import com.fasterxml.jackson.databind.JsonNode;
|
|
@@ -102,7 +101,7 @@ public class TestJobHistoryEventHandler {
|
|
|
private static MiniDFSCluster dfsCluster = null;
|
|
|
private static String coreSitePath;
|
|
|
|
|
|
- @BeforeAll
|
|
|
+ @BeforeClass
|
|
|
public static void setUpClass() throws Exception {
|
|
|
coreSitePath = "." + File.separator + "target" + File.separator +
|
|
|
"test-classes" + File.separator + "core-site.xml";
|
|
@@ -110,18 +109,17 @@ public class TestJobHistoryEventHandler {
|
|
|
dfsCluster = new MiniDFSCluster.Builder(conf).build();
|
|
|
}
|
|
|
|
|
|
- @AfterAll
|
|
|
+ @AfterClass
|
|
|
public static void cleanUpClass() throws Exception {
|
|
|
dfsCluster.shutdown();
|
|
|
}
|
|
|
|
|
|
- @AfterEach
|
|
|
+ @After
|
|
|
public void cleanTest() throws Exception {
|
|
|
new File(coreSitePath).delete();
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- @Timeout(50000)
|
|
|
+ @Test (timeout=50000)
|
|
|
public void testFirstFlushOnCompletionEvent() throws Exception {
|
|
|
TestParams t = new TestParams();
|
|
|
Configuration conf = new Configuration();
|
|
@@ -164,8 +162,7 @@ public class TestJobHistoryEventHandler {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- @Timeout(50000)
|
|
|
+ @Test (timeout=50000)
|
|
|
public void testMaxUnflushedCompletionEvents() throws Exception {
|
|
|
TestParams t = new TestParams();
|
|
|
Configuration conf = new Configuration();
|
|
@@ -210,8 +207,7 @@ public class TestJobHistoryEventHandler {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- @Timeout(50000)
|
|
|
+ @Test (timeout=50000)
|
|
|
public void testUnflushedTimer() throws Exception {
|
|
|
TestParams t = new TestParams();
|
|
|
Configuration conf = new Configuration();
|
|
@@ -236,26 +232,25 @@ public class TestJobHistoryEventHandler {
|
|
|
mockWriter = jheh.getEventWriter();
|
|
|
verify(mockWriter).write(any(HistoryEvent.class));
|
|
|
|
|
|
- for (int i = 0; i < 100; i++) {
|
|
|
+ for (int i = 0 ; i < 100 ; i++) {
|
|
|
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
|
|
|
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
|
|
|
}
|
|
|
|
|
|
handleNextNEvents(jheh, 9);
|
|
|
- Assertions.assertTrue(jheh.getFlushTimerStatus());
|
|
|
+ Assert.assertTrue(jheh.getFlushTimerStatus());
|
|
|
verify(mockWriter, times(0)).flush();
|
|
|
|
|
|
Thread.sleep(2 * 4 * 1000l); // 4 seconds should be enough. Just be safe.
|
|
|
verify(mockWriter).flush();
|
|
|
- Assertions.assertFalse(jheh.getFlushTimerStatus());
|
|
|
+ Assert.assertFalse(jheh.getFlushTimerStatus());
|
|
|
} finally {
|
|
|
jheh.stop();
|
|
|
verify(mockWriter).close();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- @Timeout(50000)
|
|
|
+ @Test (timeout=50000)
|
|
|
public void testBatchedFlushJobEndMultiplier() throws Exception {
|
|
|
TestParams t = new TestParams();
|
|
|
Configuration conf = new Configuration();
|
|
@@ -300,8 +295,7 @@ public class TestJobHistoryEventHandler {
|
|
|
}
|
|
|
|
|
|
// In case of all types of events, process Done files if it's last AM retry
|
|
|
- @Test
|
|
|
- @Timeout(50000)
|
|
|
+ @Test (timeout=50000)
|
|
|
public void testProcessDoneFilesOnLastAMRetry() throws Exception {
|
|
|
TestParams t = new TestParams(true);
|
|
|
Configuration conf = new Configuration();
|
|
@@ -315,12 +309,12 @@ public class TestJobHistoryEventHandler {
|
|
|
try {
|
|
|
jheh.start();
|
|
|
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
|
|
|
- t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
|
|
|
+ t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
|
|
|
verify(jheh, times(0)).processDoneFiles(any(JobId.class));
|
|
|
|
|
|
handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
|
|
- new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
|
|
|
- 0, 0, 0, 0, 0, 0, JobStateInternal.ERROR.toString())));
|
|
|
+ new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
|
|
|
+ 0, 0, 0, 0, 0, 0, JobStateInternal.ERROR.toString())));
|
|
|
verify(jheh, times(1)).processDoneFiles(any(JobId.class));
|
|
|
|
|
|
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
|
|
@@ -329,13 +323,13 @@ public class TestJobHistoryEventHandler {
|
|
|
verify(jheh, times(2)).processDoneFiles(any(JobId.class));
|
|
|
|
|
|
handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
|
|
- new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
|
|
|
- 0, 0, 0, 0, 0, 0, JobStateInternal.FAILED.toString())));
|
|
|
+ new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
|
|
|
+ 0, 0, 0, 0, 0, 0, JobStateInternal.FAILED.toString())));
|
|
|
verify(jheh, times(3)).processDoneFiles(any(JobId.class));
|
|
|
|
|
|
handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
|
|
- new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
|
|
|
- 0, 0, 0, 0, 0, 0, JobStateInternal.KILLED.toString())));
|
|
|
+ new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
|
|
|
+ 0, 0, 0, 0, 0, 0, JobStateInternal.KILLED.toString())));
|
|
|
verify(jheh, times(4)).processDoneFiles(any(JobId.class));
|
|
|
|
|
|
mockWriter = jheh.getEventWriter();
|
|
@@ -347,8 +341,7 @@ public class TestJobHistoryEventHandler {
|
|
|
}
|
|
|
|
|
|
// Skip processing Done files in case of ERROR, if it's not last AM retry
|
|
|
- @Test
|
|
|
- @Timeout(50000)
|
|
|
+ @Test (timeout=50000)
|
|
|
public void testProcessDoneFilesNotLastAMRetry() throws Exception {
|
|
|
TestParams t = new TestParams(false);
|
|
|
Configuration conf = new Configuration();
|
|
@@ -361,13 +354,13 @@ public class TestJobHistoryEventHandler {
|
|
|
try {
|
|
|
jheh.start();
|
|
|
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
|
|
|
- t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
|
|
|
+ t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
|
|
|
verify(jheh, times(0)).processDoneFiles(t.jobId);
|
|
|
|
|
|
// skip processing done files
|
|
|
handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
|
|
- new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
|
|
|
- 0, 0, 0, 0, 0, 0, JobStateInternal.ERROR.toString())));
|
|
|
+ new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
|
|
|
+ 0, 0, 0, 0, 0, 0, JobStateInternal.ERROR.toString())));
|
|
|
verify(jheh, times(0)).processDoneFiles(t.jobId);
|
|
|
|
|
|
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
|
|
@@ -376,13 +369,13 @@ public class TestJobHistoryEventHandler {
|
|
|
verify(jheh, times(1)).processDoneFiles(t.jobId);
|
|
|
|
|
|
handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
|
|
- new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
|
|
|
- 0, 0, 0, 0, 0, 0, JobStateInternal.FAILED.toString())));
|
|
|
+ new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
|
|
|
+ 0, 0, 0, 0, 0, 0, JobStateInternal.FAILED.toString())));
|
|
|
verify(jheh, times(2)).processDoneFiles(t.jobId);
|
|
|
|
|
|
handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
|
|
- new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
|
|
|
- 0, 0, 0, 0, 0, 0, JobStateInternal.KILLED.toString())));
|
|
|
+ new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
|
|
|
+ 0, 0, 0, 0, 0, 0, JobStateInternal.KILLED.toString())));
|
|
|
verify(jheh, times(3)).processDoneFiles(t.jobId);
|
|
|
|
|
|
mockWriter = jheh.getEventWriter();
|
|
@@ -428,15 +421,16 @@ public class TestJobHistoryEventHandler {
|
|
|
|
|
|
// load the job_conf.xml in JHS directory and verify property redaction.
|
|
|
Path jhsJobConfFile = getJobConfInIntermediateDoneDir(conf, params.jobId);
|
|
|
- Assertions.assertTrue(FileContext.getFileContext(conf).util().exists(jhsJobConfFile),
|
|
|
- "The job_conf.xml file is not in the JHS directory");
|
|
|
+ Assert.assertTrue("The job_conf.xml file is not in the JHS directory",
|
|
|
+ FileContext.getFileContext(conf).util().exists(jhsJobConfFile));
|
|
|
Configuration jhsJobConf = new Configuration();
|
|
|
|
|
|
try (InputStream input = FileSystem.get(conf).open(jhsJobConfFile)) {
|
|
|
jhsJobConf.addResource(input);
|
|
|
- Assertions.assertEquals(MRJobConfUtil.REDACTION_REPLACEMENT_VAL,
|
|
|
- jhsJobConf.get(sensitivePropertyName),
|
|
|
- sensitivePropertyName + " is not redacted in HDFS.");
|
|
|
+ Assert.assertEquals(
|
|
|
+ sensitivePropertyName + " is not redacted in HDFS.",
|
|
|
+ MRJobConfUtil.REDACTION_REPLACEMENT_VAL,
|
|
|
+ jhsJobConf.get(sensitivePropertyName));
|
|
|
}
|
|
|
} finally {
|
|
|
jheh.stop();
|
|
@@ -462,20 +456,19 @@ public class TestJobHistoryEventHandler {
|
|
|
fs.delete(new Path(intermDoneDirPrefix), true);
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- @Timeout(50000)
|
|
|
+ @Test (timeout=50000)
|
|
|
public void testDefaultFsIsUsedForHistory() throws Exception {
|
|
|
// Create default configuration pointing to the minicluster
|
|
|
Configuration conf = new Configuration();
|
|
|
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
|
|
|
- dfsCluster.getURI().toString());
|
|
|
+ dfsCluster.getURI().toString());
|
|
|
FileOutputStream os = new FileOutputStream(coreSitePath);
|
|
|
conf.writeXml(os);
|
|
|
os.close();
|
|
|
|
|
|
// simulate execution under a non-default namenode
|
|
|
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
|
|
|
- "file:///");
|
|
|
+ "file:///");
|
|
|
|
|
|
TestParams t = new TestParams();
|
|
|
conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.dfsWorkDir);
|
|
@@ -497,11 +490,11 @@ public class TestJobHistoryEventHandler {
|
|
|
// If we got here then event handler worked but we don't know with which
|
|
|
// file system. Now we check that history stuff was written to minicluster
|
|
|
FileSystem dfsFileSystem = dfsCluster.getFileSystem();
|
|
|
- assertTrue(dfsFileSystem.globStatus(new Path(t.dfsWorkDir + "/*")).length != 0,
|
|
|
- "Minicluster contains some history files");
|
|
|
+ assertTrue("Minicluster contains some history files",
|
|
|
+ dfsFileSystem.globStatus(new Path(t.dfsWorkDir + "/*")).length != 0);
|
|
|
FileSystem localFileSystem = LocalFileSystem.get(conf);
|
|
|
- assertFalse(localFileSystem.exists(new Path(t.dfsWorkDir)),
|
|
|
- "No history directory on non-default file system");
|
|
|
+ assertFalse("No history directory on non-default file system",
|
|
|
+ localFileSystem.exists(new Path(t.dfsWorkDir)));
|
|
|
} finally {
|
|
|
jheh.stop();
|
|
|
purgeHdfsHistoryIntermediateDoneDirectory(conf);
|
|
@@ -516,7 +509,7 @@ public class TestJobHistoryEventHandler {
|
|
|
"/mapred/history/done_intermediate");
|
|
|
conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
|
|
|
String pathStr = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
|
|
|
- Assertions.assertEquals("/mapred/history/done_intermediate/" +
|
|
|
+ Assert.assertEquals("/mapred/history/done_intermediate/" +
|
|
|
System.getProperty("user.name"), pathStr);
|
|
|
|
|
|
// Test fully qualified path
|
|
@@ -530,14 +523,13 @@ public class TestJobHistoryEventHandler {
|
|
|
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
|
|
|
"file:///");
|
|
|
pathStr = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
|
|
|
- Assertions.assertEquals(dfsCluster.getURI().toString() +
|
|
|
+ Assert.assertEquals(dfsCluster.getURI().toString() +
|
|
|
"/mapred/history/done_intermediate/" + System.getProperty("user.name"),
|
|
|
pathStr);
|
|
|
}
|
|
|
|
|
|
// test AMStartedEvent for submitTime and startTime
|
|
|
- @Test
|
|
|
- @Timeout(50000)
|
|
|
+ @Test (timeout=50000)
|
|
|
public void testAMStartedEvent() throws Exception {
|
|
|
TestParams t = new TestParams();
|
|
|
Configuration conf = new Configuration();
|
|
@@ -579,8 +571,7 @@ public class TestJobHistoryEventHandler {
|
|
|
|
|
|
// Have JobHistoryEventHandler handle some events and make sure they get
|
|
|
// stored to the Timeline store
|
|
|
- @Test
|
|
|
- @Timeout(50000)
|
|
|
+ @Test (timeout=50000)
|
|
|
public void testTimelineEventHandling() throws Exception {
|
|
|
TestParams t = new TestParams(RunningAppContext.class, false);
|
|
|
Configuration conf = new YarnConfiguration();
|
|
@@ -607,13 +598,13 @@ public class TestJobHistoryEventHandler {
|
|
|
jheh.getDispatcher().await();
|
|
|
TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
|
|
|
null, null, null, null, null, null, null);
|
|
|
- Assertions.assertEquals(1, entities.getEntities().size());
|
|
|
+ Assert.assertEquals(1, entities.getEntities().size());
|
|
|
TimelineEntity tEntity = entities.getEntities().get(0);
|
|
|
- Assertions.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
|
|
- Assertions.assertEquals(1, tEntity.getEvents().size());
|
|
|
- Assertions.assertEquals(EventType.AM_STARTED.toString(),
|
|
|
+ Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
|
|
+ Assert.assertEquals(1, tEntity.getEvents().size());
|
|
|
+ Assert.assertEquals(EventType.AM_STARTED.toString(),
|
|
|
tEntity.getEvents().get(0).getEventType());
|
|
|
- Assertions.assertEquals(currentTime - 10,
|
|
|
+ Assert.assertEquals(currentTime - 10,
|
|
|
tEntity.getEvents().get(0).getTimestamp());
|
|
|
|
|
|
handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
|
@@ -624,17 +615,17 @@ public class TestJobHistoryEventHandler {
|
|
|
jheh.getDispatcher().await();
|
|
|
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
|
|
|
null, null, null, null, null, null);
|
|
|
- Assertions.assertEquals(1, entities.getEntities().size());
|
|
|
+ Assert.assertEquals(1, entities.getEntities().size());
|
|
|
tEntity = entities.getEntities().get(0);
|
|
|
- Assertions.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
|
|
- Assertions.assertEquals(2, tEntity.getEvents().size());
|
|
|
- Assertions.assertEquals(EventType.JOB_SUBMITTED.toString(),
|
|
|
+ Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
|
|
+ Assert.assertEquals(2, tEntity.getEvents().size());
|
|
|
+ Assert.assertEquals(EventType.JOB_SUBMITTED.toString(),
|
|
|
tEntity.getEvents().get(0).getEventType());
|
|
|
- Assertions.assertEquals(EventType.AM_STARTED.toString(),
|
|
|
+ Assert.assertEquals(EventType.AM_STARTED.toString(),
|
|
|
tEntity.getEvents().get(1).getEventType());
|
|
|
- Assertions.assertEquals(currentTime + 10,
|
|
|
+ Assert.assertEquals(currentTime + 10,
|
|
|
tEntity.getEvents().get(0).getTimestamp());
|
|
|
- Assertions.assertEquals(currentTime - 10,
|
|
|
+ Assert.assertEquals(currentTime - 10,
|
|
|
tEntity.getEvents().get(1).getTimestamp());
|
|
|
|
|
|
handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
|
@@ -643,80 +634,80 @@ public class TestJobHistoryEventHandler {
|
|
|
jheh.getDispatcher().await();
|
|
|
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
|
|
|
null, null, null, null, null, null);
|
|
|
- Assertions.assertEquals(1, entities.getEntities().size());
|
|
|
+ Assert.assertEquals(1, entities.getEntities().size());
|
|
|
tEntity = entities.getEntities().get(0);
|
|
|
- Assertions.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
|
|
- Assertions.assertEquals(3, tEntity.getEvents().size());
|
|
|
- Assertions.assertEquals(EventType.JOB_SUBMITTED.toString(),
|
|
|
+ Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
|
|
+ Assert.assertEquals(3, tEntity.getEvents().size());
|
|
|
+ Assert.assertEquals(EventType.JOB_SUBMITTED.toString(),
|
|
|
tEntity.getEvents().get(0).getEventType());
|
|
|
- Assertions.assertEquals(EventType.AM_STARTED.toString(),
|
|
|
+ Assert.assertEquals(EventType.AM_STARTED.toString(),
|
|
|
tEntity.getEvents().get(1).getEventType());
|
|
|
- Assertions.assertEquals(EventType.JOB_QUEUE_CHANGED.toString(),
|
|
|
+ Assert.assertEquals(EventType.JOB_QUEUE_CHANGED.toString(),
|
|
|
tEntity.getEvents().get(2).getEventType());
|
|
|
- Assertions.assertEquals(currentTime + 10,
|
|
|
+ Assert.assertEquals(currentTime + 10,
|
|
|
tEntity.getEvents().get(0).getTimestamp());
|
|
|
- Assertions.assertEquals(currentTime - 10,
|
|
|
+ Assert.assertEquals(currentTime - 10,
|
|
|
tEntity.getEvents().get(1).getTimestamp());
|
|
|
- Assertions.assertEquals(currentTime - 20,
|
|
|
+ Assert.assertEquals(currentTime - 20,
|
|
|
tEntity.getEvents().get(2).getTimestamp());
|
|
|
|
|
|
handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
|
|
- new JobFinishedEvent(TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0,
|
|
|
+ new JobFinishedEvent(TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0,
|
|
|
0, 0, 0, new Counters(), new Counters(), new Counters()), currentTime));
|
|
|
jheh.getDispatcher().await();
|
|
|
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
|
|
|
- null, null, null, null, null, null);
|
|
|
- Assertions.assertEquals(1, entities.getEntities().size());
|
|
|
+ null, null, null, null, null, null);
|
|
|
+ Assert.assertEquals(1, entities.getEntities().size());
|
|
|
tEntity = entities.getEntities().get(0);
|
|
|
- Assertions.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
|
|
- Assertions.assertEquals(4, tEntity.getEvents().size());
|
|
|
- Assertions.assertEquals(EventType.JOB_SUBMITTED.toString(),
|
|
|
+ Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
|
|
+ Assert.assertEquals(4, tEntity.getEvents().size());
|
|
|
+ Assert.assertEquals(EventType.JOB_SUBMITTED.toString(),
|
|
|
tEntity.getEvents().get(0).getEventType());
|
|
|
- Assertions.assertEquals(EventType.JOB_FINISHED.toString(),
|
|
|
+ Assert.assertEquals(EventType.JOB_FINISHED.toString(),
|
|
|
tEntity.getEvents().get(1).getEventType());
|
|
|
- Assertions.assertEquals(EventType.AM_STARTED.toString(),
|
|
|
+ Assert.assertEquals(EventType.AM_STARTED.toString(),
|
|
|
tEntity.getEvents().get(2).getEventType());
|
|
|
- Assertions.assertEquals(EventType.JOB_QUEUE_CHANGED.toString(),
|
|
|
+ Assert.assertEquals(EventType.JOB_QUEUE_CHANGED.toString(),
|
|
|
tEntity.getEvents().get(3).getEventType());
|
|
|
- Assertions.assertEquals(currentTime + 10,
|
|
|
+ Assert.assertEquals(currentTime + 10,
|
|
|
tEntity.getEvents().get(0).getTimestamp());
|
|
|
- Assertions.assertEquals(currentTime,
|
|
|
+ Assert.assertEquals(currentTime,
|
|
|
tEntity.getEvents().get(1).getTimestamp());
|
|
|
- Assertions.assertEquals(currentTime - 10,
|
|
|
+ Assert.assertEquals(currentTime - 10,
|
|
|
tEntity.getEvents().get(2).getTimestamp());
|
|
|
- Assertions.assertEquals(currentTime - 20,
|
|
|
+ Assert.assertEquals(currentTime - 20,
|
|
|
tEntity.getEvents().get(3).getTimestamp());
|
|
|
|
|
|
handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
|
|
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId),
|
|
|
0, 0, 0, 0, 0, 0, 0, JobStateInternal.KILLED.toString()),
|
|
|
- currentTime + 20));
|
|
|
+ currentTime + 20));
|
|
|
jheh.getDispatcher().await();
|
|
|
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
|
|
|
null, null, null, null, null, null);
|
|
|
- Assertions.assertEquals(1, entities.getEntities().size());
|
|
|
+ Assert.assertEquals(1, entities.getEntities().size());
|
|
|
tEntity = entities.getEntities().get(0);
|
|
|
- Assertions.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
|
|
- Assertions.assertEquals(5, tEntity.getEvents().size());
|
|
|
- Assertions.assertEquals(EventType.JOB_KILLED.toString(),
|
|
|
+ Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
|
|
+ Assert.assertEquals(5, tEntity.getEvents().size());
|
|
|
+ Assert.assertEquals(EventType.JOB_KILLED.toString(),
|
|
|
tEntity.getEvents().get(0).getEventType());
|
|
|
- Assertions.assertEquals(EventType.JOB_SUBMITTED.toString(),
|
|
|
+ Assert.assertEquals(EventType.JOB_SUBMITTED.toString(),
|
|
|
tEntity.getEvents().get(1).getEventType());
|
|
|
- Assertions.assertEquals(EventType.JOB_FINISHED.toString(),
|
|
|
+ Assert.assertEquals(EventType.JOB_FINISHED.toString(),
|
|
|
tEntity.getEvents().get(2).getEventType());
|
|
|
- Assertions.assertEquals(EventType.AM_STARTED.toString(),
|
|
|
+ Assert.assertEquals(EventType.AM_STARTED.toString(),
|
|
|
tEntity.getEvents().get(3).getEventType());
|
|
|
- Assertions.assertEquals(EventType.JOB_QUEUE_CHANGED.toString(),
|
|
|
+ Assert.assertEquals(EventType.JOB_QUEUE_CHANGED.toString(),
|
|
|
tEntity.getEvents().get(4).getEventType());
|
|
|
- Assertions.assertEquals(currentTime + 20,
|
|
|
+ Assert.assertEquals(currentTime + 20,
|
|
|
tEntity.getEvents().get(0).getTimestamp());
|
|
|
- Assertions.assertEquals(currentTime + 10,
|
|
|
+ Assert.assertEquals(currentTime + 10,
|
|
|
tEntity.getEvents().get(1).getTimestamp());
|
|
|
- Assertions.assertEquals(currentTime,
|
|
|
+ Assert.assertEquals(currentTime,
|
|
|
tEntity.getEvents().get(2).getTimestamp());
|
|
|
- Assertions.assertEquals(currentTime - 10,
|
|
|
+ Assert.assertEquals(currentTime - 10,
|
|
|
tEntity.getEvents().get(3).getTimestamp());
|
|
|
- Assertions.assertEquals(currentTime - 20,
|
|
|
+ Assert.assertEquals(currentTime - 20,
|
|
|
tEntity.getEvents().get(4).getTimestamp());
|
|
|
|
|
|
handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
|
@@ -724,13 +715,13 @@ public class TestJobHistoryEventHandler {
|
|
|
jheh.getDispatcher().await();
|
|
|
entities = ts.getEntities("MAPREDUCE_TASK", null, null, null,
|
|
|
null, null, null, null, null, null);
|
|
|
- Assertions.assertEquals(1, entities.getEntities().size());
|
|
|
+ Assert.assertEquals(1, entities.getEntities().size());
|
|
|
tEntity = entities.getEntities().get(0);
|
|
|
- Assertions.assertEquals(t.taskID.toString(), tEntity.getEntityId());
|
|
|
- Assertions.assertEquals(1, tEntity.getEvents().size());
|
|
|
- Assertions.assertEquals(EventType.TASK_STARTED.toString(),
|
|
|
+ Assert.assertEquals(t.taskID.toString(), tEntity.getEntityId());
|
|
|
+ Assert.assertEquals(1, tEntity.getEvents().size());
|
|
|
+ Assert.assertEquals(EventType.TASK_STARTED.toString(),
|
|
|
tEntity.getEvents().get(0).getEventType());
|
|
|
- Assertions.assertEquals(TaskType.MAP.toString(),
|
|
|
+ Assert.assertEquals(TaskType.MAP.toString(),
|
|
|
tEntity.getEvents().get(0).getEventInfo().get("TASK_TYPE"));
|
|
|
|
|
|
handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
|
@@ -738,31 +729,30 @@ public class TestJobHistoryEventHandler {
|
|
|
jheh.getDispatcher().await();
|
|
|
entities = ts.getEntities("MAPREDUCE_TASK", null, null, null,
|
|
|
null, null, null, null, null, null);
|
|
|
- Assertions.assertEquals(1, entities.getEntities().size());
|
|
|
+ Assert.assertEquals(1, entities.getEntities().size());
|
|
|
tEntity = entities.getEntities().get(0);
|
|
|
- Assertions.assertEquals(t.taskID.toString(), tEntity.getEntityId());
|
|
|
- Assertions.assertEquals(2, tEntity.getEvents().size());
|
|
|
- Assertions.assertEquals(EventType.TASK_STARTED.toString(),
|
|
|
+ Assert.assertEquals(t.taskID.toString(), tEntity.getEntityId());
|
|
|
+ Assert.assertEquals(2, tEntity.getEvents().size());
|
|
|
+ Assert.assertEquals(EventType.TASK_STARTED.toString(),
|
|
|
tEntity.getEvents().get(1).getEventType());
|
|
|
- Assertions.assertEquals(TaskType.REDUCE.toString(),
|
|
|
+ Assert.assertEquals(TaskType.REDUCE.toString(),
|
|
|
tEntity.getEvents().get(0).getEventInfo().get("TASK_TYPE"));
|
|
|
- Assertions.assertEquals(TaskType.MAP.toString(),
|
|
|
+ Assert.assertEquals(TaskType.MAP.toString(),
|
|
|
tEntity.getEvents().get(1).getEventInfo().get("TASK_TYPE"));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- @Timeout(50000)
|
|
|
+ @Test (timeout=50000)
|
|
|
public void testCountersToJSON() throws Exception {
|
|
|
JobHistoryEventHandler jheh = new JobHistoryEventHandler(null, 0);
|
|
|
Counters counters = new Counters();
|
|
|
CounterGroup group1 = counters.addGroup("DOCTORS",
|
|
|
- "Incarnations of the Doctor");
|
|
|
+ "Incarnations of the Doctor");
|
|
|
group1.addCounter("PETER_CAPALDI", "Peter Capaldi", 12);
|
|
|
group1.addCounter("MATT_SMITH", "Matt Smith", 11);
|
|
|
group1.addCounter("DAVID_TENNANT", "David Tennant", 10);
|
|
|
CounterGroup group2 = counters.addGroup("COMPANIONS",
|
|
|
- "Companions of the Doctor");
|
|
|
+ "Companions of the Doctor");
|
|
|
group2.addCounter("CLARA_OSWALD", "Clara Oswald", 6);
|
|
|
group2.addCounter("RORY_WILLIAMS", "Rory Williams", 5);
|
|
|
group2.addCounter("AMY_POND", "Amy Pond", 4);
|
|
@@ -785,31 +775,30 @@ public class TestJobHistoryEventHandler {
|
|
|
+ "{\"NAME\":\"MATT_SMITH\",\"DISPLAY_NAME\":\"Matt Smith\",\"VALUE\":"
|
|
|
+ "11},{\"NAME\":\"PETER_CAPALDI\",\"DISPLAY_NAME\":\"Peter Capaldi\","
|
|
|
+ "\"VALUE\":12}]}]";
|
|
|
- Assertions.assertEquals(expected, jsonStr);
|
|
|
+ Assert.assertEquals(expected, jsonStr);
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- @Timeout(50000)
|
|
|
+ @Test (timeout=50000)
|
|
|
public void testCountersToJSONEmpty() throws Exception {
|
|
|
JobHistoryEventHandler jheh = new JobHistoryEventHandler(null, 0);
|
|
|
Counters counters = null;
|
|
|
JsonNode jsonNode = JobHistoryEventUtils.countersToJSON(counters);
|
|
|
String jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
|
|
|
String expected = "[]";
|
|
|
- Assertions.assertEquals(expected, jsonStr);
|
|
|
+ Assert.assertEquals(expected, jsonStr);
|
|
|
|
|
|
counters = new Counters();
|
|
|
jsonNode = JobHistoryEventUtils.countersToJSON(counters);
|
|
|
jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
|
|
|
expected = "[]";
|
|
|
- Assertions.assertEquals(expected, jsonStr);
|
|
|
+ Assert.assertEquals(expected, jsonStr);
|
|
|
|
|
|
counters.addGroup("DOCTORS", "Incarnations of the Doctor");
|
|
|
jsonNode = JobHistoryEventUtils.countersToJSON(counters);
|
|
|
jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
|
|
|
expected = "[{\"NAME\":\"DOCTORS\",\"DISPLAY_NAME\":\"Incarnations of the "
|
|
|
+ "Doctor\",\"COUNTERS\":[]}]";
|
|
|
- Assertions.assertEquals(expected, jsonStr);
|
|
|
+ Assert.assertEquals(expected, jsonStr);
|
|
|
}
|
|
|
|
|
|
private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) {
|
|
@@ -923,9 +912,8 @@ public class TestJobHistoryEventHandler {
|
|
|
}
|
|
|
jheh.stop();
|
|
|
//Make sure events were handled
|
|
|
- assertTrue(jheh.eventsHandled == 4,
|
|
|
- "handleEvent should've been called only 4 times but was "
|
|
|
- + jheh.eventsHandled);
|
|
|
+ assertTrue("handleEvent should've been called only 4 times but was "
|
|
|
+ + jheh.eventsHandled, jheh.eventsHandled == 4);
|
|
|
|
|
|
//Create a new jheh because the last stop closed the eventWriter etc.
|
|
|
jheh = new JHEventHandlerForSigtermTest(mockedContext, 0);
|
|
@@ -946,15 +934,14 @@ public class TestJobHistoryEventHandler {
|
|
|
}
|
|
|
jheh.stop();
|
|
|
//Make sure events were handled, 4 + 1 finish event
|
|
|
- assertTrue(jheh.eventsHandled == 5, "handleEvent should've been called only 5 times but was "
|
|
|
- + jheh.eventsHandled);
|
|
|
- assertTrue(jheh.lastEventHandled.getHistoryEvent()
|
|
|
- instanceof JobUnsuccessfulCompletionEvent,
|
|
|
- "Last event handled wasn't JobUnsuccessfulCompletionEvent");
|
|
|
+ assertTrue("handleEvent should've been called only 5 times but was "
|
|
|
+ + jheh.eventsHandled, jheh.eventsHandled == 5);
|
|
|
+ assertTrue("Last event handled wasn't JobUnsuccessfulCompletionEvent",
|
|
|
+ jheh.lastEventHandled.getHistoryEvent()
|
|
|
+ instanceof JobUnsuccessfulCompletionEvent);
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- @Timeout(50000)
|
|
|
+ @Test (timeout=50000)
|
|
|
public void testSetTrackingURLAfterHistoryIsWritten() throws Exception {
|
|
|
TestParams t = new TestParams(true);
|
|
|
Configuration conf = new Configuration();
|
|
@@ -985,8 +972,7 @@ public class TestJobHistoryEventHandler {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- @Timeout(50000)
|
|
|
+ @Test (timeout=50000)
|
|
|
public void testDontSetTrackingURLIfHistoryWriteFailed() throws Exception {
|
|
|
TestParams t = new TestParams(true);
|
|
|
Configuration conf = new Configuration();
|
|
@@ -1017,8 +1003,7 @@ public class TestJobHistoryEventHandler {
|
|
|
jheh.stop();
|
|
|
}
|
|
|
}
|
|
|
- @Test
|
|
|
- @Timeout(50000)
|
|
|
+ @Test (timeout=50000)
|
|
|
public void testDontSetTrackingURLIfHistoryWriteThrows() throws Exception {
|
|
|
TestParams t = new TestParams(true);
|
|
|
Configuration conf = new Configuration();
|
|
@@ -1054,8 +1039,7 @@ public class TestJobHistoryEventHandler {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- @Timeout(50000)
|
|
|
+ @Test(timeout = 50000)
|
|
|
public void testJobHistoryFilePermissions() throws Exception {
|
|
|
TestParams t = new TestParams(true);
|
|
|
Configuration conf = new Configuration();
|