Przeglądaj źródła

MAPREDUCE-5166. Fix ConcurrentModificationException due to insufficient synchronization on updates to task Counters. Contributed by Sandy Ryza.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1471796 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 12 lat temu
rodzic
commit
daaa73b657

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -347,6 +347,9 @@ Release 2.0.5-beta - UNRELEASED
     can override Mapper.run and Reducer.run to get the old (inconsistent) 
     behaviour. (acmurthy)
 
+    MAPREDUCE-5166. Fix ConcurrentModificationException due to insufficient
+    synchronization on updates to task Counters. (Sandy Ryza via acmurthy)
+
 Release 2.0.4-alpha - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 17 - 6
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -18,6 +18,10 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
@@ -86,8 +90,6 @@ public class LocalJobRunner implements ClientProtocol {
 
   private static final String jobDir =  "localRunner/";
 
-  private static final Counters EMPTY_COUNTERS = new Counters();
-
   public long getProtocolVersion(String protocol, long clientVersion) {
     return ClientProtocol.versionID;
   }
@@ -273,10 +275,10 @@ public class LocalJobRunner implements ClientProtocol {
       this.partialMapProgress = new float[numMaps];
       this.mapCounters = new Counters[numMaps];
       for (int i = 0; i < numMaps; i++) {
-        this.mapCounters[i] = EMPTY_COUNTERS;
+        this.mapCounters[i] = new Counters();
       }
 
-      this.reduceCounters = EMPTY_COUNTERS;
+      this.reduceCounters = new Counters();
     }
 
     /**
@@ -497,6 +499,15 @@ public class LocalJobRunner implements ClientProtocol {
     
     public synchronized boolean statusUpdate(TaskAttemptID taskId,
         TaskStatus taskStatus) throws IOException, InterruptedException {
+      // Serialize as we would if distributed in order to make deep copy
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      DataOutputStream dos = new DataOutputStream(baos);
+      taskStatus.write(dos);
+      dos.close();
+      taskStatus = TaskStatus.createTaskStatus(taskStatus.getIsMap());
+      taskStatus.readFields(new DataInputStream(
+          new ByteArrayInputStream(baos.toByteArray())));
+      
       LOG.info(taskStatus.getStateString());
       int taskIndex = mapIds.indexOf(taskId);
       if (taskIndex >= 0) {                       // mapping
@@ -525,10 +536,10 @@ public class LocalJobRunner implements ClientProtocol {
     public synchronized Counters getCurrentCounters() {
       if (null == mapCounters) {
         // Counters not yet initialized for job.
-        return EMPTY_COUNTERS;
+        return new Counters();
       }
 
-      Counters current = EMPTY_COUNTERS;
+      Counters current = new Counters();
       for (Counters c : mapCounters) {
         current = Counters.sum(current, c);
       }