Selaa lähdekoodia

Changes for invoking rack resolution in the RM and in the AM. Contributed by Devaraj Das.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-279@1135895 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 14 vuotta sitten
vanhempi
commit
5f84127fa2

+ 2 - 0
mapreduce/CHANGES.txt

@@ -5,6 +5,8 @@ Trunk (unreleased changes)
 
 
     MAPREDUCE-279
     MAPREDUCE-279
 
 
+    Changes for invoking rack resolution in the RM and in the AM (ddas)
+
     Fix ClassCastException in JobHistoryServer for certain jobs.
     Fix ClassCastException in JobHistoryServer for certain jobs.
     (Siddharth Seth via llu)
     (Siddharth Seth via llu)
 
 

+ 7 - 1
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -114,6 +114,7 @@ import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.RackResolver;
 
 
 /**
 /**
  * Implementation of TaskAttempt interface.
  * Implementation of TaskAttempt interface.
@@ -417,6 +418,7 @@ public abstract class TaskAttemptImpl implements
     this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
     this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
     this.resourceCapability.setMemory(getMemoryRequired(conf, taskId.getTaskType()));
     this.resourceCapability.setMemory(getMemoryRequired(conf, taskId.getTaskType()));
     this.dataLocalHosts = dataLocalHosts;
     this.dataLocalHosts = dataLocalHosts;
+    RackResolver.init(conf);
 
 
     // This "this leak" is okay because the retained pointer is in an
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
     //  instance variable.
@@ -872,7 +874,6 @@ public abstract class TaskAttemptImpl implements
     return tauce;
     return tauce;
   }
   }
 
 
-  private static String[] racks = new String[] {NetworkTopology.DEFAULT_RACK};
   private static class RequestContainerTransition implements
   private static class RequestContainerTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     boolean rescheduled = false;
     boolean rescheduled = false;
@@ -892,6 +893,11 @@ public abstract class TaskAttemptImpl implements
                 taskAttempt.attemptId, 
                 taskAttempt.attemptId, 
                 taskAttempt.resourceCapability));
                 taskAttempt.resourceCapability));
       } else {
       } else {
+        int i = 0;
+        String[] racks = new String[taskAttempt.dataLocalHosts.length];
+        for (String host : taskAttempt.dataLocalHosts) {
+          racks[i++] = RackResolver.resolve(host).getNetworkLocation();
+        }
         taskAttempt.eventHandler.handle(
         taskAttempt.eventHandler.handle(
             new ContainerRequestEvent(taskAttempt.attemptId, 
             new ContainerRequestEvent(taskAttempt.attemptId, 
                 taskAttempt.resourceCapability, 
                 taskAttempt.resourceCapability, 

+ 3 - 2
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.RackResolver;
 
 
 /**
 /**
  * Allocates the container from the ResourceManager scheduler.
  * Allocates the container from the ResourceManager scheduler.
@@ -136,6 +137,7 @@ public class RMContainerAllocator extends RMContainerRequestor
     maxReducePreemptionLimit = conf.getFloat(
     maxReducePreemptionLimit = conf.getFloat(
         AMConstants.REDUCE_PREEMPTION_LIMIT,
         AMConstants.REDUCE_PREEMPTION_LIMIT,
         AMConstants.DEFAULT_REDUCE_PREEMPTION_LIMIT);
         AMConstants.DEFAULT_REDUCE_PREEMPTION_LIMIT);
+    RackResolver.init(conf);
   }
   }
 
 
   @Override
   @Override
@@ -616,8 +618,7 @@ public class RMContainerAllocator extends RMContainerRequestor
           }
           }
         }
         }
         if (assigned == null) {
         if (assigned == null) {
-          // TODO: get rack
-          String rack = "";
+          String rack = RackResolver.resolve(host).getNetworkLocation();
           list = mapsRackMapping.get(rack);
           list = mapsRackMapping.get(rack);
           while (list != null && list.size() > 0) {
           while (list != null && list.size() > 0) {
             TaskAttemptId tId = list.removeFirst();
             TaskAttemptId tId = list.removeFirst();

+ 94 - 0
mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/RackResolver.java

@@ -0,0 +1,94 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.util;
+
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.net.ScriptBasedMapping;
+
+public class RackResolver {
+  private static DNSToSwitchMapping dnsToSwitchMapping;
+  private static boolean initCalled = false;
+  private static final Log LOG = LogFactory.getLog(RackResolver.class);
+
+  public synchronized static void init(Configuration conf) {
+    if (initCalled) {
+      return;
+    } else {
+      initCalled = true;
+    }
+    Class<? extends DNSToSwitchMapping> dnsToSwitchMappingClass =
+      conf.getClass(
+        CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, 
+        ScriptBasedMapping.class,
+        DNSToSwitchMapping.class);
+    try {
+      Constructor<? extends DNSToSwitchMapping> dnsToSwitchMappingConstructor
+                             = dnsToSwitchMappingClass.getConstructor();
+      dnsToSwitchMapping = dnsToSwitchMappingConstructor.newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
+  /**
+   * Utility method for getting a hostname resolved to a node in the
+   * network topology. This method initializes the class with the 
+   * right resolver implementation.
+   * @param conf
+   * @param hostName
+   * @return
+   */
+  public static Node resolve(Configuration conf, String hostName) {
+    init(conf);
+    return coreResolve(hostName);
+  }
+
+  /**
+   * Utility method for getting a hostname resolved to a node in the
+   * network topology. This method doesn't initialize the class.
+   * Call {@link #init(Configuration)} explicitly.
+   * @param hostName
+   * @return
+   */
+  public static Node resolve(String hostName) {
+    if (!initCalled) {
+      throw new IllegalStateException("RackResolver class not yet initialized");
+    }
+    return coreResolve(hostName);
+  }
+  
+  private static Node coreResolve(String hostName) {
+    List <String> tmpList = new ArrayList<String>(1);
+    tmpList.add(hostName);
+    List <String> rNameList = dnsToSwitchMapping.resolve(tmpList);
+    String rName = rNameList.get(0);
+    LOG.info("Resolved " + hostName + " to " + rName);
+    return new NodeBase(hostName, rName);
+  }
+}

+ 3 - 3
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java

@@ -36,9 +36,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.Node;
-import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.yarn.Lock;
 import org.apache.hadoop.yarn.Lock;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -64,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceListener;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.RackResolver;
 
 
 /**
 /**
  * This class is responsible for the interaction with the NodeManagers.
  * This class is responsible for the interaction with the NodeManagers.
@@ -153,6 +152,7 @@ NodeTracker, ClusterTracker {
         this.hostsReader = null;
         this.hostsReader = null;
       }
       }
     }
     }
+    RackResolver.init(conf);
   }
   }
 
 
   private void printConfiguredHosts() {
   private void printConfiguredHosts() {
@@ -194,7 +194,7 @@ NodeTracker, ClusterTracker {
    */
    */
   @Lock(Lock.NoLock.class)
   @Lock(Lock.NoLock.class)
   public static Node resolve(String hostName) {
   public static Node resolve(String hostName) {
-    return new NodeBase(hostName, NetworkTopology.DEFAULT_RACK);
+    return RackResolver.resolve(hostName);
   }
   }
   
   
   @Lock(Lock.NoLock.class)
   @Lock(Lock.NoLock.class)