Kaynağa Gözat

MAPREDUCE-2489. Jobsplits with random hostnames can make the queue unusable. (Jeffrey Naisbitt via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security@1154562 13f79535-47bb-0310-9956-ffa450edef68
Mahadev Konar 14 yıl önce
ebeveyn
işleme
59b711baf1

+ 3 - 0
CHANGES.txt

@@ -26,6 +26,9 @@ Release 0.20.205.0 - unreleased
 
     HDFS-2117. DiskChecker#mkdirsWithExistsAndPermissionCheck may
     return true even when the dir is not created. (eli)
+    
+    MAPREDUCE-2489. Jobsplits with random hostnames can make the 
+    queue unusable. (Jeffrey Naisbitt via mahadev)
 
   IMPROVEMENTS
 

+ 31 - 1
src/core/org/apache/hadoop/net/NetUtils.java

@@ -27,6 +27,7 @@ import java.net.Socket;
 import java.net.SocketAddress;
 import java.net.SocketException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.nio.channels.SocketChannel;
 import java.util.Map.Entry;
@@ -443,7 +444,36 @@ public class NetUtils {
     }
     return hostNames;
   }
-  
+
+  /**
+   * Performs a sanity check on the list of hostnames/IPs to verify they at least
+   * appear to be valid.
+   * @param names - List of hostnames/IPs
+   * @throws UnknownHostException
+   */
+  public static void verifyHostnames(String[] names) throws UnknownHostException {
+    for (String name: names) {
+      if (name == null) {
+        throw new UnknownHostException("null hostname found");
+      }
+      // The first check supports URL formats (e.g. hdfs://, etc.). 
+      // java.net.URI requires a schema, so we add a dummy one if it doesn't
+      // have one already.
+      URI uri = null;
+      try {
+        uri = new URI(name);
+        if (uri.getHost() == null) {
+          uri = new URI("http://" + name);
+        }
+      } catch (URISyntaxException e) {
+        uri = null;
+      }
+      if (uri == null || uri.getHost() == null) {
+        throw new UnknownHostException(name + " is not a valid Inet address");
+      }
+    }
+  }
+
   /**
    * Checks if {@code host} is a local host name and return {@link InetAddress}
    * corresponding to that address.

+ 9 - 2
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -36,6 +36,7 @@ import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.Vector;
+import java.net.UnknownHostException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -531,7 +532,8 @@ public class JobInProgress {
   }
   
   private Map<Node, List<TaskInProgress>> createCache(
-                                 TaskSplitMetaInfo[] splits, int maxLevel) {
+                                 TaskSplitMetaInfo[] splits, int maxLevel)
+                                 throws UnknownHostException {
     Map<Node, List<TaskInProgress>> cache = 
       new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);
     
@@ -658,7 +660,7 @@ public class JobInProgress {
    * thread so that split-computation doesn't block anyone.
    */
   public synchronized void initTasks() 
-  throws IOException, KillInterruptedException {
+  throws IOException, KillInterruptedException, UnknownHostException {
     if (tasksInited || isComplete()) {
       return;
     }
@@ -704,6 +706,11 @@ public class JobInProgress {
     }
     numMapTasks = splits.length;
 
+    // Sanity check the locations so we don't create/initialize unnecessary tasks
+    for (TaskSplitMetaInfo split : splits) {
+      NetUtils.verifyHostnames(split.getLocations());
+    }
+    
     jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
     jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
     this.queueMetrics.addWaitingMaps(getJobID(), numMapTasks);

+ 10 - 6
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -1257,7 +1257,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
        * Adds a task-attempt in the listener
        */
       private void processTaskAttempt(String taskAttemptId, 
-                                      JobHistory.TaskAttempt attempt) {
+                                      JobHistory.TaskAttempt attempt) 
+        throws UnknownHostException {
         TaskAttemptID id = TaskAttemptID.forName(taskAttemptId);
         
         // Check if the transaction for this attempt can be committed
@@ -1512,7 +1513,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     
     private void createTaskAttempt(JobInProgress job, 
                                    TaskAttemptID attemptId, 
-                                   JobHistory.TaskAttempt attempt) {
+                                   JobHistory.TaskAttempt attempt) 
+      throws UnknownHostException {
       TaskID id = attemptId.getTaskID();
       String type = attempt.get(Keys.TASK_TYPE);
       TaskInProgress tip = job.getTaskInProgress(id);
@@ -3172,7 +3174,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
    * 
    * @param status Task Tracker's status
    */
-  private void addNewTracker(TaskTracker taskTracker) {
+  private void addNewTracker(TaskTracker taskTracker) throws UnknownHostException {
     TaskTrackerStatus status = taskTracker.getStatus();
     trackerExpiryQueue.add(status);
 
@@ -3196,10 +3198,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     trackers.add(taskTracker);
   }
 
-  public Node resolveAndAddToTopology(String name) {
+  public Node resolveAndAddToTopology(String name) throws UnknownHostException {
     List <String> tmpList = new ArrayList<String>(1);
     tmpList.add(name);
-    List <String> rNameList = dnsToSwitchMapping.resolve(tmpList);
+    List <String> rNameList = dnsToSwitchMapping.resolveValidHosts(tmpList);
     String rName = rNameList.get(0);
     String networkLoc = NodeBase.normalize(rName);
     return addHostToNodeMapping(name, networkLoc);
@@ -3639,7 +3641,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   private synchronized boolean processHeartbeat(
                                  TaskTrackerStatus trackerStatus, 
                                  boolean initialContact,
-                                 long timeStamp) {
+                                 long timeStamp) throws UnknownHostException {
 
     getInstrumentation().heartbeat();
 
@@ -3671,6 +3673,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
           if (isBlacklisted(trackerName)) {
             faultyTrackers.incrBlacklistedTrackers(1);
           }
+          // This could now throw an UnknownHostException but only if the
+          // TaskTracker status itself has an invalid name
           addNewTracker(taskTracker);
         }
       }

+ 1 - 12
src/test/org/apache/hadoop/net/StaticMapping.java

@@ -62,17 +62,6 @@ public class StaticMapping extends Configured implements DNSToSwitchMapping {
   }
   public List<String> resolveValidHosts(List<String> names)
     throws UnknownHostException {
-    List<String> m = new ArrayList<String>();
-    synchronized (nameToRackMap) {
-      for (String name : names) {
-        String rackId;
-        if ((rackId = nameToRackMap.get(name)) != null) {
-          m.add(rackId);
-        } else {
-          throw new UnknownHostException(name);
-        }
-      }
-      return m;
-    }
+    return this.resolve(names);
   }
 }