Browse Source

YARN-5390. Federation Subcluster Resolver. Contributed by Ellen Hui.

(cherry picked from commit d3dc461a935c2af4ec3f0312ff0c26918c408467)
Subru Krishnan 9 years ago
parent
commit
d19b677301
10 changed files with 522 additions and 0 deletions
  1. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  2. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  3. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
  4. 67 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java
  5. 164 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/DefaultSubClusterResolverImpl.java
  6. 58 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/SubClusterResolver.java
  7. 17 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/package-info.java
  8. 184 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/resolver/TestDefaultSubClusterResolver.java
  9. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes
  10. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes-malformed

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -2555,6 +2555,14 @@ public class YarnConfiguration extends Configuration {
       SHARED_CACHE_PREFIX + "nm.uploader.thread-count";
       SHARED_CACHE_PREFIX + "nm.uploader.thread-count";
   public static final int DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT = 20;
   public static final int DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT = 20;
 
 
+  ////////////////////////////////
+  // Federation Configs
+  ////////////////////////////////
+
+  public static final String FEDERATION_PREFIX = YARN_PREFIX + "federation.";
+  public static final String FEDERATION_MACHINE_LIST =
+      FEDERATION_PREFIX + "machine-list";
+
   ////////////////////////////////
   ////////////////////////////////
   // Other Configs
   // Other Configs
   ////////////////////////////////
   ////////////////////////////////

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -2688,6 +2688,13 @@
   </property>
   </property>
   <!-- Other Configuration -->
   <!-- Other Configuration -->
 
 
+  <property>
+    <description>
+      Machine list file to be loaded by the FederationSubCluster Resolver
+    </description>
+    <name>yarn.federation.machine-list</name>
+  </property>
+
   <property>
   <property>
     <description>The interval that the yarn client library uses to poll the
     <description>The interval that the yarn client library uses to poll the
     completion status of the asynchronous API of application client protocol.
     completion status of the asynchronous API of application client protocol.

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml

@@ -171,6 +171,16 @@
           </filesets>
           </filesets>
         </configuration>
         </configuration>
       </plugin>
       </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+          <excludes>
+            <exclude>src/test/resources/nodes</exclude>
+            <exclude>src/test/resources/nodes-malformed</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
     </plugins>
     </plugins>
   </build>
   </build>
 </project>
 </project>

+ 67 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java

@@ -0,0 +1,67 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.federation.resolver;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+
+import java.util.HashMap;
+import java.util.Set;
+import java.util.Map;
+
+/**
+ * Partial implementation of {@link SubClusterResolver}, containing basic
+ * implementations of the read methods.
+ */
+public abstract class AbstractSubClusterResolver implements SubClusterResolver {
+  private Map<String, SubClusterId> nodeToSubCluster =
+      new HashMap<String, SubClusterId>();
+  private Map<String, Set<SubClusterId>> rackToSubClusters =
+      new HashMap<String, Set<SubClusterId>>();
+
+  @Override
+  public SubClusterId getSubClusterForNode(String nodename)
+      throws YarnException {
+    SubClusterId subClusterId = this.nodeToSubCluster.get(nodename);
+
+    if (subClusterId == null) {
+      throw new YarnException("Cannot find subClusterId for node " + nodename);
+    }
+
+    return subClusterId;
+  }
+
+  @Override
+  public Set<SubClusterId> getSubClustersForRack(String rackname)
+      throws YarnException {
+    if (!rackToSubClusters.containsKey(rackname)) {
+      throw new YarnException("Cannot resolve rack " + rackname);
+    }
+
+    return rackToSubClusters.get(rackname);
+  }
+
+  protected Map<String, SubClusterId> getNodeToSubCluster() {
+    return nodeToSubCluster;
+  }
+
+  protected Map<String, Set<SubClusterId>> getRackToSubClusters() {
+    return rackToSubClusters;
+  }
+}

+ 164 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/DefaultSubClusterResolverImpl.java

@@ -0,0 +1,164 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.federation.resolver;
+
+import java.io.BufferedReader;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.InvalidPathException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * Default simple sub-cluster and rack resolver class.
+ *
+ * This class expects a three-column comma separated file, specified in
+ * yarn.federation.machine-list. Each line of the file should be of the format:
+ *
+ * nodeName, subClusterId, rackName
+ *
+ * Lines that do not follow this format will be ignored. This resolver only
+ * loads the file when load() is explicitly called; it will not react to changes
+ * to the file.
+ *
+ * It is case-insensitive on the rack and node names and ignores
+ * leading/trailing whitespace.
+ *
+ */
+public class DefaultSubClusterResolverImpl extends AbstractSubClusterResolver
+    implements SubClusterResolver {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DefaultSubClusterResolverImpl.class);
+  private Configuration conf;
+
+  // Index of the node hostname in the machine info file.
+  private static final int NODE_NAME_INDEX = 0;
+
+  // Index of the sub-cluster ID in the machine info file.
+  private static final int SUBCLUSTER_ID_INDEX = 1;
+
+  // Index of the rack name ID in the machine info file.
+  private static final int RACK_NAME_INDEX = 2;
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  @Override
+  public SubClusterId getSubClusterForNode(String nodename)
+      throws YarnException {
+    return super.getSubClusterForNode(nodename.toUpperCase());
+  }
+
+  @Override
+  public void load() {
+    String fileName =
+        this.conf.get(YarnConfiguration.FEDERATION_MACHINE_LIST, "");
+
+    try {
+      if (fileName == null || fileName.trim().length() == 0) {
+        LOG.info(
+            "The machine list file path is not specified in the configuration");
+        return;
+      }
+
+      Path file = null;
+      BufferedReader reader = null;
+
+      try {
+        file = Paths.get(fileName);
+      } catch (InvalidPathException e) {
+        LOG.info("The configured machine list file path {} does not exist",
+            fileName);
+        return;
+      }
+
+      try {
+        reader = Files.newBufferedReader(file, Charset.defaultCharset());
+        String line = null;
+        while ((line = reader.readLine()) != null) {
+          String[] tokens = line.split(",");
+          if (tokens.length == 3) {
+
+            String nodeName = tokens[NODE_NAME_INDEX].trim().toUpperCase();
+            SubClusterId subClusterId =
+                SubClusterId.newInstance(tokens[SUBCLUSTER_ID_INDEX].trim());
+            String rackName = tokens[RACK_NAME_INDEX].trim().toUpperCase();
+
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Loading node into resolver: {} --> {}", nodeName,
+                  subClusterId);
+              LOG.debug("Loading rack into resolver: {} --> {} ", rackName,
+                  subClusterId);
+            }
+
+            this.getNodeToSubCluster().put(nodeName, subClusterId);
+            loadRackToSubCluster(rackName, subClusterId);
+          } else {
+            LOG.warn("Skipping malformed line in machine list: " + line);
+          }
+        }
+      } finally {
+        if (reader != null) {
+          reader.close();
+        }
+      }
+      LOG.info("Successfully loaded file {}", fileName);
+
+    } catch (Exception e) {
+      LOG.error("Failed to parse file " + fileName, e);
+    }
+  }
+
+  private void loadRackToSubCluster(String rackName,
+      SubClusterId subClusterId) {
+    String rackNameUpper = rackName.toUpperCase();
+
+    if (!this.getRackToSubClusters().containsKey(rackNameUpper)) {
+      this.getRackToSubClusters().put(rackNameUpper,
+          new HashSet<SubClusterId>());
+    }
+
+    this.getRackToSubClusters().get(rackNameUpper).add(subClusterId);
+
+  }
+
+  @Override
+  public Set<SubClusterId> getSubClustersForRack(String rackname)
+      throws YarnException {
+    return super.getSubClustersForRack(rackname.toUpperCase());
+  }
+}

