فهرست منبع

YARN-5557. Add localize API to the ContainerManagementProtocol. Contributed by Jian He.

Junping Du 8 سال پیش
والد
کامیت
9ef632f3b0
20فایلهای تغییر یافته به همراه577 افزوده شده و 10 حذف شده
  1. 8 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
  2. 8 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
  3. 16 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java
  4. 84 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ResourceLocalizationRequest.java
  5. 40 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ResourceLocalizationResponse.java
  6. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto
  7. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
  8. 28 10
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java
  9. 20 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java
  10. 216 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ResourceLocalizationRequestPBImpl.java
  11. 69 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ResourceLocalizationResponsePBImpl.java
  12. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
  13. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java
  14. 15 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
  15. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
  16. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
  17. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
  18. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
  19. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java
  20. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java

+ 8 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java

@@ -32,6 +32,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
 import org.junit.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -468,5 +470,11 @@ public class TestContainerLauncher {
         SignalContainerRequest request) throws YarnException, IOException {
       return null;
     }
+
+    @Override
+    public ResourceLocalizationResponse localize(
+        ResourceLocalizationRequest request) throws YarnException, IOException {
+      return null;
+    }
   }
 }

+ 8 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java

@@ -50,6 +50,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequ
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
@@ -473,6 +475,12 @@ public class TestContainerLauncherImpl {
         SignalContainerRequest request) throws YarnException, IOException {
       return null;
     }
+
+    @Override
+    public ResourceLocalizationResponse localize(
+        ResourceLocalizationRequest request) throws YarnException, IOException {
+      return null;
+    }
   }
   
   @SuppressWarnings("serial")

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java

@@ -27,6 +27,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequ
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -199,4 +201,18 @@ public interface ContainerManagementProtocol {
 
   SignalContainerResponse signalToContainer(SignalContainerRequest request)
       throws YarnException, IOException;
+
+  /**
+   * Localize resources required by the container.
+   * Currently, this API only works for running containers.
+   *
+   * @param request Specify the resources to be localized.
+   * @return Response that the localize request is accepted.
+   * @throws YarnException Exception specific to YARN
+   * @throws IOException IOException thrown from the RPC layer.
+   */
+  @Public
+  @Unstable
+  ResourceLocalizationResponse localize(ResourceLocalizationRequest request)
+    throws YarnException, IOException;
 }

+ 84 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ResourceLocalizationRequest.java

@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.Map;
+
+/**
+ * The request sent by the ApplicationMaster to ask for localizing resources.
+ */
+@Public
+@Unstable
+public abstract class ResourceLocalizationRequest {
+
+  @Public
+  @Unstable
+  public static ResourceLocalizationRequest newInstance(ContainerId containerId,
+      Map<String, LocalResource> localResources) {
+    ResourceLocalizationRequest record =
+        Records.newRecord(ResourceLocalizationRequest.class);
+    record.setContainerId(containerId);
+    record.setLocalResources(localResources);
+    return record;
+  }
+
+  /**
+   * Get the <code>ContainerId</code> of the container to localize resources.
+   *
+   * @return <code>ContainerId</code> of the container to localize resources.
+   */
+  @Public
+  @Unstable
+  public abstract ContainerId getContainerId();
+
+  /**
+   * Set the <code>ContainerId</code> of the container to localize resources.
+   * @param containerId the containerId of the container.
+   */
+  @Private
+  @Unstable
+  public abstract void setContainerId(ContainerId containerId);
+
+  /**
+   * Get <code>LocalResource</code> required by the container.
+   *
+   * @return all <code>LocalResource</code> required by the container
+   */
+  @Public
+  @Unstable
+  public abstract Map<String, LocalResource> getLocalResources();
+
+  /**
+   * Set <code>LocalResource</code> required by the container. All pre-existing
+   * Map entries are cleared before adding the new Map
+   *
+   * @param localResources <code>LocalResource</code> required by the container
+   */
+  @Private
+  @Unstable
+  public abstract void setLocalResources(
+      Map<String, LocalResource> localResources);
+}

+ 40 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ResourceLocalizationResponse.java

