소스 검색

Merge -r 949895:949896 from trunk to 0.20 branch. Fixes: MAPREDUCE-1372

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20@949901 13f79535-47bb-0310-9956-ffa450edef68
Thomas White 15 년 전
부모
커밋
9ad85a3b27
2개의 변경된 파일31개의 추가작업 그리고 18개의 파일을 삭제
  1. 3 0
      CHANGES.txt
  2. 28 18
      src/mapred/org/apache/hadoop/mapred/JobTracker.java

+ 3 - 0
CHANGES.txt

@@ -29,6 +29,9 @@ Release 0.20.3 - Unreleased
     HDFS-909. Wait until edits syncing is finishes before purging edits.
     (Todd Lipcon via shv)
 
+    MAPREDUCE-1372. ConcurrentModificationException in JobInProgress.
+    (Dick King and Amareshwari Sriramadasu via tomwhite)
+
   IMPROVEMENTS
 
     MAPREDUCE-1407. Update javadoc in mapreduce.{Mapper,Reducer} to match

+ 28 - 18
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -117,7 +117,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   private DNSToSwitchMapping dnsToSwitchMapping;
   private NetworkTopology clusterMap = new NetworkTopology();
   private int numTaskCacheLevels; // the max level to which we cache tasks
-  private Set<Node> nodesAtMaxLevel = new HashSet<Node>();
+  /**
+   * {@link #nodesAtMaxLevel} is using the keySet from {@link ConcurrentHashMap}
+   * so that it can be safely written to and iterated on via 2 separate threads.
+   * Note: It can only be iterated from a single thread which is feasible since
+   *       the only iteration is done in {@link JobInProgress} under the 
+   *       {@link JobTracker} lock.
+   */
+  private Set<Node> nodesAtMaxLevel = 
+    Collections.newSetFromMap(new ConcurrentHashMap<Node, Boolean>());
   private final TaskScheduler taskScheduler;
   private final List<JobInProgressListener> jobInProgressListeners =
     new CopyOnWriteArrayList<JobInProgressListener>();
@@ -2388,25 +2396,27 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   }
   
   private Node addHostToNodeMapping(String host, String networkLoc) {
-    Node node;
-    if ((node = clusterMap.getNode(networkLoc+"/"+host)) == null) {
-      node = new NodeBase(host, networkLoc);
-      clusterMap.add(node);
-      if (node.getLevel() < getNumTaskCacheLevels()) {
-        LOG.fatal("Got a host whose level is: " + node.getLevel() + "." 
-                  + " Should get at least a level of value: " 
-                  + getNumTaskCacheLevels());
-        try {
-          stopTracker();
-        } catch (IOException ie) {
-          LOG.warn("Exception encountered during shutdown: " 
-                   + StringUtils.stringifyException(ie));
-          System.exit(-1);
+    Node node = null;
+    synchronized (nodesAtMaxLevel) {
+      if ((node = clusterMap.getNode(networkLoc+"/"+host)) == null) {
+        node = new NodeBase(host, networkLoc);
+        clusterMap.add(node);
+        if (node.getLevel() < getNumTaskCacheLevels()) {
+          LOG.fatal("Got a host whose level is: " + node.getLevel() + "." 
+              + " Should get at least a level of value: " 
+              + getNumTaskCacheLevels());
+          try {
+            stopTracker();
+          } catch (IOException ie) {
+            LOG.warn("Exception encountered during shutdown: " 
+                + StringUtils.stringifyException(ie));
+            System.exit(-1);
+          }
         }
+        hostnameToNodeMap.put(host, node);
+        // Make an entry for the node at the max level in the cache
+        nodesAtMaxLevel.add(getParentNode(node, getNumTaskCacheLevels() - 1));
       }
-      hostnameToNodeMap.put(host, node);
-      // Make an entry for the node at the max level in the cache
-      nodesAtMaxLevel.add(getParentNode(node, getNumTaskCacheLevels() - 1));
     }
     return node;
   }