Explorar o código

HADOOP-1739. Let OS always choose the tasktracker's umbilical port.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@571275 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting %!s(int64=18) %!d(string=hai) anos
pai
achega
5eaab0a93d

+ 4 - 0
CHANGES.txt

@@ -135,6 +135,10 @@ Trunk (unreleased changes)
     HADOOP-1803.  Generalize build.xml to make files in all
     HADOOP-1803.  Generalize build.xml to make files in all
     src/contrib/*/bin directories executable.  (stack via cutting)
     src/contrib/*/bin directories executable.  (stack via cutting)
 
 
+    HADOOP-1739.  Let OS always choose the tasktracker's umbilical
+    port.  Also switch default address for umbilical connections to
+    loopback.  (cutting)
+
 Release 0.14.1 - (unreleased)
 Release 0.14.1 - (unreleased)
 
 
   BUG FIXES
   BUG FIXES

+ 6 - 18
conf/hadoop-default.xml

@@ -468,18 +468,9 @@ creations/deletions), or "all".</description>
 
 
 <property>
 <property>
   <name>mapred.task.tracker.report.bindAddress</name>
   <name>mapred.task.tracker.report.bindAddress</name>
-  <value>0.0.0.0</value>
-  <description>
-    the address where the maperd tracker report server will be binded on.
-  </description>
-</property>
-
-<property>
-  <name>mapred.task.tracker.report.port</name>
-  <value>50050</value>
-  <description>The port number that the MapReduce task tracker report server uses as a starting
-               point to look for a free port to listen on.
-  </description>
+  <value>127.0.0.1</value>
+  <description>The interface that task processes use to communicate
+  with their parent tasktracker process.</description>
 </property>
 </property>
 
 
 <property>
 <property>
@@ -617,14 +608,11 @@ creations/deletions), or "all".</description>
   <description>Java opts for the task tracker child processes.  Subsumes
   <description>Java opts for the task tracker child processes.  Subsumes
   'mapred.child.heap.size' (If a mapred.child.heap.size value is found
   'mapred.child.heap.size' (If a mapred.child.heap.size value is found
   in a configuration, its maximum heap size will be used and a warning
   in a configuration, its maximum heap size will be used and a warning
-  emitted that heap.size has been deprecated). Also, the following symbols,
-  if present, will be interpolated: @taskid@ is replaced by current TaskID;
-  and @port@ will be replaced by mapred.task.tracker.report.port + 1 (A second
-  child will fail with a port-in-use if mapred.tasktracker.tasks.maximum is
-  greater than one). Any other occurrences of '@' will go unchanged. For
+  emitted that heap.size has been deprecated). Also, the following symbol,
+  if present, will be interpolated: @taskid@ is replaced by current TaskID.
+  Any other occurrences of '@' will go unchanged. For
   example, to enable verbose gc logging to a file named for the taskid in
   example, to enable verbose gc logging to a file named for the taskid in
   /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
   /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
-
         -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
         -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
   </description>
   </description>
 </property>
 </property>

+ 5 - 6
src/java/org/apache/hadoop/mapred/TaskRunner.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.*;
 import org.apache.hadoop.filecache.*;
 import org.apache.hadoop.filecache.*;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.*;
 import java.io.*;
 import java.io.*;
+import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.List;
 import java.util.Vector;
 import java.util.Vector;
 import java.net.URI;
 import java.net.URI;
