瀏覽代碼

HADOOP-3780. Remove asynchronous resolution of network topology in the
JobTracker (Amar Kamat via omalley)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@685330 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 17 年之前
父節點
當前提交
9543dcdc1a

+ 3 - 0
CHANGES.txt

@@ -172,6 +172,9 @@ Trunk (unreleased changes)
     HADOOP-3851. Fix spelling mistake in FSNamesystemMetrics. (Steve Loughran 
     via omalley)
 
+    HADOOP-3780. Remove asynchronous resolution of network topology in the 
+    JobTracker (Amar Kamat via omalley)
+
   OPTIMIZATIONS
 
     HADOOP-3556. Removed lock contention in MD5Hash by changing the 

+ 6 - 70
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -87,7 +87,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
 
   private DNSToSwitchMapping dnsToSwitchMapping;
   private NetworkTopology clusterMap = new NetworkTopology();
-  private ResolutionThread resThread = new ResolutionThread();
   private int numTaskCacheLevels; // the max level to which we cache tasks
   private Set<Node> nodesAtMaxLevel = new HashSet<Node>();
   private final TaskScheduler taskScheduler;
@@ -688,7 +687,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     this.expireTrackersThread = new Thread(this.expireTrackers,
                                           "expireTrackers");
     this.expireTrackersThread.start();
-    this.resThread.start();
     this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
     this.retireJobsThread.start();
     taskScheduler.start();
@@ -758,15 +756,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         ex.printStackTrace();
       }
     }
-    if (this.resThread != null) {
-      LOG.info("Stopping DNSToSwitchMapping Resolution thread");
-      this.resThread.interrupt();
-      try {
-        this.resThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
-      }
-    }
     if (this.completedJobsStoreThread != null &&
         this.completedJobsStoreThread.isAlive()) {
       LOG.info("Stopping completedJobsStore thread");
@@ -1211,6 +1200,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       }
     }
       
+    // Register the tracker if its not registered
+    if (getNode(trackerName) == null) {
+      // Making the network location resolution inline .. 
+      resolveAndAddToTopology(status.getHost());
+    }
+    
     // Process this heartbeat 
     short newResponseId = (short)(responseId + 1);
     if (!processHeartbeat(status, initialContact)) {
@@ -1390,7 +1385,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
 
         if (initialContact) {
           trackerExpiryQueue.add(trackerStatus);
-          resThread.addToResolutionQueue(trackerStatus);
         }
       }
     }
@@ -1400,64 +1394,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     return true;
   }
 
-  private class ResolutionThread extends Thread {
-    private LinkedBlockingQueue<TaskTrackerStatus> queue = 
-      new LinkedBlockingQueue <TaskTrackerStatus>();
-    public ResolutionThread() {
-      setName("DNSToSwitchMapping reolution Thread");
-      setDaemon(true);
-    }
-    public void addToResolutionQueue(TaskTrackerStatus t) {
-      while (!queue.add(t)) {
-        LOG.warn("Couldn't add to the Resolution queue now. Will " +
-                 "try again");
-        try {
-          Thread.sleep(2000);
-        } catch (InterruptedException ie) {}
-      }
-    }
-    @Override
-    public void run() {
-      while (!isInterrupted()) {
-        try {
-          List <TaskTrackerStatus> statuses = 
-            new ArrayList<TaskTrackerStatus>(queue.size());
-          // Block if the queue is empty
-          statuses.add(queue.take());
-          queue.drainTo(statuses);
-          List<String> dnHosts = new ArrayList<String>(statuses.size());
-          for (TaskTrackerStatus t : statuses) {
-            dnHosts.add(t.getHost());
-          }
-          List<String> rName = dnsToSwitchMapping.resolve(dnHosts);
-          if (rName == null) {
-            LOG.error("The resolve call returned null! Using " + 
-                NetworkTopology.DEFAULT_RACK + " for some hosts");
-            rName = new ArrayList<String>(dnHosts.size());
-            for (int i = 0; i < dnHosts.size(); i++) {
-              rName.add(NetworkTopology.DEFAULT_RACK);
-            }
-          }
-          int i = 0;
-          for (String m : rName) {
-            String host = statuses.get(i++).getHost();
-            String networkLoc = NodeBase.normalize(m);
-            addHostToNodeMapping(host, networkLoc);
-            numResolved++;
-          }
-        } catch (InterruptedException ie) {
-          LOG.warn(getName() + " exiting, got interrupted: " + 
-                   StringUtils.stringifyException(ie));
-          return;
-        } catch (Throwable t) {
-          LOG.error(getName() + " got an exception: " +
-              StringUtils.stringifyException(t));
-        }
-      }
-      LOG.warn(getName() + " exiting...");
-    }
-  }
-
   /**
    * A tracker wants to know if any of its Tasks have been
    * closed (because the job completed, whether successfully or not)

+ 0 - 7
src/test/org/apache/hadoop/mapred/MiniMRCluster.java

@@ -426,13 +426,6 @@ public class MiniMRCluster {
       taskTrackerThread.start();
     }
 
-    // Wait till the MR cluster stabilizes
-    while(jobTracker.tracker.getNumResolvedTaskTrackers() != numTaskTrackers) {
-      try {
-        Thread.sleep(20);
-      } catch (InterruptedException ie) {
-      }
-    }
     waitUntilIdle();
   }