Browse Source

YARN-2448. Changed ApplicationMasterProtocol to expose RM-recognized resource types to the AMs. Contributed by Varun Vasudev.

(cherry picked from commit b67d5ba7842cc10695d987f217027848a5a8c3d8)
Vinod Kumar Vavilapalli 10 năm trước cách đây
mục cha
commit
b4b59ef749
10 tập tin đã thay đổi với 202 bổ sung34 xóa
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 23 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
  3. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
  4. 73 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java
  5. 5 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
  6. 8 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
  7. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
  8. 14 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  9. 9 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
  10. 51 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -170,6 +170,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2515. Updated ConverterUtils#toContainerId to parse epoch.
     (Tsuyoshi OZAWA via jianhe)
 
+    YARN-2448. Changed ApplicationMasterProtocol to expose RM-recognized resource
+    types to the AMs. (Varun Vasudev via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 23 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.api.protocolrecords;
 
 import java.nio.ByteBuffer;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 
@@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 import org.apache.hadoop.yarn.util.Records;
 
 /**
@@ -180,4 +182,25 @@ public abstract class RegisterApplicationMasterResponse {
   @Private
   @Unstable
   public abstract void setNMTokensFromPreviousAttempts(List<NMToken> nmTokens);
+
+  /**
+   * Get a set of the resource types considered by the scheduler.
+   *
+   * @return a Map of RM settings
+   */
+  @Public
+  @Unstable
+  public abstract EnumSet<SchedulerResourceTypes> getSchedulerResourceTypes();
+
+  /**
+   * Set the resource types used by the scheduler.
+   *
+   * @param types
+   *          a set of the resource types that the scheduler considers during
+   *          scheduling
+   */
+  @Private
+  @Unstable
+  public abstract void setSchedulerResourceTypes(
+      EnumSet<SchedulerResourceTypes> types);
 }

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto

@@ -47,6 +47,7 @@ message RegisterApplicationMasterResponseProto {
   repeated ContainerProto containers_from_previous_attempts = 4;
   optional string queue = 5;
   repeated NMTokenProto nm_tokens_from_previous_attempts = 6;
+  repeated SchedulerResourceTypes scheduler_resource_types = 7;
 }
 
 message FinishApplicationMasterRequestProto {
@@ -88,6 +89,11 @@ message AllocateResponseProto {
   optional hadoop.common.TokenProto am_rm_token = 12;
 }
 
+enum SchedulerResourceTypes {
+  MEMORY = 0;
+  CPU = 1;
+}
+
 //////////////////////////////////////////////////////
 /////// client_RM_Protocol ///////////////////////////
 //////////////////////////////////////////////////////

+ 73 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java

@@ -20,11 +20,7 @@ package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
 
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -43,6 +39,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.TextFormat;
@@ -61,6 +58,7 @@ public class RegisterApplicationMasterResponsePBImpl extends
   private Map<ApplicationAccessType, String> applicationACLS = null;
   private List<Container> containersFromPreviousAttempts = null;
   private List<NMToken> nmTokens = null;
+  private EnumSet<SchedulerResourceTypes> schedulerResourceTypes = null;
 
   public RegisterApplicationMasterResponsePBImpl() {
     builder = RegisterApplicationMasterResponseProto.newBuilder();
@@ -122,6 +120,9 @@ public class RegisterApplicationMasterResponsePBImpl extends
       Iterable<NMTokenProto> iterable = getTokenProtoIterable(nmTokens);
       builder.addAllNmTokensFromPreviousAttempts(iterable);
     }
+    if(schedulerResourceTypes != null) {
+      addSchedulerResourceTypes();
+    }
   }
 
 
@@ -364,6 +365,73 @@ public class RegisterApplicationMasterResponsePBImpl extends
     };
   }
 
+  @Override
+  public EnumSet<SchedulerResourceTypes> getSchedulerResourceTypes() {
+    initSchedulerResourceTypes();
+    return this.schedulerResourceTypes;
+  }
+
+  private void initSchedulerResourceTypes() {
+    if (this.schedulerResourceTypes != null) {
+      return;
+    }
+    RegisterApplicationMasterResponseProtoOrBuilder p =
+        viaProto ? proto : builder;
+
+    List<SchedulerResourceTypes> list = p.getSchedulerResourceTypesList();
+    if (list.isEmpty()) {
+      this.schedulerResourceTypes =
+          EnumSet.noneOf(SchedulerResourceTypes.class);
+    } else {
+      this.schedulerResourceTypes = EnumSet.copyOf(list);
+    }
+  }
+
+  private void addSchedulerResourceTypes() {
+    maybeInitBuilder();
+    builder.clearSchedulerResourceTypes();
+    if (schedulerResourceTypes == null) {
+      return;
+    }
+    Iterable<? extends SchedulerResourceTypes> values =
+        new Iterable<SchedulerResourceTypes>() {
+
+          @Override
+          public Iterator<SchedulerResourceTypes> iterator() {
+            return new Iterator<SchedulerResourceTypes>() {
+              Iterator<SchedulerResourceTypes> settingsIterator =
+                  schedulerResourceTypes.iterator();
+
+              @Override
+              public boolean hasNext() {
+                return settingsIterator.hasNext();
+              }
+
+              @Override
+              public SchedulerResourceTypes next() {
+                return settingsIterator.next();
+              }
+
+              @Override
+              public void remove() {
+                throw new UnsupportedOperationException();
+              }
+            };
+          }
+        };
+    this.builder.addAllSchedulerResourceTypes(values);
+  }
+
+  @Override
+  public void setSchedulerResourceTypes(EnumSet<SchedulerResourceTypes> types) {
+    if (types == null) {
+      return;
+    }
+    initSchedulerResourceTypes();
+    this.schedulerResourceTypes.clear();
+    this.schedulerResourceTypes.addAll(types);
+  }
+
   private Resource convertFromProtoFormat(ResourceProto resource) {
     return new ResourcePBImpl(resource);
   }

