Browse Source

YARN-2885. Create AMRMProxy request interceptor and ContainerAllocator to distribute OPPORTUNISTIC containers to appropriate Nodes (asuresh)

(cherry picked from commit 2bf025278a318b0452fdc9ece4427b4c42124e39)
Arun Suresh 9 years ago
parent
commit
c282a08f38
44 changed files with 2633 additions and 45 deletions
  1. 42 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  2. 15 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
  3. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
  4. 78 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
  5. 36 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocolPB.java
  6. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
  7. 151 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
  8. 143 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
  9. 58 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java
  10. 102 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java
  11. 180 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java
  12. 304 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java
  13. 38 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
  14. 15 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
  15. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
  16. 31 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
  17. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
  18. 42 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
  19. 124 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
  20. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
  21. 416 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
  22. 185 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
  23. 23 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
  24. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
  25. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
  26. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
  27. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
  28. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
  29. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
  30. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
  31. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
  32. 212 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
  33. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java
  34. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMAppsPage.java
  35. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
  36. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
  37. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java
  38. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java
  39. 27 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
  40. 162 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
  41. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
  42. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
  43. 17 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
  44. 170 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java

+ 42 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -296,6 +296,48 @@ public class YarnConfiguration extends Configuration {
   /** ACL used in case none is found. Allows nothing. */
   public static final String DEFAULT_YARN_APP_ACL = " ";
 
+  /** Is Distributed Scheduling Enabled. */
+  public static final String DIST_SCHEDULING_ENABLED =
+      YARN_PREFIX + "distributed-scheduling.enabled";
+  public static final boolean DIST_SCHEDULING_ENABLED_DEFAULT = false;
+
+  /** Mininum allocatable container memory for Distributed Scheduling. */
+  public static final String DIST_SCHEDULING_MIN_MEMORY =
+      YARN_PREFIX + "distributed-scheduling.min-memory";
+  public static final int DIST_SCHEDULING_MIN_MEMORY_DEFAULT = 512;
+
+  /** Mininum allocatable container vcores for Distributed Scheduling. */
+  public static final String DIST_SCHEDULING_MIN_VCORES =
+      YARN_PREFIX + "distributed-scheduling.min-vcores";
+  public static final int DIST_SCHEDULING_MIN_VCORES_DEFAULT = 1;
+
+  /** Maximum allocatable container memory for Distributed Scheduling. */
+  public static final String DIST_SCHEDULING_MAX_MEMORY =
+      YARN_PREFIX + "distributed-scheduling.max-memory";
+  public static final int DIST_SCHEDULING_MAX_MEMORY_DEFAULT = 2048;
+
+  /** Maximum allocatable container vcores for Distributed Scheduling. */
+  public static final String DIST_SCHEDULING_MAX_VCORES =
+      YARN_PREFIX + "distributed-scheduling.max-vcores";
+  public static final int DIST_SCHEDULING_MAX_VCORES_DEFAULT = 4;
+
+  /** Incremental allocatable container memory for Distributed Scheduling. */
+  public static final String DIST_SCHEDULING_INCR_MEMORY =
+      YARN_PREFIX + "distributed-scheduling.incr-memory";
+  public static final int DIST_SCHEDULING_INCR_MEMORY_DEFAULT = 512;
+
+  /** Incremental allocatable container vcores for Distributed Scheduling. */
+  public static final String DIST_SCHEDULING_INCR_VCORES =
+      YARN_PREFIX + "distributed-scheduling.incr-vcores";
+  public static final int DIST_SCHEDULING_INCR_VCORES_DEFAULT = 1;
+
+  /** Container token expiry for container allocated via Distributed
+   * Scheduling. */
+  public static final String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS =
+      YARN_PREFIX + "distributed-scheduling.container-token-expiry";
+  public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT =
+      600000;
+
   /**
    * Enable/disable intermediate-data encryption at YARN level. For now, this
    * only is used by the FileSystemRMStateStore to setup right file-system

+ 15 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java

@@ -37,8 +37,10 @@ import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.hadoop.yarn.proto.YarnProtos.AMCommandProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
@@ -294,4 +296,17 @@ public class ProtoUtils {
   public static ExecutionType convertFromProtoFormat(ExecutionTypeProto e) {
     return ExecutionType.valueOf(e.name());
   }
+
+  /*
+   * Resource
+   */
+  public static synchronized YarnProtos.ResourceProto convertToProtoFormat(
+      Resource r) {
+    return ((ResourcePBImpl) r).getProto();
+  }
+
+  public static Resource convertFromProtoFormat(
+      YarnProtos.ResourceProto resource) {
+    return new ResourcePBImpl(resource);
+  }
 }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml

@@ -142,6 +142,7 @@
               <source>
                 <directory>${basedir}/src/main/proto</directory>
                 <includes>
+                  <include>distributed_scheduler_protocol.proto</include>
                   <include>yarn_server_common_protos.proto</include>
                   <include>yarn_server_common_service_protos.proto</include>
                   <include>yarn_server_common_service_protos.proto</include>

+ 78 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java

@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.io.retry.Idempotent;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import java.io.IOException;
+
+/**
+ * <p>This protocol extends the <code>ApplicationMasterProtocol</code>. It is
+ * used by the <code>LocalScheduler</code> running on the NodeManager to wrap
+ * the request / response objects of the <code>registerApplicationMaster</code>
+ * and <code>allocate</code> methods of the protocol with addition information
+ * required to perform Distributed Scheduling.
+ * </p>
+ */
+public interface DistributedSchedulerProtocol
+    extends ApplicationMasterProtocol {
+
+  /**
+   * <p> Extends the <code>registerApplicationMaster</code> to wrap the response
+   * with additional metadata.</p>
+   *
+   * @param request ApplicationMaster registration request
+   * @return A <code>DistSchedRegisterResponse</code> that contains a standard
+   *         AM registration response along with additional information required
+   *         for Distributed Scheduling
+   * @throws YarnException
+   * @throws IOException
+   */
+  @Public
+  @Unstable
+  @Idempotent
+  DistSchedRegisterResponse registerApplicationMasterForDistributedScheduling(
+      RegisterApplicationMasterRequest request)
+      throws YarnException, IOException;
+
+  /**
+   * <p> Extends the <code>allocate</code> to wrap the response with additional
+   * metadata.</p>
+   *
+   * @param request ApplicationMaster allocate request
+   * @return A <code>DistSchedAllocateResponse</code> that contains a standard
+   *         AM allocate response along with additional information required
+   *         for Distributed Scheduling
+   * @throws YarnException
+   * @throws IOException
+   */
+  @Public
+  @Unstable
+  @Idempotent
+  DistSchedAllocateResponse allocateForDistributedScheduling(
+      AllocateRequest request) throws YarnException, IOException;
+}

+ 36 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocolPB.java

@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
+
+
+import org.apache.hadoop.yarn.proto.DistributedSchedulerProtocol;
+
+@Private
+@Unstable
+@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB",
+    protocolVersion = 1)
+public interface DistributedSchedulerProtocolPB extends
+    DistributedSchedulerProtocol.DistributedSchedulerProtocolService.BlockingInterface,
+    ApplicationMasterProtocolService.BlockingInterface {
+}

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java