@@ -224,7 +225,6 @@ abstract class TaskRunner extends Thread {
       // The following symbols if present in mapred.child.java.opts value are
       // The following symbols if present in mapred.child.java.opts value are
       // replaced:
       // replaced:
       // + @taskid@ is interpolated with value of TaskID.
       // + @taskid@ is interpolated with value of TaskID.
-      // + Replaces @port@ with mapred.task.tracker.report.port + 1.
       // Other occurrences of @ will not be altered.
       // Other occurrences of @ will not be altered.
       //
       //
       // Example with multiple arguments and substitutions, showing
       // Example with multiple arguments and substitutions, showing
@@ -236,15 +236,12 @@ abstract class TaskRunner extends Thread {
       //     <value>-verbose:gc -Xloggc:/tmp/@taskid@.gc \
       //     <value>-verbose:gc -Xloggc:/tmp/@taskid@.gc \
         //     -Dcom.sun.management.jmxremote.authenticate=false \
         //     -Dcom.sun.management.jmxremote.authenticate=false \
         //     -Dcom.sun.management.jmxremote.ssl=false \
         //     -Dcom.sun.management.jmxremote.ssl=false \
-        //     -Dcom.sun.management.jmxremote.port=@port@
         //     </value>
         //     </value>
         //
         //
         String javaOpts = handleDeprecatedHeapSize(
         String javaOpts = handleDeprecatedHeapSize(
                                                    conf.get("mapred.child.java.opts", "-Xmx200m"),
                                                    conf.get("mapred.child.java.opts", "-Xmx200m"),
                                                    conf.get("mapred.child.heap.size"));
                                                    conf.get("mapred.child.heap.size"));
         javaOpts = replaceAll(javaOpts, "@taskid@", taskid);
         javaOpts = replaceAll(javaOpts, "@taskid@", taskid);
-        int port = conf.getInt("mapred.task.tracker.report.port", 50050) + 1;
-        javaOpts = replaceAll(javaOpts, "@port@", Integer.toString(port));
         String [] javaOptsSplit = javaOpts.split(" ");
         String [] javaOptsSplit = javaOpts.split(" ");
         //Add java.library.path; necessary for native-hadoop libraries
         //Add java.library.path; necessary for native-hadoop libraries
         String libraryPath = System.getProperty("java.library.path");
         String libraryPath = System.getProperty("java.library.path");
@@ -279,8 +276,10 @@ abstract class TaskRunner extends Thread {
 
 
         // Add main class and its arguments 
         // Add main class and its arguments 
         vargs.add(TaskTracker.Child.class.getName());  // main of Child
         vargs.add(TaskTracker.Child.class.getName());  // main of Child
-        // pass umbilical port
-        vargs.add(Integer.toString(tracker.getTaskTrackerReportPort())); 
+        // pass umbilical address
+        InetSocketAddress address = tracker.getTaskTrackerReportAddress();
+        vargs.add(address.getAddress().getHostAddress()); 
+        vargs.add(Integer.toString(address.getPort())); 
         vargs.add(taskid);                      // pass task identifier
         vargs.add(taskid);                      // pass task identifier
 
 
         // Run java
         // Run java

+ 19 - 30
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -93,8 +93,7 @@ public class TaskTracker
   String localHostname;
   String localHostname;
   InetSocketAddress jobTrackAddr;
   InetSocketAddress jobTrackAddr;
     
     
-  String taskReportBindAddress;
-  private int taskReportPort;
+  InetSocketAddress taskReportAddress;
 
 
   Server taskReportServer = null;
   Server taskReportServer = null;
   InterTrackerProtocol jobClient;
   InterTrackerProtocol jobClient;
@@ -402,31 +401,22 @@ public class TaskTracker
     
     
     this.myMetrics = new TaskTrackerMetrics();
     this.myMetrics = new TaskTrackerMetrics();
     
     
-    // port numbers
-    this.taskReportPort = this.fConf.getInt("mapred.task.tracker.report.port", 50050);
     // bind address
     // bind address
-    this.taskReportBindAddress = this.fConf.get("mapred.task.tracker.report.bindAddress", "0.0.0.0");
+    String bindAddress =
+      this.fConf.get("mapred.task.tracker.report.bindAddress", "127.0.0.1");
 
 
     // RPC initialization
     // RPC initialization
-    while (true) {
-      try {
-        this.taskReportServer = RPC.getServer(this, this.taskReportBindAddress, this.taskReportPort, maxCurrentTasks, false, this.fConf);
-        this.taskReportServer.start();
-        break;
-      } catch (BindException e) {
-        LOG.info("Could not open report server at " + this.taskReportPort + ", trying new port");
-        this.taskReportPort++;
-      }
-        
-    }
-    // The rpc-server port can be ephemeral... 
-    // ... ensure we have the correct info
-    this.taskReportPort = taskReportServer.getListenerAddress().getPort();
-    this.fConf.setInt("mapred.task.tracker.report.port", this.taskReportPort);
-    LOG.info("TaskTracker up at: " + this.taskReportPort);
+    this.taskReportServer =
+      RPC.getServer(this, bindAddress, 0, maxCurrentTasks, false, this.fConf);
+    this.taskReportServer.start();
+
+    // get the assigned address
+    this.taskReportAddress = taskReportServer.getListenerAddress();
+    this.fConf.set("mapred.task.tracker.report.address",
+                   taskReportAddress.toString());
+    LOG.info("TaskTracker up at: " + this.taskReportAddress);
 
 
-    this.taskTrackerName = "tracker_" + 
-      localHostname + ":" + taskReportPort;
+    this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportAddress;
     LOG.info("Starting tracker " + taskTrackerName);
     LOG.info("Starting tracker " + taskTrackerName);
 
 
     // Clear out temporary files that might be lying around
     // Clear out temporary files that might be lying around
@@ -777,8 +767,8 @@ public class TaskTracker
   }
   }
   
   
   /** Return the port at which the tasktracker bound to */
   /** Return the port at which the tasktracker bound to */
-  public synchronized int getTaskTrackerReportPort() {
-    return taskReportPort;
+  public synchronized InetSocketAddress getTaskTrackerReportAddress() {
+    return taskReportAddress;
   }
   }
     
     
   /** Queries the job tracker for a set of outputs ready to be copied
   /** Queries the job tracker for a set of outputs ready to be copied
@@ -1766,11 +1756,10 @@ public class TaskTracker
       LOG.debug("Child starting");
       LOG.debug("Child starting");
 
 
       JobConf defaultConf = new JobConf();
       JobConf defaultConf = new JobConf();
-      int port = Integer.parseInt(args[0]);
-      InetSocketAddress address = new InetSocketAddress
-        (defaultConf.get("mapred.task.tracker.report.bindAddress","0.0.0.0"),
-         port);
-      String taskid = args[1];
+      String host = args[0];
+      int port = Integer.parseInt(args[1]);
+      InetSocketAddress address = new InetSocketAddress(host, port);
+      String taskid = args[2];
       //set a very high idle timeout so that the connection is never closed
       //set a very high idle timeout so that the connection is never closed
       defaultConf.setInt("ipc.client.connection.maxidletime", 60*60*1000);
       defaultConf.setInt("ipc.client.connection.maxidletime", 60*60*1000);
       TaskUmbilicalProtocol umbilical =
       TaskUmbilicalProtocol umbilical =