Pārlūkot izejas kodu

HDFS-6376. Distcp data between two HA clusters requires another configuration. Contributed by Dave Marion and Haohui Mai.

Jing Zhao 10 gadi atpakaļ
vecāks
revīzija
c6107f566f

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -441,6 +441,9 @@ Release 2.6.0 - UNRELEASED
     HDFS-6886. Use single editlog record for creating file + overwrite. (Yi Liu
     via jing9)
 
+    HDFS-6376. Distcp data between two HA clusters requires another configuration.
+    (Dave Marion and Haohui Mai via jing9)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -530,6 +530,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   
   public static final String  DFS_NAMESERVICES = "dfs.nameservices";
   public static final String  DFS_NAMESERVICE_ID = "dfs.nameservice.id";
+  public static final String  DFS_INTERNAL_NAMESERVICES_KEY = "dfs.internal.nameservices";
   public static final String  DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY = "dfs.namenode.resource.check.interval";
   public static final int     DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT = 5000;
   public static final String  DFS_NAMENODE_DU_RESERVED_KEY = "dfs.namenode.resource.du.reserved";

+ 72 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java

@@ -60,6 +60,7 @@ import java.util.Set;
 
 import javax.net.SocketFactory;
 
+import com.google.common.collect.Sets;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Option;
@@ -612,7 +613,7 @@ public class DFSUtil {
     String keySuffix = concatSuffixes(suffixes);
     return addSuffix(key, keySuffix);
   }
-  
+
   /**
    * Returns the configured address for all NameNodes in the cluster.
    * @param conf configuration
@@ -621,14 +622,25 @@ public class DFSUtil {
    * @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
    */
   private static Map<String, Map<String, InetSocketAddress>>
