浏览代码

MAPREDUCE-4079. Allow MR AppMaster to limit ephemeral port range.(bobby via tgraves)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1329696 13f79535-47bb-0310-9956-ffa450edef68
Thomas Graves 13 年之前
父节点
当前提交
afeba56c53
共有 16 个文件被更改,包括 262 次插入34 次删除
  1. 60 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java
  2. 5 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java
  3. 22 8
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
  4. 20 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
  5. 44 6
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
  6. 24 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
  7. 33 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfiguration.java
  8. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  9. 2 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
  10. 7 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
  11. 8 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
  12. 1 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/RpcServerFactory.java
  13. 13 4
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcServerFactoryPBImpl.java
  14. 2 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnProtoRPC.java
  15. 9 4
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java
  16. 9 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/YarnRPC.java

+ 60 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java

@@ -940,11 +940,57 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
    * bound may be omitted meaning all values up to or over. So the string 
    * bound may be omitted meaning all values up to or over. So the string 
    * above means 2, 3, 5, and 7, 8, 9, ...
    * above means 2, 3, 5, and 7, 8, 9, ...
    */
    */
-  public static class IntegerRanges {
+  public static class IntegerRanges implements Iterable<Integer>{
     private static class Range {
     private static class Range {
       int start;
       int start;
       int end;
       int end;
     }
     }
+    
+    private static class RangeNumberIterator implements Iterator<Integer> {
+      Iterator<Range> internal;
+      int at;
+      int end;
+
+      public RangeNumberIterator(List<Range> ranges) {
+        if (ranges != null) {
+          internal = ranges.iterator();
+        }
+        at = -1;
+        end = -2;
+      }
+      
+      @Override
+      public boolean hasNext() {
+        if (at <= end) {
+          return true;
+        } else if (internal != null){
+          return internal.hasNext();
+        }
+        return false;
+      }
+
+      @Override
+      public Integer next() {
+        if (at <= end) {
+          at++;
+          return at - 1;
+        } else if (internal != null){
+          Range found = internal.next();
+          if (found != null) {
+            at = found.start;
+            end = found.end;
+            at++;
+            return at - 1;
+          }
+        }
+        return null;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    };
 
 
     List<Range> ranges = new ArrayList<Range>();
     List<Range> ranges = new ArrayList<Range>();
     
     
@@ -1003,6 +1049,13 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
       return false;
       return false;
     }
     }
     
     
+    /**
+     * @return true if there are no values in this range, else false.
+     */
+    public boolean isEmpty() {
+      return ranges == null || ranges.isEmpty();
+    }
+    
     @Override
     @Override
     public String toString() {
     public String toString() {
       StringBuilder result = new StringBuilder();
       StringBuilder result = new StringBuilder();
@@ -1019,6 +1072,12 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
       }
       }
       return result.toString();
       return result.toString();
     }
     }
+
+    @Override
+    public Iterator<Integer> iterator() {
+      return new RangeNumberIterator(ranges);
+    }
+    
   }
   }
 
 
   /**
   /**

+ 5 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java

@@ -223,16 +223,19 @@ public class AvroRpcEngine implements RpcEngine {
 
 
   /** Construct a server for a protocol implementation instance listening on a
   /** Construct a server for a protocol implementation instance listening on a
    * port and address. */
    * port and address. */
+  @Override
   public RPC.Server getServer(Class<?> iface, Object impl, String bindAddress,
   public RPC.Server getServer(Class<?> iface, Object impl, String bindAddress,
                               int port, int numHandlers, int numReaders,
                               int port, int numHandlers, int numReaders,
                               int queueSizePerHandler, boolean verbose,
                               int queueSizePerHandler, boolean verbose,
                               Configuration conf, 
                               Configuration conf, 
-                       SecretManager<? extends TokenIdentifier> secretManager
+                       SecretManager<? extends TokenIdentifier> secretManager,
+                       String portRangeConfig
                               ) throws IOException {
                               ) throws IOException {
     return ENGINE.getServer(TunnelProtocol.class,
     return ENGINE.getServer(TunnelProtocol.class,
                             new TunnelResponder(iface, impl),
                             new TunnelResponder(iface, impl),
                             bindAddress, port, numHandlers, numReaders,
                             bindAddress, port, numHandlers, numReaders,
-                            queueSizePerHandler, verbose, conf, secretManager);
+                            queueSizePerHandler, verbose, conf, secretManager,
+                            portRangeConfig);
   }
   }
 
 
 }
 }

