Forráskód Böngészése

MAPREDUCE-3927. Shuffle hang when set map.failures.percent (Bhallamudi Venkata Siva Kamesh via tgraves)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1348846 13f79535-47bb-0310-9956-ffa450edef68
Thomas Graves 13 éve
szülő
commit
27d1c74a0c

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -570,6 +570,9 @@ Release 0.23.3 - UNRELEASED
 
 
     MAPREDUCE-3842. Stop webpages from automatic refreshing (tgraves)
     MAPREDUCE-3842. Stop webpages from automatic refreshing (tgraves)
 
 
+    MAPREDUCE-3927. Shuffle hang when set map.failures.percent
+    (Bhallamudi Venkata Siva Kamesh via tgraves)
+
 Release 0.23.2 - UNRELEASED
 Release 0.23.2 - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 2 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -590,9 +590,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
       float reduceProgress = 0f;
       float reduceProgress = 0f;
       for (Task task : this.tasks.values()) {
       for (Task task : this.tasks.values()) {
         if (task.getType() == TaskType.MAP) {
         if (task.getType() == TaskType.MAP) {
-          mapProgress += task.getProgress();
+          mapProgress += (task.isFinished() ? 1f : task.getProgress());
         } else {
         } else {
-          reduceProgress += task.getProgress();
+          reduceProgress += (task.isFinished() ? 1f : task.getProgress());
         }
         }
       }
       }
       if (this.numMapTasks != 0) {
       if (this.numMapTasks != 0) {

+ 23 - 15
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java

@@ -137,24 +137,26 @@ class ShuffleScheduler<K,V> {
 
 
       // update the status
       // update the status
       totalBytesShuffledTillNow += bytes;
       totalBytesShuffledTillNow += bytes;
-      float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024);
-      int mapsDone = totalMaps - remainingMaps;
-      long secsSinceStart = 
-        (System.currentTimeMillis()-startTime)/1000+1;
-
-      float transferRate = mbs/secsSinceStart;
-      progress.set((float) mapsDone / totalMaps);
-      String statusString = mapsDone + " / " + totalMaps + " copied.";
-      status.setStateString(statusString);
-      progress.setStatus("copy(" + mapsDone + " of " + totalMaps 
-          + " at " +
-          mbpsFormat.format(transferRate) +  " MB/s)");
-      
+      updateStatus();
       reduceShuffleBytes.increment(bytes);
       reduceShuffleBytes.increment(bytes);
       lastProgressTime = System.currentTimeMillis();
       lastProgressTime = System.currentTimeMillis();
-      LOG.debug("map " + mapId + " done " + statusString);
+      LOG.debug("map " + mapId + " done " + status.getStateString());
     }
     }
   }
   }
+  
+  private void updateStatus() {
+    float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024);
+    int mapsDone = totalMaps - remainingMaps;
+    long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
+
+    float transferRate = mbs / secsSinceStart;
+    progress.set((float) mapsDone / totalMaps);
+    String statusString = mapsDone + " / " + totalMaps + " copied.";
+    status.setStateString(statusString);
+
+    progress.setStatus("copy(" + mapsDone + " of " + totalMaps + " at "
+        + mbpsFormat.format(transferRate) + " MB/s)");
+  }
 
 
   public synchronized void copyFailed(TaskAttemptID mapId, MapHost host,
   public synchronized void copyFailed(TaskAttemptID mapId, MapHost host,
                                       boolean readError) {
                                       boolean readError) {
@@ -256,7 +258,13 @@ class ShuffleScheduler<K,V> {
   }
   }
   
   
   public synchronized void tipFailed(TaskID taskId) {
   public synchronized void tipFailed(TaskID taskId) {
-    finishedMaps[taskId.getId()] = true;
+    if (!finishedMaps[taskId.getId()]) {
+      finishedMaps[taskId.getId()] = true;
+      if (--remainingMaps == 0) {
+        notifyAll();
+      }
+      updateStatus();
+    }
   }
   }
   
   
   public synchronized void addKnownMapOutput(String hostName, 
   public synchronized void addKnownMapOutput(String hostName, 

+ 67 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java

@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.Progress;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestShuffleScheduler {
+
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testTipFailed() throws Exception {
+    JobConf job = new JobConf();
+    job.setNumMapTasks(2);
+
+    TaskStatus status = new TaskStatus() {
+      @Override
+      public boolean getIsMap() {
+        return false;
+      }
+
+      @Override
+      public void addFetchFailedMap(TaskAttemptID mapTaskId) {
+      }
+    };
+    Progress progress = new Progress();
+
+    ShuffleScheduler scheduler = new ShuffleScheduler(job, status, null,
+        progress, null, null, null);
+
+    JobID jobId = new JobID();
+    TaskID taskId1 = new TaskID(jobId, TaskType.REDUCE, 1);
+    scheduler.tipFailed(taskId1);
+
+    Assert.assertEquals("Progress should be 0.5", 0.5f, progress.getProgress(),
+        0.0f);
+    Assert.assertFalse(scheduler.waitUntilDone(1));
+
+    TaskID taskId0 = new TaskID(jobId, TaskType.REDUCE, 0);
+    scheduler.tipFailed(taskId0);
+    Assert.assertEquals("Progress should be 1.0", 1.0f, progress.getProgress(),
+        0.0f);
+    Assert.assertTrue(scheduler.waitUntilDone(1));
+  }
+}