@@ -78,6 +78,10 @@ public class ServerRMProxy<T> extends RMProxy<T> {
         YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
         YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
         YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
+    } else if (protocol == DistributedSchedulerProtocol.class) {
+      return conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+          YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+          YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
     } else {
       String message = "Unsupported protocol found when creating the proxy " +
           "connection to ResourceManager: " +

+ 151 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java

@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.impl.pb.client;
+
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
+
+
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+    .FinishApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+    .FinishApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+    .RegisterApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+    .RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+public class DistributedSchedulerProtocolPBClientImpl implements
+    DistributedSchedulerProtocol, Closeable {
+
+  private DistributedSchedulerProtocolPB proxy;
+
+  public DistributedSchedulerProtocolPBClientImpl(long clientVersion,
+      InetSocketAddress addr,
+      Configuration conf) throws IOException {
+    RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class,
+        ProtobufRpcEngine.class);
+    proxy = RPC.getProxy(DistributedSchedulerProtocolPB.class, clientVersion,
+        addr, conf);
+  }
+
+  @Override
+  public void close() {
+    if (this.proxy != null) {
+      RPC.stopProxy(this.proxy);
+    }
+  }
+
+  @Override
+  public DistSchedRegisterResponse
+  registerApplicationMasterForDistributedScheduling
+      (RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    YarnServiceProtos.RegisterApplicationMasterRequestProto requestProto =
+        ((RegisterApplicationMasterRequestPBImpl) request).getProto();
+    try {
+      return new DistSchedRegisterResponsePBImpl(
+          proxy.registerApplicationMasterForDistributedScheduling(
+              null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public DistSchedAllocateResponse allocateForDistributedScheduling
+      (AllocateRequest request) throws YarnException, IOException {
+    YarnServiceProtos.AllocateRequestProto requestProto =
+        ((AllocateRequestPBImpl) request).getProto();
+    try {
+      return new DistSchedAllocateResponsePBImpl(
+          proxy.allocateForDistributedScheduling(null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster
+      (RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    YarnServiceProtos.RegisterApplicationMasterRequestProto requestProto =
+        ((RegisterApplicationMasterRequestPBImpl) request).getProto();
+    try {
+      return new RegisterApplicationMasterResponsePBImpl(
+          proxy.registerApplicationMaster(null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster
+      (FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    YarnServiceProtos.FinishApplicationMasterRequestProto requestProto =
+        ((FinishApplicationMasterRequestPBImpl) request).getProto();
+    try {
+      return new FinishApplicationMasterResponsePBImpl(
+          proxy.finishApplicationMaster(null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public AllocateResponse allocate(AllocateRequest request) throws
+      YarnException, IOException {
+    YarnServiceProtos.AllocateRequestProto requestProto =
+        ((AllocateRequestPBImpl) request).getProto();
+    try {
+      return new AllocateResponsePBImpl(proxy.allocate(null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+}

+ 143 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java

@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.impl.pb.service;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords
+    .FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+    .FinishApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+    .FinishApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto;
+
+import java.io.IOException;
+
+public class DistributedSchedulerProtocolPBServiceImpl implements
+    DistributedSchedulerProtocolPB {
+
+  private DistributedSchedulerProtocol real;
+
+  public DistributedSchedulerProtocolPBServiceImpl(
+      DistributedSchedulerProtocol impl) {
+    this.real = impl;
+  }
+
+  @Override
+  public YarnServerCommonServiceProtos.DistSchedRegisterResponseProto
+  registerApplicationMasterForDistributedScheduling(RpcController controller,
+      RegisterApplicationMasterRequestProto proto) throws
+      ServiceException {
+    RegisterApplicationMasterRequestPBImpl request = new
+        RegisterApplicationMasterRequestPBImpl(proto);
+    try {
+      DistSchedRegisterResponse response =
+          real.registerApplicationMasterForDistributedScheduling(request);
+      return ((DistSchedRegisterResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto
+  allocateForDistributedScheduling(RpcController controller,
+      AllocateRequestProto proto) throws ServiceException {
+    AllocateRequestPBImpl request = new AllocateRequestPBImpl(proto);
+    try {
+      DistSchedAllocateResponse response = real
+          .allocateForDistributedScheduling(request);
+      return ((DistSchedAllocateResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public YarnServiceProtos.AllocateResponseProto allocate(RpcController arg0,
+      AllocateRequestProto proto) throws ServiceException {
+    AllocateRequestPBImpl request = new AllocateRequestPBImpl(proto);
+    try {
+      AllocateResponse response = real.allocate(request);
+      return ((AllocateResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public YarnServiceProtos.FinishApplicationMasterResponseProto
+  finishApplicationMaster(
+      RpcController arg0, YarnServiceProtos
+      .FinishApplicationMasterRequestProto proto)
+      throws ServiceException {
+    FinishApplicationMasterRequestPBImpl request = new
+        FinishApplicationMasterRequestPBImpl(proto);
+    try {
+      FinishApplicationMasterResponse response = real.finishApplicationMaster
+          (request);
+      return ((FinishApplicationMasterResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public YarnServiceProtos.RegisterApplicationMasterResponseProto
+  registerApplicationMaster(
+      RpcController arg0, RegisterApplicationMasterRequestProto proto)
+      throws ServiceException {
+    RegisterApplicationMasterRequestPBImpl request = new
+        RegisterApplicationMasterRequestPBImpl(proto);
+    try {
+      RegisterApplicationMasterResponse response = real
+          .registerApplicationMaster(request);
+      return ((RegisterApplicationMasterResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+}

+ 58 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java

@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.List;
+
+@Public
+@Unstable
+public abstract class DistSchedAllocateResponse {
+
+  @Public
+  @Unstable
+  public static DistSchedAllocateResponse newInstance(AllocateResponse
+      allResp) {
+    DistSchedAllocateResponse response =
+        Records.newRecord(DistSchedAllocateResponse.class);
+    response.setAllocateResponse(allResp);
+    return  response;
+  }
+
+  @Public
+  @Unstable
+  public abstract void setAllocateResponse(AllocateResponse response);
+
+  @Public
+  @Unstable
+  public abstract AllocateResponse getAllocateResponse();
+
+  @Public
+  @Unstable
+  public abstract void setNodesForScheduling(List<NodeId> nodesForScheduling);
+
+  @Public
+  @Unstable
+  public abstract List<NodeId> getNodesForScheduling();
+}

+ 102 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java

@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.protocolrecords
+    .RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.List;
+
+@Public
+@Unstable
+public abstract class DistSchedRegisterResponse {
+
+  @Public
+  @Unstable
+  public static DistSchedRegisterResponse newInstance
+      (RegisterApplicationMasterResponse regAMResp) {
+    DistSchedRegisterResponse response =
+        Records.newRecord(DistSchedRegisterResponse.class);
+    response.setRegisterResponse(regAMResp);
+    return response;
+  }
+
+  @Public
+  @Unstable
+  public abstract void setRegisterResponse(
+      RegisterApplicationMasterResponse resp);
+
+  @Public
+  @Unstable
+  public abstract RegisterApplicationMasterResponse getRegisterResponse();
+
+  @Public
+  @Unstable
+  public abstract void setMinAllocatableCapabilty(Resource minResource);
+
+  @Public
+  @Unstable
+  public abstract Resource getMinAllocatableCapabilty();
+
+  @Public
+  @Unstable
+  public abstract void setMaxAllocatableCapabilty(Resource maxResource);
+
+  @Public
+  @Unstable
+  public abstract Resource getMaxAllocatableCapabilty();
+
+  @Public
+  @Unstable
+  public abstract void setIncrAllocatableCapabilty(Resource maxResource);
+
+  @Public
+  @Unstable
+  public abstract Resource getIncrAllocatableCapabilty();
+
+  @Public
+  @Unstable
+  public abstract void setContainerTokenExpiryInterval(int interval);
+
+  @Public
+  @Unstable
+  public abstract int getContainerTokenExpiryInterval();
+
+  @Public
+  @Unstable
+  public abstract void setContainerIdStart(long containerIdStart);
+
+  @Public
+  @Unstable
+  public abstract long getContainerIdStart();
+
+  @Public
+  @Unstable
+  public abstract void setNodesForScheduling(List<NodeId> nodesForScheduling);
+
+  @Public
+  @Unstable
+  public abstract List<NodeId> getNodesForScheduling();
+
+}

+ 180 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java

@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+    .AllocateResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos;
+import org.apache.hadoop.yarn.server.api.protocolrecords
+    .DistSchedAllocateResponse;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class DistSchedAllocateResponsePBImpl extends DistSchedAllocateResponse {
+
+  YarnServerCommonServiceProtos.DistSchedAllocateResponseProto proto =
+      YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.getDefaultInstance();
+  YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.Builder builder = null;
+  boolean viaProto = false;
+
+  private AllocateResponse allocateResponse;
+  private List<NodeId> nodesForScheduling;
+
+  public DistSchedAllocateResponsePBImpl() {
+    builder = YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.newBuilder();
+  }
+
+  public DistSchedAllocateResponsePBImpl(YarnServerCommonServiceProtos.DistSchedAllocateResponseProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  private synchronized void mergeLocalToProto() {
+    if (viaProto)
+      maybeInitBuilder();
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private synchronized void mergeLocalToBuilder() {
+    if (this.nodesForScheduling != null) {
+      builder.clearNodesForScheduling();
+      Iterable<YarnProtos.NodeIdProto> iterable =
+          getNodeIdProtoIterable(this.nodesForScheduling);
+      builder.addAllNodesForScheduling(iterable);
+    }
+    if (this.allocateResponse != null) {
+      builder.setAllocateResponse(
+          ((AllocateResponsePBImpl)this.allocateResponse).getProto());
+    }
+  }
+  @Override
+  public void setAllocateResponse(AllocateResponse response) {
+    maybeInitBuilder();
+    if(allocateResponse == null) {
+      builder.clearAllocateResponse();
+    }
+    this.allocateResponse = response;
+  }
+
+  @Override
+  public AllocateResponse getAllocateResponse() {
+    if (this.allocateResponse != null) {
+      return this.allocateResponse;
+    }
+
+    YarnServerCommonServiceProtos.DistSchedAllocateResponseProtoOrBuilder p =
+        viaProto ? proto : builder;
+    if (!p.hasAllocateResponse()) {
+      return null;
+    }
+
+    this.allocateResponse =
+        new AllocateResponsePBImpl(p.getAllocateResponse());
+    return this.allocateResponse;
+  }
+
+  @Override
+  public void setNodesForScheduling(List<NodeId> nodesForScheduling) {
+    maybeInitBuilder();
+    if (nodesForScheduling == null || nodesForScheduling.isEmpty()) {
+      if (this.nodesForScheduling != null) {
+        this.nodesForScheduling.clear();
+      }
+      builder.clearNodesForScheduling();
+      return;
+    }
+    this.nodesForScheduling = new ArrayList<>();
+    this.nodesForScheduling.addAll(nodesForScheduling);
+  }
+
+  @Override
+  public List<NodeId> getNodesForScheduling() {
+    if (nodesForScheduling != null) {
+      return nodesForScheduling;
+    }
+    initLocalNodesForSchedulingList();
+    return nodesForScheduling;
+  }
+
+  private synchronized void initLocalNodesForSchedulingList() {
+    YarnServerCommonServiceProtos.DistSchedAllocateResponseProtoOrBuilder p =
+        viaProto ? proto : builder;
+    List<YarnProtos.NodeIdProto> list = p.getNodesForSchedulingList();
+    nodesForScheduling = new ArrayList<>();
+    if (list != null) {
+      for (YarnProtos.NodeIdProto t : list) {
+        nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t));
+      }
+    }
+  }
+  private synchronized Iterable<YarnProtos.NodeIdProto> getNodeIdProtoIterable(
+      final List<NodeId> nodeList) {
+    maybeInitBuilder();
+    return new Iterable<YarnProtos.NodeIdProto>() {
+      @Override
+      public synchronized Iterator<YarnProtos.NodeIdProto> iterator() {
+        return new Iterator<YarnProtos.NodeIdProto>() {
+
+          Iterator<NodeId> iter = nodeList.iterator();
+
+          @Override
+          public boolean hasNext() {
+            return iter.hasNext();
+          }
+
+          @Override
+          public YarnProtos.NodeIdProto next() {
+            return ProtoUtils.convertToProtoFormat(iter.next());
+          }
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+          }
+        };
+      }
+    };
+  }
+
+}

+ 304 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java

@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import com.google.protobuf.TextFormat;
+
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+    .RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
+import org.apache.hadoop.yarn.server.api.protocolrecords
+    .DistSchedRegisterResponse;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class DistSchedRegisterResponsePBImpl extends DistSchedRegisterResponse {
+
+  YarnServerCommonServiceProtos.DistSchedRegisterResponseProto proto =
+      YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.getDefaultInstance();
+  YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.Builder builder = null;
+  boolean viaProto = false;
+
+  private Resource maxAllocatableCapability;
+  private Resource minAllocatableCapability;
+  private Resource incrAllocatableCapability;
+  private List<NodeId> nodesForScheduling;
+  private RegisterApplicationMasterResponse registerApplicationMasterResponse;
+
+  public DistSchedRegisterResponsePBImpl() {
+    builder = YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.newBuilder();
+  }
+
+  public DistSchedRegisterResponsePBImpl(YarnServerCommonServiceProtos.DistSchedRegisterResponseProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public YarnServerCommonServiceProtos.DistSchedRegisterResponseProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  private synchronized void mergeLocalToProto() {
+    if (viaProto)
+      maybeInitBuilder();
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private synchronized void mergeLocalToBuilder() {
+    if (this.nodesForScheduling != null) {
+      builder.clearNodesForScheduling();
+      Iterable<YarnProtos.NodeIdProto> iterable =
+          getNodeIdProtoIterable(this.nodesForScheduling);
+      builder.addAllNodesForScheduling(iterable);
+    }
+    if (this.maxAllocatableCapability != null) {
+      builder.setMaxAllocCapability(
+          ProtoUtils.convertToProtoFormat(this.maxAllocatableCapability));
+    }
+    if (this.minAllocatableCapability != null) {
+      builder.setMaxAllocCapability(
+          ProtoUtils.convertToProtoFormat(this.minAllocatableCapability));
+    }
+    if (this.registerApplicationMasterResponse != null) {
+      builder.setRegisterResponse(
+          ((RegisterApplicationMasterResponsePBImpl)
+              this.registerApplicationMasterResponse).getProto());
+    }
+  }
+
+  @Override
+  public void setRegisterResponse(RegisterApplicationMasterResponse resp) {
+    maybeInitBuilder();
+    if(registerApplicationMasterResponse == null) {
+      builder.clearRegisterResponse();
+    }
+    this.registerApplicationMasterResponse = resp;
+  }
+
+  @Override
+  public RegisterApplicationMasterResponse getRegisterResponse() {
+    if (this.registerApplicationMasterResponse != null) {
+      return this.registerApplicationMasterResponse;
+    }
+
+    YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasRegisterResponse()) {
+      return null;
+    }
+
+    this.registerApplicationMasterResponse =
+        new RegisterApplicationMasterResponsePBImpl(p.getRegisterResponse());
+    return this.registerApplicationMasterResponse;
+  }
+
+  @Override
+  public void setMaxAllocatableCapabilty(Resource maxResource) {
+    maybeInitBuilder();
+    if(maxAllocatableCapability == null) {
+      builder.clearMaxAllocCapability();
+    }
+    this.maxAllocatableCapability = maxResource;
+  }
+
+  @Override
+  public Resource getMaxAllocatableCapabilty() {
+    if (this.maxAllocatableCapability != null) {
+      return this.maxAllocatableCapability;
+    }
+
+    YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasMaxAllocCapability()) {
+      return null;
+    }
+
+    this.maxAllocatableCapability =
+        ProtoUtils.convertFromProtoFormat(p.getMaxAllocCapability());
+    return this.maxAllocatableCapability;
+  }
+
+  @Override
+  public void setMinAllocatableCapabilty(Resource minResource) {
+    maybeInitBuilder();
+    if(minAllocatableCapability == null) {
+      builder.clearMinAllocCapability();
+    }
+    this.minAllocatableCapability = minResource;
+  }
+
+  @Override
+  public Resource getMinAllocatableCapabilty() {
+    if (this.minAllocatableCapability != null) {
+      return this.minAllocatableCapability;
+    }
+
+    YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasMinAllocCapability()) {
+      return null;
+    }
+
+    this.minAllocatableCapability =
+        ProtoUtils.convertFromProtoFormat(p.getMinAllocCapability());
+    return this.minAllocatableCapability;
+  }
+
+  @Override
+  public void setIncrAllocatableCapabilty(Resource incrResource) {
+    maybeInitBuilder();
+    if(incrAllocatableCapability == null) {
+      builder.clearIncrAllocCapability();
+    }
+    this.incrAllocatableCapability = incrResource;
+  }
+
+  @Override
+  public Resource getIncrAllocatableCapabilty() {
+    if (this.incrAllocatableCapability != null) {
+      return this.incrAllocatableCapability;
+    }
+
+    YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasIncrAllocCapability()) {
+      return null;
+    }
+
+    this.incrAllocatableCapability =
+        ProtoUtils.convertFromProtoFormat(p.getIncrAllocCapability());
+    return this.incrAllocatableCapability;
+  }
+
+  @Override
+  public void setContainerTokenExpiryInterval(int interval) {
+    maybeInitBuilder();
+    builder.setContainerTokenExpiryInterval(interval);
+  }
+
+  @Override
+  public int getContainerTokenExpiryInterval() {
+    YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasContainerTokenExpiryInterval()) {
+      return 0;
+    }
+    return p.getContainerTokenExpiryInterval();
+  }
+
+  @Override
+  public void setContainerIdStart(long containerIdStart) {
+    maybeInitBuilder();
+    builder.setContainerIdStart(containerIdStart);
+  }
+
+  @Override
+  public long getContainerIdStart() {
+    YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasContainerIdStart()) {
+      return 0;
+    }
+    return p.getContainerIdStart();
+  }
+
+
+  @Override
+  public void setNodesForScheduling(List<NodeId> nodesForScheduling) {
+    maybeInitBuilder();
+    if (nodesForScheduling == null || nodesForScheduling.isEmpty()) {
+      if (this.nodesForScheduling != null) {
+        this.nodesForScheduling.clear();
+      }
+      builder.clearNodesForScheduling();
+      return;
+    }
+    this.nodesForScheduling = new ArrayList<>();
+    this.nodesForScheduling.addAll(nodesForScheduling);
+  }
+
+  @Override
+  public List<NodeId> getNodesForScheduling() {
+    if (nodesForScheduling != null) {
+      return nodesForScheduling;
+    }
+    initLocalNodesForSchedulingList();
+    return nodesForScheduling;
+  }
+
+  private synchronized void initLocalNodesForSchedulingList() {
+    YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
+    List<YarnProtos.NodeIdProto> list = p.getNodesForSchedulingList();
+    nodesForScheduling = new ArrayList<>();
+    if (list != null) {
+      for (YarnProtos.NodeIdProto t : list) {
+        nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t));
+      }
+    }
+  }
+  private synchronized Iterable<YarnProtos.NodeIdProto> getNodeIdProtoIterable(
+      final List<NodeId> nodeList) {
+    maybeInitBuilder();
+    return new Iterable<YarnProtos.NodeIdProto>() {
+      @Override
+      public synchronized Iterator<YarnProtos.NodeIdProto> iterator() {
+        return new Iterator<YarnProtos.NodeIdProto>() {
+
+          Iterator<NodeId> iter = nodeList.iterator();
+
+          @Override
+          public boolean hasNext() {
+            return iter.hasNext();
+          }
+
+          @Override
+          public YarnProtos.NodeIdProto next() {
+            return ProtoUtils.convertToProtoFormat(iter.next());
+          }
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+          }
+        };
+      }
+    };
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+}

+ 38 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto

@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are public and stable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *stable* .proto interface.
+ */
+
+option java_package = "org.apache.hadoop.yarn.proto";
+option java_outer_classname = "DistributedSchedulerProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.yarn;
+
+import "yarn_service_protos.proto";
+import "yarn_server_common_service_protos.proto";
+
+
+service DistributedSchedulerProtocolService {
+  rpc registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequestProto) returns (DistSchedRegisterResponseProto);
+  rpc allocateForDistributedScheduling (AllocateRequestProto) returns (DistSchedAllocateResponseProto);
+}

+ 15 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto

@@ -26,6 +26,21 @@ import "yarn_protos.proto";
 import "yarn_server_common_protos.proto";
 import "yarn_service_protos.proto";
 
+message DistSchedRegisterResponseProto {
+  optional RegisterApplicationMasterResponseProto register_response = 1;
+  optional ResourceProto max_alloc_capability = 2;
+  optional ResourceProto min_alloc_capability = 3;
+  optional ResourceProto incr_alloc_capability = 4;
+  optional int32 container_token_expiry_interval = 5;
+  optional int64 container_id_start = 6;
+  repeated NodeIdProto nodes_for_scheduling = 7;
+}
+
+message DistSchedAllocateResponseProto {
+  optional AllocateResponseProto allocate_response = 1;
+  repeated NodeIdProto nodes_for_scheduling = 2;
+}
+
 message NodeLabelsProto {
   repeated NodeLabelProto nodeLabels = 1;
 }

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -106,4 +107,8 @@ public interface Context {
    * queued and killed.
    */
   QueuingContext getQueuingContext();
+
+  boolean isDistributedSchedulingEnabled();
+
+  OpportunisticContainerAllocator getContainerAllocator();
 }

+ 31 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java

@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ScriptBasedNodeLabel
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
@@ -195,9 +196,9 @@ public class NodeManager extends CompositeService
   protected NMContext createNMContext(
       NMContainerTokenSecretManager containerTokenSecretManager,
       NMTokenSecretManagerInNM nmTokenSecretManager,
-      NMStateStoreService stateStore) {
+      NMStateStoreService stateStore, boolean isDistSchedulerEnabled) {
     return new NMContext(containerTokenSecretManager, nmTokenSecretManager,
-        dirsHandler, aclsManager, stateStore);
+        dirsHandler, aclsManager, stateStore, isDistSchedulerEnabled);
   }
 
   protected void doSecureLogin() throws IOException {
@@ -318,8 +319,12 @@ public class NodeManager extends CompositeService
             getNodeHealthScriptRunner(conf), dirsHandler);
     addService(nodeHealthChecker);
 
+    boolean isDistSchedulingEnabled =
+        conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED,
+            YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT);
+
     this.context = createNMContext(containerTokenSecretManager,
-        nmTokenSecretManager, nmStore);
+        nmTokenSecretManager, nmStore, isDistSchedulingEnabled);
 
     nodeLabelsProvider = createNodeLabelsProvider(conf);
 
@@ -348,6 +353,10 @@ public class NodeManager extends CompositeService
     addService(webServer);
     ((NMContext) context).setWebServer(webServer);
 
+    ((NMContext) context).setQueueableContainerAllocator(
+        new OpportunisticContainerAllocator(nodeStatusUpdater, context,
+            webServer.getPort()));
+
     dispatcher.register(ContainerManagerEventType.class, containerManager);
     dispatcher.register(NodeManagerEventType.class, this);
     addService(dispatcher);
@@ -468,13 +477,16 @@ public class NodeManager extends CompositeService
     private final ConcurrentLinkedQueue<LogAggregationReport>
         logAggregationReportForApps;
     private NodeStatusUpdater nodeStatusUpdater;
+    private final boolean isDistSchedulingEnabled;
+
+    private OpportunisticContainerAllocator containerAllocator;
 
     private final QueuingContext queuingContext;
 
     public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
         NMTokenSecretManagerInNM nmTokenSecretManager,
         LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
-        NMStateStoreService stateStore) {
+        NMStateStoreService stateStore, boolean isDistSchedulingEnabled) {
       this.containerTokenSecretManager = containerTokenSecretManager;
       this.nmTokenSecretManager = nmTokenSecretManager;
       this.dirsHandler = dirsHandler;
@@ -486,6 +498,7 @@ public class NodeManager extends CompositeService
       this.logAggregationReportForApps = new ConcurrentLinkedQueue<
           LogAggregationReport>();
       this.queuingContext = new QueuingNMContext();
+      this.isDistSchedulingEnabled = isDistSchedulingEnabled;
     }
 
     /**
@@ -611,6 +624,20 @@ public class NodeManager extends CompositeService
     public QueuingContext getQueuingContext() {
       return this.queuingContext;
     }
+
+    public boolean isDistributedSchedulingEnabled() {
+      return isDistSchedulingEnabled;
+    }
+
+    public void setQueueableContainerAllocator(
+        OpportunisticContainerAllocator containerAllocator) {
+      this.containerAllocator = containerAllocator;
+    }
+
+    @Override
+    public OpportunisticContainerAllocator getContainerAllocator() {
+      return containerAllocator;
+    }
   }
 
   /**

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java

@@ -64,6 +64,8 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+
+import org.apache.hadoop.yarn.server.nodemanager.scheduler.LocalScheduler;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
@@ -465,6 +467,12 @@ public class AMRMProxyService extends AbstractService implements
       interceptorClassNames.add(item.trim());
     }
 
+    // Make sure LocalScheduler is present at the beginning
+    // of the chain..
+    if (this.nmContext.isDistributedSchedulingEnabled()) {
+      interceptorClassNames.add(0, LocalScheduler.class.getName());
+    }
+
     return interceptorClassNames;
   }
 

+ 42 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java

@@ -21,6 +21,14 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
 import org.apache.hadoop.conf.Configuration;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords
+    .RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+
+import java.io.IOException;
 
 /**
  * Implements the RequestInterceptor interface and provides common functionality
@@ -99,4 +107,38 @@ public abstract class AbstractRequestInterceptor implements
   public AMRMProxyApplicationContext getApplicationContext() {
     return this.appContext;
   }
+
+  /**
+   * Default implementation that invokes the distributed scheduling version
+   * of the register method.
+   *
+   * @param request ApplicationMaster allocate request
+   * @return Distribtued Scheduler Allocate Response
+   * @throws YarnException
+   * @throws IOException
+   */
+  @Override
+  public DistSchedAllocateResponse allocateForDistributedScheduling
+      (AllocateRequest request) throws YarnException, IOException {
+    return (this.nextInterceptor != null) ?
+        this.nextInterceptor.allocateForDistributedScheduling(request) : null;
+  }
+
+  /**
+   * Default implementation that invokes the distributed scheduling version
+   * of the allocate method.
+   *
+   * @param request ApplicationMaster registration request
+   * @return Distributed Scheduler Register Response
+   * @throws YarnException
+   * @throws IOException
+   */
+  @Override
+  public DistSchedRegisterResponse
+  registerApplicationMasterForDistributedScheduling
+      (RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return (this.nextInterceptor != null) ? this.nextInterceptor
+        .registerApplicationMasterForDistributedScheduling(request) : null;
+  }
 }

+ 124 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java

@@ -20,10 +20,15 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 
+import com.google.common.base.Joiner;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -33,9 +38,16 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.server.api.ServerRMProxy;
+import org.apache.hadoop.yarn.server.api.protocolrecords
+    .DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +62,7 @@ public final class DefaultRequestInterceptor extends
     AbstractRequestInterceptor {
   private static final Logger LOG = LoggerFactory
       .getLogger(DefaultRequestInterceptor.class);
-  private ApplicationMasterProtocol rmClient;
+  private DistributedSchedulerProtocol rmClient;
   private UserGroupInformation user = null;
 
   @Override
@@ -65,11 +77,12 @@ public final class DefaultRequestInterceptor extends
       final Configuration conf = this.getConf();
 
       rmClient =
-          user.doAs(new PrivilegedExceptionAction<ApplicationMasterProtocol>() {
+          user.doAs(new PrivilegedExceptionAction<DistributedSchedulerProtocol>() {
             @Override
-            public ApplicationMasterProtocol run() throws Exception {
-              return ClientRMProxy.createRMProxy(conf,
-                  ApplicationMasterProtocol.class);
+            public DistributedSchedulerProtocol run() throws Exception {
+              setAMRMTokenService(conf);
+              return ServerRMProxy.createRMProxy(conf,
+                  DistributedSchedulerProtocol.class);
             }
           });
     } catch (IOException e) {
@@ -109,6 +122,32 @@ public final class DefaultRequestInterceptor extends
     return allocateResponse;
   }
 
+  @Override
+  public DistSchedRegisterResponse
+  registerApplicationMasterForDistributedScheduling
+      (RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" +
+        "request to the real YARN RM");
+    return rmClient.registerApplicationMasterForDistributedScheduling(request);
+  }
+
+  @Override
+  public DistSchedAllocateResponse allocateForDistributedScheduling
+      (AllocateRequest request) throws YarnException, IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Forwarding allocateForDistributedScheduling request" +
+          "to the real YARN RM");
+    }
+    DistSchedAllocateResponse allocateResponse =
+        rmClient.allocateForDistributedScheduling(request);
+    if (allocateResponse.getAllocateResponse().getAMRMToken() != null) {
+      updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken());
+    }
+
+    return allocateResponse;
+  }
+
   @Override
   public FinishApplicationMasterResponse finishApplicationMaster(
       final FinishApplicationMasterRequest request) throws YarnException,
@@ -139,7 +178,85 @@ public final class DefaultRequestInterceptor extends
   }
 
   @VisibleForTesting
-  public void setRMClient(ApplicationMasterProtocol rmClient) {
-    this.rmClient = rmClient;
+  public void setRMClient(final ApplicationMasterProtocol rmClient) {
+    if (rmClient instanceof DistributedSchedulerProtocol) {
+      this.rmClient = (DistributedSchedulerProtocol)rmClient;
+    } else {
+      this.rmClient = new DistributedSchedulerProtocol() {
+        @Override
+        public RegisterApplicationMasterResponse registerApplicationMaster
+            (RegisterApplicationMasterRequest request) throws YarnException,
+            IOException {
+          return rmClient.registerApplicationMaster(request);
+        }
+
+        @Override
+        public FinishApplicationMasterResponse finishApplicationMaster
+            (FinishApplicationMasterRequest request) throws YarnException,
+            IOException {
+          return rmClient.finishApplicationMaster(request);
+        }
+
+        @Override
+        public AllocateResponse allocate(AllocateRequest request) throws
+            YarnException, IOException {
+          return rmClient.allocate(request);
+        }
+
+        @Override
+        public DistSchedRegisterResponse
+        registerApplicationMasterForDistributedScheduling
+            (RegisterApplicationMasterRequest request) throws YarnException,
+            IOException {
+          throw new IOException("Not Supported !!");
+        }
+
+        @Override
+        public DistSchedAllocateResponse
+        allocateForDistributedScheduling(AllocateRequest request) throws
+            YarnException, IOException {
+          throw new IOException("Not Supported !!");
+        }
+      };
+    }
+  }
+
+  private static void setAMRMTokenService(final Configuration conf)
+      throws IOException {
+    for (org.apache.hadoop.security.token.Token<? extends TokenIdentifier> token : UserGroupInformation
+        .getCurrentUser().getTokens()) {
+      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+        token.setService(getAMRMTokenService(conf));
+      }
+    }
+  }
+
+  @InterfaceStability.Unstable
+  public static Text getAMRMTokenService(Configuration conf) {
+    return getTokenService(conf, YarnConfiguration.RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+  }
+
+  @InterfaceStability.Unstable
+  public static Text getTokenService(Configuration conf, String address,
+      String defaultAddr, int defaultPort) {
+    if (HAUtil.isHAEnabled(conf)) {
+      // Build a list of service addresses to form the service name
+      ArrayList<String> services = new ArrayList<String>();
+      YarnConfiguration yarnConf = new YarnConfiguration(conf);
+      for (String rmId : HAUtil.getRMHAIds(conf)) {
+        // Set RM_ID to get the corresponding RM_ADDRESS
+        yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
+        services.add(SecurityUtil.buildTokenService(
+            yarnConf.getSocketAddr(address, defaultAddr, defaultPort))
+            .toString());
+      }
+      return new Text(Joiner.on(',').join(services));
+    }
+
+    // Non-HA case - no need to set RM_ID
+    return SecurityUtil.buildTokenService(conf.getSocketAddr(address,
+        defaultAddr, defaultPort));
   }
 }

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java

@@ -19,14 +19,14 @@
 package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
 
 import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
 
 /**
  * Defines the contract to be implemented by the request intercepter classes,
  * that can be used to intercept and inspect messages sent from the application
  * master to the resource manager.
  */
-public interface RequestInterceptor extends ApplicationMasterProtocol,
+public interface RequestInterceptor extends DistributedSchedulerProtocol,
     Configurable {
   /**
    * This method is called for initializing the intercepter. This is guaranteed

+ 416 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java

@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.scheduler;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords
+    .FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords
+    .FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords
+    .RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords
+    .RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy
+    .AMRMProxyApplicationContext;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor;
+
+
+
+import org.apache.hadoop.yarn.server.nodemanager.security
+    .NMTokenSecretManagerInNM;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * <p>The LocalScheduler runs on the NodeManager and is modelled as an
+ * <code>AMRMProxy</code> request interceptor. It is responsible for the
+ * following :</p>
+ * <ul>
+ *   <li>Intercept <code>ApplicationMasterProtocol</code> calls and unwrap the
+ *   response objects to extract instructions from the
+ *   <code>ClusterManager</code> running on the ResourceManager to aid in making
+ *   Scheduling scheduling decisions</li>
+ *   <li>Call the <code>OpportunisticContainerAllocator</code> to allocate
+ *   containers for the opportunistic resource outstandingOpReqs</li>
+ * </ul>
+ */
+public final class LocalScheduler extends AbstractRequestInterceptor {
+
+  static class PartitionedResourceRequests {
+    private List<ResourceRequest> guaranteed = new ArrayList<>();
+    private List<ResourceRequest> opportunistic = new ArrayList<>();
+    public List<ResourceRequest> getGuaranteed() {
+      return guaranteed;
+    }
+    public List<ResourceRequest> getOpportunistic() {
+      return opportunistic;
+    }
+  }
+
+  static class DistSchedulerParams {
+    Resource maxResource;
+    Resource minResource;
+    Resource incrementResource;
+    int containerTokenExpiryInterval;
+  }
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(LocalScheduler.class);
+
+  // Currently just used to keep track of allocated Containers
+  // Can be used for reporting stats later
+  private Set<ContainerId> containersAllocated = new HashSet<>();
+
+  private DistSchedulerParams appParams = new DistSchedulerParams();
+  private final OpportunisticContainerAllocator.ContainerIdCounter containerIdCounter =
+      new OpportunisticContainerAllocator.ContainerIdCounter();
+  private Map<String, NodeId> nodeList = new HashMap<>();
+
+  // Mapping of NodeId to NodeTokens. Populated either from RM response or
+  // generated locally if required.
+  private Map<NodeId, NMToken> nodeTokens = new HashMap<>();
+  final Set<String> blacklist = new HashSet<>();
+
+  // This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority,
+  // Resource Name (Host/rack/any) and capability. This mapping is required
+  // to match a received Container to an outstanding OPPORTUNISTIC
+  // ResourceRequests (ask)
+  final TreeMap<Priority, Map<Resource, ResourceRequest>>
+      outstandingOpReqs = new TreeMap<>();
+
+  private ApplicationAttemptId applicationAttemptId;
+  private OpportunisticContainerAllocator containerAllocator;
+  private NMTokenSecretManagerInNM nmSecretManager;
+  private String appSubmitter;
+
+  public void init(AMRMProxyApplicationContext appContext) {
+    super.init(appContext);
+    initLocal(appContext.getApplicationAttemptId(),
+        appContext.getNMCotext().getContainerAllocator(),
+        appContext.getNMCotext().getNMTokenSecretManager(),
+        appContext.getUser());
+  }
+
+  @VisibleForTesting
+  void initLocal(ApplicationAttemptId applicationAttemptId,
+      OpportunisticContainerAllocator containerAllocator,
+      NMTokenSecretManagerInNM nmSecretManager, String appSubmitter) {
+    this.applicationAttemptId = applicationAttemptId;
+    this.containerAllocator = containerAllocator;
+    this.nmSecretManager = nmSecretManager;
+    this.appSubmitter = appSubmitter;
+  }
+
+  /**
+   * Route register call to the corresponding distributed scheduling method viz.
+   * registerApplicationMasterForDistributedScheduling, and return response to
+   * the caller after stripping away Distributed Scheduling information.
+   *
+   * @param request
+   *          registration request
+   * @return Allocate Response
+   * @throws YarnException
+   * @throws IOException
+   */
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster
+      (RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return registerApplicationMasterForDistributedScheduling(request)
+        .getRegisterResponse();
+  }
+
+  /**
+   * Route allocate call to the allocateForDistributedScheduling method and
+   * return response to the caller after stripping away Distributed Scheduling
+   * information.
+   *
+   * @param request
+   *          allocation request
+   * @return Allocate Response
+   * @throws YarnException
+   * @throws IOException
+   */
+  @Override
+  public AllocateResponse allocate(AllocateRequest request) throws
+      YarnException, IOException {
+    return allocateForDistributedScheduling(request).getAllocateResponse();
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster
+      (FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return getNextInterceptor().finishApplicationMaster(request);
+  }
+
+  /**
+   * Check if we already have a NMToken. if Not, generate the Token and
+   * add it to the response
+   * @param response
+   * @param nmTokens
+   * @param allocatedContainers
+   */
+  private void updateResponseWithNMTokens(AllocateResponse response,
+      List<NMToken> nmTokens, List<Container> allocatedContainers) {
+    List<NMToken> newTokens = new ArrayList<>();
+    if (allocatedContainers.size() > 0) {
+      response.getAllocatedContainers().addAll(allocatedContainers);
+      for (Container alloc : allocatedContainers) {
+        if (!nodeTokens.containsKey(alloc.getNodeId())) {
+          newTokens.add(nmSecretManager.generateNMToken(appSubmitter, alloc));
+        }
+      }
+      List<NMToken> retTokens = new ArrayList<>(nmTokens);
+      retTokens.addAll(newTokens);
+      response.setNMTokens(retTokens);
+    }
+  }
+
+  private PartitionedResourceRequests partitionAskList(List<ResourceRequest>
+      askList) {
+    PartitionedResourceRequests partitionedRequests =
+        new PartitionedResourceRequests();
+    for (ResourceRequest rr : askList) {
+      if (rr.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+        partitionedRequests.getOpportunistic().add(rr);
+      } else {
+        partitionedRequests.getGuaranteed().add(rr);
+      }
+    }
+    return partitionedRequests;
+  }
+
+  private void updateParameters(
+      DistSchedRegisterResponse registerResponse) {
+    appParams.minResource = registerResponse.getMinAllocatableCapabilty();
+    appParams.maxResource = registerResponse.getMaxAllocatableCapabilty();
+    appParams.incrementResource =
+        registerResponse.getIncrAllocatableCapabilty();
+    if (appParams.incrementResource == null) {
+      appParams.incrementResource = appParams.minResource;
+    }
+    appParams.containerTokenExpiryInterval = registerResponse
+        .getContainerTokenExpiryInterval();
+
+    containerIdCounter
+        .resetContainerIdCounter(registerResponse.getContainerIdStart());
+    setNodeList(registerResponse.getNodesForScheduling());
+  }
+
+  /**
+   * Takes a list of ResourceRequests (asks), extracts the key information viz.
+   * (Priority, ResourceName, Capability) and adds it the outstanding
+   * OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce
+   * the current YARN constraint that only a single ResourceRequest can exist at
+   * a give Priority and Capability
+   * @param resourceAsks
+   */
+  public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
+    for (ResourceRequest request : resourceAsks) {
+      Priority priority = request.getPriority();
+
+      // TODO: Extend for Node/Rack locality. We only handle ANY requests now
+      if (!ResourceRequest.isAnyLocation(request.getResourceName())) {
+        continue;
+      }
+
+      if (request.getNumContainers() == 0) {
+        continue;
+      }
+
+      Map<Resource, ResourceRequest> reqMap =
+          this.outstandingOpReqs.get(priority);
+      if (reqMap == null) {
+        reqMap = new HashMap<>();
+        this.outstandingOpReqs.put(priority, reqMap);
+      }
+
+      ResourceRequest resourceRequest = reqMap.get(request.getCapability());
+      if (resourceRequest == null) {
+        resourceRequest = request;
+        reqMap.put(request.getCapability(), request);
+      } else {
+        resourceRequest.setNumContainers(
+            resourceRequest.getNumContainers() + request.getNumContainers());
+      }
+      if (ResourceRequest.isAnyLocation(request.getResourceName())) {
+        LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority
+            + ", with capability = " + request.getCapability() + " ) : "
+            + resourceRequest.getNumContainers());
+      }
+    }
+  }
+
+  /**
+   * This method matches a returned list of Container Allocations to any
+   * outstanding OPPORTUNISTIC ResourceRequest
+   * @param capability
+   * @param allocatedContainers
+   */
+  public void matchAllocationToOutstandingRequest(Resource capability,
+      List<Container> allocatedContainers) {
+    for (Container c : allocatedContainers) {
+      containersAllocated.add(c.getId());
+      Map<Resource, ResourceRequest> asks =
+          outstandingOpReqs.get(c.getPriority());
+
+      if (asks == null)
+        continue;
+
+      ResourceRequest rr = asks.get(capability);
+      if (rr != null) {
+        rr.setNumContainers(rr.getNumContainers() - 1);
+        if (rr.getNumContainers() == 0) {
+          asks.remove(capability);
+        }
+      }
+    }
+  }
+
+  private void setNodeList(List<NodeId> nodeList) {
+    this.nodeList.clear();
+    addToNodeList(nodeList);
+  }
+
+  private void addToNodeList(List<NodeId> nodes) {
+    for (NodeId n : nodes) {
+      this.nodeList.put(n.getHost(), n);
+    }
+  }
+
+  @Override
+  public DistSchedRegisterResponse
+  registerApplicationMasterForDistributedScheduling
+      (RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    LOG.info("Forwarding registration request to the" +
+        "Distributed Scheduler Service on YARN RM");
+    DistSchedRegisterResponse dsResp = getNextInterceptor()
+        .registerApplicationMasterForDistributedScheduling(request);
+    updateParameters(dsResp);
+    return dsResp;
+  }
+
+  @Override
+  public DistSchedAllocateResponse allocateForDistributedScheduling
+      (AllocateRequest request) throws YarnException, IOException {
+    LOG.info("Forwarding allocate request to the" +
+        "Distributed Scheduler Service on YARN RM");
+    // Partition requests into GUARANTEED and OPPORTUNISTIC reqs
+    PartitionedResourceRequests partitionedAsks = partitionAskList(request
+        .getAskList());
+
+    List<ContainerId> releasedContainers = request.getReleaseList();
+    int numReleasedContainers = releasedContainers.size();
+    if (numReleasedContainers > 0) {
+      LOG.info("AttemptID: " + applicationAttemptId + " released: "
+          + numReleasedContainers);
+      containersAllocated.removeAll(releasedContainers);
+    }
+
+    // Also, update black list
+    ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest();
+    if (rbr != null) {
+      blacklist.removeAll(rbr.getBlacklistRemovals());
+      blacklist.addAll(rbr.getBlacklistAdditions());
+    }
+
+    // Add OPPORTUNISTIC reqs to the outstanding reqs
+    addToOutstandingReqs(partitionedAsks.getOpportunistic());
+
+    List<Container> allocatedContainers = new ArrayList<>();
+    for (Priority priority : outstandingOpReqs.descendingKeySet()) {
+      // Allocated containers :
+      //  Key = Requested Capability,
+      //  Value = List of Containers of given Cap (The actual container size
+      //          might be different than what is requested.. which is why
+      //          we need the requested capability (key) to match against
+      //          the outstanding reqs)
+      Map<Resource, List<Container>> allocated =
+          containerAllocator.allocate(this.appParams, containerIdCounter,
+              outstandingOpReqs.get(priority).values(), blacklist,
+              applicationAttemptId, nodeList, appSubmitter);
+      for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) {
+        matchAllocationToOutstandingRequest(e.getKey(), e.getValue());
+        allocatedContainers.addAll(e.getValue());
+      }
+    }
+
+    // Send all the GUARANTEED Reqs to RM
+    request.setAskList(partitionedAsks.getGuaranteed());
+    DistSchedAllocateResponse dsResp =
+        getNextInterceptor().allocateForDistributedScheduling(request);
+
+    // Update host to nodeId mapping
+    setNodeList(dsResp.getNodesForScheduling());
+    List<NMToken> nmTokens = dsResp.getAllocateResponse().getNMTokens();
+    for (NMToken nmToken : nmTokens) {
+      nodeTokens.put(nmToken.getNodeId(), nmToken);
+    }
+
+    List<ContainerStatus> completedContainers =
+        dsResp.getAllocateResponse().getCompletedContainersStatuses();
+
+    // Only account for opportunistic containers
+    for (ContainerStatus cs : completedContainers) {
+      if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+        containersAllocated.remove(cs.getContainerId());
+      }
+    }
+
+    // Check if we have NM tokens for all the allocated containers. If not
+    // generate one and update the response.
+    updateResponseWithNMTokens(
+        dsResp.getAllocateResponse(), nmTokens, allocatedContainers);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "Number of opportunistic containers currently allocated by" +
+              "application: " + containersAllocated.size());
+    }
+    return dsResp;
+  }
+}

+ 185 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java

@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.scheduler;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerType;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.scheduler.LocalScheduler.DistSchedulerParams;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * <p>The OpportunisticContainerAllocator allocates containers on a given list
+ * of Nodes after it modifies the container sizes to within allowable limits
+ * specified by the <code>ClusterManager</code> running on the RM. It tries to
+ * distribute the containers as evenly as possible. It also uses the
+ * <code>NMTokenSecretManagerInNM</code> to generate the required NM tokens for
+ * the allocated containers</p>
+ */
+public class OpportunisticContainerAllocator {
+
+  private static final Log LOG =
+      LogFactory.getLog(OpportunisticContainerAllocator.class);
+
+  private static final ResourceCalculator RESOURCE_CALCULATOR =
+      new DominantResourceCalculator();
+
+  static class ContainerIdCounter {
+    final AtomicLong containerIdCounter = new AtomicLong(1);
+
+    void resetContainerIdCounter(long containerIdStart) {
+      this.containerIdCounter.set(containerIdStart);
+    }
+
+    long generateContainerId() {
+      return this.containerIdCounter.decrementAndGet();
+    }
+  }
+
+  private final NodeStatusUpdater nodeStatusUpdater;
+  private final Context context;
+  private int webpagePort;
+
+  public OpportunisticContainerAllocator(NodeStatusUpdater nodeStatusUpdater,
+      Context context, int webpagePort) {
+    this.nodeStatusUpdater = nodeStatusUpdater;
+    this.context = context;
+    this.webpagePort = webpagePort;
+  }
+
+  public Map<Resource, List<Container>> allocate(DistSchedulerParams appParams,
+      ContainerIdCounter idCounter, Collection<ResourceRequest> resourceAsks,
+      Set<String> blacklist, ApplicationAttemptId appAttId,
+      Map<String, NodeId> allNodes, String userName) throws YarnException {
+    Map<Resource, List<Container>> containers = new HashMap<>();
+    Set<String> nodesAllocated = new HashSet<>();
+    int numAsks = resourceAsks.size();
+    for (ResourceRequest anyAsk : resourceAsks) {
+      allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId,
+          allNodes, userName, containers, nodesAllocated, anyAsk);
+    }
+    if (numAsks > 0) {
+      LOG.info("Opportunistic allocation requested for: " + numAsks
+          + " containers; allocated = " + containers.size());
+    }
+    return containers;
+  }
+
+  private void allocateOpportunisticContainers(DistSchedulerParams appParams,
+      ContainerIdCounter idCounter, Set<String> blacklist,
+      ApplicationAttemptId id, Map<String, NodeId> allNodes, String userName,
+      Map<Resource, List<Container>> containers, Set<String> nodesAllocated,
+      ResourceRequest anyAsk) throws YarnException {
+    int toAllocate = anyAsk.getNumContainers()
+        - (containers.isEmpty() ?
+        0 : containers.get(anyAsk.getCapability()).size());
+
+    List<String> topKNodesLeft = new ArrayList<>();
+    for (String s : allNodes.keySet()) {
+      // Bias away from whatever we have already allocated and respect blacklist
+      if (nodesAllocated.contains(s) || blacklist.contains(s)) {
+        continue;
+      }
+      topKNodesLeft.add(s);
+    }
+    int numAllocated = 0;
+    int nextNodeToAllocate = 0;
+    for (int numCont = 0; numCont < toAllocate; numCont++) {
+      String topNode = topKNodesLeft.get(nextNodeToAllocate);
+      nextNodeToAllocate++;
+      nextNodeToAllocate %= topKNodesLeft.size();
+      NodeId nodeId = allNodes.get(topNode);
+      Container container = buildContainer(appParams, idCounter, anyAsk, id,
+          userName, nodeId);
+      List<Container> cList = containers.get(anyAsk.getCapability());
+      if (cList == null) {
+        cList = new ArrayList<>();
+        containers.put(anyAsk.getCapability(), cList);
+      }
+      cList.add(container);
+      numAllocated++;
+      LOG.info("Allocated " + numAllocated + " opportunistic containers.");
+    }
+  }
+
+  private Container buildContainer(DistSchedulerParams appParams,
+      ContainerIdCounter idCounter, ResourceRequest rr, ApplicationAttemptId id,
+      String userName, NodeId nodeId) throws YarnException {
+    ContainerId cId =
+        ContainerId.newContainerId(id, idCounter.generateContainerId());
+
+    // Normalize the resource asks (Similar to what the the RM scheduler does
+    // before accepting an ask)
+    Resource capability = normalizeCapability(appParams, rr);
+
+    long currTime = System.currentTimeMillis();
+    ContainerTokenIdentifier containerTokenIdentifier =
+        new ContainerTokenIdentifier(
+            cId, nodeId.getHost(), userName, capability,
+            currTime + appParams.containerTokenExpiryInterval,
+            context.getContainerTokenSecretManager().getCurrentKey().getKeyId(),
+            nodeStatusUpdater.getRMIdentifier(), rr.getPriority(), currTime,
+            null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,
+            ExecutionType.OPPORTUNISTIC);
+    byte[] pwd =
+        context.getContainerTokenSecretManager().createPassword(
+            containerTokenIdentifier);
+    Token containerToken = newContainerToken(nodeId, pwd,
+        containerTokenIdentifier);
+    Container container = BuilderUtils.newContainer(
+        cId, nodeId, nodeId.getHost() + ":" + webpagePort,
+        capability, rr.getPriority(), containerToken);
+    return container;
+  }
+
+  private Resource normalizeCapability(DistSchedulerParams appParams,
+      ResourceRequest ask) {
+    return Resources.normalize(RESOURCE_CALCULATOR,
+        ask.getCapability(), appParams.minResource, appParams.maxResource,
+        appParams.incrementResource);
+  }
+
+  public static Token newContainerToken(NodeId nodeId, byte[] password,
+      ContainerTokenIdentifier tokenIdentifier) {
+    // RPC layer client expects ip:port as service for tokens
+    InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(),
+        nodeId.getPort());
+    // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
+    Token containerToken = Token.newInstance(tokenIdentifier.getBytes(),
+        ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
+            .buildTokenService(addr).toString());
+    return containerToken;
+  }
+
+}

+ 23 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java

@@ -29,7 +29,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
@@ -50,7 +53,7 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager {
   private final Map<ApplicationAttemptId, MasterKeyData> oldMasterKeys;
   private final Map<ApplicationId, List<ApplicationAttemptId>> appToAppAttemptMap;
   private final NMStateStoreService stateStore;
-  private NodeId nodeId;                                                      
+  private NodeId nodeId;
   
   public NMTokenSecretManagerInNM() {
     this(new NMNullStateStoreService());
@@ -276,4 +279,23 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager {
       LOG.error("Unable to remove master key for application " + attempt, e);
     }
   }
+
+  /**
+   * Used by the Distributed Scheduler framework to generate NMTokens
+   * @param applicationSubmitter
+   * @param container
+   * @return NMToken
+   */
+  public NMToken generateNMToken(
+      String applicationSubmitter, Container container) {
+    this.readLock.lock();
+    try {
+      Token token =
+          createNMToken(container.getId().getApplicationAttemptId(),
+              container.getNodeId(), applicationSubmitter);
+      return NMToken.newInstance(container.getNodeId(), token);
+    } finally {
+      this.readLock.unlock();
+    }
+  }
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java

@@ -80,7 +80,7 @@ public class TestEventFlow {
     
     Context context = new NMContext(new NMContainerTokenSecretManager(conf),
         new NMTokenSecretManagerInNM(), null, null,
-        new NMNullStateStoreService()) {
+        new NMNullStateStoreService(), false) {
       @Override
       public int getHttpPort() {
         return 1234;

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

@@ -1583,7 +1583,7 @@ public class TestNodeStatusUpdater {
       protected NMContext createNMContext(
           NMContainerTokenSecretManager containerTokenSecretManager,
           NMTokenSecretManagerInNM nmTokenSecretManager,
-          NMStateStoreService store) {
+          NMStateStoreService store, boolean isDistributedSchedulingEnabled) {
         return new MyNMContext(containerTokenSecretManager,
           nmTokenSecretManager);
       }
@@ -1818,7 +1818,7 @@ public class TestNodeStatusUpdater {
         NMContainerTokenSecretManager containerTokenSecretManager,
         NMTokenSecretManagerInNM nmTokenSecretManager) {
       super(containerTokenSecretManager, nmTokenSecretManager, null, null,
-          new NMNullStateStoreService());
+          new NMNullStateStoreService(), false);
     }
 
     @Override

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java

@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -688,5 +689,14 @@ public abstract class BaseAMRMProxyTest {
     public QueuingContext getQueuingContext() {
       return null;
     }
+
+    public boolean isDistributedSchedulingEnabled() {
+      return false;
+    }
+
+    @Override
+    public OpportunisticContainerAllocator getContainerAllocator() {
+      return null;
+    }
   }
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java

@@ -110,7 +110,7 @@ public abstract class BaseContainerManagerTest {
   protected Configuration conf = new YarnConfiguration();
   protected Context context = new NMContext(new NMContainerTokenSecretManager(
     conf), new NMTokenSecretManagerInNM(), null,
-    new ApplicationACLsManager(conf), new NMNullStateStoreService()) {
+    new ApplicationACLsManager(conf), new NMNullStateStoreService(), false) {
     public int getHttpPort() {
       return HTTP_PORT;
     };

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java

@@ -475,7 +475,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
       NMStateStoreService stateStore) {
     NMContext context = new NMContext(new NMContainerTokenSecretManager(
         conf), new NMTokenSecretManagerInNM(), null,
-        new ApplicationACLsManager(conf), stateStore){
+        new ApplicationACLsManager(conf), stateStore, false){
       public int getHttpPort() {
         return HTTP_PORT;
       }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java

@@ -113,7 +113,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
   private static final String INVALID_JAVA_HOME = "/no/jvm/here";
   protected Context distContext = new NMContext(new NMContainerTokenSecretManager(
     conf), new NMTokenSecretManagerInNM(), null,
-    new ApplicationACLsManager(conf), new NMNullStateStoreService()) {
+    new ApplicationACLsManager(conf), new NMNullStateStoreService(), false) {
     public int getHttpPort() {
       return HTTP_PORT;
     };

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java

@@ -81,7 +81,8 @@ public class TestLocalCacheDirectoryManager {
     NMContext nmContext =
         new NMContext(new NMContainerTokenSecretManager(conf),
           new NMTokenSecretManagerInNM(), null,
-          new ApplicationACLsManager(conf), new NMNullStateStoreService());
+          new ApplicationACLsManager(conf), new NMNullStateStoreService(),
+            false);
     ResourceLocalizationService service =
         new ResourceLocalizationService(null, null, null, null, nmContext);
     try {

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

@@ -186,7 +186,7 @@ public class TestResourceLocalizationService {
     conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
     nmContext = new NMContext(new NMContainerTokenSecretManager(
       conf), new NMTokenSecretManagerInNM(), null,
-      new ApplicationACLsManager(conf), new NMNullStateStoreService());
+      new ApplicationACLsManager(conf), new NMNullStateStoreService(), false);
   }
 
   @After
@@ -2365,7 +2365,7 @@ public class TestResourceLocalizationService {
     NMContext nmContext =
         new NMContext(new NMContainerTokenSecretManager(conf),
           new NMTokenSecretManagerInNM(), null,
-          new ApplicationACLsManager(conf), stateStore);
+          new ApplicationACLsManager(conf), stateStore, false);
     ResourceLocalizationService rawService =
       new ResourceLocalizationService(dispatcher, exec, delService,
                                       dirsHandler, nmContext);

+ 212 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java

@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.scheduler;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords
+    .RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords
+    .RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
+import org.apache.hadoop.yarn.server.nodemanager.security
+    .NMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.security
+    .NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestLocalScheduler {
+
+  @Test
+  public void testLocalScheduler() throws Exception {
+
+    Configuration conf = new Configuration();
+    LocalScheduler localScheduler = new LocalScheduler();
+
+    NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class);
+    Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l);
+    Context context = Mockito.mock(Context.class);
+    NMContainerTokenSecretManager nmContainerTokenSecretManager = new
+        NMContainerTokenSecretManager(conf);
+    MasterKey mKey = new MasterKey() {
+      @Override
+      public int getKeyId() {
+        return 1;
+      }
+      @Override
+      public void setKeyId(int keyId) {}
+      @Override
+      public ByteBuffer getBytes() {
+        return ByteBuffer.allocate(8);
+      }
+      @Override
+      public void setBytes(ByteBuffer bytes) {}
+    };
+    nmContainerTokenSecretManager.setMasterKey(mKey);
+    Mockito.when(context.getContainerTokenSecretManager()).thenReturn
+        (nmContainerTokenSecretManager);
+    OpportunisticContainerAllocator containerAllocator =
+        new OpportunisticContainerAllocator(nodeStatusUpdater, context, 7777);
+
+    NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
+        new NMTokenSecretManagerInNM();
+    nmTokenSecretManagerInNM.setMasterKey(mKey);
+    localScheduler.initLocal(
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
+        containerAllocator, nmTokenSecretManagerInNM, "test");
+
+    RequestInterceptor finalReqIntcptr = Mockito.mock(RequestInterceptor.class);
+    localScheduler.setNextInterceptor(finalReqIntcptr);
+
+    DistSchedRegisterResponse distSchedRegisterResponse =
+        Records.newRecord(DistSchedRegisterResponse.class);
+    distSchedRegisterResponse.setRegisterResponse(
+        Records.newRecord(RegisterApplicationMasterResponse.class));
+    distSchedRegisterResponse.setContainerTokenExpiryInterval(12345);
+    distSchedRegisterResponse.setContainerIdStart(0);
+    distSchedRegisterResponse.setMaxAllocatableCapabilty(
+        Resource.newInstance(1024, 4));
+    distSchedRegisterResponse.setMinAllocatableCapabilty(
+        Resource.newInstance(512, 2));
+    distSchedRegisterResponse.setNodesForScheduling(Arrays.asList(
+        NodeId.newInstance("a", 1), NodeId.newInstance("b", 2)));
+    Mockito.when(
+        finalReqIntcptr.registerApplicationMasterForDistributedScheduling(
+            Mockito.any(RegisterApplicationMasterRequest.class)))
+        .thenReturn(distSchedRegisterResponse);
+
+    localScheduler.registerApplicationMaster(
+        Records.newRecord(RegisterApplicationMasterRequest.class));
+
+    Mockito.when(
+        finalReqIntcptr.allocateForDistributedScheduling(
+            Mockito.any(AllocateRequest.class)))
+        .thenAnswer(new Answer<DistSchedAllocateResponse>() {
+          @Override
+          public DistSchedAllocateResponse answer(InvocationOnMock
+              invocationOnMock) throws Throwable {
+            return createAllocateResponse(Arrays.asList(
+                NodeId.newInstance("c", 3), NodeId.newInstance("d", 4)));
+          }
+        });
+
+    AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
+    ResourceRequest guaranteedReq = Records.newRecord(ResourceRequest.class);
+    guaranteedReq.setExecutionType(ExecutionType.GUARANTEED);
+    guaranteedReq.setNumContainers(5);
+    guaranteedReq.setCapability(Resource.newInstance(2048, 2));
+    guaranteedReq.setRelaxLocality(true);
+    guaranteedReq.setResourceName("*");
+    ResourceRequest opportunisticReq = Records.newRecord(ResourceRequest.class);
+    opportunisticReq.setExecutionType(ExecutionType.OPPORTUNISTIC);
+    opportunisticReq.setNumContainers(4);
+    opportunisticReq.setCapability(Resource.newInstance(1024, 4));
+    opportunisticReq.setPriority(Priority.newInstance(100));
+    opportunisticReq.setRelaxLocality(true);
+    opportunisticReq.setResourceName("*");
+    allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
+
+    // Verify 4 containers were allocated
+    AllocateResponse allocateResponse = localScheduler.allocate(allocateRequest);
+    Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size());
+
+    // Verify equal distribution on hosts a and b
+    // And None on c and d
+    Map<NodeId, List<ContainerId>> allocs = mapAllocs(allocateResponse);
+    Assert.assertEquals(2, allocs.get(NodeId.newInstance("a", 1)).size());
+    Assert.assertEquals(2, allocs.get(NodeId.newInstance("b", 2)).size());
+    Assert.assertNull(allocs.get(NodeId.newInstance("c", 3)));
+    Assert.assertNull(allocs.get(NodeId.newInstance("d", 4)));
+
+    // New Allocate request
+    allocateRequest = Records.newRecord(AllocateRequest.class);
+    opportunisticReq = Records.newRecord(ResourceRequest.class);
+    opportunisticReq.setExecutionType(ExecutionType.OPPORTUNISTIC);
+    opportunisticReq.setNumContainers(6);
+    opportunisticReq.setCapability(Resource.newInstance(512, 3));
+    opportunisticReq.setPriority(Priority.newInstance(100));
+    opportunisticReq.setRelaxLocality(true);
+    opportunisticReq.setResourceName("*");
+    allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
+
+    // Verify 6 containers were allocated
+    allocateResponse = localScheduler.allocate(allocateRequest);
+    Assert.assertEquals(6, allocateResponse.getAllocatedContainers().size());
+
+    // Verify New containers are equally distribution on hosts c and d
+    // And None on a and b
+    allocs = mapAllocs(allocateResponse);
+    Assert.assertEquals(3, allocs.get(NodeId.newInstance("c", 3)).size());
+    Assert.assertEquals(3, allocs.get(NodeId.newInstance("d", 4)).size());
+    Assert.assertNull(allocs.get(NodeId.newInstance("a", 1)));
+    Assert.assertNull(allocs.get(NodeId.newInstance("b", 2)));
+  }
+
+  private DistSchedAllocateResponse createAllocateResponse(List<NodeId> nodes) {
+    DistSchedAllocateResponse distSchedAllocateResponse = Records.newRecord
+        (DistSchedAllocateResponse.class);
+    distSchedAllocateResponse.setAllocateResponse(
+        Records.newRecord(AllocateResponse.class));
+    distSchedAllocateResponse.setNodesForScheduling(nodes);
+    return distSchedAllocateResponse;
+  }
+
+  private Map<NodeId, List<ContainerId>> mapAllocs(AllocateResponse
+      allocateResponse) {
+    Map<NodeId, List<ContainerId>> allocs = new HashMap<>();
+    for (Container c : allocateResponse.getAllocatedContainers()) {
+      List<ContainerId> cIds = allocs.get(c.getNodeId());
+      if (cIds == null) {
+        cIds = new ArrayList<>();
+        allocs.put(c.getNodeId(), cIds);
+      }
+      cIds.add(c.getId());
+    }
+    return allocs;
+  }
+
+}

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java

@@ -96,7 +96,7 @@ public class TestContainerLogsPage {
     healthChecker.init(conf);
     LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
     NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler,
-        new ApplicationACLsManager(conf), new NMNullStateStoreService());
+        new ApplicationACLsManager(conf), new NMNullStateStoreService(), false);
     // Add an application and the corresponding containers
     RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(conf);
     String user = "nobody";
@@ -136,7 +136,7 @@ public class TestContainerLogsPage {
     when(dirsHandlerForFullDisk.getLogDirsForRead()).
         thenReturn(Arrays.asList(new String[] {absLogDir.getAbsolutePath()}));
     nmContext = new NodeManager.NMContext(null, null, dirsHandlerForFullDisk,
-        new ApplicationACLsManager(conf), new NMNullStateStoreService());
+        new ApplicationACLsManager(conf), new NMNullStateStoreService(), false);
     nmContext.getApplications().put(appId, app);
     container.setState(ContainerState.RUNNING);
     nmContext.getContainers().put(container1, container);
@@ -158,7 +158,7 @@ public class TestContainerLogsPage {
     LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
     dirsHandler.init(conf);
     NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler,
-        new ApplicationACLsManager(conf), new NMNullStateStoreService());
+        new ApplicationACLsManager(conf), new NMNullStateStoreService(), false);
     // Add an application and the corresponding containers
     String user = "nobody";
     long clusterTimeStamp = 1234;

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMAppsPage.java

@@ -62,7 +62,8 @@ public class TestNMAppsPage {
     Configuration conf = new Configuration();
     final NMContext nmcontext = new NMContext(
         new NMContainerTokenSecretManager(conf), new NMTokenSecretManagerInNM(),
-        null, new ApplicationACLsManager(conf), new NMNullStateStoreService());
+        null, new ApplicationACLsManager(conf), new NMNullStateStoreService(),
+        false);
     Injector injector = WebAppTests.createMockInjector(NMContext.class,
         nmcontext, new Module() {
           @Override

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java

@@ -87,7 +87,7 @@ public class TestNMWebServer {
 
   private int startNMWebAppServer(String webAddr) {
     Context nmContext = new NodeManager.NMContext(null, null, null, null,
-        null);
+        null, false);
     ResourceView resourceView = new ResourceView() {
       @Override
       public long getVmemAllocatedForContainers() {
@@ -150,7 +150,7 @@ public class TestNMWebServer {
   @Test
   public void testNMWebApp() throws IOException, YarnException {
     Context nmContext = new NodeManager.NMContext(null, null, null, null,
-        null);
+        null, false);
     ResourceView resourceView = new ResourceView() {
       @Override
       public long getVmemAllocatedForContainers() {

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java

@@ -111,7 +111,7 @@ public class TestNMWebServices extends JerseyTestBase {
       healthChecker.init(conf);
       aclsManager = new ApplicationACLsManager(conf);
       nmContext = new NodeManager.NMContext(null, null, dirsHandler,
-          aclsManager, null);
+          aclsManager, null, false);
       NodeId nodeId = NodeId.newInstance("testhost.foo.com", 8042);
       ((NodeManager.NMContext)nmContext).setNodeId(nodeId);
       resourceView = new ResourceView() {

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java

@@ -104,7 +104,7 @@ public class TestNMWebServicesApps extends JerseyTestBase {
       dirsHandler = healthChecker.getDiskHandler();
       aclsManager = new ApplicationACLsManager(conf);
       nmContext = new NodeManager.NMContext(null, null, dirsHandler,
-          aclsManager, null);
+          aclsManager, null, false);
       NodeId nodeId = NodeId.newInstance("testhost.foo.com", 9999);
       ((NodeManager.NMContext)nmContext).setNodeId(nodeId);
       resourceView = new ResourceView() {

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java

@@ -136,7 +136,7 @@ public class TestNMWebServicesContainers extends JerseyTestBase {
       dirsHandler = healthChecker.getDiskHandler();
       aclsManager = new ApplicationACLsManager(conf);
       nmContext = new NodeManager.NMContext(null, null, dirsHandler,
-          aclsManager, null) {
+          aclsManager, null, false) {
         public NodeId getNodeId() {
           return NodeId.newInstance("testhost.foo.com", 8042);
         };

+ 27 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

@@ -48,6 +48,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -89,6 +91,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.security
+    .AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -104,21 +108,27 @@ public class ApplicationMasterService extends AbstractService implements
   private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
   private final AMLivelinessMonitor amLivelinessMonitor;
   private YarnScheduler rScheduler;
-  private InetSocketAddress masterServiceAddress;
-  private Server server;
-  private final RecordFactory recordFactory =
+  protected InetSocketAddress masterServiceAddress;
+  protected Server server;
+  protected final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
   private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap =
       new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
-  private final RMContext rmContext;
+  protected final RMContext rmContext;
 
-  public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
-    super(ApplicationMasterService.class.getName());
+  public ApplicationMasterService(String name, RMContext rmContext,
+      YarnScheduler scheduler) {
+    super(name);
     this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
     this.rScheduler = scheduler;
     this.rmContext = rmContext;
   }
 
+  public ApplicationMasterService(RMContext rmContext,
+      YarnScheduler scheduler) {
+    this(ApplicationMasterService.class.getName(), rmContext, scheduler);
+  }
+
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     masterServiceAddress = conf.getSocketAddr(
@@ -139,11 +149,8 @@ public class ApplicationMasterService extends AbstractService implements
     serverConf.set(
         CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
         SaslRpcServer.AuthMethod.TOKEN.toString());
-    this.server =
-      rpc.getServer(ApplicationMasterProtocol.class, this, masterServiceAddress,
-          serverConf, this.rmContext.getAMRMTokenSecretManager(),
-          serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, 
-              YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
+    this.server = getServer(rpc, serverConf, masterServiceAddress,
+        this.rmContext.getAMRMTokenSecretManager());
     
     // Enable service authorization?
     if (conf.getBoolean(
@@ -158,7 +165,7 @@ public class ApplicationMasterService extends AbstractService implements
       }
       refreshServiceAcls(conf, RMPolicyProvider.getInstance());
     }
-    
+
     this.server.start();
     this.masterServiceAddress =
         conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
@@ -168,6 +175,14 @@ public class ApplicationMasterService extends AbstractService implements
     super.serviceStart();
   }
 
+  protected Server getServer(YarnRPC rpc, Configuration serverConf,
+      InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
+    return rpc.getServer(ApplicationMasterProtocol.class, this, addr,
+        serverConf, secretManager,
+        serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
+  }
+
   @Private
   public InetSocketAddress getBindAddress() {
     return this.masterServiceAddress;

+ 162 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java

@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
+
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.security
+    .AMRMTokenSecretManager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+
+public class DistributedSchedulingService extends ApplicationMasterService
+    implements DistributedSchedulerProtocol {
+
+  public DistributedSchedulingService(RMContext rmContext,
+      YarnScheduler scheduler) {
+    super(DistributedSchedulingService.class.getName(), rmContext, scheduler);
+  }
+
+  @Override
+  public Server getServer(YarnRPC rpc, Configuration serverConf,
+      InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
+    Server server = rpc.getServer(DistributedSchedulerProtocol.class, this,
+        addr, serverConf, secretManager,
+        serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
+    // To support application running no NMs that DO NOT support
+    // Dist Scheduling...
+    ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+        ApplicationMasterProtocolPB.class,
+        ApplicationMasterProtocolService.newReflectiveBlockingService(
+            new ApplicationMasterProtocolPBServiceImpl(this)));
+    return server;
+  }
+
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster
+      (RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return super.registerApplicationMaster(request);
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster
+      (FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return super.finishApplicationMaster(request);
+  }
+
+  @Override
+  public AllocateResponse allocate(AllocateRequest request) throws
+      YarnException, IOException {
+    return super.allocate(request);
+  }
+
+  @Override
+  public DistSchedRegisterResponse
+  registerApplicationMasterForDistributedScheduling(
+      RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    RegisterApplicationMasterResponse response =
+        registerApplicationMaster(request);
+    DistSchedRegisterResponse dsResp = recordFactory
+        .newRecordInstance(DistSchedRegisterResponse.class);
+    dsResp.setRegisterResponse(response);
+    dsResp.setMinAllocatableCapabilty(
+        Resource.newInstance(
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY,
+                YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY_DEFAULT),
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_MIN_VCORES,
+                YarnConfiguration.DIST_SCHEDULING_MIN_VCORES_DEFAULT)
+        )
+    );
+    dsResp.setMaxAllocatableCapabilty(
+        Resource.newInstance(
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY,
+                YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_DEFAULT),
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_MAX_VCORES,
+                YarnConfiguration.DIST_SCHEDULING_MAX_VCORES_DEFAULT)
+        )
+    );
+    dsResp.setIncrAllocatableCapabilty(
+        Resource.newInstance(
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY,
+                YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY_DEFAULT),
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_INCR_VCORES,
+                YarnConfiguration.DIST_SCHEDULING_INCR_VCORES_DEFAULT)
+        )
+    );
+    dsResp.setContainerTokenExpiryInterval(
+        getConfig().getInt(
+            YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS,
+            YarnConfiguration.
+                DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT));
+    dsResp.setContainerIdStart(
+        this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
+
+    // Set nodes to be used for scheduling
+    // TODO: The actual computation of the list will happen in YARN-4412
+    // TODO: Till then, send the complete list
+    dsResp.setNodesForScheduling(
+        new ArrayList<>(this.rmContext.getRMNodes().keySet()));
+    return dsResp;
+  }
+
+  @Override
+  public DistSchedAllocateResponse allocateForDistributedScheduling
+      (AllocateRequest request) throws YarnException, IOException {
+    AllocateResponse response = allocate(request);
+    DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance
+        (DistSchedAllocateResponse.class);
+    dsResp.setAllocateResponse(response);
+    dsResp.setNodesForScheduling(
+        new ArrayList<>(this.rmContext.getRMNodes().keySet()));
+    return dsResp;
+  }
+}

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -134,6 +134,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
    */
   public static final int SHUTDOWN_HOOK_PRIORITY = 30;
 
+  /**
+   * Used for generation of various ids.
+   */
+  public static final int EPOCH_BIT_SHIFT = 40;
+
   private static final Log LOG = LogFactory.getLog(ResourceManager.class);
   private static long clusterTimeStamp = System.currentTimeMillis();
 
@@ -1226,6 +1231,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
   }
 
   protected ApplicationMasterService createApplicationMasterService() {
+    if (this.rmContext.getYarnConfiguration().getBoolean(
+        YarnConfiguration.DIST_SCHEDULING_ENABLED,
+        YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
+      return new DistributedSchedulingService(this.rmContext, scheduler);
+    }
     return new ApplicationMasterService(this.rmContext, scheduler);
   }
 

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java

@@ -93,7 +93,8 @@ public class AppSchedulingInfo {
     this.queue = queue;
     this.user = user;
     this.activeUsersManager = activeUsersManager;
-    this.containerIdCounter = new AtomicLong(epoch << EPOCH_BIT_SHIFT);
+    this.containerIdCounter =
+        new AtomicLong(epoch << EPOCH_BIT_SHIFT);
     this.appResourceUsage = appResourceUsage;
   }
 

+ 17 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java

@@ -93,6 +93,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+
+
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
 import org.apache.log4j.Level;
@@ -740,6 +742,21 @@ public class MockRM extends ResourceManager {
 
   @Override
   protected ApplicationMasterService createApplicationMasterService() {
+    if (this.rmContext.getYarnConfiguration().getBoolean(
+        YarnConfiguration.DIST_SCHEDULING_ENABLED,
+        YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
+      return new DistributedSchedulingService(getRMContext(), scheduler) {
+        @Override
+        protected void serviceStart() {
+          // override to not start rpc handler
+        }
+
+        @Override
+        protected void serviceStop() {
+          // don't do anything
+        }
+      };
+    }
     return new ApplicationMasterService(getRMContext(), scheduler) {
       @Override
       protected void serviceStart() {

+ 170 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java

@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
+    .AMLivelinessMonitor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+
+public class TestDistributedSchedulingService {
+
+  // Test if the DistributedSchedulingService can handle both DSProtocol as
+  // well as AMProtocol clients
+  @Test
+  public void testRPCWrapping() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
+        .getName());
+    YarnRPC rpc = YarnRPC.create(conf);
+    String bindAddr = "localhost:0";
+    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
+    conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, addr);
+    final RecordFactory factory = RecordFactoryProvider.getRecordFactory(null);
+    final RMContext rmContext = new RMContextImpl() {
+      @Override
+      public AMLivelinessMonitor getAMLivelinessMonitor() {
+        return null;
+      }
+    };
+    DistributedSchedulingService service =
+        new DistributedSchedulingService(rmContext, null) {
+          @Override
+          public RegisterApplicationMasterResponse registerApplicationMaster
+              (RegisterApplicationMasterRequest request) throws
+              YarnException, IOException {
+            RegisterApplicationMasterResponse resp = factory.newRecordInstance(
+                RegisterApplicationMasterResponse.class);
+            // Dummy Entry to Assert that we get this object back
+            resp.setQueue("dummyQueue");
+            return resp;
+          }
+
+          @Override
+          public FinishApplicationMasterResponse finishApplicationMaster
+              (FinishApplicationMasterRequest request) throws YarnException,
+              IOException {
+            FinishApplicationMasterResponse resp = factory.newRecordInstance(
+                FinishApplicationMasterResponse.class);
+            // Dummy Entry to Assert that we get this object back
+            resp.setIsUnregistered(false);
+            return resp;
+          }
+
+          @Override
+          public AllocateResponse allocate(AllocateRequest request) throws
+              YarnException, IOException {
+            AllocateResponse response = factory.newRecordInstance
+                (AllocateResponse.class);
+            response.setNumClusterNodes(12345);
+            return response;
+          }
+
+          @Override
+          public DistSchedRegisterResponse
+          registerApplicationMasterForDistributedScheduling
+              (RegisterApplicationMasterRequest request) throws
+              YarnException, IOException {
+            DistSchedRegisterResponse resp = factory.newRecordInstance(
+                DistSchedRegisterResponse.class);
+            resp.setContainerIdStart(54321l);
+            return resp;
+          }
+
+          @Override
+          public DistSchedAllocateResponse allocateForDistributedScheduling
+              (AllocateRequest request) throws YarnException, IOException {
+            DistSchedAllocateResponse resp =
+                factory.newRecordInstance(DistSchedAllocateResponse.class);
+            resp.setNodesForScheduling(
+                Arrays.asList(NodeId.newInstance("h1", 1234)));
+            return resp;
+          }
+        };
+    Server server = service.getServer(rpc, conf, addr, null);
+    server.start();
+
+    // Verify that the DistrubutedSchedulingService can handle vanilla
+    // ApplicationMasterProtocol clients
+    RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class,
+        ProtobufRpcEngine.class);
+    ApplicationMasterProtocol ampProxy =
+        (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol
+            .class, NetUtils.getConnectAddress(server), conf);
+    RegisterApplicationMasterResponse regResp = ampProxy.registerApplicationMaster(
+            factory.newRecordInstance(RegisterApplicationMasterRequest.class));
+    Assert.assertEquals("dummyQueue", regResp.getQueue());
+    FinishApplicationMasterResponse finishResp = ampProxy
+        .finishApplicationMaster(factory.newRecordInstance(
+            FinishApplicationMasterRequest.class));
+    Assert.assertEquals(false, finishResp.getIsUnregistered());
+    AllocateResponse allocResp = ampProxy
+        .allocate(factory.newRecordInstance(AllocateRequest.class));
+    Assert.assertEquals(12345, allocResp.getNumClusterNodes());
+
+
+    // Verify that the DistrubutedSchedulingService can handle the
+    // DistributedSchedulerProtocol clients as well
+    RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class,
+        ProtobufRpcEngine.class);
+    DistributedSchedulerProtocol dsProxy =
+        (DistributedSchedulerProtocol) rpc.getProxy(DistributedSchedulerProtocol
+            .class, NetUtils.getConnectAddress(server), conf);
+
+    DistSchedRegisterResponse dsRegResp =
+        dsProxy.registerApplicationMasterForDistributedScheduling(
+        factory.newRecordInstance(RegisterApplicationMasterRequest.class));
+    Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
+    DistSchedAllocateResponse dsAllocResp =
+        dsProxy.allocateForDistributedScheduling(
+            factory.newRecordInstance(AllocateRequest.class));
+    Assert.assertEquals(
+        "h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
+  }
+}