Procházet zdrojové kódy

HADOOP-527. Permit specification of the local address that various Hadoop daemons should bind to. Contributed by Philippe Gassmann.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@447615 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting před 19 roky
rodič
revize
11fd232c8e

+ 3 - 0
CHANGES.txt

@@ -20,6 +20,9 @@ Trunk (unreleased changes)
 5. HADOOP-533.  Fix unit test to not modify conf directory.
   (Hairong Kuang via cutting)
 
+6. HADOOP-527.  Permit specification of the local address that various
+   Hadoop daemons should bind to.  (Philippe Gassmann via cutting)
+
 
 Release 0.6.2 (unreleased)
 

+ 40 - 0
conf/hadoop-default.xml

@@ -103,6 +103,14 @@ creations/deletions), or "all".</description>
   literal string "local" or a host:port for DFS.</description>
 </property>
 
+<property>
+  <name>dfs.datanode.bindAddress</name>
+  <value>0.0.0.0</value>
+  <description>
+    the address where the datanode will listen to.
+  </description>
+</property>
+
 <property>
   <name>dfs.datanode.port</name>
   <value>50010</value>
@@ -111,6 +119,14 @@ creations/deletions), or "all".</description>
 </description>
 </property>
 
+<property>
+  <name>dfs.info.bindAddress</name>
+  <value>0.0.0.0</value>
+  <description>
+    the address where the dfs namenode web ui will listen on.
+  </description>
+</property>
+
 <property>
   <name>dfs.info.port</name>
   <value>50070</value>
@@ -237,6 +253,14 @@ creations/deletions), or "all".</description>
   </description>
 </property>
 
+<property>
+  <name>mapred.job.tracker.info.bindAddress</name>
+  <value>0.0.0.0</value>
+  <description>
+    the address where the job tracker info webserver will be binded on.
+  </description>
+</property>
+
 <property>
   <name>mapred.job.tracker.info.port</name>
   <value>50030</value>
@@ -252,6 +276,14 @@ creations/deletions), or "all".</description>
   </description>
 </property>
 
+<property>
+  <name>mapred.task.tracker.report.bindAddress</name>
+  <value>0.0.0.0</value>
+  <description>
+    the address where the maperd tracker report server will be binded on.
+  </description>
+</property>
+
 <property>
   <name>mapred.task.tracker.report.port</name>
   <value>50050</value>
@@ -430,6 +462,14 @@ creations/deletions), or "all".</description>
   </description>
 </property>
 
+<property>
+  <name>tasktracker.http.bindAddress</name>
+  <value>0.0.0.0</value>
+  <description>
+    the address where the task tracker http server will be binded on.
+  </description>
+</property>
+
 <property>
   <name>tasktracker.http.port</name>
   <value>50060</value>

+ 4 - 2
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -156,7 +156,8 @@ public class DataNode implements FSConstants, Runnable {
              dataDirs,
              createSocketAddr(conf.get("fs.default.name", "local")), conf);
         int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075);
