ソースを参照

MAPREDUCE-6361. NPE issue in shuffle caused by concurrent issue between copySucceeded() in one thread and copyFailed() in another thread on the same host. Contributed by Junping Du.

(cherry picked from commit f4e2b3cc0b1f4e49c306bc09a9dddd0495225bb2)
(cherry picked from commit fb5b0ebb459cc8812084090a7ce7ac29e2ad147c)
(cherry picked from commit 3a3dcf0bb689d6aa8bc524a31799067702814674)
Tsuyoshi Ozawa 10 年 前
コミット
6c0ad99b9d

+ 4 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -38,6 +38,10 @@ Release 2.6.1 - UNRELEASED
     MAPREDUCE-5649. Reduce cannot use more than 2G memory for the final merge
     MAPREDUCE-5649. Reduce cannot use more than 2G memory for the final merge
     (Gera Shegalov via jlowe)
     (Gera Shegalov via jlowe)
 
 
+    MAPREDUCE-6361. NPE issue in shuffle caused by concurrent issue between
+    copySucceeded() in one thread and copyFailed() in another thread on the
+    same host. (Junping Du via ozawa)
+
 Release 2.6.0 - 2014-11-18
 Release 2.6.0 - 2014-11-18
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 11 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java

@@ -239,7 +239,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
   }
   }
   
   
   private void updateStatus() {
   private void updateStatus() {
-    updateStatus(null);	
+    updateStatus(null);
   }
   }
 
 
   public synchronized void hostFailed(String hostname) {
   public synchronized void hostFailed(String hostname) {
@@ -263,9 +263,17 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
       failureCounts.put(mapId, new IntWritable(1));
       failureCounts.put(mapId, new IntWritable(1));
     }
     }
     String hostname = host.getHostName();
     String hostname = host.getHostName();
+    IntWritable hostFailedNum = hostFailures.get(hostname);
+    // MAPREDUCE-6361: hostname could get cleanup from hostFailures in another
+    // thread with copySucceeded.
+    // In this case, add back hostname to hostFailures to get rid of NPE issue.
+    if (hostFailedNum == null) {
+      hostFailures.put(hostname, new IntWritable(1));
+    }
     //report failure if already retried maxHostFailures times
     //report failure if already retried maxHostFailures times
-    boolean hostFail = hostFailures.get(hostname).get() > getMaxHostFailures() ? true : false;
-    
+    boolean hostFail = hostFailures.get(hostname).get() >
+        getMaxHostFailures() ? true : false;
+
     if (failures >= abortFailureLimit) {
     if (failures >= abortFailureLimit) {
       try {
       try {
         throw new IOException(failures + " failures downloading " + mapId);
         throw new IOException(failures + " failures downloading " + mapId);

+ 70 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java

@@ -223,4 +223,74 @@ public class TestShuffleScheduler {
         + " Aggregated copy rate(10 of 10 at 2.00 MB/s)", progress.toString());
         + " Aggregated copy rate(10 of 10 at 2.00 MB/s)", progress.toString());
 
 
   }
   }
+
+  @SuppressWarnings("rawtypes")
+  @Test
+  public <K, V> void TestSucceedAndFailedCopyMap() throws Exception {
+    JobConf job = new JobConf();
+    job.setNumMapTasks(2);
+    //mock creation
+    TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
+    Reporter mockReporter = mock(Reporter.class);
+    FileSystem mockFileSystem = mock(FileSystem.class);
+    Class<? extends org.apache.hadoop.mapred.Reducer>  combinerClass = job.getCombinerClass();
+    @SuppressWarnings("unchecked")  // needed for mock with generic
+    CombineOutputCollector<K, V>  mockCombineOutputCollector =
+        (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
+    org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
+        mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
+    LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
+    CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
+    Counter mockCounter = mock(Counter.class);
+    TaskStatus mockTaskStatus = mock(TaskStatus.class);
+    Progress mockProgress = mock(Progress.class);
+    MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
+    Task mockTask = mock(Task.class);
+    @SuppressWarnings("unchecked")
+    MapOutput<K, V> output = mock(MapOutput.class);
+
+    ShuffleConsumerPlugin.Context<K, V> context =
+        new ShuffleConsumerPlugin.Context<K, V>(
+            mockTaskAttemptID, job, mockFileSystem,
+            mockUmbilical, mockLocalDirAllocator,
+            mockReporter, mockCompressionCodec,
+            combinerClass, mockCombineOutputCollector,
+            mockCounter, mockCounter, mockCounter,
+            mockCounter, mockCounter, mockCounter,
+            mockTaskStatus, mockProgress, mockProgress,
+            mockTask, mockMapOutputFile, null);
+    TaskStatus status = new TaskStatus() {
+      @Override
+      public boolean getIsMap() {
+        return false;
+      }
+      @Override
+      public void addFetchFailedMap(TaskAttemptID mapTaskId) {
+      }
+    };
+    Progress progress = new Progress();
+    ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job,
+        status, null, null, progress, context.getShuffledMapsCounter(),
+        context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
+
+    MapHost host1 = new MapHost("host1", null);
+    TaskAttemptID failedAttemptID = new TaskAttemptID(
+        new org.apache.hadoop.mapred.TaskID(
+        new JobID("test",0), TaskType.MAP, 0), 0);
+
+    TaskAttemptID succeedAttemptID = new TaskAttemptID(
+        new org.apache.hadoop.mapred.TaskID(
+        new JobID("test",0), TaskType.MAP, 1), 1);
+
+    // handle output fetch failure for failedAttemptID, part I
+    scheduler.hostFailed(host1.getHostName());
+
+    // handle output fetch succeed for succeedAttemptID
+    long bytes = (long)500 * 1024 * 1024;
+    scheduler.copySucceeded(succeedAttemptID, host1, bytes, 0, 500000, output);
+
+    // handle output fetch failure for failedAttemptID, part II
+    // for MAPREDUCE-6361: verify no NPE exception get thrown out
+    scheduler.copyFailed(failedAttemptID, host1, true, false);
+  }
 }
 }