Bladeren bron

HADOOP-3780. Fixes a problem to do with NPE due to nodes not being resolved by the resolution thread in time. Patch contributed by Ravi Gummadi for the 0.18 branch (Amar Kamat had fixed the issue for 0.19).

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.18@730106 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 jaren geleden
bovenliggende
commit
5df72a4fa5

+ 3 - 0
CHANGES.txt

@@ -121,6 +121,9 @@ Release 0.18.3 - Unreleased
     HADOOP-4951. Lease monitor should acquire the LeaseManager lock but not the
     Monitor lock. (szetszwo)
 
+    HADOOP-3780. Fixes a problem to do with NPE due to nodes not being
+    resolved by the resolution thread in time. (Ravia Gummadi via ddas)
+
 Release 0.18.2 - 2008-11-03
 
   BUG FIXES

+ 11 - 73
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -86,7 +86,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
 
   private DNSToSwitchMapping dnsToSwitchMapping;
   private NetworkTopology clusterMap = new NetworkTopology();
-  private ResolutionThread resThread = new ResolutionThread();
   private int numTaskCacheLevels; // the max level to which we cache tasks
   private Set<Node> nodesAtMaxLevel = new HashSet<Node>();
 
@@ -652,6 +651,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     this.localMachine = addr.getHostName();
     this.port = addr.getPort();
     int handlerCount = conf.getInt("mapred.job.tracker.handler.count", 10);
+
+    this.dnsToSwitchMapping = (DNSToSwitchMapping)ReflectionUtils.newInstance(
+        conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
+            DNSToSwitchMapping.class), conf);
+
     this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr.getPort(), handlerCount, false, conf);
     this.interTrackerServer.start();
     if (LOG.isDebugEnabled()) {
@@ -741,9 +745,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
       infoServer.setAttribute("fileSys", historyFS);
     }
 
-    this.dnsToSwitchMapping = (DNSToSwitchMapping)ReflectionUtils.newInstance(
-        conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
-            DNSToSwitchMapping.class), conf);
     this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels", 
         NetworkTopology.DEFAULT_HOST_LEVEL);
     synchronized (this) {
@@ -769,7 +770,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     this.expireTrackersThread = new Thread(this.expireTrackers,
                                           "expireTrackers");
     this.expireTrackersThread.start();
-    this.resThread.start();
     this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
     this.retireJobsThread.start();
     this.initJobsThread = new Thread(this.initJobs, "initJobs");
@@ -846,15 +846,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         ex.printStackTrace();
       }
     }
-    if (this.resThread != null) {
-      LOG.info("Stopping DNSToSwitchMapping Resolution thread");
-      this.resThread.interrupt();
-      try {
-        this.resThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
-      }
-    }
     if (this.completedJobsStoreThread != null &&
         this.completedJobsStoreThread.isAlive()) {
       LOG.info("Stopping completedJobsStore thread");
@@ -1287,6 +1278,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
       }
     }
       
+    // Register the tracker if its not registered
+    if (getNode(trackerName) == null) {
+      // Making the network location resolution inline .. 
+      resolveAndAddToTopology(status.getHost());
+    }
+    
     // Process this heartbeat 
     short newResponseId = (short)(responseId + 1);
     if (!processHeartbeat(status, initialContact)) {
@@ -1453,7 +1450,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
 
         if (initialContact) {
           trackerExpiryQueue.add(trackerStatus);
-          resThread.addToResolutionQueue(trackerStatus);
         }
       }
     }
@@ -1463,64 +1459,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     return true;
   }
 
-  private class ResolutionThread extends Thread {
-    private LinkedBlockingQueue<TaskTrackerStatus> queue = 
-      new LinkedBlockingQueue <TaskTrackerStatus>();
-    public ResolutionThread() {
-      setName("DNSToSwitchMapping reolution Thread");
-      setDaemon(true);
-    }
-    public void addToResolutionQueue(TaskTrackerStatus t) {
-      while (!queue.add(t)) {
-        LOG.warn("Couldn't add to the Resolution queue now. Will " +
-                 "try again");
-        try {
-          Thread.sleep(2000);
-        } catch (InterruptedException ie) {}
-      }
-    }
-    @Override
-    public void run() {
-      while (!isInterrupted()) {
-        try {
-          List <TaskTrackerStatus> statuses = 
-            new ArrayList<TaskTrackerStatus>(queue.size());
-          // Block if the queue is empty
-          statuses.add(queue.take());
-          queue.drainTo(statuses);
-          List<String> dnHosts = new ArrayList<String>(statuses.size());
-          for (TaskTrackerStatus t : statuses) {
-            dnHosts.add(t.getHost());
-          }
-          List<String> rName = dnsToSwitchMapping.resolve(dnHosts);
-          if (rName == null) {
-            LOG.error("The resolve call returned null! Using " + 
-                NetworkTopology.DEFAULT_RACK + " for some hosts");
-            rName = new ArrayList<String>(dnHosts.size());
-            for (int i = 0; i < dnHosts.size(); i++) {
-              rName.add(NetworkTopology.DEFAULT_RACK);
-            }
-          }
-          int i = 0;
-          for (String m : rName) {
-            String host = statuses.get(i++).getHost();
-            String networkLoc = NodeBase.normalize(m);
-            addHostToNodeMapping(host, networkLoc);
-            numResolved++;
-          }
-        } catch (InterruptedException ie) {
-          LOG.warn(getName() + " exiting, got interrupted: " + 
-                   StringUtils.stringifyException(ie));
-          return;
-        } catch (Throwable t) {
-          LOG.error(getName() + " got an exception: " +
-              StringUtils.stringifyException(t));
-        }
-      }
-      LOG.warn(getName() + " exiting...");
-    }
-  }
-  
   /**
    * Returns a task we'd like the TaskTracker to execute right now.
    *

+ 0 - 7
src/test/org/apache/hadoop/mapred/MiniMRCluster.java

@@ -426,13 +426,6 @@ public class MiniMRCluster {
       taskTrackerThread.start();
     }
 
-    // Wait till the MR cluster stabilizes
-    while(jobTracker.tracker.getNumResolvedTaskTrackers() != numTaskTrackers) {
-      try {
-        Thread.sleep(20);
-      } catch (InterruptedException ie) {
-      }
-    }
     waitUntilIdle();
   }