+ 22 - 8
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java

@@ -514,7 +514,8 @@ public class RPC {
                                  final boolean verbose, Configuration conf) 
                                  final boolean verbose, Configuration conf) 
     throws IOException {
     throws IOException {
     return getServer(instance.getClass(),         // use impl class for protocol
     return getServer(instance.getClass(),         // use impl class for protocol
-                     instance, bindAddress, port, numHandlers, false, conf, null);
+                     instance, bindAddress, port, numHandlers, false, conf, null,
+                     null);
   }
   }
 
 
   /** Construct a server for a protocol implementation instance. */
   /** Construct a server for a protocol implementation instance. */
@@ -522,7 +523,8 @@ public class RPC {
                                  Object instance, String bindAddress,
                                  Object instance, String bindAddress,
                                  int port, Configuration conf) 
                                  int port, Configuration conf) 
     throws IOException {
     throws IOException {
-    return getServer(protocol, instance, bindAddress, port, 1, false, conf, null);
+    return getServer(protocol, instance, bindAddress, port, 1, false, conf, null,
+        null);
   }
   }
 
 
   /** Construct a server for a protocol implementation instance.
   /** Construct a server for a protocol implementation instance.
@@ -536,7 +538,7 @@ public class RPC {
     throws IOException {
     throws IOException {
     
     
     return getServer(protocol, instance, bindAddress, port, numHandlers, verbose,
     return getServer(protocol, instance, bindAddress, port, numHandlers, verbose,
-                 conf, null);
+                 conf, null, null);
   }
   }
   
   
   /** Construct a server for a protocol implementation instance. */
   /** Construct a server for a protocol implementation instance. */
@@ -546,10 +548,20 @@ public class RPC {
                                  boolean verbose, Configuration conf,
                                  boolean verbose, Configuration conf,
                                  SecretManager<? extends TokenIdentifier> secretManager) 
                                  SecretManager<? extends TokenIdentifier> secretManager) 
     throws IOException {
     throws IOException {
-    
+    return getServer(protocol, instance, bindAddress, port, numHandlers, verbose,
+        conf, secretManager, null);
+  }
+  
+  public static Server getServer(Class<?> protocol,
+      Object instance, String bindAddress, int port,
+      int numHandlers,
+      boolean verbose, Configuration conf,
+      SecretManager<? extends TokenIdentifier> secretManager,
+      String portRangeConfig) 
+  throws IOException {
     return getProtocolEngine(protocol, conf)
     return getProtocolEngine(protocol, conf)
       .getServer(protocol, instance, bindAddress, port, numHandlers, -1, -1,
       .getServer(protocol, instance, bindAddress, port, numHandlers, -1, -1,
-                 verbose, conf, secretManager);
+                 verbose, conf, secretManager, portRangeConfig);
   }
   }
 
 
   /** Construct a server for a protocol implementation instance. */
   /** Construct a server for a protocol implementation instance. */
@@ -562,7 +574,8 @@ public class RPC {
     
     
     return getProtocolEngine(protocol, conf)
     return getProtocolEngine(protocol, conf)
       .getServer(protocol, instance, bindAddress, port, numHandlers,
       .getServer(protocol, instance, bindAddress, port, numHandlers,
-                 numReaders, queueSizePerHandler, verbose, conf, secretManager);
+                 numReaders, queueSizePerHandler, verbose, conf, secretManager,
+                 null);
   }
   }
 
 
   /** An RPC Server. */
   /** An RPC Server. */
