Selaa lähdekoodia

MAPREDUCE-5833. TestRMContainerAllocator fails ocassionally. Contributed by Zhijie Shen.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1589248 13f79535-47bb-0310-9956-ffa450edef68
Chris Nauroth 11 vuotta sitten
vanhempi
commit
4a91b876db

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -215,6 +215,9 @@ Release 2.4.1 - UNRELEASED
     MAPREDUCE-5827. TestSpeculativeExecutionWithMRApp fails.
     (Zhijie Shen via cnauroth)
 
+    MAPREDUCE-5833. TestRMContainerAllocator fails ocassionally.
+    (Zhijie Shen via cnauroth)
+
 Release 2.4.0 - 2014-04-07 
 
   INCOMPATIBLE CHANGES

+ 2 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -155,7 +155,8 @@ public class RMContainerAllocator extends RMContainerRequestor
 
   private final AMPreemptionPolicy preemptionPolicy;
 
-  BlockingQueue<ContainerAllocatorEvent> eventQueue
+  @VisibleForTesting
+  protected BlockingQueue<ContainerAllocatorEvent> eventQueue
     = new LinkedBlockingQueue<ContainerAllocatorEvent>();
 
   private ScheduleStats scheduleStats = new ScheduleStats();

+ 17 - 6
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java

@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
-import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy;
-
 import static org.mockito.Matchers.anyFloat;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.isA;
@@ -56,6 +54,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
@@ -64,17 +63,20 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssigned
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -106,6 +108,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.base.Supplier;
+
 @SuppressWarnings("unchecked")
 public class TestRMContainerAllocator {
 
@@ -581,7 +585,7 @@ public class TestRMContainerAllocator {
     MyContainerAllocator allocator = (MyContainerAllocator) mrApp
       .getContainerAllocator();
 
-    mrApp.waitForState(job, JobState.RUNNING);
+    mrApp.waitForInternalState((JobImpl) job, JobStateInternal.RUNNING);
 
     amDispatcher.await();
     // Wait till all map-attempts request for containers
@@ -733,7 +737,7 @@ public class TestRMContainerAllocator {
     MyContainerAllocator allocator = (MyContainerAllocator) mrApp
       .getContainerAllocator();
 
-    mrApp.waitForState(job, JobState.RUNNING);
+    mrApp.waitForInternalState((JobImpl)job, JobStateInternal.RUNNING);
 
     amDispatcher.await();
     // Wait till all map-attempts request for containers
@@ -1554,7 +1558,15 @@ public class TestRMContainerAllocator {
     }
     
     // API to be used by tests
-    public List<TaskAttemptContainerAssignedEvent> schedule() {
+    public List<TaskAttemptContainerAssignedEvent> schedule()
+        throws Exception {
+      // before doing heartbeat with RM, drain all the outstanding events to
+      // ensure all the requests before this heartbeat is to be handled
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        public Boolean get() {
+          return eventQueue.isEmpty();
+        }
+      }, 100, 10000);
       // run the scheduler
       try {
         super.heartbeat();
@@ -1590,7 +1602,6 @@ public class TestRMContainerAllocator {
     public boolean isUnregistered() {
       return isUnregistered;
     }
-        
   }
 
   @Test