|
@@ -20,9 +20,11 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
|
|
|
|
|
import static org.apache.hadoop.test.GenericTestUtils.waitFor;
|
|
|
import static org.assertj.core.api.Assertions.assertThat;
|
|
|
-import static org.junit.Assert.assertEquals;
|
|
|
-import static org.junit.Assert.assertFalse;
|
|
|
-import static org.junit.Assert.assertTrue;
|
|
|
+import static org.junit.jupiter.api.Assertions.assertEquals;
|
|
|
+import static org.junit.jupiter.api.Assertions.assertFalse;
|
|
|
+import static org.junit.jupiter.api.Assertions.assertThrows;
|
|
|
+import static org.junit.jupiter.api.Assertions.assertTrue;
|
|
|
+import static org.junit.jupiter.api.Assertions.fail;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.spy;
|
|
|
import static org.mockito.Mockito.times;
|
|
@@ -41,10 +43,9 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
|
|
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
|
|
|
import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider;
|
|
|
-import org.junit.After;
|
|
|
-import org.junit.Assert;
|
|
|
-import org.junit.Before;
|
|
|
-import org.junit.BeforeClass;
|
|
|
+import org.junit.jupiter.api.AfterEach;
|
|
|
+import org.junit.jupiter.api.BeforeEach;
|
|
|
+import org.junit.jupiter.api.BeforeAll;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
@@ -111,7 +112,7 @@ import org.apache.log4j.AppenderSkeleton;
|
|
|
import org.apache.log4j.Level;
|
|
|
import org.apache.log4j.Logger;
|
|
|
import org.apache.log4j.spi.LoggingEvent;
|
|
|
-import org.junit.Test;
|
|
|
+import org.junit.jupiter.api.Test;
|
|
|
import org.mockito.ArgumentCaptor;
|
|
|
|
|
|
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
|
|
@@ -151,17 +152,17 @@ public class TestTaskAttempt{
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @BeforeClass
|
|
|
+ @BeforeAll
|
|
|
public static void setupBeforeClass() {
|
|
|
ResourceUtils.resetResourceTypes(new Configuration());
|
|
|
}
|
|
|
|
|
|
- @Before
|
|
|
+ @BeforeEach
|
|
|
public void before() {
|
|
|
TaskAttemptImpl.RESOURCE_REQUEST_CACHE.clear();
|
|
|
}
|
|
|
|
|
|
- @After
|
|
|
+ @AfterEach
|
|
|
public void tearDown() {
|
|
|
ResourceUtils.resetResourceTypes(new Configuration());
|
|
|
}
|
|
@@ -289,7 +290,7 @@ public class TestTaskAttempt{
|
|
|
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
|
|
|
verify(eventHandler, times(2)).handle(arg.capture());
|
|
|
if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) {
|
|
|
- Assert.fail("Second Event not of type ContainerRequestEvent");
|
|
|
+ fail("Second Event not of type ContainerRequestEvent");
|
|
|
}
|
|
|
ContainerRequestEvent cre =
|
|
|
(ContainerRequestEvent) arg.getAllValues().get(1);
|
|
@@ -323,7 +324,7 @@ public class TestTaskAttempt{
|
|
|
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
|
|
|
verify(eventHandler, times(2)).handle(arg.capture());
|
|
|
if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) {
|
|
|
- Assert.fail("Second Event not of type ContainerRequestEvent");
|
|
|
+ fail("Second Event not of type ContainerRequestEvent");
|
|
|
}
|
|
|
Map<String, Boolean> expected = new HashMap<String, Boolean>();
|
|
|
expected.put("host1", true);
|
|
@@ -361,16 +362,16 @@ public class TestTaskAttempt{
|
|
|
Job job = app.submit(conf);
|
|
|
app.waitForState(job, JobState.RUNNING);
|
|
|
Map<TaskId, Task> tasks = job.getTasks();
|
|
|
- Assert.assertEquals("Num tasks is not correct", 2, tasks.size());
|
|
|
+ assertEquals(2, tasks.size(), "Num tasks is not correct");
|
|
|
Iterator<Task> taskIter = tasks.values().iterator();
|
|
|
Task mTask = taskIter.next();
|
|
|
app.waitForState(mTask, TaskState.RUNNING);
|
|
|
Task rTask = taskIter.next();
|
|
|
app.waitForState(rTask, TaskState.RUNNING);
|
|
|
Map<TaskAttemptId, TaskAttempt> mAttempts = mTask.getAttempts();
|
|
|
- Assert.assertEquals("Num attempts is not correct", 1, mAttempts.size());
|
|
|
+ assertEquals(1, mAttempts.size(), "Num attempts is not correct");
|
|
|
Map<TaskAttemptId, TaskAttempt> rAttempts = rTask.getAttempts();
|
|
|
- Assert.assertEquals("Num attempts is not correct", 1, rAttempts.size());
|
|
|
+ assertEquals(1, rAttempts.size(), "Num attempts is not correct");
|
|
|
TaskAttempt mta = mAttempts.values().iterator().next();
|
|
|
TaskAttempt rta = rAttempts.values().iterator().next();
|
|
|
app.waitForState(mta, TaskAttemptState.RUNNING);
|
|
@@ -392,21 +393,21 @@ public class TestTaskAttempt{
|
|
|
|
|
|
int memoryMb = (int) containerResource.getMemorySize();
|
|
|
int vcores = containerResource.getVirtualCores();
|
|
|
- Assert.assertEquals((int) Math.ceil((float) memoryMb / minContainerSize),
|
|
|
+ assertEquals((int) Math.ceil((float) memoryMb / minContainerSize),
|
|
|
counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue());
|
|
|
- Assert.assertEquals((int) Math.ceil((float) memoryMb / minContainerSize),
|
|
|
+ assertEquals((int) Math.ceil((float) memoryMb / minContainerSize),
|
|
|
counters.findCounter(JobCounter.SLOTS_MILLIS_REDUCES).getValue());
|
|
|
- Assert.assertEquals(1,
|
|
|
+ assertEquals(1,
|
|
|
counters.findCounter(JobCounter.MILLIS_MAPS).getValue());
|
|
|
- Assert.assertEquals(1,
|
|
|
+ assertEquals(1,
|
|
|
counters.findCounter(JobCounter.MILLIS_REDUCES).getValue());
|
|
|
- Assert.assertEquals(memoryMb,
|
|
|
+ assertEquals(memoryMb,
|
|
|
counters.findCounter(JobCounter.MB_MILLIS_MAPS).getValue());
|
|
|
- Assert.assertEquals(memoryMb,
|
|
|
+ assertEquals(memoryMb,
|
|
|
counters.findCounter(JobCounter.MB_MILLIS_REDUCES).getValue());
|
|
|
- Assert.assertEquals(vcores,
|
|
|
+ assertEquals(vcores,
|
|
|
counters.findCounter(JobCounter.VCORES_MILLIS_MAPS).getValue());
|
|
|
- Assert.assertEquals(vcores,
|
|
|
+ assertEquals(vcores,
|
|
|
counters.findCounter(JobCounter.VCORES_MILLIS_REDUCES).getValue());
|
|
|
}
|
|
|
|
|
@@ -452,23 +453,23 @@ public class TestTaskAttempt{
|
|
|
app.waitForState(job, JobState.FAILED);
|
|
|
Map<TaskId, Task> tasks = job.getTasks();
|
|
|
|
|
|
- Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
|
|
|
+ assertEquals(1, tasks.size(), "Num tasks is not correct");
|
|
|
Task task = tasks.values().iterator().next();
|
|
|
- Assert.assertEquals("Task state not correct", TaskState.FAILED, task
|
|
|
- .getReport().getTaskState());
|
|
|
+ assertEquals(TaskState.FAILED, task
|
|
|
+ .getReport().getTaskState(), "Task state not correct");
|
|
|
Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator().next()
|
|
|
.getAttempts();
|
|
|
- Assert.assertEquals("Num attempts is not correct", 4, attempts.size());
|
|
|
+ assertEquals(4, attempts.size(), "Num attempts is not correct");
|
|
|
|
|
|
Iterator<TaskAttempt> it = attempts.values().iterator();
|
|
|
TaskAttemptReport report = it.next().getReport();
|
|
|
- Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
|
|
|
- report.getTaskAttemptState());
|
|
|
- Assert.assertEquals("Diagnostic Information is not Correct",
|
|
|
- "Test Diagnostic Event", report.getDiagnosticInfo());
|
|
|
+ assertEquals(TaskAttemptState.FAILED,
|
|
|
+ report.getTaskAttemptState(), "Attempt state not correct");
|
|
|
+ assertEquals("Test Diagnostic Event", report.getDiagnosticInfo(),
|
|
|
+ "Diagnostic Information is not Correct");
|
|
|
report = it.next().getReport();
|
|
|
- Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
|
|
|
- report.getTaskAttemptState());
|
|
|
+ assertEquals(TaskAttemptState.FAILED,
|
|
|
+ report.getTaskAttemptState(), "Attempt state not correct");
|
|
|
}
|
|
|
|
|
|
private void testTaskAttemptAssignedFailHistory
|
|
@@ -477,8 +478,8 @@ public class TestTaskAttempt{
|
|
|
Job job = app.submit(conf);
|
|
|
app.waitForState(job, JobState.FAILED);
|
|
|
Map<TaskId, Task> tasks = job.getTasks();
|
|
|
- Assert.assertTrue("No Ta Started JH Event", app.getTaStartJHEvent());
|
|
|
- Assert.assertTrue("No Ta Failed JH Event", app.getTaFailedJHEvent());
|
|
|
+ assertTrue(app.getTaStartJHEvent(), "No Ta Started JH Event");
|
|
|
+ assertTrue(app.getTaFailedJHEvent(), "No Ta Failed JH Event");
|
|
|
}
|
|
|
|
|
|
private void testTaskAttemptAssignedKilledHistory
|
|
@@ -518,8 +519,8 @@ public class TestTaskAttempt{
|
|
|
if (event.getType() == org.apache.hadoop.mapreduce.jobhistory.EventType.MAP_ATTEMPT_FAILED) {
|
|
|
TaskAttemptUnsuccessfulCompletion datum = (TaskAttemptUnsuccessfulCompletion) event
|
|
|
.getHistoryEvent().getDatum();
|
|
|
- Assert.assertEquals("Diagnostic Information is not Correct",
|
|
|
- "Test Diagnostic Event", datum.get(8).toString());
|
|
|
+ assertEquals("Test Diagnostic Event", datum.get(8).toString(),
|
|
|
+ "Diagnostic Information is not Correct");
|
|
|
}
|
|
|
}
|
|
|
};
|
|
@@ -638,8 +639,8 @@ public class TestTaskAttempt{
|
|
|
taImpl.handle(new TaskAttemptEvent(attemptId,
|
|
|
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
|
|
|
assertFalse(eventHandler.internalError);
|
|
|
- assertEquals("Task attempt is not assigned on the local node",
|
|
|
- Locality.NODE_LOCAL, taImpl.getLocality());
|
|
|
+ assertEquals(Locality.NODE_LOCAL, taImpl.getLocality(),
|
|
|
+ "Task attempt is not assigned on the local node");
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -695,10 +696,10 @@ public class TestTaskAttempt{
|
|
|
.isEqualTo(TaskAttemptState.RUNNING);
|
|
|
taImpl.handle(new TaskAttemptEvent(attemptId,
|
|
|
TaskAttemptEventType.TA_CONTAINER_CLEANED));
|
|
|
- assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
|
|
|
- eventHandler.internalError);
|
|
|
- assertEquals("Task attempt is not assigned on the local rack",
|
|
|
- Locality.RACK_LOCAL, taImpl.getLocality());
|
|
|
+ assertFalse(eventHandler.internalError,
|
|
|
+ "InternalError occurred trying to handle TA_CONTAINER_CLEANED");
|
|
|
+ assertEquals(Locality.RACK_LOCAL, taImpl.getLocality(),
|
|
|
+ "Task attempt is not assigned on the local rack");
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -757,10 +758,10 @@ public class TestTaskAttempt{
|
|
|
.isEqualTo(TaskAttemptState.COMMIT_PENDING);
|
|
|
taImpl.handle(new TaskAttemptEvent(attemptId,
|
|
|
TaskAttemptEventType.TA_CONTAINER_CLEANED));
|
|
|
- assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
|
|
|
- eventHandler.internalError);
|
|
|
- assertEquals("Task attempt is assigned locally", Locality.OFF_SWITCH,
|
|
|
- taImpl.getLocality());
|
|
|
+ assertFalse(eventHandler.internalError,
|
|
|
+ "InternalError occurred trying to handle TA_CONTAINER_CLEANED");
|
|
|
+ assertEquals(Locality.OFF_SWITCH,
|
|
|
+ taImpl.getLocality(), "Task attempt is assigned locally");
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -832,8 +833,8 @@ public class TestTaskAttempt{
|
|
|
assertThat(taImpl.getState())
|
|
|
.withFailMessage("Task attempt is not in FAILED state, still")
|
|
|
.isEqualTo(TaskAttemptState.FAILED);
|
|
|
- assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
|
|
|
- eventHandler.internalError);
|
|
|
+ assertFalse(eventHandler.internalError,
|
|
|
+ "InternalError occurred trying to handle TA_CONTAINER_CLEANED");
|
|
|
}
|
|
|
|
|
|
|
|
@@ -883,16 +884,14 @@ public class TestTaskAttempt{
|
|
|
TaskAttemptEventType.TA_SCHEDULE));
|
|
|
taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptId,
|
|
|
"Task got killed"));
|
|
|
- assertFalse(
|
|
|
- "InternalError occurred trying to handle TA_DIAGNOSTICS_UPDATE on assigned task",
|
|
|
- eventHandler.internalError);
|
|
|
+ assertFalse(eventHandler.internalError,
|
|
|
+ "InternalError occurred trying to handle TA_DIAGNOSTICS_UPDATE on assigned task");
|
|
|
try {
|
|
|
taImpl.handle(new TaskAttemptEvent(attemptId,
|
|
|
TaskAttemptEventType.TA_KILL));
|
|
|
- Assert.assertTrue("No exception on UNASSIGNED STATE KILL event", true);
|
|
|
+ assertTrue(true, "No exception on UNASSIGNED STATE KILL event");
|
|
|
} catch (Exception e) {
|
|
|
- Assert.assertFalse(
|
|
|
- "Exception not expected for UNASSIGNED STATE KILL event", true);
|
|
|
+ fail("Exception not expected for UNASSIGNED STATE KILL event");
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -962,8 +961,8 @@ public class TestTaskAttempt{
|
|
|
assertThat(taImpl.getState())
|
|
|
.withFailMessage("Task attempt is not in KILLED state, still")
|
|
|
.isEqualTo(TaskAttemptState.KILLED);
|
|
|
- assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
|
|
|
- eventHandler.internalError);
|
|
|
+ assertFalse(eventHandler.internalError,
|
|
|
+ "InternalError occurred trying to handle TA_CONTAINER_CLEANED");
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -1009,9 +1008,8 @@ public class TestTaskAttempt{
|
|
|
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
|
|
|
taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptId,
|
|
|
"Task got killed"));
|
|
|
- assertFalse(
|
|
|
- "InternalError occurred trying to handle TA_DIAGNOSTICS_UPDATE on assigned task",
|
|
|
- eventHandler.internalError);
|
|
|
+ assertFalse(eventHandler.internalError,
|
|
|
+ "InternalError occurred trying to handle TA_DIAGNOSTICS_UPDATE on assigned task");
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -1072,8 +1070,8 @@ public class TestTaskAttempt{
|
|
|
.withFailMessage("Task attempt is not in SUCCEEDED state")
|
|
|
.isEqualTo(TaskAttemptState.SUCCEEDED);
|
|
|
|
|
|
- assertTrue("Task Attempt finish time is not greater than 0",
|
|
|
- taImpl.getFinishTime() > 0);
|
|
|
+ assertTrue(taImpl.getFinishTime() > 0,
|
|
|
+ "Task Attempt finish time is not greater than 0");
|
|
|
|
|
|
Long finishTime = taImpl.getFinishTime();
|
|
|
Thread.sleep(5);
|
|
@@ -1084,9 +1082,9 @@ public class TestTaskAttempt{
|
|
|
.withFailMessage("Task attempt is not in FAILED state")
|
|
|
.isEqualTo(TaskAttemptState.FAILED);
|
|
|
|
|
|
- assertEquals("After TA_TOO_MANY_FETCH_FAILURE,"
|
|
|
- + " Task attempt finish time is not the same ",
|
|
|
- finishTime, Long.valueOf(taImpl.getFinishTime()));
|
|
|
+ assertEquals(finishTime, Long.valueOf(taImpl.getFinishTime()),
|
|
|
+ "After TA_TOO_MANY_FETCH_FAILURE,"
|
|
|
+ + " Task attempt finish time is not the same ");
|
|
|
}
|
|
|
|
|
|
private void containerKillBeforeAssignment(boolean scheduleAttempt)
|
|
@@ -1114,7 +1112,7 @@ public class TestTaskAttempt{
|
|
|
assertThat(taImpl.getInternalState())
|
|
|
.withFailMessage("Task attempt's internal state is not KILLED")
|
|
|
.isEqualTo(TaskAttemptStateInternal.KILLED);
|
|
|
- assertFalse("InternalError occurred", eventHandler.internalError);
|
|
|
+ assertFalse(eventHandler.internalError, "InternalError occurred");
|
|
|
TaskEvent event = eventHandler.lastTaskEvent;
|
|
|
assertEquals(TaskEventType.T_ATTEMPT_KILLED, event.getType());
|
|
|
// In NEW state, new map attempt should not be rescheduled.
|
|
@@ -1238,8 +1236,8 @@ public class TestTaskAttempt{
|
|
|
.isEqualTo(TaskAttemptState.RUNNING);
|
|
|
taImpl.handle(new TaskAttemptEvent(attemptId,
|
|
|
TaskAttemptEventType.TA_KILL));
|
|
|
- assertFalse("InternalError occurred trying to handle TA_KILL",
|
|
|
- eventHandler.internalError);
|
|
|
+ assertFalse(eventHandler.internalError,
|
|
|
+ "InternalError occurred trying to handle TA_KILL");
|
|
|
assertThat(taImpl.getInternalState())
|
|
|
.withFailMessage("Task should be in KILL_CONTAINER_CLEANUP state")
|
|
|
.isEqualTo(TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP);
|
|
@@ -1301,8 +1299,8 @@ public class TestTaskAttempt{
|
|
|
.isEqualTo(TaskAttemptStateInternal.COMMIT_PENDING);
|
|
|
taImpl.handle(new TaskAttemptEvent(attemptId,
|
|
|
TaskAttemptEventType.TA_KILL));
|
|
|
- assertFalse("InternalError occurred trying to handle TA_KILL",
|
|
|
- eventHandler.internalError);
|
|
|
+ assertFalse(eventHandler.internalError,
|
|
|
+ "InternalError occurred trying to handle TA_KILL");
|
|
|
assertThat(taImpl.getInternalState())
|
|
|
.withFailMessage("Task should be in KILL_CONTAINER_CLEANUP state")
|
|
|
.isEqualTo(TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP);
|
|
@@ -1348,7 +1346,7 @@ public class TestTaskAttempt{
|
|
|
.withFailMessage("Task attempt is not in KILLED state")
|
|
|
.isEqualTo(TaskAttemptState.KILLED);
|
|
|
|
|
|
- assertFalse("InternalError occurred", eventHandler.internalError);
|
|
|
+ assertFalse(eventHandler.internalError, "InternalError occurred");
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -1359,32 +1357,30 @@ public class TestTaskAttempt{
|
|
|
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
|
|
|
TaskAttemptEventType.TA_DONE));
|
|
|
|
|
|
- assertEquals("Task attempt is not in SUCCEEDED state",
|
|
|
- TaskAttemptState.SUCCEEDED, taImpl.getState());
|
|
|
- assertEquals("Task attempt's internal state is not " +
|
|
|
- "SUCCESS_FINISHING_CONTAINER",
|
|
|
- TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
|
|
|
- taImpl.getInternalState());
|
|
|
+ assertEquals(TaskAttemptState.SUCCEEDED, taImpl.getState(),
|
|
|
+ "Task attempt is not in SUCCEEDED state");
|
|
|
+ assertEquals(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
|
|
|
+ taImpl.getInternalState(), "Task attempt's internal state is not " +
|
|
|
+ "SUCCESS_FINISHING_CONTAINER");
|
|
|
|
|
|
// If the map only task is killed when it is in SUCCESS_FINISHING_CONTAINER
|
|
|
// state, the state will move to SUCCESS_CONTAINER_CLEANUP
|
|
|
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
|
|
|
TaskAttemptEventType.TA_KILL));
|
|
|
- assertEquals("Task attempt is not in SUCCEEDED state",
|
|
|
- TaskAttemptState.SUCCEEDED, taImpl.getState());
|
|
|
- assertEquals("Task attempt's internal state is not " +
|
|
|
- "SUCCESS_CONTAINER_CLEANUP",
|
|
|
- TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
|
|
|
- taImpl.getInternalState());
|
|
|
+ assertEquals(TaskAttemptState.SUCCEEDED,
|
|
|
+ taImpl.getState(), "Task attempt is not in SUCCEEDED state");
|
|
|
+ assertEquals(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
|
|
|
+ taImpl.getInternalState(), "Task attempt's internal state is not " +
|
|
|
+ "SUCCESS_CONTAINER_CLEANUP");
|
|
|
|
|
|
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
|
|
|
TaskAttemptEventType.TA_CONTAINER_CLEANED));
|
|
|
- assertEquals("Task attempt is not in SUCCEEDED state",
|
|
|
- TaskAttemptState.SUCCEEDED, taImpl.getState());
|
|
|
- assertEquals("Task attempt's internal state is not SUCCEEDED state",
|
|
|
- TaskAttemptStateInternal.SUCCEEDED, taImpl.getInternalState());
|
|
|
+ assertEquals(TaskAttemptState.SUCCEEDED, taImpl.getState(),
|
|
|
+ "Task attempt is not in SUCCEEDED state");
|
|
|
+ assertEquals(TaskAttemptStateInternal.SUCCEEDED, taImpl.getInternalState(),
|
|
|
+ "Task attempt's internal state is not SUCCEEDED state");
|
|
|
|
|
|
- assertFalse("InternalError occurred", eventHandler.internalError);
|
|
|
+ assertFalse(eventHandler.internalError, "InternalError occurred");
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -1414,7 +1410,7 @@ public class TestTaskAttempt{
|
|
|
assertThat(taImpl.getInternalState())
|
|
|
.withFailMessage("Task attempt's internal state is not KILLED")
|
|
|
.isEqualTo(TaskAttemptStateInternal.KILLED);
|
|
|
- assertFalse("InternalError occurred", eventHandler.internalError);
|
|
|
+ assertFalse(eventHandler.internalError, "InternalError occurred");
|
|
|
TaskEvent event = eventHandler.lastTaskEvent;
|
|
|
assertEquals(TaskEventType.T_ATTEMPT_KILLED, event.getType());
|
|
|
// Send an attempt killed event to TaskImpl forwarding the same reschedule
|
|
@@ -1430,22 +1426,21 @@ public class TestTaskAttempt{
|
|
|
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
|
|
|
TaskAttemptEventType.TA_DONE));
|
|
|
|
|
|
- assertEquals("Task attempt is not in SUCCEEDED state",
|
|
|
- TaskAttemptState.SUCCEEDED, taImpl.getState());
|
|
|
- assertEquals("Task attempt's internal state is not " +
|
|
|
- "SUCCESS_FINISHING_CONTAINER",
|
|
|
- TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
|
|
|
- taImpl.getInternalState());
|
|
|
+ assertEquals(TaskAttemptState.SUCCEEDED, taImpl.getState(),
|
|
|
+ "Task attempt is not in SUCCEEDED state");
|
|
|
+ assertEquals(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
|
|
|
+ taImpl.getInternalState(), "Task attempt's internal state is not " +
|
|
|
+ "SUCCESS_FINISHING_CONTAINER");
|
|
|
|
|
|
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
|
|
|
TaskAttemptEventType.TA_CONTAINER_CLEANED));
|
|
|
// Succeeded
|
|
|
taImpl.handle(new TaskAttemptKillEvent(taImpl.getID(),"", true));
|
|
|
- assertEquals("Task attempt is not in SUCCEEDED state",
|
|
|
- TaskAttemptState.SUCCEEDED, taImpl.getState());
|
|
|
- assertEquals("Task attempt's internal state is not SUCCEEDED",
|
|
|
- TaskAttemptStateInternal.SUCCEEDED, taImpl.getInternalState());
|
|
|
- assertFalse("InternalError occurred", eventHandler.internalError);
|
|
|
+ assertEquals(TaskAttemptState.SUCCEEDED, taImpl.getState(),
|
|
|
+ "Task attempt is not in SUCCEEDED state");
|
|
|
+ assertEquals(TaskAttemptStateInternal.SUCCEEDED,
|
|
|
+ taImpl.getInternalState(), "Task attempt's internal state is not SUCCEEDED");
|
|
|
+ assertFalse(eventHandler.internalError, "InternalError occurred");
|
|
|
TaskEvent event = eventHandler.lastTaskEvent;
|
|
|
assertEquals(TaskEventType.T_ATTEMPT_SUCCEEDED, event.getType());
|
|
|
}
|
|
@@ -1498,7 +1493,7 @@ public class TestTaskAttempt{
|
|
|
.withFailMessage("Task attempt is not in FAILED state")
|
|
|
.isEqualTo(TaskAttemptState.FAILED);
|
|
|
|
|
|
- assertFalse("InternalError occurred", eventHandler.internalError);
|
|
|
+ assertFalse(eventHandler.internalError, "InternalError occurred");
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -1531,7 +1526,7 @@ public class TestTaskAttempt{
|
|
|
.withFailMessage("Task attempt is not in FAILED state")
|
|
|
.isEqualTo(TaskAttemptState.FAILED);
|
|
|
|
|
|
- assertFalse("InternalError occurred", eventHandler.internalError);
|
|
|
+ assertFalse(eventHandler.internalError, "InternalError occurred");
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -1561,7 +1556,7 @@ public class TestTaskAttempt{
|
|
|
"SUCCESS_FINISHING_CONTAINER")
|
|
|
.isEqualTo(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER);
|
|
|
|
|
|
- assertFalse("InternalError occurred", eventHandler.internalError);
|
|
|
+ assertFalse(eventHandler.internalError, "InternalError occurred");
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -1592,7 +1587,7 @@ public class TestTaskAttempt{
|
|
|
"SUCCESS_CONTAINER_CLEANUP")
|
|
|
.isEqualTo(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP);
|
|
|
|
|
|
- assertFalse("InternalError occurred", eventHandler.internalError);
|
|
|
+ assertFalse(eventHandler.internalError, "InternalError occurred");
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -1619,7 +1614,7 @@ public class TestTaskAttempt{
|
|
|
"FAIL_CONTAINER_CLEANUP")
|
|
|
.isEqualTo(TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP);
|
|
|
|
|
|
- assertFalse("InternalError occurred", eventHandler.internalError);
|
|
|
+ assertFalse(eventHandler.internalError, "InternalError occurred");
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -1636,8 +1631,8 @@ public class TestTaskAttempt{
|
|
|
ResourceInformation resourceInfo =
|
|
|
getResourceInfoFromContainerRequest(taImpl, eventHandler).
|
|
|
getResourceInformation(CUSTOM_RESOURCE_NAME);
|
|
|
- assertEquals("Expecting the default unit (G)",
|
|
|
- "G", resourceInfo.getUnits());
|
|
|
+ assertEquals("G", resourceInfo.getUnits(),
|
|
|
+ "Expecting the default unit (G)");
|
|
|
assertEquals(7L, resourceInfo.getValue());
|
|
|
}
|
|
|
|
|
@@ -1654,8 +1649,8 @@ public class TestTaskAttempt{
|
|
|
ResourceInformation resourceInfo =
|
|
|
getResourceInfoFromContainerRequest(taImpl, eventHandler).
|
|
|
getResourceInformation(CUSTOM_RESOURCE_NAME);
|
|
|
- assertEquals("Expecting the specified unit (m)",
|
|
|
- "m", resourceInfo.getUnits());
|
|
|
+ assertEquals("m", resourceInfo.getUnits(),
|
|
|
+ "Expecting the specified unit (m)");
|
|
|
assertEquals(3L, resourceInfo.getValue());
|
|
|
}
|
|
|
|
|
@@ -1752,18 +1747,20 @@ public class TestTaskAttempt{
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test(expected=IllegalArgumentException.class)
|
|
|
+ @Test
|
|
|
public void testReducerMemoryRequestMultipleName() {
|
|
|
- EventHandler eventHandler = mock(EventHandler.class);
|
|
|
- Clock clock = SystemClock.getInstance();
|
|
|
- JobConf jobConf = new JobConf();
|
|
|
- for (String memoryName : ImmutableList.of(
|
|
|
- MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
|
|
|
- MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
|
|
|
- jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + memoryName,
|
|
|
- "3Gi");
|
|
|
- }
|
|
|
- createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
|
|
+ assertThrows(IllegalArgumentException.class, ()->{
|
|
|
+ EventHandler eventHandler = mock(EventHandler.class);
|
|
|
+ Clock clock = SystemClock.getInstance();
|
|
|
+ JobConf jobConf = new JobConf();
|
|
|
+ for (String memoryName : ImmutableList.of(
|
|
|
+ MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
|
|
|
+ MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
|
|
|
+ jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + memoryName,
|
|
|
+ "3Gi");
|
|
|
+ }
|
|
|
+ createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -1853,21 +1850,23 @@ public class TestTaskAttempt{
|
|
|
containerRequestEvents.add((ContainerRequestEvent) e);
|
|
|
}
|
|
|
}
|
|
|
- assertEquals("Expected one ContainerRequestEvent after scheduling "
|
|
|
- + "task attempt", 1, containerRequestEvents.size());
|
|
|
+ assertEquals(1, containerRequestEvents.size(),
|
|
|
+ "Expected one ContainerRequestEvent after scheduling task attempt");
|
|
|
|
|
|
return containerRequestEvents.get(0).getCapability();
|
|
|
}
|
|
|
|
|
|
- @Test(expected=IllegalArgumentException.class)
|
|
|
+ @Test
|
|
|
public void testReducerCustomResourceTypeWithInvalidUnit() {
|
|
|
- initResourceTypes();
|
|
|
- EventHandler eventHandler = mock(EventHandler.class);
|
|
|
- Clock clock = SystemClock.getInstance();
|
|
|
- JobConf jobConf = new JobConf();
|
|
|
- jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX
|
|
|
- + CUSTOM_RESOURCE_NAME, "3z");
|
|
|
- createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
|
|
+ assertThrows(IllegalArgumentException.class, () -> {
|
|
|
+ initResourceTypes();
|
|
|
+ EventHandler eventHandler = mock(EventHandler.class);
|
|
|
+ Clock clock = SystemClock.getInstance();
|
|
|
+ JobConf jobConf = new JobConf();
|
|
|
+ jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX
|
|
|
+ + CUSTOM_RESOURCE_NAME, "3z");
|
|
|
+ createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -1882,22 +1881,19 @@ public class TestTaskAttempt{
|
|
|
// move in two steps to the desired state (cannot get there directly)
|
|
|
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
|
|
|
TaskAttemptEventType.TA_DONE));
|
|
|
- assertEquals("Task attempt's internal state is not " +
|
|
|
- "SUCCESS_FINISHING_CONTAINER",
|
|
|
- TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
|
|
|
- taImpl.getInternalState());
|
|
|
+ assertEquals(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
|
|
|
+ taImpl.getInternalState(), "Task attempt's internal state is not " +
|
|
|
+ "SUCCESS_FINISHING_CONTAINER");
|
|
|
|
|
|
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
|
|
|
TaskAttemptEventType.TA_TIMED_OUT));
|
|
|
- assertEquals("Task attempt's internal state is not " +
|
|
|
- "SUCCESS_CONTAINER_CLEANUP",
|
|
|
- TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
|
|
|
- taImpl.getInternalState());
|
|
|
+ assertEquals(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
|
|
|
+ taImpl.getInternalState(), "Task attempt's internal state is not " +
|
|
|
+ "SUCCESS_CONTAINER_CLEANUP");
|
|
|
|
|
|
taImpl.handle(new TaskAttemptKillEvent(mapTAId, "", true));
|
|
|
- assertEquals("Task attempt is not in KILLED state",
|
|
|
- TaskAttemptState.KILLED,
|
|
|
- taImpl.getState());
|
|
|
+ assertEquals(TaskAttemptState.KILLED,
|
|
|
+ taImpl.getState(), "Task attempt is not in KILLED state");
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -1912,24 +1908,21 @@ public class TestTaskAttempt{
|
|
|
// move in two steps to the desired state (cannot get there directly)
|
|
|
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
|
|
|
TaskAttemptEventType.TA_DONE));
|
|
|
- assertEquals("Task attempt's internal state is not " +
|
|
|
- "SUCCESS_FINISHING_CONTAINER",
|
|
|
- TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
|
|
|
- taImpl.getInternalState());
|
|
|
+ assertEquals(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
|
|
|
+ taImpl.getInternalState(), "Task attempt's internal state is not " +
|
|
|
+ "SUCCESS_FINISHING_CONTAINER");
|
|
|
|
|
|
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
|
|
|
TaskAttemptEventType.TA_TIMED_OUT));
|
|
|
- assertEquals("Task attempt's internal state is not " +
|
|
|
- "SUCCESS_CONTAINER_CLEANUP",
|
|
|
- TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
|
|
|
- taImpl.getInternalState());
|
|
|
+ assertEquals(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
|
|
|
+ taImpl.getInternalState(), "Task attempt's internal state is not " +
|
|
|
+ "SUCCESS_CONTAINER_CLEANUP");
|
|
|
|
|
|
taImpl.handle(new TaskAttemptTooManyFetchFailureEvent(taImpl.getID(),
|
|
|
reduceTAId, "Host"));
|
|
|
- assertEquals("Task attempt is not in FAILED state",
|
|
|
- TaskAttemptState.FAILED,
|
|
|
- taImpl.getState());
|
|
|
- assertFalse("InternalError occurred", eventHandler.internalError);
|
|
|
+ assertEquals(TaskAttemptState.FAILED,
|
|
|
+ taImpl.getState(), "Task attempt is not in FAILED state");
|
|
|
+ assertFalse(eventHandler.internalError, "InternalError occurred");
|
|
|
}
|
|
|
|
|
|
private void initResourceTypes() {
|
|
@@ -1951,17 +1944,15 @@ public class TestTaskAttempt{
|
|
|
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
|
|
|
TaskAttemptEventType.TA_DONE));
|
|
|
|
|
|
- assertEquals("Task attempt's internal state is not " +
|
|
|
- "SUCCESS_FINISHING_CONTAINER",
|
|
|
- TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
|
|
|
- taImpl.getInternalState());
|
|
|
+ assertEquals(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
|
|
|
+ taImpl.getInternalState(), "Task attempt's internal state is not " +
|
|
|
+ "SUCCESS_FINISHING_CONTAINER");
|
|
|
|
|
|
taImpl.handle(new TaskAttemptTooManyFetchFailureEvent(taImpl.getID(),
|
|
|
reduceTAId, "Host"));
|
|
|
- assertEquals("Task attempt is not in FAILED state",
|
|
|
- TaskAttemptState.FAILED,
|
|
|
- taImpl.getState());
|
|
|
- assertFalse("InternalError occurred", eventHandler.internalError);
|
|
|
+ assertEquals(TaskAttemptState.FAILED,
|
|
|
+ taImpl.getState(), "Task attempt is not in FAILED state");
|
|
|
+ assertFalse(eventHandler.internalError, "InternalError occurred");
|
|
|
}
|
|
|
|
|
|
private void setupTaskAttemptFinishingMonitor(
|