@@ -572,9 +585,10 @@ public class RPC {
                      Class<? extends Writable> paramClass, int handlerCount,
                      Class<? extends Writable> paramClass, int handlerCount,
                      int numReaders, int queueSizePerHandler,
                      int numReaders, int queueSizePerHandler,
                      Configuration conf, String serverName, 
                      Configuration conf, String serverName, 
-                     SecretManager<? extends TokenIdentifier> secretManager) throws IOException {
+                     SecretManager<? extends TokenIdentifier> secretManager,
+                     String portRangeConfig) throws IOException {
       super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler,
       super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler,
-            conf, serverName, secretManager);
+            conf, serverName, secretManager, portRangeConfig);
     }
     }
   }
   }
 
 

+ 20 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java

@@ -49,12 +49,30 @@ public interface RpcEngine {
                 UserGroupInformation ticket, Configuration conf)
                 UserGroupInformation ticket, Configuration conf)
     throws IOException, InterruptedException;
     throws IOException, InterruptedException;
 
 
-  /** Construct a server for a protocol implementation instance. */
+  /** 
+   * Construct a server for a protocol implementation instance.
+   * 
+   * @param protocol the class of protocol to use
+   * @param instance the instance of protocol whose methods will be called
+   * @param conf the configuration to use
+   * @param bindAddress the address to bind on to listen for connection
+   * @param port the port to listen for connections on
+   * @param numHandlers the number of method handler threads to run
+   * @param numReaders the number of reader threads to run
+   * @param queueSizePerHandler the size of the queue per hander thread
+   * @param verbose whether each call should be logged
+   * @param secretManager The secret manager to use to validate incoming requests.
+   * @param portRangeConfig A config parameter that can be used to restrict
+   *        the range of ports used when port is 0 (an ephemeral port)
+   * @return The Server instance
+   * @throws IOException on any error
+   */
   RPC.Server getServer(Class<?> protocol, Object instance, String bindAddress,
   RPC.Server getServer(Class<?> protocol, Object instance, String bindAddress,
                        int port, int numHandlers, int numReaders,
                        int port, int numHandlers, int numReaders,
                        int queueSizePerHandler, boolean verbose,
                        int queueSizePerHandler, boolean verbose,
                        Configuration conf, 
                        Configuration conf, 
-                       SecretManager<? extends TokenIdentifier> secretManager
+                       SecretManager<? extends TokenIdentifier> secretManager,
+                       String portRangeConfig
                        ) throws IOException;
                        ) throws IOException;
 
 
 }
 }

