浏览代码

MAPREDUCE-3834. Changed MR AM to not add the same rack entry multiple times into the container request table when multiple hosts for a split happen to be on the same rack. Contributed by Siddarth Seth.
svn merge --ignore-ancestry -c 1241734 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1241735 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli 13 年之前
父节点
当前提交
595ea04385

+ 4 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -721,6 +721,10 @@ Release 0.23.1 - Unreleased
     the same FS scheme, instead of randomly using one. (Mahadev Konar via
     sseth)
 
+    MAPREDUCE-3834. Changed MR AM to not add the same rack entry multiple times
+    into the container request table when multiple hosts for a split happen to
+    be on the same rack. (Siddarth Seth via vinodkv)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

+ 6 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -27,9 +27,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -1079,14 +1081,14 @@ public abstract class TaskAttemptImpl implements
                 taskAttempt.attemptId, 
                 taskAttempt.resourceCapability));
       } else {
-        int i = 0;
-        String[] racks = new String[taskAttempt.dataLocalHosts.length];
+        Set<String> racks = new HashSet<String>(); 
         for (String host : taskAttempt.dataLocalHosts) {
-          racks[i++] = RackResolver.resolve(host).getNetworkLocation();
+          racks.add(RackResolver.resolve(host).getNetworkLocation());
         }
         taskAttempt.eventHandler.handle(new ContainerRequestEvent(
             taskAttempt.attemptId, taskAttempt.resourceCapability, taskAttempt
-                .resolveHosts(taskAttempt.dataLocalHosts), racks));
+                .resolveHosts(taskAttempt.dataLocalHosts), racks
+                .toArray(new String[racks.size()])));
       }
     }
   }

+ 33 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java

@@ -81,7 +81,39 @@ public class TestTaskAttempt{
     MRApp app = new FailingAttemptsMRApp(0, 1);
     testMRAppHistory(app);
   }
-  
+
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testSingleRackRequest() throws Exception {
+    TaskAttemptImpl.RequestContainerTransition rct =
+        new TaskAttemptImpl.RequestContainerTransition(false);
+
+    EventHandler eventHandler = mock(EventHandler.class);
+    String[] hosts = new String[3];
+    hosts[0] = "host1";
+    hosts[1] = "host2";
+    hosts[2] = "host3";
+    TaskSplitMetaInfo splitInfo =
+        new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l);
+
+    TaskAttemptImpl mockTaskAttempt =
+        createMapTaskAttemptImplForTest(eventHandler, splitInfo);
+    TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class);
+
+    rct.transition(mockTaskAttempt, mockTAEvent);
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(2)).handle(arg.capture());
+    if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) {
+      Assert.fail("Second Event not of type ContainerRequestEvent");
+    }
+    ContainerRequestEvent cre =
+        (ContainerRequestEvent) arg.getAllValues().get(1);
+    String[] requestedRacks = cre.getRacks();
+    //Only a single occurance of /DefaultRack
+    assertEquals(1, requestedRacks.length);
+  }
+ 
   @SuppressWarnings("rawtypes")
   @Test
   public void testHostResolveAttempt() throws Exception {