Browse Source

HDFS-16481. Provide support to set Http and Rpc ports in MiniJournalCluster (#4028). Contributed by Viraj Jasani.

Viraj Jasani 3 years ago
parent
commit
278568203b

+ 26 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java

@@ -1053,6 +1053,32 @@ public class NetUtils {
     return port;
   }
 
+  /**
+   * Return free ports. There is no guarantee they will remain free, so
+   * ports should be used immediately. The number of free ports returned by
+   * this method should match argument {@code numOfPorts}. Num of ports
+   * provided in the argument should not exceed 25.
+   *
+   * @param numOfPorts Number of free ports to acquire.
+   * @return Free ports for binding a local socket.
+   */
+  public static Set<Integer> getFreeSocketPorts(int numOfPorts) {
+    Preconditions.checkArgument(numOfPorts > 0 && numOfPorts <= 25,
+        "Valid range for num of ports is between 0 and 26");
+    final Set<Integer> freePorts = new HashSet<>(numOfPorts);
+    for (int i = 0; i < numOfPorts * 5; i++) {
+      int port = getFreeSocketPort();
+      if (port == 0) {
+        continue;
+      }
+      freePorts.add(port);
+      if (freePorts.size() == numOfPorts) {
+        return freePorts;
+      }
+    }
+    throw new IllegalStateException(numOfPorts + " free ports could not be acquired.");
+  }
+
   /**
    * Return an @{@link InetAddress} to bind to. If bindWildCardAddress is true
    * than returns null.

+ 38 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.qjournal;
 import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO;
 import static org.junit.Assert.fail;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -45,13 +46,16 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
 import org.apache.hadoop.test.GenericTestUtils;
 
-public class MiniJournalCluster {
+public final class MiniJournalCluster implements Closeable {
+
   public static final String CLUSTER_WAITACTIVE_URI = "waitactive";
   public static class Builder {
     private String baseDir;
     private int numJournalNodes = 3;
     private boolean format = true;
     private final Configuration conf;
+    private int[] httpPorts = null;
+    private int[] rpcPorts = null;
 
     static {
       DefaultMetricsSystem.setMiniClusterMode(true);
@@ -76,6 +80,16 @@ public class MiniJournalCluster {
       return this;
     }
 
+    public Builder setHttpPorts(int... ports) {
+      this.httpPorts = ports;
+      return this;
+    }
+
+    public Builder setRpcPorts(int... ports) {
+      this.rpcPorts = ports;
+      return this;
+    }
+
     public MiniJournalCluster build() throws IOException {
       return new MiniJournalCluster(this);
     }
@@ -99,6 +113,19 @@ public class MiniJournalCluster {
   private final JNInfo[] nodes;
   
   private MiniJournalCluster(Builder b) throws IOException {
+
+    if (b.httpPorts != null && b.httpPorts.length != b.numJournalNodes) {
+      throw new IllegalArgumentException(
+          "Num of http ports (" + b.httpPorts.length + ") should match num of JournalNodes ("
+              + b.numJournalNodes + ")");
+    }
+
+    if (b.rpcPorts != null && b.rpcPorts.length != b.numJournalNodes) {
+      throw new IllegalArgumentException(
+          "Num of rpc ports (" + b.rpcPorts.length + ") should match num of JournalNodes ("
+              + b.numJournalNodes + ")");
+    }
+
     LOG.info("Starting MiniJournalCluster with " +
         b.numJournalNodes + " journal nodes");
     
@@ -173,8 +200,10 @@ public class MiniJournalCluster {
     Configuration conf = new Configuration(b.conf);
     File logDir = getStorageDir(idx);
     conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, logDir.toString());
-    conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "localhost:0");
-    conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "localhost:0");
+    int httpPort = b.httpPorts != null ? b.httpPorts[idx] : 0;
+    int rpcPort = b.rpcPorts != null ? b.rpcPorts[idx] : 0;
+    conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "localhost:" + rpcPort);
+    conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "localhost:" + httpPort);
     return conf;
   }
 
@@ -274,4 +303,10 @@ public class MiniJournalCluster {
           .DFS_NAMENODE_SHARED_EDITS_DIR_KEY, quorumJournalURI.toString());
     }
   }
+
+  @Override
+  public void close() throws IOException {
+    this.shutdown();
+  }
+
 }

+ 97 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java

@@ -22,15 +22,23 @@ import static org.junit.Assert.*;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
-import org.junit.Test;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
 
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestMiniJournalCluster {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestMiniJournalCluster.class);
+
   @Test
   public void testStartStop() throws IOException {
     Configuration conf = new Configuration();
@@ -52,4 +60,92 @@ public class TestMiniJournalCluster {
       c.shutdown();
     }
   }
+
+  @Test
+  public void testStartStopWithPorts() throws Exception {
+    Configuration conf = new Configuration();
+
+    LambdaTestUtils.intercept(
+        IllegalArgumentException.class,
+        "Num of http ports (1) should match num of JournalNodes (3)",
+        "MiniJournalCluster port validation failed",
+        () -> {
+          new MiniJournalCluster.Builder(conf).setHttpPorts(8481).build();
+        });
+
+    LambdaTestUtils.intercept(
+        IllegalArgumentException.class,
+        "Num of rpc ports (2) should match num of JournalNodes (3)",
+        "MiniJournalCluster port validation failed",
+        () -> {
+          new MiniJournalCluster.Builder(conf).setRpcPorts(8481, 8482).build();
+        });
+
+    LambdaTestUtils.intercept(
+        IllegalArgumentException.class,
+        "Num of rpc ports (1) should match num of JournalNodes (3)",
+        "MiniJournalCluster port validation failed",
+        () -> {
+          new MiniJournalCluster.Builder(conf).setHttpPorts(800, 9000, 10000).setRpcPorts(8481)
+              .build();
+        });
+
+    LambdaTestUtils.intercept(
+        IllegalArgumentException.class,
+        "Num of http ports (4) should match num of JournalNodes (3)",
+        "MiniJournalCluster port validation failed",
+        () -> {
+          new MiniJournalCluster.Builder(conf).setHttpPorts(800, 9000, 1000, 2000)
+              .setRpcPorts(8481, 8482, 8483).build();
+        });
+
+    final Set<Integer> httpAndRpcPorts = NetUtils.getFreeSocketPorts(6);
+    LOG.info("Free socket ports: {}", httpAndRpcPorts);
+
+    for (Integer httpAndRpcPort : httpAndRpcPorts) {
+      assertNotEquals("None of the acquired socket port should not be zero", 0,
+          httpAndRpcPort.intValue());
+    }
+
+    final int[] httpPorts = new int[3];
+    final int[] rpcPorts = new int[3];
+    int httpPortIdx = 0;
+    int rpcPortIdx = 0;
+    for (Integer httpAndRpcPort : httpAndRpcPorts) {
+      if (httpPortIdx < 3) {
+        httpPorts[httpPortIdx++] = httpAndRpcPort;
+      } else {
+        rpcPorts[rpcPortIdx++] = httpAndRpcPort;
+      }
+    }
+
+    LOG.info("Http ports selected: {}", httpPorts);
+    LOG.info("Rpc ports selected: {}", rpcPorts);
+
+    try (MiniJournalCluster miniJournalCluster = new MiniJournalCluster.Builder(conf)
+        .setHttpPorts(httpPorts)
+        .setRpcPorts(rpcPorts).build()) {
+      miniJournalCluster.waitActive();
+      URI uri = miniJournalCluster.getQuorumJournalURI("myjournal");
+      String[] addrs = uri.getAuthority().split(";");
+      assertEquals(3, addrs.length);
+
+      assertEquals(httpPorts[0], miniJournalCluster.getJournalNode(0).getHttpAddress().getPort());
+      assertEquals(httpPorts[1], miniJournalCluster.getJournalNode(1).getHttpAddress().getPort());
+      assertEquals(httpPorts[2], miniJournalCluster.getJournalNode(2).getHttpAddress().getPort());
+
+      assertEquals(rpcPorts[0],
+          miniJournalCluster.getJournalNode(0).getRpcServer().getAddress().getPort());
+      assertEquals(rpcPorts[1],
+          miniJournalCluster.getJournalNode(1).getRpcServer().getAddress().getPort());
+      assertEquals(rpcPorts[2],
+          miniJournalCluster.getJournalNode(2).getRpcServer().getAddress().getPort());
+
+      JournalNode node = miniJournalCluster.getJournalNode(0);
+      String dir = node.getConf().get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY);
+      assertEquals(new File(MiniDFSCluster.getBaseDirectory() + "journalnode-0").getAbsolutePath(),
+          dir);
+    }
+  }
+
 }