@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The response to the {@link ResourceLocalizationRequest}
+ */
+@Public
+@Unstable
+public abstract class ResourceLocalizationResponse {
+
+  @Private
+  @Unstable
+  public static ResourceLocalizationResponse newInstance() {
+    ResourceLocalizationResponse record =
+        Records.newRecord(ResourceLocalizationResponse.class);
+    return record;
+  }
+}

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto

@@ -36,4 +36,5 @@ service ContainerManagementProtocolService {
   rpc getContainerStatuses(GetContainerStatusesRequestProto) returns (GetContainerStatusesResponseProto);
   rpc increaseContainersResource(IncreaseContainersResourceRequestProto) returns (IncreaseContainersResourceResponseProto);
   rpc signalToContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto);
+  rpc localize(ResourceLocalizationRequestProto) returns (ResourceLocalizationResponseProto);
 }

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto

@@ -263,6 +263,13 @@ message StopContainerRequestProto {
 message StopContainerResponseProto {
 }
 
+message ResourceLocalizationRequestProto {
+  optional ContainerIdProto container_id = 1;
+  repeated StringLocalResourceMapProto local_resources = 2;
+}
+
+message ResourceLocalizationResponseProto {
+}
 
 //// bulk API records
 message StartContainersRequestProto {

+ 28 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java

@@ -18,10 +18,7 @@
 
 package org.apache.hadoop.yarn.api.impl.pb.client;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
+import com.google.protobuf.ServiceException;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -30,20 +27,24 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
-import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ResourceLocalizationRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ResourceLocalizationResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl;
@@ -54,12 +55,15 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ResourceLocalizationRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto;
-import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto;
 
-import com.google.protobuf.ServiceException;
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
 
 @Private
 public class ContainerManagementProtocolPBClientImpl implements ContainerManagementProtocol,
@@ -167,4 +171,18 @@ public class ContainerManagementProtocolPBClientImpl implements ContainerManagem
       return null;
     }
   }