-        this.infoServer = new StatusHttpServer("datanode", infoServerPort, true);
+        String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0");
+        this.infoServer = new StatusHttpServer("datanode", infoServerBindAddress, infoServerPort, true);
         //create a servlet to serve full-file content
         try {
           this.infoServer.addServlet(null, "/streamFile/*",
@@ -208,9 +209,10 @@ public class DataNode implements FSConstants, Runnable {
       // find free port
       ServerSocket ss = null;
       int tmpPort = conf.getInt("dfs.datanode.port", 50010);
+      String bindAddress = conf.get("dfs.datanode.bindAddress", "0.0.0.0");
       while (ss == null) {
         try {
-          ss = new ServerSocket(tmpPort);
+          ss = new ServerSocket(tmpPort,0,InetAddress.getByName(bindAddress));
           LOG.info("Opened server at " + tmpPort);
         } catch (IOException ie) {
           LOG.info("Could not open server at " + tmpPort + ", trying new port");

+ 3 - 1
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -116,6 +116,7 @@ class FSNamesystem implements FSConstants {
     //
     StatusHttpServer infoServer;
     int infoPort;
+    String infoBindAddress;
     Date startTime;
     
     //
@@ -190,7 +191,8 @@ class FSNamesystem implements FSConstants {
     public FSNamesystem(File dir, Configuration conf) throws IOException {
         fsNamesystemObject = this;
         this.infoPort = conf.getInt("dfs.info.port", 50070);
-        this.infoServer = new StatusHttpServer("dfs", infoPort, false);
+        this.infoBindAddress = conf.get("dfs.info.bindAddress", "0.0.0.0");
+        this.infoServer = new StatusHttpServer("dfs",infoBindAddress, infoPort, false);
         this.infoServer.start();
         InetSocketAddress addr = DataNode.createSocketAddr(conf.get("fs.default.name", "local"));
         this.localMachine = addr.getHostName();

+ 6 - 5
src/java/org/apache/hadoop/dfs/NameNode.java

@@ -23,6 +23,8 @@ import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
 
 import java.io.*;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
 
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.Metrics;
@@ -121,18 +123,17 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
      * Create a NameNode at the default location
      */
     public NameNode(Configuration conf) throws IOException {
-        this(getDir(conf),
-             DataNode.createSocketAddr
-             (conf.get("fs.default.name", "local")).getPort(), conf);
+       this(getDir(conf),DataNode.createSocketAddr(conf.get("fs.default.name", "local")).getHostName(),
+                       DataNode.createSocketAddr(conf.get("fs.default.name", "local")).getPort(), conf);
     }
 
     /**
      * Create a NameNode at the specified location and start it.
      */
-    public NameNode(File dir, int port, Configuration conf) throws IOException {
+    public NameNode(File dir, String bindAddress, int port, Configuration conf) throws IOException {
         this.namesystem = new FSNamesystem(dir, conf);
         this.handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
-        this.server = RPC.getServer(this, port, handlerCount, false, conf);
+        this.server = RPC.getServer(this, bindAddress, port, handlerCount, false, conf);
         this.datanodeStartupPeriod =
             conf.getLong("dfs.datanode.startupMsec", DATANODE_STARTUP_PERIOD);
         this.server.start();

+ 59 - 7
src/java/org/apache/hadoop/ipc/RPC.java

@@ -277,19 +277,42 @@ public class RPC {
   }
 
   /** Construct a server for a protocol implementation instance listening on a
-   * port. */
+   * port and address. */
+  public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf) {
+    return getServer(instance, bindAddress, port, 1, false, conf);
+  }
+
+  /** Construct a server for a protocol implementation instance listening on a
+   * port and address. */
+  public static Server getServer(final Object instance, final String bindAddress, final int port,
+                                 final int numHandlers,
+                                 final boolean verbose, Configuration conf) {
+    return new Server(instance, conf, bindAddress,port, numHandlers, verbose);
+  }
+
+  
+  /** Construct a server for a protocol implementation instance listening on a
+   * port.
+   * 
+   * @deprecated the bind address should always be specified
+   */
   public static Server getServer(final Object instance, final int port, Configuration conf) {
     return getServer(instance, port, 1, false, conf);
   }
 
   /** Construct a server for a protocol implementation instance listening on a
-   * port. */
-  public static Server getServer(final Object instance, final int port,
+   * port. 
+   *
+   * @deprecated the bind address should always be specified
+   */
+  public static Server getServer(final Object instance,final int port,
                                  final int numHandlers,
                                  final boolean verbose, Configuration conf) {
     return new Server(instance, conf, port, numHandlers, verbose);
   }
-        
+  
+  
+  
   /** An RPC Server. */
   public static class Server extends org.apache.hadoop.ipc.Server {
     private Object instance;
@@ -300,26 +323,55 @@ public class RPC {
      * @param instance the instance whose methods will be called
      * @param conf the configuration to use
      * @param port the port to listen for connections on
+     * 
+     * @deprecated the bind address should always be specified
      */
     public Server(Object instance, Configuration conf, int port) {
-      this(instance, conf, port, 1, false);
+      this(instance, conf,  "0.0.0.0", port, 1, false);
+    }
+
+    /** Construct an RPC server.
+     * @param instance the instance whose methods will be called
+     * @param conf the configuration to use
+     * @param bindAddress the address to bind on to listen for connection
+     * @param port the port to listen for connections on
+     */
+    public Server(Object instance, Configuration conf, String bindAddress, int port) {
+      this(instance, conf,  bindAddress, port, 1, false);
     }
 
     /** Construct an RPC server.
      * @param instance the instance whose methods will be called
      * @param conf the configuration to use
+     * @param bindAddress the address to bind on to listen for connection
      * @param port the port to listen for connections on
      * @param numHandlers the number of method handler threads to run
      * @param verbose whether each call should be logged
      */
-    public Server(Object instance, Configuration conf, int port,
+    public Server(Object instance, Configuration conf, String bindAddress,  int port,
                   int numHandlers, boolean verbose) {
-      super(port, Invocation.class, numHandlers, conf);
+      super(bindAddress, port, Invocation.class, numHandlers, conf);
       this.instance = instance;
       this.implementation = instance.getClass();
       this.verbose = verbose;
     }
 
+    /** Construct an RPC server.
+     * @param instance the instance whose methods will be called
+     * @param conf the configuration to use
+     * @param port the port to listen for connections on
+     * @param numHandlers the number of method handler threads to run
+     * @param verbose whether each call should be logged
+     * 
+     * @deprecated the bind address should always be specified
+     */
+    public Server(Object instance, Configuration conf,  int port,
+                  int numHandlers, boolean verbose) {
+      super("0.0.0.0", port, Invocation.class, numHandlers, conf);
+      this.instance = instance;
+      this.implementation = instance.getClass();
+      this.verbose = verbose;
+    }
     public Writable call(Writable param) throws IOException {
       try {
         Invocation call = (Invocation)param;

+ 17 - 5
src/java/org/apache/hadoop/ipc/Server.java

@@ -69,7 +69,7 @@ public abstract class Server {
   public static Server get() {
     return (Server)SERVER.get();
   }
-
+  private String bindAddress; 
   private int port;                               // port we listen on
   private int handlerCount;                       // number of handler threads
   private int maxQueuedCalls;                     // max number of queued calls
@@ -125,7 +125,7 @@ public abstract class Server {
     private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
     
     public Listener() throws IOException {
-      address = new InetSocketAddress(port);
+      address = new InetSocketAddress(bindAddress,port);
       // Create a new server socket and set to non blocking mode
       acceptChannel = ServerSocketChannel.open();
       acceptChannel.configureBlocking(false);
@@ -515,12 +515,13 @@ public abstract class Server {
     }
 
   }
-  
-  /** Constructs a server listening on the named port.  Parameters passed must
+  /** Constructs a server listening on the named port and address.  Parameters passed must
    * be of the named class.  The <code>handlerCount</handlerCount> determines
    * the number of handler threads that will be used to process calls.
+   * 
    */
-  protected Server(int port, Class paramClass, int handlerCount, Configuration conf) {
+  protected Server(String bindAddress, int port, Class paramClass, int handlerCount, Configuration conf) {
+    this.bindAddress = bindAddress;
     this.conf = conf;
     this.port = port;
     this.paramClass = paramClass;
@@ -531,6 +532,17 @@ public abstract class Server {
     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
     this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
   }
+  
+  
+  /** Constructs a server listening on the named port.  Parameters passed must
+   * be of the named class.  The <code>handlerCount</handlerCount> determines
+   * the number of handler threads that will be used to process calls.
+   * 
+   * @deprecated the bind address should always be specified
+   */
+  protected Server(int port, Class paramClass, int handlerCount, Configuration conf) {
+    this("0.0.0.0",port,paramClass,handlerCount,conf);
+  }
 
   /** Sets the timeout used for network i/o. */
   public void setTimeout(int timeout) { this.timeout = timeout; }

+ 4 - 2
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -453,6 +453,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
 
     // Used to provide an HTML view on Job, Task, and TaskTracker structures
     StatusHttpServer infoServer;
+    String infoBindAddress;
     int infoPort;
 
     Server interTrackerServer;
@@ -494,7 +495,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         InetSocketAddress addr = getAddress(conf);
         this.localMachine = addr.getHostName();
         this.port = addr.getPort();
-        this.interTrackerServer = RPC.getServer(this, addr.getPort(), 10, false, conf);
+        this.interTrackerServer = RPC.getServer(this,addr.getHostName(), addr.getPort(), 10, false, conf);
         this.interTrackerServer.start();
         Properties p = System.getProperties();
         for (Iterator it = p.keySet().iterator(); it.hasNext(); ) {
@@ -504,7 +505,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         }
 
         this.infoPort = conf.getInt("mapred.job.tracker.info.port", 50030);
-        this.infoServer = new StatusHttpServer("job", infoPort, false);
+        this.infoBindAddress = conf.get("mapred.job.tracker.info.bindAddress","0.0.0.0");
+        this.infoServer = new StatusHttpServer("job", infoBindAddress, infoPort, false);
         this.infoServer.start();
 
         this.startTime = System.currentTimeMillis();

+ 2 - 1
src/java/org/apache/hadoop/mapred/StatusHttpServer.java

@@ -52,12 +52,13 @@ public class StatusHttpServer {
    * @param findPort whether the server should start at the given port and 
    *        increment by 1 until it finds a free port.
    */
-  public StatusHttpServer(String name, int port, 
+  public StatusHttpServer(String name, String bindAddress, int port, 
                           boolean findPort) throws IOException {
     webServer = new org.mortbay.jetty.Server();
     this.findPort = findPort;
     listener = new SocketListener();
     listener.setPort(port);
+    listener.setHost(bindAddress);
     webServer.addListener(listener);
 
     // set up the context for "/logs/"

+ 7 - 3
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -56,7 +56,8 @@ public class TaskTracker
     String taskTrackerName;
     String localHostname;
     InetSocketAddress jobTrackAddr;
-
+    
+    String taskReportBindAddress;
     int taskReportPort;
 
     Server taskReportServer = null;
@@ -189,11 +190,13 @@ public class TaskTracker
         
         // port numbers
         this.taskReportPort = this.fConf.getInt("mapred.task.tracker.report.port", 50050);
+        // bind address
+        this.taskReportBindAddress = this.fConf.get("mapred.task.tracker.report.bindAddress", "0.0.0.0");
 
         // RPC initialization
         while (true) {
             try {
-                this.taskReportServer = RPC.getServer(this, this.taskReportPort, maxCurrentTasks, false, this.fConf);
+                this.taskReportServer = RPC.getServer(this, this.taskReportBindAddress, this.taskReportPort, maxCurrentTasks, false, this.fConf);
                 this.taskReportServer.start();
                 break;
             } catch (BindException e) {
@@ -354,7 +357,8 @@ public class TaskTracker
       this.mapOutputFile = new MapOutputFile();
       this.mapOutputFile.setConf(conf);
       int httpPort = conf.getInt("tasktracker.http.port", 50060);
-      this.server = new StatusHttpServer("task", httpPort, true);
+      String httpBindAddress = conf.get("tasktracker.http.bindAddress", "0.0.0.0");;
+      this.server = new StatusHttpServer("task", httpBindAddress, httpPort, true);
       int workerThreads = conf.getInt("tasktracker.http.threads", 40);
       server.setThreads(1, workerThreads);
       server.start();

+ 1 - 1
src/test/org/apache/hadoop/dfs/ClusterTestDFS.java

@@ -217,7 +217,7 @@ public class ClusterTestDFS extends TestCase implements FSConstants {
 
     int nameNodePort = 9000 + testCycleNumber++; // ToDo: settable base port
     String nameNodeSocketAddr = "localhost:" + nameNodePort;
-    NameNode nameNodeDaemon = new NameNode(new File(nameFSDir), nameNodePort, conf);
+    NameNode nameNodeDaemon = new NameNode(new File(nameFSDir), "localhost", nameNodePort, conf);
     DFSClient dfsClient = null;
     try {
       //

+ 1 - 1
src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java

@@ -350,7 +350,7 @@ public class ClusterTestDFSNamespaceLogging extends TestCase implements FSConsta
 	
     NameNode.format(conf);
     
-    nameNodeDaemon = new NameNode(new File(nameFSDir), nameNodePort, conf);
+    nameNodeDaemon = new NameNode(new File(nameFSDir), "localhost", nameNodePort, conf);
 
      //
       //        start DataNodes

+ 5 - 4
src/test/org/apache/hadoop/ipc/TestIPC.java

@@ -41,12 +41,13 @@ public class TestIPC extends TestCase {
   private static final Random RANDOM = new Random();
 
   private static final int PORT = 1234;
+  private static final String ADDRESS = "0.0.0.0";
 
   private static class TestServer extends Server {
     private boolean sleep;
 
-    public TestServer(int port, int handlerCount, boolean sleep) {
-      super(port, LongWritable.class, handlerCount, conf);
+    public TestServer(String bindAddress, int port, int handlerCount, boolean sleep) {
+      super(bindAddress, port, LongWritable.class, handlerCount, conf);
       this.setTimeout(1000);
       this.sleep = sleep;
     }
@@ -134,7 +135,7 @@ public class TestIPC extends TestCase {
   public void testSerial(int handlerCount, boolean handlerSleep, 
                           int clientCount, int callerCount, int callCount)
     throws Exception {
-    Server server = new TestServer(PORT, handlerCount, handlerSleep);
+    Server server = new TestServer(ADDRESS, PORT, handlerCount, handlerSleep);
     server.start();
 
     Client[] clients = new Client[clientCount];
@@ -167,7 +168,7 @@ public class TestIPC extends TestCase {
     throws Exception {
     Server[] servers = new Server[serverCount];
     for (int i = 0; i < serverCount; i++) {
-      servers[i] = new TestServer(PORT+i, handlerCount, handlerSleep);
+      servers[i] = new TestServer(ADDRESS, PORT+i, handlerCount, handlerSleep);
       servers[i].start();
     }
 

+ 2 - 1
src/test/org/apache/hadoop/ipc/TestRPC.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.ipc.VersionedProtocol;
 /** Unit tests for RPC. */
 public class TestRPC extends TestCase {
   private static final int PORT = 1234;
+  private static final String ADDRESS = "0.0.0.0";
 
   public static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.ipc.TestRPC");
@@ -101,7 +102,7 @@ public class TestRPC extends TestCase {
   }
 
   public void testCalls() throws Exception {
-    Server server = RPC.getServer(new TestImpl(), PORT, conf);
+    Server server = RPC.getServer(new TestImpl(), ADDRESS, PORT, conf);
     server.start();
 
     InetSocketAddress addr = new InetSocketAddress(PORT);