-    getAddresses(Configuration conf,
-      String defaultAddress, String... keys) {
+    getAddresses(Configuration conf, String defaultAddress, String... keys) {
     Collection<String> nameserviceIds = getNameServiceIds(conf);
-    
+    return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, keys);
+  }
+
+  /**
+   * Returns the configured address for all NameNodes in the cluster.
+   * @param conf configuration
+   * @param nsIds
+   *@param defaultAddress default address to return in case key is not found.
+   * @param keys Set of keys to look for in the order of preference   @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
+   */
+  private static Map<String, Map<String, InetSocketAddress>>
+    getAddressesForNsIds(Configuration conf, Collection<String> nsIds,
+                         String defaultAddress, String... keys) {
     // Look for configurations of the form <key>[.<nameserviceId>][.<namenodeId>]
     // across all of the configured nameservices and namenodes.
     Map<String, Map<String, InetSocketAddress>> ret = Maps.newLinkedHashMap();
-    for (String nsId : emptyAsSingletonNull(nameserviceIds)) {
+    for (String nsId : emptyAsSingletonNull(nsIds)) {
       Map<String, InetSocketAddress> isas =
         getAddressesForNameserviceId(conf, nsId, defaultAddress, keys);
       if (!isas.isEmpty()) {
@@ -773,8 +785,7 @@ public class DFSUtil {
 
   /**
    * Returns list of InetSocketAddresses corresponding to namenodes from the
-   * configuration. Note this is to be used by datanodes to get the list of
-   * namenode addresses to talk to.
+   * configuration.
    * 
    * Returns namenode address specifically configured for datanodes (using
    * service ports), if found. If not, regular RPC address configured for other
@@ -805,7 +816,60 @@ public class DFSUtil {
     }
     return addressList;
   }
-  
+
+  /**
+   * Returns list of InetSocketAddresses corresponding to the namenode
+   * that manages this cluster. Note this is to be used by datanodes to get
+   * the list of namenode addresses to talk to.
+   *
+   * Returns namenode address specifically configured for datanodes (using
+   * service ports), if found. If not, regular RPC address configured for other
+   * clients is returned.
+   *
+   * @param conf configuration
+   * @return list of InetSocketAddress
+   * @throws IOException on error
+   */
+  public static Map<String, Map<String, InetSocketAddress>>
+    getNNServiceRpcAddressesForCluster(Configuration conf) throws IOException {
+    // Use default address as fall back
+    String defaultAddress;
+    try {
+      defaultAddress = NetUtils.getHostPortString(NameNode.getAddress(conf));
+    } catch (IllegalArgumentException e) {
+      defaultAddress = null;
+    }
+
+    Collection<String> parentNameServices = conf.getTrimmedStringCollection
+            (DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY);
+
+    if (parentNameServices.isEmpty()) {
+      parentNameServices = conf.getTrimmedStringCollection
+              (DFSConfigKeys.DFS_NAMESERVICES);
+    } else {
+      // Ensure that the internal service is ineed in the list of all available
+      // nameservices.
+      Set<String> availableNameServices = Sets.newHashSet(conf
+              .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES));
+      for (String nsId : parentNameServices) {
+        if (!availableNameServices.contains(nsId)) {
+          throw new IOException("Unknown nameservice: " + nsId);
+        }
+      }
+    }
+
+    Map<String, Map<String, InetSocketAddress>> addressList =
+            getAddressesForNsIds(conf, parentNameServices, defaultAddress,
+                    DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
+    if (addressList.isEmpty()) {
+      throw new IOException("Incorrect configuration: namenode address "
+              + DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + " or "
+              + DFS_NAMENODE_RPC_ADDRESS_KEY
+              + " is not configured.");
+    }
+    return addressList;
+  }
+
   /**
    * Flatten the given map, as returned by other functions in this class,
    * into a flat list of {@link ConfiguredNNAddress} instances.

+ 6 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java

@@ -149,12 +149,12 @@ class BlockPoolManager {
   
   void refreshNamenodes(Configuration conf)
       throws IOException {
-    LOG.info("Refresh request received for nameservices: "
-        + conf.get(DFSConfigKeys.DFS_NAMESERVICES));
-    
-    Map<String, Map<String, InetSocketAddress>> newAddressMap = 
-      DFSUtil.getNNServiceRpcAddresses(conf);
-    
+    LOG.info("Refresh request received for nameservices: " + conf.get
+            (DFSConfigKeys.DFS_NAMESERVICES));
+
+    Map<String, Map<String, InetSocketAddress>> newAddressMap = DFSUtil
+            .getNNServiceRpcAddressesForCluster(conf);
+
     synchronized (refreshNamenodesLock) {
       doRefreshNamenodes(newAddressMap);
     }

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java

@@ -186,7 +186,7 @@ public class GetConf extends Configured implements Tool {
   static class NameNodesCommandHandler extends CommandHandler {
     @Override
     int doWorkInternal(GetConf tool, String []args) throws IOException {
-      tool.printMap(DFSUtil.getNNServiceRpcAddresses(tool.getConf()));
+      tool.printMap(DFSUtil.getNNServiceRpcAddressesForCluster(tool.getConf()));
       return 0;
     }
   }
@@ -223,7 +223,7 @@ public class GetConf extends Configured implements Tool {
     public int doWorkInternal(GetConf tool, String []args) throws IOException {
       Configuration config = tool.getConf();
       List<ConfiguredNNAddress> cnnlist = DFSUtil.flattenAddressMap(
-          DFSUtil.getNNServiceRpcAddresses(config));
+          DFSUtil.getNNServiceRpcAddressesForCluster(config));
       if (!cnnlist.isEmpty()) {
         for (ConfiguredNNAddress cnn : cnnlist) {
           InetSocketAddress rpc = cnn.getAddress();

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -1115,6 +1115,16 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.internal.nameservices</name>
+  <value></value>
+  <description>
+    Comma-separated list of nameservices that belong to this cluster.
+    Datanode will report to all the nameservices in this list. By default
+    this is set to the value of dfs.nameservices.
+  </description>
+</property>
+
 <property>
   <name>dfs.ha.namenodes.EXAMPLENAMESERVICE</name>
   <value></value>

+ 26 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
@@ -865,4 +866,29 @@ public class TestDFSUtil {
     // let's make sure that a password that doesn't exist returns null
     Assert.assertEquals(null, DFSUtil.getPassword(conf,"invalid-alias"));
   }
+
+  @Test
+  public void testGetNNServiceRpcAddressesForNsIds() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    conf.set(DFS_NAMESERVICES, "nn1,nn2");
+    conf.set(DFS_INTERNAL_NAMESERVICES_KEY, "nn1");
+    // Test - configured list of namenodes are returned
+    final String NN1_ADDRESS = "localhost:9000";
+    final String NN2_ADDRESS = "localhost:9001";
+    conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, "nn1"),
+            NN1_ADDRESS);
+    conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, "nn2"),
+            NN2_ADDRESS);
+
+    Map<String, Map<String, InetSocketAddress>> nnMap = DFSUtil
+            .getNNServiceRpcAddressesForCluster(conf);
+    assertEquals(1, nnMap.size());
+    assertTrue(nnMap.containsKey("nn1"));
+    conf.set(DFS_INTERNAL_NAMESERVICES_KEY, "nn3");
+    try {
+      DFSUtil.getNNServiceRpcAddressesForCluster(conf);
+      fail("Should fail for misconfiguration");
+    } catch (IOException ignored) {
+    }
+  }
 }

+ 22 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java

@@ -23,15 +23,18 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -130,6 +133,25 @@ public class TestBlockPoolManager {
         "refresh #2\n", log.toString());
   }
 
+  @Test
+  public void testInternalNameService() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns1,ns2,ns3");
+    addNN(conf, "ns1", "mock1:8020");
+    addNN(conf, "ns2", "mock1:8020");
+    addNN(conf, "ns3", "mock1:8020");
+    conf.set(DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY, "ns1");
+    bpm.refreshNamenodes(conf);
+    assertEquals("create #1\n", log.toString());
+    @SuppressWarnings("unchecked")
+    Map<String, BPOfferService> map = (Map<String, BPOfferService>) Whitebox
+            .getInternalState(bpm, "bpByNameserviceId");
+    Assert.assertFalse(map.containsKey("ns2"));
+    Assert.assertFalse(map.containsKey("ns3"));
+    Assert.assertTrue(map.containsKey("ns1"));
+    log.setLength(0);
+  }
+
   private static void addNN(Configuration conf, String ns, String addr) {
     String key = DFSUtil.addKeySuffixes(
         DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, ns);

+ 22 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestGetConf.java

@@ -18,11 +18,13 @@
 package org.apache.hadoop.hdfs.tools;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -121,13 +123,13 @@ public class TestGetConf {
       TestType type, HdfsConfiguration conf) throws IOException {
     switch (type) {
     case NAMENODE:
-      return DFSUtil.getNNServiceRpcAddresses(conf);
+      return DFSUtil.getNNServiceRpcAddressesForCluster(conf);
     case BACKUP:
       return DFSUtil.getBackupNodeAddresses(conf);
     case SECONDARY:
       return DFSUtil.getSecondaryNameNodeAddresses(conf);
     case NNRPCADDRESSES:
-      return DFSUtil.getNNServiceRpcAddresses(conf);
+      return DFSUtil.getNNServiceRpcAddressesForCluster(conf);
     }
     return null;
   }
@@ -226,7 +228,7 @@ public class TestGetConf {
     String[] actual = toStringArray(list);
     Arrays.sort(actual);
     Arrays.sort(expected);
-    assertTrue(Arrays.equals(expected, actual));
+    assertArrayEquals(expected, actual);
 
     // Test GetConf returned addresses
     getAddressListFromTool(type, conf, checkPort, list);
@@ -425,7 +427,23 @@ public class TestGetConf {
     assertEquals(hostsFile.toUri().getPath(),ret.trim());
     cleanupFile(localFileSys, excludeFile.getParent());
   }
-  
+
+  @Test
+  public void testIncludeInternalNameServices() throws Exception {
+    final int nsCount = 10;
+    final int remoteNsCount = 4;
+    HdfsConfiguration conf = new HdfsConfiguration();
+    setupNameServices(conf, nsCount);
+    setupAddress(conf, DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, nsCount, 1000);
+    setupAddress(conf, DFS_NAMENODE_RPC_ADDRESS_KEY, nsCount, 1500);
+    conf.set(DFS_INTERNAL_NAMESERVICES_KEY, "ns1");
+    setupStaticHostResolution(nsCount);
+
+    String[] includedNN = new String[] {"nn1:1001"};
+    verifyAddresses(conf, TestType.NAMENODE, false, includedNN);
+    verifyAddresses(conf, TestType.NNRPCADDRESSES, true, includedNN);
+  }
+
   private void writeConfigFile(Path name, ArrayList<String> nodes) 
       throws IOException {
       // delete if it already exists