+
+  @Override
+  public ResourceLocalizationResponse localize(
+      ResourceLocalizationRequest request) throws YarnException, IOException {
+    ResourceLocalizationRequestProto requestProto =
+        ((ResourceLocalizationRequestPBImpl) request).getProto();
+    try {
+      return new ResourceLocalizationResponsePBImpl(
+          proxy.localize(null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
 }

+ 20 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
@@ -32,6 +33,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersReso
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ResourceLocalizationRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ResourceLocalizationResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl;
@@ -49,6 +52,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProt
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ResourceLocalizationRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ResourceLocalizationResponseProto;
 
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
@@ -136,4 +141,19 @@ public class ContainerManagementProtocolPBServiceImpl implements ContainerManage
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public ResourceLocalizationResponseProto localize(RpcController controller,
+      ResourceLocalizationRequestProto proto) throws ServiceException {
+    ResourceLocalizationRequestPBImpl request =
+        new ResourceLocalizationRequestPBImpl(proto);
+    try {
+      ResourceLocalizationResponse response = real.localize(request);
+      return ((ResourceLocalizationResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

+ 216 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ResourceLocalizationRequestPBImpl.java

@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import com.google.protobuf.TextFormat;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ResourceLocalizationRequestProto;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class ResourceLocalizationRequestPBImpl
+    extends ResourceLocalizationRequest {
+  private ResourceLocalizationRequestProto proto =
+      ResourceLocalizationRequestProto.getDefaultInstance();
+  private ResourceLocalizationRequestProto.Builder builder = null;
+  private boolean viaProto = false;
+
+  private Map<String, LocalResource> localResources = null;
+  private ContainerId containerId;
+
+  public ResourceLocalizationRequestPBImpl() {
+    builder = ResourceLocalizationRequestProto.newBuilder();
+  }
+
+  public ResourceLocalizationRequestPBImpl(
+      ResourceLocalizationRequestProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto) {
+      maybeInitBuilder();
+    }
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void mergeLocalToBuilder() {
+    if (this.containerId != null) {
+      builder.setContainerId(convertToProtoFormat(this.containerId));
+    }
+    if (this.localResources != null) {
+      addLocalResourcesToProto();
+    }
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = ResourceLocalizationRequestProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  private void addLocalResourcesToProto() {
+    maybeInitBuilder();
+    builder.clearLocalResources();
+    if (localResources == null) {
+      return;
+    }
+    Iterable<YarnProtos.StringLocalResourceMapProto> iterable =
+        new Iterable<YarnProtos.StringLocalResourceMapProto>() {
+
+          @Override
+          public Iterator<YarnProtos.StringLocalResourceMapProto> iterator() {
+            return new Iterator<YarnProtos.StringLocalResourceMapProto>() {
+
+              Iterator<String> keyIter = localResources.keySet().iterator();
+
+              @Override public void remove() {
+                throw new UnsupportedOperationException();
+              }
+
+              @Override public YarnProtos.StringLocalResourceMapProto next() {
+                String key = keyIter.next();
+                return YarnProtos.StringLocalResourceMapProto.newBuilder()
+                    .setKey(key).
+                        setValue(convertToProtoFormat(localResources.get(key)))
+                    .build();
+              }
+
+              @Override public boolean hasNext() {
+                return keyIter.hasNext();
+              }
+            };
+          }
+        };
+    builder.addAllLocalResources(iterable);
+  }
+
+  public ResourceLocalizationRequestProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+
+  private void initLocalResources() {
+    if (this.localResources != null) {
+      return;
+    }
+    YarnServiceProtos.ResourceLocalizationRequestProtoOrBuilder p =
+        viaProto ? proto : builder;
+    List<YarnProtos.StringLocalResourceMapProto> list =
+        p.getLocalResourcesList();
+    this.localResources = new HashMap<>();
+
+    for (YarnProtos.StringLocalResourceMapProto c : list) {
+      this.localResources.put(c.getKey(), convertFromProtoFormat(c.getValue()));
+    }
+  }
+
+  private LocalResourcePBImpl convertFromProtoFormat(
+      YarnProtos.LocalResourceProto p) {
+    return new LocalResourcePBImpl(p);
+  }
+
+  private ContainerIdPBImpl convertFromProtoFormat(
+      YarnProtos.ContainerIdProto p) {
+    return new ContainerIdPBImpl(p);
+  }
+
+  private YarnProtos.ContainerIdProto convertToProtoFormat(ContainerId t) {
+    return ((ContainerIdPBImpl) t).getProto();
+  }
+
+  private YarnProtos.LocalResourceProto convertToProtoFormat(LocalResource t) {
+    return ((LocalResourcePBImpl) t).getProto();
+  }
+
+  @Override
+  public ContainerId getContainerId() {
+    YarnServiceProtos.ResourceLocalizationRequestProtoOrBuilder p =
+        viaProto ? proto : builder;
+    if (this.containerId != null) {
+      return this.containerId;
+    }
+    if (!p.hasContainerId()) {
+      return null;
+    }
+    this.containerId = convertFromProtoFormat(p.getContainerId());
+    return this.containerId;
+  }
+
+  @Override
+  public void setContainerId(ContainerId containerId) {
+    maybeInitBuilder();
+    if (containerId == null) {
+      builder.clearContainerId();
+    }
+    this.containerId = containerId;
+  }
+
+  @Override
+  public Map<String, LocalResource> getLocalResources() {
+    initLocalResources();
+    return this.localResources;
+  }
+
+  @Override
+  public void setLocalResources(Map<String, LocalResource> localResources) {
+    if (localResources == null) {
+      this.localResources = null;
+      builder.clearLocalResources();
+      return;
+    }
+    this.localResources = new HashMap<>(localResources);
+  }
+}

+ 69 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ResourceLocalizationResponsePBImpl.java

@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import com.google.protobuf.TextFormat;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ResourceLocalizationResponseProto;
+
+public class ResourceLocalizationResponsePBImpl extends
+    ResourceLocalizationResponse {
+  ResourceLocalizationResponseProto proto =
+      ResourceLocalizationResponseProto.getDefaultInstance();
+  ResourceLocalizationResponseProto.Builder builder = null;
+  boolean viaProto = false;
+
+  public YarnServiceProtos.ResourceLocalizationResponseProto getProto() {
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  public ResourceLocalizationResponsePBImpl() {
+    builder = ResourceLocalizationResponseProto.newBuilder();
+  }
+
+  public ResourceLocalizationResponsePBImpl(
+      ResourceLocalizationResponseProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+}

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java

@@ -36,6 +36,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequ
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -198,5 +200,11 @@ public class TestContainerLaunchRPC {
           "Dummy function cause"));
       throw new YarnException(e);
     }
+
+    @Override
+    public ResourceLocalizationResponse localize(
+        ResourceLocalizationRequest request) throws YarnException, IOException {
+      return null;
+    }
   }
 }

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java

@@ -30,6 +30,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
@@ -180,5 +182,10 @@ public class TestContainerResourceIncreaseRPC {
         SignalContainerRequest request) throws YarnException, IOException {
       return null;
     }
+
+    @Override public ResourceLocalizationResponse localize(
+        ResourceLocalizationRequest request) throws YarnException, IOException {
+      return null;
+    }
   }
 }

+ 15 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java

@@ -100,6 +100,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionR
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ResourceLocalizationRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ResourceLocalizationResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersResponsePBImpl;
@@ -246,6 +248,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.Repla
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterRequestProto;
@@ -984,6 +987,18 @@ public class TestPBImplRecords {
         ContainerLaunchContextProto.class);
   }
 
+  @Test
+  public void testResourceLocalizationRequest() throws Exception {
+    validatePBImplRecord(ResourceLocalizationRequestPBImpl.class,
+        YarnServiceProtos.ResourceLocalizationRequestProto.class);
+  }
+
+  @Test
+  public void testResourceLocalizationResponse() throws Exception {
+    validatePBImplRecord(ResourceLocalizationResponsePBImpl.class,
+        YarnServiceProtos.ResourceLocalizationResponseProto.class);
+  }
+
   @Test
   public void testContainerPBImpl() throws Exception {
     validatePBImplRecord(ContainerPBImpl.class, ContainerProto.class);

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java

@@ -38,6 +38,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResp
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -347,6 +349,12 @@ public class TestRPC {
           new Exception(EXCEPTION_CAUSE));
       throw new YarnException(e);
     }
+
+    @Override
+    public ResourceLocalizationResponse localize(
+        ResourceLocalizationRequest request) throws YarnException, IOException {
+      return null;
+    }
   }
 
   public static ContainerTokenIdentifier newContainerTokenIdentifier(

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -59,6 +59,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -1514,6 +1516,14 @@ public class ContainerManagerImpl extends CompositeService implements
     return new SignalContainerResponsePBImpl();
   }
 
+  @Override
+  @SuppressWarnings("unchecked")
+  public ResourceLocalizationResponse localize(
+      ResourceLocalizationRequest request) throws YarnException, IOException {
+
+    return ResourceLocalizationResponse.newInstance();
+  }
+
   @SuppressWarnings("unchecked")
   private void internalSignalToContainer(SignalContainerRequest request,
       String sentBy) {

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager;
 
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.util.Collection;
 
 import org.apache.commons.logging.Log;
@@ -27,6 +28,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -204,4 +207,9 @@ public class DummyContainerManager extends ContainerManagerImpl {
     // do nothing
   }
 
+  @Override
+  public ResourceLocalizationResponse localize(
+      ResourceLocalizationRequest request) throws YarnException, IOException {
+    return null;
+  }
 }

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java

@@ -27,6 +27,8 @@ import java.util.Map;
 
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
 import org.junit.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -324,4 +326,10 @@ public class NodeManager implements ContainerManagementProtocol {
       SignalContainerRequest request) throws YarnException, IOException {
     throw new YarnException("Not supported yet!");
   }
+
+  @Override
+  public ResourceLocalizationResponse localize(
+      ResourceLocalizationRequest request) throws YarnException, IOException {
+    return null;
+  }
 }

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java

@@ -47,6 +47,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
@@ -168,6 +170,12 @@ public class TestAMAuthorization {
         SignalContainerRequest request) throws YarnException, IOException {
       return null;
     }
+
+    @Override
+    public ResourceLocalizationResponse localize(
+        ResourceLocalizationRequest request) throws YarnException, IOException {
+      return null;
+    }
   }
 
   public static class MockRMWithAMS extends MockRMWithCustomAMLauncher {

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java

@@ -37,6 +37,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequ
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -156,6 +158,12 @@ public class TestApplicationMasterLauncher {
         SignalContainerRequest request) throws YarnException, IOException {
       return null;
     }
+
+    @Override
+    public ResourceLocalizationResponse localize(
+        ResourceLocalizationRequest request) throws YarnException, IOException {
+      return null;
+    }
   }
 
   @Test