+ 5 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

@@ -22,11 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -329,6 +325,10 @@ public class ApplicationMasterService extends AbstractService implements
             + transferredContainers.size() + " containers from previous"
             + " attempts and " + nmTokens.size() + " NM tokens.");
       }
+
+      response.setSchedulerResourceTypes(rScheduler
+        .getSchedulingResourceTypes());
+
       return response;
     }
   }

+ 8 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -19,13 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
@@ -45,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
@@ -502,4 +497,10 @@ public abstract class AbstractYarnScheduler
           + " with the same resource: " + newResource);
     }
   }
+
+  /** {@inheritDoc} */
+  @Override
+  public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes() {
+    return EnumSet.of(SchedulerResourceTypes.MEMORY);
+  }
 }

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 
 /**
  * This interface is used by the components to talk to the
@@ -220,4 +222,12 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
    * @throws YarnException
    */
   void killAllAppsInQueue(String queueName) throws YarnException;
+
+  /**
+   * Return a collection of the resource types that are considered when
+   * scheduling
+   *
+   * @return an EnumSet containing the resource types
+   */
+  public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes();
 }

+ 14 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -20,13 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -50,12 +44,12 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*;
@@ -89,6 +83,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.Lock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -1285,4 +1280,15 @@ public class CapacityScheduler extends
     }
     return (LeafQueue) ret;
   }
+
+  /** {@inheritDoc} */
+  @Override
+  public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes() {
+    if (calculator.getClass().getName()
+      .equals(DefaultResourceCalculator.class.getName())) {
+      return EnumSet.of(SchedulerResourceTypes.MEMORY);
+    }
+    return EnumSet
+      .of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU);
+  }
 }

+ 9 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -19,13 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
@@ -50,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
@@ -1529,4 +1524,11 @@ public class FairScheduler extends
     queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
     queueMgr.getRootQueue().recomputeSteadyShares();
   }
+
+  /** {@inheritDoc} */
+  @Override
+  public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes() {
+    return EnumSet
+      .of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU);
+  }
 }

+ 51 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java

@@ -18,6 +18,13 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.junit.Assert;
 
 import org.apache.commons.logging.Log;
@@ -40,8 +47,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
 
 import static java.lang.Thread.sleep;
 import static org.mockito.Matchers.any;
@@ -259,4 +265,47 @@ public class TestApplicationMasterService {
       }
     }
   }
+
+  @Test(timeout = 3000000)
+  public void testResourceTypes() throws Exception {
+    HashMap<YarnConfiguration, EnumSet<SchedulerResourceTypes>> driver =
+        new HashMap<YarnConfiguration, EnumSet<SchedulerResourceTypes>>();
+
+    CapacitySchedulerConfiguration csconf =
+        new CapacitySchedulerConfiguration();
+    csconf.setResourceComparator(DominantResourceCalculator.class);
+    YarnConfiguration testCapacityDRConf = new YarnConfiguration(csconf);
+    testCapacityDRConf.setClass(YarnConfiguration.RM_SCHEDULER,
+      CapacityScheduler.class, ResourceScheduler.class);
+    YarnConfiguration testCapacityDefConf = new YarnConfiguration();
+    testCapacityDefConf.setClass(YarnConfiguration.RM_SCHEDULER,
+      CapacityScheduler.class, ResourceScheduler.class);
+    YarnConfiguration testFairDefConf = new YarnConfiguration();
+    testFairDefConf.setClass(YarnConfiguration.RM_SCHEDULER,
+      FairScheduler.class, ResourceScheduler.class);
+
+    driver.put(conf, EnumSet.of(SchedulerResourceTypes.MEMORY));
+    driver.put(testCapacityDRConf,
+      EnumSet.of(SchedulerResourceTypes.CPU, SchedulerResourceTypes.MEMORY));
+    driver.put(testCapacityDefConf, EnumSet.of(SchedulerResourceTypes.MEMORY));
+    driver.put(testFairDefConf,
+      EnumSet.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU));
+
+    for (Map.Entry<YarnConfiguration, EnumSet<SchedulerResourceTypes>> entry : driver
+      .entrySet()) {
+      EnumSet<SchedulerResourceTypes> expectedValue = entry.getValue();
+      MockRM rm = new MockRM(entry.getKey());
+      rm.start();
+      MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
+      RMApp app1 = rm.submitApp(2048);
+      nm1.nodeHeartbeat(true);
+      RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+      MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+      RegisterApplicationMasterResponse resp = am1.registerAppAttempt();
+      EnumSet<SchedulerResourceTypes> types = resp.getSchedulerResourceTypes();
+      LOG.info("types = " + types.toString());
+      Assert.assertEquals(expectedValue, types);
+      rm.stop();
+    }
+  }
 }