+ 58 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/SubClusterResolver.java

@@ -0,0 +1,58 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.federation.resolver;
+
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+
+/**
+ * An utility that helps to determine the sub-cluster that a specified node
+ * belongs to.
+ */
+public interface SubClusterResolver extends Configurable {
+
+  /**
+   * Obtain the sub-cluster that a specified node belongs to.
+   *
+   * @param nodename the node whose sub-cluster is to be determined
+   * @return the sub-cluster as identified by the {@link SubClusterId} that the
+   *         node belongs to
+   * @throws YarnException if the node's sub-cluster cannot be resolved
+   */
+  SubClusterId getSubClusterForNode(String nodename) throws YarnException;
+
+  /**
+   * Obtain the sub-clusters that have nodes on a specified rack.
+   *
+   * @param rackname the name of the rack
+   * @return the sub-clusters as identified by the {@link SubClusterId} that
+   *         have nodes on the given rack
+   * @throws YarnException if the sub-cluster of any node on the rack cannot be
+   *           resolved, or if the rack name is not recognized
+   */
+  Set<SubClusterId> getSubClustersForRack(String rackname) throws YarnException;
+
+  /**
+   * Load the nodes to subCluster mapping from the file.
+   */
+  void load();
+}

+ 17 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/package-info.java

@@ -0,0 +1,17 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.resolver;

+ 184 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/resolver/TestDefaultSubClusterResolver.java