+ 44 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -60,6 +60,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.BytesWritable;
@@ -193,6 +194,7 @@ public abstract class Server {
   protected RpcDetailedMetrics rpcDetailedMetrics;
   protected RpcDetailedMetrics rpcDetailedMetrics;
   
   
   private Configuration conf;
   private Configuration conf;
+  private String portRangeConfig = null;
   private SecretManager<TokenIdentifier> secretManager;
   private SecretManager<TokenIdentifier> secretManager;
   private ServiceAuthorizationManager serviceAuthorizationManager = new ServiceAuthorizationManager();
   private ServiceAuthorizationManager serviceAuthorizationManager = new ServiceAuthorizationManager();
 
 
@@ -225,8 +227,33 @@ public abstract class Server {
    */
    */
   public static void bind(ServerSocket socket, InetSocketAddress address, 
   public static void bind(ServerSocket socket, InetSocketAddress address, 
                           int backlog) throws IOException {
                           int backlog) throws IOException {
+    bind(socket, address, backlog, null, null);
+  }
+
+  public static void bind(ServerSocket socket, InetSocketAddress address, 
+      int backlog, Configuration conf, String rangeConf) throws IOException {
     try {
     try {
-      socket.bind(address, backlog);
+      IntegerRanges range = null;
+      if (rangeConf != null) {
+        range = conf.getRange(rangeConf, "");
+      }
+      if (range == null || range.isEmpty() || (address.getPort() != 0)) {
+        socket.bind(address, backlog);
+      } else {
+        for (Integer port : range) {
+          if (socket.isBound()) break;
+          try {
+            InetSocketAddress temp = new InetSocketAddress(address.getAddress(),
+                port);
+            socket.bind(temp, backlog);
+          } catch(BindException e) {
+            //Ignored
+          }
+        }
+        if (!socket.isBound()) {
+          throw new BindException("Could not find a free port in "+range);
+        }
+      }
     } catch (SocketException e) {
     } catch (SocketException e) {
       throw NetUtils.wrapException(null,
       throw NetUtils.wrapException(null,
           0,
           0,
@@ -310,7 +337,7 @@ public abstract class Server {
       acceptChannel.configureBlocking(false);
       acceptChannel.configureBlocking(false);
 
 
       // Bind the server socket to the local host and port
       // Bind the server socket to the local host and port
-      bind(acceptChannel.socket(), address, backlogLength);
+      bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
       port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
       port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
       // create a selector;
       // create a selector;
       selector= Selector.open();
       selector= Selector.open();
@@ -1543,9 +1570,18 @@ public abstract class Server {
                   Configuration conf)
                   Configuration conf)
     throws IOException 
     throws IOException 
   {
   {
-    this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer.toString(port), null);
+    this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer.toString(port), null, null);
   }
   }
   
   
+  protected Server(String bindAddress, int port,
+      Class<? extends Writable> rpcRequestClass, int handlerCount,
+      int numReaders, int queueSizePerHandler, Configuration conf,
+      String serverName, SecretManager<? extends TokenIdentifier> secretManager)
+  throws IOException {
+    this(bindAddress, port, rpcRequestClass, handlerCount, numReaders, 
+        queueSizePerHandler, conf, serverName, secretManager, null);
+  }
+
   /** Constructs a server listening on the named port and address.  Parameters passed must
   /** Constructs a server listening on the named port and address.  Parameters passed must
    * be of the named class.  The <code>handlerCount</handlerCount> determines
    * be of the named class.  The <code>handlerCount</handlerCount> determines
    * the number of handler threads that will be used to process calls.
    * the number of handler threads that will be used to process calls.
@@ -1554,11 +1590,13 @@ public abstract class Server {
    */
    */
   @SuppressWarnings("unchecked")
   @SuppressWarnings("unchecked")
   protected Server(String bindAddress, int port, 
   protected Server(String bindAddress, int port, 
-                  Class<? extends Writable> paramClass, int handlerCount, int numReaders, int queueSizePerHandler,
-                  Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager) 
-    throws IOException {
+      Class<? extends Writable> paramClass, int handlerCount, int numReaders, int queueSizePerHandler,
+      Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager,
+      String portRangeConfig) 
+  throws IOException {
     this.bindAddress = bindAddress;
     this.bindAddress = bindAddress;
     this.conf = conf;
     this.conf = conf;
+    this.portRangeConfig = portRangeConfig;
     this.port = port;
     this.port = port;
     this.paramClass = paramClass;
     this.paramClass = paramClass;
     this.handlerCount = handlerCount;
     this.handlerCount = handlerCount;

+ 24 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java

@@ -282,6 +282,18 @@ public class WritableRpcEngine implements RpcEngine {
     return new Server(instance, conf, bindAddress, port, numHandlers, 
     return new Server(instance, conf, bindAddress, port, numHandlers, 
         numReaders, queueSizePerHandler, verbose, secretManager);
         numReaders, queueSizePerHandler, verbose, secretManager);
   }
   }
+  
+  @Override
+  public org.apache.hadoop.ipc.RPC.Server getServer(Class<?> protocol,
+      Object instance, String bindAddress, int port, int numHandlers,
+      int numReaders, int queueSizePerHandler, boolean verbose,
+      Configuration conf,
+      SecretManager<? extends TokenIdentifier> secretManager,
+      String portRangeConfig) throws IOException {
+    return new Server(instance, conf, bindAddress, port, numHandlers, 
+        numReaders, queueSizePerHandler, verbose, secretManager,
+        portRangeConfig);
+  }
 
 
   /** An RPC Server. */
   /** An RPC Server. */
   public static class Server extends RPC.Server {
   public static class Server extends RPC.Server {
@@ -315,13 +327,23 @@ public class WritableRpcEngine implements RpcEngine {
      * @param numHandlers the number of method handler threads to run
      * @param numHandlers the number of method handler threads to run
      * @param verbose whether each call should be logged
      * @param verbose whether each call should be logged
      */
      */
+    public Server(Object instance, Configuration conf, String bindAddress,  int port,
+        int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, 
+        SecretManager<? extends TokenIdentifier> secretManager) 
+    throws IOException {
+      this(instance, conf, bindAddress, port, numHandlers, numReaders,
+          queueSizePerHandler, verbose, secretManager, null);
+    }
+    
     public Server(Object instance, Configuration conf, String bindAddress,  int port,
     public Server(Object instance, Configuration conf, String bindAddress,  int port,
                   int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, 
                   int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, 
-                  SecretManager<? extends TokenIdentifier> secretManager) 
+                  SecretManager<? extends TokenIdentifier> secretManager, 
+                  String portRangeConfig) 
         throws IOException {
         throws IOException {
       super(bindAddress, port, Invocation.class, numHandlers, numReaders,
       super(bindAddress, port, Invocation.class, numHandlers, numReaders,
           queueSizePerHandler, conf,
           queueSizePerHandler, conf,
-          classNameBase(instance.getClass().getName()), secretManager);
+          classNameBase(instance.getClass().getName()), secretManager, 
+          portRangeConfig);
       this.instance = instance;
       this.instance = instance;
       this.verbose = verbose;
       this.verbose = verbose;
     }
     }

+ 33 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfiguration.java

@@ -25,16 +25,20 @@ import java.io.IOException;
 import java.io.StringWriter;
 import java.io.StringWriter;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Map;
 import java.util.Random;
 import java.util.Random;
+import java.util.Set;
 import java.util.regex.Pattern;
 import java.util.regex.Pattern;
 
 
 import junit.framework.TestCase;
 import junit.framework.TestCase;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertArrayEquals;
 
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.codehaus.jackson.map.ObjectMapper; 
 import org.codehaus.jackson.map.ObjectMapper; 
@@ -362,6 +366,35 @@ public class TestConfiguration extends TestCase {
     assertEquals(true, range.isIncluded(34));
     assertEquals(true, range.isIncluded(34));
     assertEquals(true, range.isIncluded(100000000));
     assertEquals(true, range.isIncluded(100000000));
   }
   }
+  
+  public void testGetRangeIterator() throws Exception {
+    Configuration config = new Configuration(false);
+    IntegerRanges ranges = config.getRange("Test", "");
+    assertFalse("Empty range has values", ranges.iterator().hasNext());
+    ranges = config.getRange("Test", "5");
+    Set<Integer> expected = new HashSet<Integer>(Arrays.asList(5));
+    Set<Integer> found = new HashSet<Integer>();
+    for(Integer i: ranges) {
+      found.add(i);
+    }
+    assertEquals(expected, found);
+
+    ranges = config.getRange("Test", "5-10,13-14");
+    expected = new HashSet<Integer>(Arrays.asList(5,6,7,8,9,10,13,14));
+    found = new HashSet<Integer>();
+    for(Integer i: ranges) {
+      found.add(i);
+    }
+    assertEquals(expected, found);
+    
+    ranges = config.getRange("Test", "8-12, 5- 7");
+    expected = new HashSet<Integer>(Arrays.asList(5,6,7,8,9,10,11,12));
+    found = new HashSet<Integer>();
+    for(Integer i: ranges) {
+      found.add(i);
+    }
+    assertEquals(expected, found);
+  }
 
 
   public void testHexValues() throws IOException{
   public void testHexValues() throws IOException{
     out=new BufferedWriter(new FileWriter(CONFIG));
     out=new BufferedWriter(new FileWriter(CONFIG));

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -32,6 +32,9 @@ Release 0.23.3 - UNRELEASED
 
 
     MAPREDUCE-4161. create sockets consistently (Daryn Sharp via bobby)
     MAPREDUCE-4161. create sockets consistently (Daryn Sharp via bobby)
 
 
+    MAPREDUCE-4079. Allow MR AppMaster to limit ephemeral port range.
+    (bobby via tgraves)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
   BUG FIXES
   BUG FIXES

+ 2 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java

@@ -139,7 +139,8 @@ public class MRClientService extends AbstractService
         rpc.getServer(MRClientProtocol.class, protocolHandler, address,
         rpc.getServer(MRClientProtocol.class, protocolHandler, address,
             conf, secretManager,
             conf, secretManager,
             conf.getInt(MRJobConfig.MR_AM_JOB_CLIENT_THREAD_COUNT, 
             conf.getInt(MRJobConfig.MR_AM_JOB_CLIENT_THREAD_COUNT, 
-                MRJobConfig.DEFAULT_MR_AM_JOB_CLIENT_THREAD_COUNT));
+                MRJobConfig.DEFAULT_MR_AM_JOB_CLIENT_THREAD_COUNT),
+                MRJobConfig.MR_AM_JOB_CLIENT_PORT_RANGE);
     
     
     // Enable service authorization?
     // Enable service authorization?
     if (conf.getBoolean(
     if (conf.getBoolean(

+ 7 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -371,6 +371,13 @@ public interface MRJobConfig {
     MR_AM_PREFIX + "job.client.thread-count";
     MR_AM_PREFIX + "job.client.thread-count";
   public static final int DEFAULT_MR_AM_JOB_CLIENT_THREAD_COUNT = 1;
   public static final int DEFAULT_MR_AM_JOB_CLIENT_THREAD_COUNT = 1;
 
 
+  /** 
+   * Range of ports that the MapReduce AM can use when binding. Leave blank
+   * if you want all possible ports.
+   */
+  public static final String MR_AM_JOB_CLIENT_PORT_RANGE = 
+    MR_AM_PREFIX + "job.client.port-range";
+  
   /** Enable blacklisting of nodes in the job.*/
   /** Enable blacklisting of nodes in the job.*/
   public static final String MR_AM_JOB_NODE_BLACKLISTING_ENABLE = 
   public static final String MR_AM_JOB_NODE_BLACKLISTING_ENABLE = 
     MR_AM_PREFIX  + "job.node-blacklisting.enable";
     MR_AM_PREFIX  + "job.node-blacklisting.enable";

+ 8 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -1236,6 +1236,14 @@
     MR AppMaster from remote tasks</description>
     MR AppMaster from remote tasks</description>
 </property>
 </property>
 
 
+<property>
+  <name>yarn.app.mapreduce.am.job.client.port-range</name>
+  <value></value>
+  <description>Range of ports that the MapReduce AM can use when binding.
+    Leave blank if you want all possible ports.  
+    For example 50000-50050,50100-50200</description>
+</property>
+
 <property>
 <property>
   <name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name>
   <name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name>
   <value>1000</value>
   <value>1000</value>

+ 1 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/RpcServerFactory.java

@@ -31,6 +31,6 @@ public interface RpcServerFactory {
   public Server getServer(Class<?> protocol, Object instance,
   public Server getServer(Class<?> protocol, Object instance,
       InetSocketAddress addr, Configuration conf,
       InetSocketAddress addr, Configuration conf,
       SecretManager<? extends TokenIdentifier> secretManager,
       SecretManager<? extends TokenIdentifier> secretManager,
-      int numHandlers)
+      int numHandlers, String portRangeConfig)
       throws YarnException;
       throws YarnException;
 }
 }

+ 13 - 4
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcServerFactoryPBImpl.java

@@ -58,11 +58,20 @@ public class RpcServerFactoryPBImpl implements RpcServerFactory {
   private RpcServerFactoryPBImpl() {
   private RpcServerFactoryPBImpl() {
   }
   }
   
   
-  @Override
   public Server getServer(Class<?> protocol, Object instance,
   public Server getServer(Class<?> protocol, Object instance,
       InetSocketAddress addr, Configuration conf,
       InetSocketAddress addr, Configuration conf,
       SecretManager<? extends TokenIdentifier> secretManager, int numHandlers)
       SecretManager<? extends TokenIdentifier> secretManager, int numHandlers)
       throws YarnException {
       throws YarnException {
+    return getServer(protocol, instance, addr, conf, secretManager, numHandlers,
+        null);
+  }
+  
+  @Override
+  public Server getServer(Class<?> protocol, Object instance,
+      InetSocketAddress addr, Configuration conf,
+      SecretManager<? extends TokenIdentifier> secretManager, int numHandlers,
+      String portRangeConfig)
+      throws YarnException {
     
     
     Constructor<?> constructor = serviceCache.get(protocol);
     Constructor<?> constructor = serviceCache.get(protocol);
     if (constructor == null) {
     if (constructor == null) {
@@ -116,7 +125,7 @@ public class RpcServerFactoryPBImpl implements RpcServerFactory {
     
     
     try {
     try {
       return createServer(addr, conf, secretManager, numHandlers,
       return createServer(addr, conf, secretManager, numHandlers,
-          (BlockingService)method.invoke(null, service));
+          (BlockingService)method.invoke(null, service), portRangeConfig);
     } catch (InvocationTargetException e) {
     } catch (InvocationTargetException e) {
       throw new YarnException(e);
       throw new YarnException(e);
     } catch (IllegalAccessException e) {
     } catch (IllegalAccessException e) {
@@ -150,11 +159,11 @@ public class RpcServerFactoryPBImpl implements RpcServerFactory {
 
 
   private Server createServer(InetSocketAddress addr, Configuration conf, 
   private Server createServer(InetSocketAddress addr, Configuration conf, 
       SecretManager<? extends TokenIdentifier> secretManager, int numHandlers, 
       SecretManager<? extends TokenIdentifier> secretManager, int numHandlers, 
-      BlockingService blockingService) throws IOException {
+      BlockingService blockingService, String portRangeConfig) throws IOException {
     RPC.setProtocolEngine(conf, BlockingService.class, ProtoOverHadoopRpcEngine.class);
     RPC.setProtocolEngine(conf, BlockingService.class, ProtoOverHadoopRpcEngine.class);
     Server server = RPC.getServer(BlockingService.class, blockingService, 
     Server server = RPC.getServer(BlockingService.class, blockingService, 
         addr.getHostName(), addr.getPort(), numHandlers, false, conf, 
         addr.getHostName(), addr.getPort(), numHandlers, false, conf, 
-        secretManager);
+        secretManager, portRangeConfig);
     return server;
     return server;
   }
   }
 }
 }

+ 2 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnProtoRPC.java

@@ -56,12 +56,12 @@ public class HadoopYarnProtoRPC extends YarnRPC {
   public Server getServer(Class protocol, Object instance,
   public Server getServer(Class protocol, Object instance,
       InetSocketAddress addr, Configuration conf,
       InetSocketAddress addr, Configuration conf,
       SecretManager<? extends TokenIdentifier> secretManager,
       SecretManager<? extends TokenIdentifier> secretManager,
-      int numHandlers) {
+      int numHandlers, String portRangeConfig) {
     LOG.debug("Creating a HadoopYarnProtoRpc server for protocol " + protocol + 
     LOG.debug("Creating a HadoopYarnProtoRpc server for protocol " + protocol + 
         " with " + numHandlers + " handlers");
         " with " + numHandlers + " handlers");
     
     
     return RpcFactoryProvider.getServerFactory(conf).getServer(protocol, 
     return RpcFactoryProvider.getServerFactory(conf).getServer(protocol, 
-        instance, addr, conf, secretManager, numHandlers);
+        instance, addr, conf, secretManager, numHandlers, portRangeConfig);
 
 
   }
   }
 
 

+ 9 - 4
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java

@@ -304,14 +304,18 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine {
      *          the number of method handler threads to run
      *          the number of method handler threads to run
      * @param verbose
      * @param verbose
      *          whether each call should be logged
      *          whether each call should be logged
+     * @param portRangeConfig
+     *          an optional config value used to limit ephemeral port ranges.
      */
      */
     public Server(Object instance, Configuration conf, String bindAddress,
     public Server(Object instance, Configuration conf, String bindAddress,
         int port, int numHandlers, int numReaders, 
         int port, int numHandlers, int numReaders, 
         int queueSizePerHandler, boolean verbose,
         int queueSizePerHandler, boolean verbose,
-        SecretManager<? extends TokenIdentifier> secretManager)
+        SecretManager<? extends TokenIdentifier> secretManager,
+        String portRangeConfig)
         throws IOException {
         throws IOException {
       super(bindAddress, port, ProtoSpecificRequestWritable.class, numHandlers,
       super(bindAddress, port, ProtoSpecificRequestWritable.class, numHandlers,
-          numReaders, queueSizePerHandler, conf, classNameBase(instance.getClass().getName()), secretManager);
+          numReaders, queueSizePerHandler, conf, classNameBase(instance.getClass().getName()), secretManager,
+          portRangeConfig);
       this.service = (BlockingService) instance;
       this.service = (BlockingService) instance;
       this.verbose = verbose;
       this.verbose = verbose;
     }
     }
@@ -383,9 +387,10 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine {
   public RPC.Server getServer(Class<?> protocol, Object instance,
   public RPC.Server getServer(Class<?> protocol, Object instance,
       String bindAddress, int port, int numHandlers,int numReaders, 
       String bindAddress, int port, int numHandlers,int numReaders, 
       int queueSizePerHandler, boolean verbose,
       int queueSizePerHandler, boolean verbose,
-      Configuration conf, SecretManager<? extends TokenIdentifier> secretManager)
+      Configuration conf, SecretManager<? extends TokenIdentifier> secretManager,
+      String portRangeConfig)
       throws IOException {
       throws IOException {
     return new Server(instance, conf, bindAddress, port, numHandlers, numReaders, queueSizePerHandler,
     return new Server(instance, conf, bindAddress, port, numHandlers, numReaders, queueSizePerHandler,
-        verbose, secretManager);
+        verbose, secretManager, portRangeConfig);
   }
   }
 }
 }

+ 9 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/YarnRPC.java

@@ -43,8 +43,16 @@ public abstract class YarnRPC {
   public abstract Server getServer(Class protocol, Object instance,
   public abstract Server getServer(Class protocol, Object instance,
       InetSocketAddress addr, Configuration conf,
       InetSocketAddress addr, Configuration conf,
       SecretManager<? extends TokenIdentifier> secretManager,
       SecretManager<? extends TokenIdentifier> secretManager,
-      int numHandlers);
+      int numHandlers, String portRangeConfig);
 
 
+  public Server getServer(Class protocol, Object instance,
+      InetSocketAddress addr, Configuration conf,
+      SecretManager<? extends TokenIdentifier> secretManager,
+      int numHandlers) {
+    return getServer(protocol, instance, addr, conf, secretManager, numHandlers,
+        null);
+  }
+  
   public static YarnRPC create(Configuration conf) {
   public static YarnRPC create(Configuration conf) {
     LOG.debug("Creating YarnRPC for " + 
     LOG.debug("Creating YarnRPC for " + 
         conf.get(YarnConfiguration.IPC_RPC_IMPL));
         conf.get(YarnConfiguration.IPC_RPC_IMPL));