|
@@ -54,6 +54,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
|
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
|
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
|
|
@@ -62,6 +63,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssigned
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
|
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
|
|
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
|
|
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
|
|
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
|
|
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
|
|
@@ -73,6 +75,7 @@ import org.apache.hadoop.net.NetUtils;
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
|
+import org.apache.hadoop.test.GenericTestUtils;
|
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
@@ -104,6 +107,8 @@ import org.junit.After;
|
|
import org.junit.Before;
|
|
import org.junit.Before;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
|
|
|
|
+import com.google.common.base.Supplier;
|
|
|
|
+
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
public class TestRMContainerAllocator {
|
|
public class TestRMContainerAllocator {
|
|
|
|
|
|
@@ -526,7 +531,7 @@ public class TestRMContainerAllocator {
|
|
MyContainerAllocator allocator = (MyContainerAllocator) mrApp
|
|
MyContainerAllocator allocator = (MyContainerAllocator) mrApp
|
|
.getContainerAllocator();
|
|
.getContainerAllocator();
|
|
|
|
|
|
- mrApp.waitForState(job, JobState.RUNNING);
|
|
|
|
|
|
+ mrApp.waitForInternalState((JobImpl) job, JobStateInternal.RUNNING);
|
|
|
|
|
|
amDispatcher.await();
|
|
amDispatcher.await();
|
|
// Wait till all map-attempts request for containers
|
|
// Wait till all map-attempts request for containers
|
|
@@ -678,7 +683,7 @@ public class TestRMContainerAllocator {
|
|
MyContainerAllocator allocator = (MyContainerAllocator) mrApp
|
|
MyContainerAllocator allocator = (MyContainerAllocator) mrApp
|
|
.getContainerAllocator();
|
|
.getContainerAllocator();
|
|
|
|
|
|
- mrApp.waitForState(job, JobState.RUNNING);
|
|
|
|
|
|
+ mrApp.waitForInternalState((JobImpl)job, JobStateInternal.RUNNING);
|
|
|
|
|
|
amDispatcher.await();
|
|
amDispatcher.await();
|
|
// Wait till all map-attempts request for containers
|
|
// Wait till all map-attempts request for containers
|
|
@@ -1497,7 +1502,15 @@ public class TestRMContainerAllocator {
|
|
}
|
|
}
|
|
|
|
|
|
// API to be used by tests
|
|
// API to be used by tests
|
|
- public List<TaskAttemptContainerAssignedEvent> schedule() {
|
|
|
|
|
|
+ public List<TaskAttemptContainerAssignedEvent> schedule()
|
|
|
|
+ throws Exception {
|
|
|
|
+ // before doing heartbeat with RM, drain all the outstanding events to
|
|
|
|
+ // ensure all the requests before this heartbeat is to be handled
|
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
|
+ public Boolean get() {
|
|
|
|
+ return eventQueue.isEmpty();
|
|
|
|
+ }
|
|
|
|
+ }, 100, 10000);
|
|
// run the scheduler
|
|
// run the scheduler
|
|
try {
|
|
try {
|
|
super.heartbeat();
|
|
super.heartbeat();
|
|
@@ -1533,7 +1546,6 @@ public class TestRMContainerAllocator {
|
|
public boolean isUnregistered() {
|
|
public boolean isUnregistered() {
|
|
return isUnregistered;
|
|
return isUnregistered;
|
|
}
|
|
}
|
|
-
|
|
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|