@@ -0,0 +1,184 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.federation.resolver;
+
+import java.net.URL;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test {@link SubClusterResolver} against correct and malformed Federation
+ * machine lists.
+ */
+public class TestDefaultSubClusterResolver {
+  private static YarnConfiguration conf;
+  private static SubClusterResolver resolver;
+
+  public static void setUpGoodFile() {
+    conf = new YarnConfiguration();
+    resolver = new DefaultSubClusterResolverImpl();
+
+    URL url =
+        Thread.currentThread().getContextClassLoader().getResource("nodes");
+    if (url == null) {
+      throw new RuntimeException(
+          "Could not find 'nodes' dummy file in classpath");
+    }
+
+    conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, url.getPath());
+    resolver.setConf(conf);
+    resolver.load();
+  }
+
+  private void setUpMalformedFile() {
+    conf = new YarnConfiguration();
+    resolver = new DefaultSubClusterResolverImpl();
+
+    URL url = Thread.currentThread().getContextClassLoader()
+        .getResource("nodes-malformed");
+    if (url == null) {
+      throw new RuntimeException(
+          "Could not find 'nodes-malformed' dummy file in classpath");
+    }
+
+    conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, url.getPath());
+    resolver.setConf(conf);
+    resolver.load();
+  }
+
+  private void setUpNonExistentFile() {
+    conf = new YarnConfiguration();
+    resolver = new DefaultSubClusterResolverImpl();
+
+    conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, "fileDoesNotExist");
+    resolver.setConf(conf);
+    resolver.load();
+  }
+
+  @Test
+  public void testGetSubClusterForNode() throws YarnException {
+    setUpGoodFile();
+
+    // All lowercase, no whitespace in machine list file
+    Assert.assertEquals(SubClusterId.newInstance("subcluster1"),
+        resolver.getSubClusterForNode("node1"));
+    // Leading and trailing whitespace in machine list file
+    Assert.assertEquals(SubClusterId.newInstance("subcluster2"),
+        resolver.getSubClusterForNode("node2"));
+    // Node name capitalization in machine list file
+    Assert.assertEquals(SubClusterId.newInstance("subcluster3"),
+        resolver.getSubClusterForNode("node3"));
+
+    try {
+      resolver.getSubClusterForNode("nodeDoesNotExist");
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(
+          e.getMessage().startsWith("Cannot find subClusterId for node"));
+    }
+  }
+
+  @Test
+  public void testGetSubClusterForNodeMalformedFile() throws YarnException {
+    setUpMalformedFile();
+
+    try {
+      resolver.getSubClusterForNode("node1");
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(
+          e.getMessage().startsWith("Cannot find subClusterId for node"));
+    }
+
+    try {
+      resolver.getSubClusterForNode("node2");
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(
+          e.getMessage().startsWith("Cannot find subClusterId for node"));
+    }
+
+    Assert.assertEquals(SubClusterId.newInstance("subcluster3"),
+        resolver.getSubClusterForNode("node3"));
+
+    try {
+      resolver.getSubClusterForNode("nodeDoesNotExist");
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(
+          e.getMessage().startsWith("Cannot find subClusterId for node"));
+    }
+  }
+
+  @Test
+  public void testGetSubClusterForNodeNoFile() throws YarnException {
+    setUpNonExistentFile();
+
+    try {
+      resolver.getSubClusterForNode("node1");
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(
+          e.getMessage().startsWith("Cannot find subClusterId for node"));
+    }
+  }
+
+  @Test
+  public void testGetSubClustersForRack() throws YarnException {
+    setUpGoodFile();
+
+    Set<SubClusterId> rack1Expected = new HashSet<SubClusterId>();
+    rack1Expected.add(SubClusterId.newInstance("subcluster1"));
+    rack1Expected.add(SubClusterId.newInstance("subcluster2"));
+
+    Set<SubClusterId> rack2Expected = new HashSet<SubClusterId>();
+    rack2Expected.add(SubClusterId.newInstance("subcluster3"));
+
+    // Two subclusters have nodes in rack1
+    Assert.assertEquals(rack1Expected, resolver.getSubClustersForRack("rack1"));
+
+    // Two nodes are in rack2, but both belong to subcluster3
+    Assert.assertEquals(rack2Expected, resolver.getSubClustersForRack("rack2"));
+
+    try {
+      resolver.getSubClustersForRack("rackDoesNotExist");
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(e.getMessage().startsWith("Cannot resolve rack"));
+    }
+  }
+
+  @Test
+  public void testGetSubClustersForRackNoFile() throws YarnException {
+    setUpNonExistentFile();
+
+    try {
+      resolver.getSubClustersForRack("rack1");
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(e.getMessage().startsWith("Cannot resolve rack"));
+    }
+  }
+}

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes

@@ -0,0 +1,4 @@
+node1,subcluster1,rack1
+ node2 , subcluster2, RACK1
+noDE3,subcluster3, rack2
+node4, subcluster3, rack2

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes-malformed

@@ -0,0 +1,3 @@
+node1,
+node2,subcluster2,subCluster2, rack1
+node3,subcluster3, rack2