Ver Fonte

svn merge -c 1555161 FIXES: MAPREDUCE-5689. MRAppMaster does not preempt reducers when scheduled maps cannot be fulfilled. Contributed by Lohit Vijayarenu

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1574272 13f79535-47bb-0310-9956-ffa450edef68
Jason Darrell Lowe há 11 anos atrás
pai
commit
ded6d13064

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -28,6 +28,9 @@ Release 0.23.11 - UNRELEASED
     RMContainerAllocator$AssignedRequests.preemptReduce() violates the 
     RMContainerAllocator$AssignedRequests.preemptReduce() violates the 
     comparator contract (Gera Shegalov via kasha)
     comparator contract (Gera Shegalov via kasha)
 
 
+    MAPREDUCE-5689. MRAppMaster does not preempt reducers when scheduled maps 
+    cannot be fulfilled. (lohit via kasha)
+
 Release 0.23.10 - 2013-12-09
 Release 0.23.10 - 2013-12-09
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 2 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -213,7 +213,8 @@ public class RMContainerAllocator extends RMContainerRequestor
 
 
     int completedMaps = getJob().getCompletedMaps();
     int completedMaps = getJob().getCompletedMaps();
     int completedTasks = completedMaps + getJob().getCompletedReduces();
     int completedTasks = completedMaps + getJob().getCompletedReduces();
-    if (lastCompletedTasks != completedTasks) {
+    if ((lastCompletedTasks != completedTasks) ||
+          (scheduledRequests.maps.size() > 0)) {
       lastCompletedTasks = completedTasks;
       lastCompletedTasks = completedTasks;
       recalculateReduceSchedule = true;
       recalculateReduceSchedule = true;
     }
     }

+ 15 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java

@@ -1428,6 +1428,21 @@ public class TestRMContainerAllocator {
         numPendingReduces, 
         numPendingReduces, 
         maxReduceRampupLimit, reduceSlowStart);
         maxReduceRampupLimit, reduceSlowStart);
     verify(allocator).rampDownReduces(anyInt());
     verify(allocator).rampDownReduces(anyInt());
+
+    // Test reduce ramp-down for when there are scheduled maps
+    // Since we have two scheduled Maps, rampDownReducers 
+    // should be invoked twice.
+    scheduledMaps = 2;
+    assignedReduces = 2;
+    doReturn(10 * 1024).when(allocator).getMemLimit();
+    allocator.scheduleReduces(
+        totalMaps, succeededMaps, 
+        scheduledMaps, scheduledReduces, 
+        assignedMaps, assignedReduces, 
+        mapResourceReqt, reduceResourceReqt, 
+        numPendingReduces, 
+        maxReduceRampupLimit, reduceSlowStart);
+    verify(allocator, times(2)).rampDownReduces(anyInt());
   }
   }
 
 
   private static class RecalculateContainerAllocator extends MyContainerAllocator {
   private static class RecalculateContainerAllocator extends MyContainerAllocator {