Browse Source

YARN-2186. [YARN-1492] Node Manager uploader service for cache manager. (Chris Trezzo and Sangjin Lee via kasha)

(cherry picked from commit 256697acd5ec16bca022ae86e22f9882b3309d8b)
Karthik Kambatla 10 years ago
parent
commit
268af259b5
22 changed files with 1344 additions and 2 deletions
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 15 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  3. 14 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  4. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
  5. 83 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocol.java
  6. 28 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocolPB.java
  7. 93 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/SCMUploaderProtocolPBClientImpl.java
  8. 79 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/SCMUploaderProtocolPBServiceImpl.java
  9. 49 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadRequest.java
  10. 52 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadResponse.java
  11. 67 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyRequest.java
  12. 51 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyResponse.java
  13. 78 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadRequestPBImpl.java
  14. 75 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadResponsePBImpl.java
  15. 93 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyRequestPBImpl.java
  16. 73 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyResponsePBImpl.java
  17. 30 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/SCMUploader.proto
  18. 18 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
  19. 9 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
  20. 140 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheUploaderService.java
  21. 105 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/SharedCacheUploaderMetrics.java
  22. 188 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestSharedCacheUploaderService.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -15,6 +15,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2183. [YARN-1492] Cleaner service for cache manager.
     (Chris Trezzo and Sangjin Lee via kasha)
 
+    YARN-2186. [YARN-1492] Node Manager uploader service for cache manager. 
+    (Chris Trezzo and Sangjin Lee via kasha)
+
   IMPROVEMENTS
 
     YARN-1979. TestDirectoryCollection fails when the umask is unusual.

+ 15 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -31,7 +31,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 
@@ -1446,6 +1445,21 @@ public class YarnConfiguration extends Configuration {
       SCM_CLEANER_PREFIX + "resource-sleep-ms";
   public static final long DEFAULT_SCM_CLEANER_RESOURCE_SLEEP_MS = 0L;
 
+  /** The address of the node manager interface in the SCM. */
+  public static final String SCM_UPLOADER_SERVER_ADDRESS = SHARED_CACHE_PREFIX
+      + "uploader.server.address";
+  public static final int DEFAULT_SCM_UPLOADER_SERVER_PORT = 8046;
+  public static final String DEFAULT_SCM_UPLOADER_SERVER_ADDRESS = "0.0.0.0:"
+      + DEFAULT_SCM_UPLOADER_SERVER_PORT;
+
+  /**
+   * The number of SCM threads used to handle notify requests from the node
+   * manager.
+   */
+  public static final String SCM_UPLOADER_SERVER_THREAD_COUNT =
+      SHARED_CACHE_PREFIX + "uploader.server.thread-count";
+  public static final int DEFAULT_SCM_UPLOADER_SERVER_THREAD_COUNT = 50;
+
   ////////////////////////////////
   // Other Configs
   ////////////////////////////////

+ 14 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -1433,6 +1433,20 @@
     <value>0</value>
   </property>
 
+  <property>
+    <description>The address of the node manager interface in the SCM
+    (shared cache manager)</description>
+    <name>yarn.sharedcache.uploader.server.address</name>
+    <value>0.0.0.0:8046</value>
+  </property>
+
+  <property>
+    <description>The number of threads used to handle shared cache manager
+    requests from the node manager (50 by default)</description>
+    <name>yarn.sharedcache.uploader.server.thread-count</name>
+    <value>50</value>
+  </property>
+
   <!-- Other configuration -->
   <property>
     <description>The interval that the yarn client library uses to poll the

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml

@@ -153,6 +153,7 @@
                   <include>yarn_server_common_service_protos.proto</include>
                   <include>yarn_server_common_service_protos.proto</include>
                   <include>ResourceTracker.proto</include>
+                  <include>SCMUploader.proto</include>
                 </includes>
               </source>
               <output>${project.build.directory}/generated-sources/java</output>

+ 83 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocol.java

@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse;
+
+/**
+ * <p>
+ * The protocol between a <code>NodeManager's</code>
+ * <code>SharedCacheUploadService</code> and the
+ * <code>SharedCacheManager.</code>
+ * </p>
+ */
+@Private
+@Unstable
+public interface SCMUploaderProtocol {
+  /**
+   * <p>
+   * The method used by the NodeManager's <code>SharedCacheUploadService</code>
+   * to notify the shared cache manager of a newly cached resource.
+   * </p>
+   *
+   * <p>
+   * The <code>SharedCacheManager</code> responds with whether or not the
+   * NodeManager should delete the uploaded file.
+   * </p>
+   *
+   * @param request notify the shared cache manager of a newly uploaded resource
+   *          to the shared cache
+   * @return response indicating if the newly uploaded resource should be
+   *         deleted
+   * @throws YarnException
+   * @throws IOException
+   */
+  public SCMUploaderNotifyResponse
+      notify(SCMUploaderNotifyRequest request)
+      throws YarnException, IOException;
+
+  /**
+   * <p>
+   * The method used by the NodeManager's <code>SharedCacheUploadService</code>
+   * to request whether a resource can be uploaded.
+   * </p>
+   *
+   * <p>
+   * The <code>SharedCacheManager</code> responds with whether or not the
+   * NodeManager can upload the file.
+   * </p>
+   *
+   * @param request whether the resource can be uploaded to the shared cache
+   * @return response indicating if resource can be uploaded to the shared cache
+   * @throws YarnException
+   * @throws IOException
+   */
+  public SCMUploaderCanUploadResponse
+      canUpload(SCMUploaderCanUploadRequest request)
+      throws YarnException, IOException;
+
+}

+ 28 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocolPB.java

@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api;
+
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.yarn.proto.SCMUploaderProtocol.SCMUploaderProtocolService;
+
+@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.server.api.SCMUploaderProtocolPB",
+    protocolVersion = 1)
+public interface SCMUploaderProtocolPB extends
+    SCMUploaderProtocolService.BlockingInterface {
+
+}

+ 93 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/SCMUploaderProtocolPBClientImpl.java

@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.impl.pb.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyRequestProto;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderCanUploadRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderCanUploadResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderNotifyRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderNotifyResponsePBImpl;
+
+import com.google.protobuf.ServiceException;
+
+public class SCMUploaderProtocolPBClientImpl implements
+    SCMUploaderProtocol, Closeable {
+
+  private SCMUploaderProtocolPB proxy;
+
+  public SCMUploaderProtocolPBClientImpl(long clientVersion,
+      InetSocketAddress addr, Configuration conf) throws IOException {
+    RPC.setProtocolEngine(conf, SCMUploaderProtocolPB.class,
+      ProtobufRpcEngine.class);
+    proxy =
+        RPC.getProxy(SCMUploaderProtocolPB.class, clientVersion, addr, conf);
+  }
+
+  @Override
+  public void close() {
+    if (this.proxy != null) {
+      RPC.stopProxy(this.proxy);
+      this.proxy = null;
+    }
+  }
+
+  @Override
+  public SCMUploaderNotifyResponse notify(SCMUploaderNotifyRequest request)
+      throws YarnException, IOException {
+    SCMUploaderNotifyRequestProto requestProto =
+        ((SCMUploaderNotifyRequestPBImpl) request).getProto();
+    try {
+      return new SCMUploaderNotifyResponsePBImpl(proxy.notify(null,
+          requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public SCMUploaderCanUploadResponse canUpload(
+      SCMUploaderCanUploadRequest request) throws YarnException, IOException {
+    SCMUploaderCanUploadRequestProto requestProto =
+        ((SCMUploaderCanUploadRequestPBImpl)request).getProto();
+    try {
+      return new SCMUploaderCanUploadResponsePBImpl(proxy.canUpload(null,
+          requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+}

+ 79 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/SCMUploaderProtocolPBServiceImpl.java

@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.impl.pb.service;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyResponseProto;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderCanUploadRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderCanUploadResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderNotifyRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderNotifyResponsePBImpl;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+public class SCMUploaderProtocolPBServiceImpl implements
+    SCMUploaderProtocolPB {
+
+  private SCMUploaderProtocol real;
+
+  public SCMUploaderProtocolPBServiceImpl(SCMUploaderProtocol impl) {
+    this.real = impl;
+  }
+
+  @Override
+  public SCMUploaderNotifyResponseProto notify(RpcController controller,
+      SCMUploaderNotifyRequestProto proto) throws ServiceException {
+    SCMUploaderNotifyRequestPBImpl request =
+        new SCMUploaderNotifyRequestPBImpl(proto);
+    try {
+      SCMUploaderNotifyResponse response = real.notify(request);
+      return ((SCMUploaderNotifyResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public SCMUploaderCanUploadResponseProto canUpload(RpcController controller,
+      SCMUploaderCanUploadRequestProto proto)
+      throws ServiceException {
+    SCMUploaderCanUploadRequestPBImpl request =
+        new SCMUploaderCanUploadRequestPBImpl(proto);
+    try {
+      SCMUploaderCanUploadResponse response = real.canUpload(request);
+      return ((SCMUploaderCanUploadResponsePBImpl)response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+}

+ 49 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadRequest.java

@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * <p>
+ * The request from the NodeManager to the <code>SharedCacheManager</code> that
+ * requests whether it can upload a resource in the shared cache.
+ * </p>
+ */
+@Private
+@Unstable
+public abstract class SCMUploaderCanUploadRequest {
+
+  /**
+   * Get the <code>key</code> of the resource that would be uploaded to the
+   * shared cache.
+   *
+   * @return <code>key</code>
+   */
+  public abstract String getResourceKey();
+
+  /**
+   * Set the <code>key</code> of the resource that would be uploaded to the
+   * shared cache.
+   *
+   * @param key unique identifier for the resource
+   */
+  public abstract void setResourceKey(String key);
+}

+ 52 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadResponse.java

@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * <p>
+ * The response from the SharedCacheManager to the NodeManager that indicates
+ * whether the NodeManager can upload the resource to the shared cache. If it is
+ * not accepted by SCM, the NodeManager should not upload it to the shared
+ * cache.
+ * </p>
+ */
+@Private
+@Unstable
+public abstract class SCMUploaderCanUploadResponse {
+
+  /**
+   * Get whether or not the node manager can upload the resource to the shared
+   * cache.
+   *
+   * @return boolean True if the resource can be uploaded, false otherwise.
+   */
+  public abstract boolean getUploadable();
+
+  /**
+   * Set whether or not the node manager can upload the resource to the shared
+   * cache.
+   *
+   * @param b True if the resource can be uploaded, false otherwise.
+   */
+  public abstract void setUploadable(boolean b);
+
+}

+ 67 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyRequest.java

@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * <p>
+ * The request from the NodeManager to the <code>SharedCacheManager</code> that
+ * notifies that a resource has been uploaded to the shared cache. The
+ * <code>SharedCacheManager</code> may reject the resource for various reasons,
+ * in which case the NodeManager should remove it from the shared cache.
+ * </p>
+ */
+@Private
+@Unstable
+public abstract class SCMUploaderNotifyRequest {
+
+  /**
+   * Get the filename of the resource that was just uploaded to the shared
+   * cache.
+   *
+   * @return the filename
+   */
+  public abstract String getFileName();
+
+  /**
+   * Set the filename of the resource that was just uploaded to the shared
+   * cache.
+   *
+   * @param filename the name of the file
+   */
+  public abstract void setFilename(String filename);
+
+  /**
+   * Get the <code>key</code> of the resource that was just uploaded to the
+   * shared cache.
+   *
+   * @return <code>key</code>
+   */
+  public abstract String getResourceKey();
+
+  /**
+   * Set the <code>key</code> of the resource that was just uploaded to the
+   * shared cache.
+   *
+   * @param key unique identifier for the resource
+   */
+  public abstract void setResourceKey(String key);
+}

+ 51 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyResponse.java

@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * <p>
+ * The response from the SharedCacheManager to the NodeManager that indicates
+ * whether the NodeManager needs to delete the cached resource it was sending
+ * the notification for.
+ * </p>
+ */
+@Private
+@Unstable
+public abstract class SCMUploaderNotifyResponse {
+
+  /**
+   * Get whether or not the shared cache manager has accepted the notified
+   * resource (i.e. the uploaded file should remain in the cache).
+   *
+   * @return boolean True if the resource has been accepted, false otherwise.
+   */
+  public abstract boolean getAccepted();
+
+  /**
+   * Set whether or not the shared cache manager has accepted the notified
+   * resource (i.e. the uploaded file should remain in the cache).
+   *
+   * @param b True if the resource has been accepted, false otherwise.
+   */
+  public abstract void setAccepted(boolean b);
+
+}

+ 78 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadRequestPBImpl.java

@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadRequest;
+
+public class SCMUploaderCanUploadRequestPBImpl
+    extends SCMUploaderCanUploadRequest {
+  SCMUploaderCanUploadRequestProto proto =
+      SCMUploaderCanUploadRequestProto.getDefaultInstance();
+  SCMUploaderCanUploadRequestProto.Builder builder = null;
+  boolean viaProto = false;
+
+  public SCMUploaderCanUploadRequestPBImpl() {
+    builder = SCMUploaderCanUploadRequestProto.newBuilder();
+  }
+
+  public SCMUploaderCanUploadRequestPBImpl(
+      SCMUploaderCanUploadRequestProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public SCMUploaderCanUploadRequestProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public String getResourceKey() {
+    SCMUploaderCanUploadRequestProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.hasResourceKey()) ? p.getResourceKey() : null;
+  }
+
+  @Override
+  public void setResourceKey(String key) {
+    maybeInitBuilder();
+    if (key == null) {
+      builder.clearResourceKey();
+      return;
+    }
+    builder.setResourceKey(key);
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto)
+      maybeInitBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = SCMUploaderCanUploadRequestProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+}

+ 75 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadResponsePBImpl.java

@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadResponse;
+
+public class SCMUploaderCanUploadResponsePBImpl
+    extends SCMUploaderCanUploadResponse {
+  SCMUploaderCanUploadResponseProto proto =
+      SCMUploaderCanUploadResponseProto.getDefaultInstance();
+  SCMUploaderCanUploadResponseProto.Builder builder = null;
+  boolean viaProto = false;
+
+  public SCMUploaderCanUploadResponsePBImpl() {
+    builder = SCMUploaderCanUploadResponseProto.newBuilder();
+  }
+
+  public SCMUploaderCanUploadResponsePBImpl(
+      SCMUploaderCanUploadResponseProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public SCMUploaderCanUploadResponseProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public boolean getUploadable() {
+    SCMUploaderCanUploadResponseProtoOrBuilder p = viaProto ? proto : builder;
+    // Default to true, when in doubt allow the upload
+    return (p.hasUploadable()) ? p.getUploadable() : true;
+  }
+
+  @Override
+  public void setUploadable(boolean b) {
+    maybeInitBuilder();
+    builder.setUploadable(b);
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto)
+      maybeInitBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = SCMUploaderCanUploadResponseProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+}

+ 93 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyRequestPBImpl.java

@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
+
+public class SCMUploaderNotifyRequestPBImpl extends SCMUploaderNotifyRequest {
+  SCMUploaderNotifyRequestProto proto =
+      SCMUploaderNotifyRequestProto.getDefaultInstance();
+  SCMUploaderNotifyRequestProto.Builder builder = null;
+  boolean viaProto = false;
+
+  public SCMUploaderNotifyRequestPBImpl() {
+    builder = SCMUploaderNotifyRequestProto.newBuilder();
+  }
+
+  public SCMUploaderNotifyRequestPBImpl(
+      SCMUploaderNotifyRequestProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public SCMUploaderNotifyRequestProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public String getResourceKey() {
+    SCMUploaderNotifyRequestProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.hasResourceKey()) ? p.getResourceKey() : null;
+  }
+
+  @Override
+  public void setResourceKey(String key) {
+    maybeInitBuilder();
+    if (key == null) {
+      builder.clearResourceKey();
+      return;
+    }
+    builder.setResourceKey(key);
+  }
+
+  @Override
+  public String getFileName() {
+    SCMUploaderNotifyRequestProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.hasFilename()) ? p.getFilename() : null;
+  }
+
+  @Override
+  public void setFilename(String filename) {
+    maybeInitBuilder();
+    if (filename == null) {
+      builder.clearFilename();
+      return;
+    }
+    builder.setFilename(filename);
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto)
+      maybeInitBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = SCMUploaderNotifyRequestProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+}

+ 73 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyResponsePBImpl.java

@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse;
+
+public class SCMUploaderNotifyResponsePBImpl extends SCMUploaderNotifyResponse {
+  SCMUploaderNotifyResponseProto proto =
+      SCMUploaderNotifyResponseProto.getDefaultInstance();
+  SCMUploaderNotifyResponseProto.Builder builder = null;
+  boolean viaProto = false;
+
+  public SCMUploaderNotifyResponsePBImpl() {
+    builder = SCMUploaderNotifyResponseProto.newBuilder();
+  }
+
+  public SCMUploaderNotifyResponsePBImpl(SCMUploaderNotifyResponseProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public SCMUploaderNotifyResponseProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public boolean getAccepted() {
+    SCMUploaderNotifyResponseProtoOrBuilder p = viaProto ? proto : builder;
+    // Default to true, when in doubt just leave the file in the cache
+    return (p.hasAccepted()) ? p.getAccepted() : true;
+  }
+
+  @Override
+  public void setAccepted(boolean b) {
+    maybeInitBuilder();
+    builder.setAccepted(b);
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto)
+      maybeInitBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = SCMUploaderNotifyResponseProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+}

+ 30 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/SCMUploader.proto

@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.yarn.proto";
+option java_outer_classname = "SCMUploaderProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.yarn;
+
+import "yarn_server_common_service_protos.proto";
+
+service SCMUploaderProtocolService {
+  rpc notify(SCMUploaderNotifyRequestProto) returns (SCMUploaderNotifyResponseProto);
+  rpc canUpload(SCMUploaderCanUploadRequestProto) returns (SCMUploaderCanUploadResponseProto);
+}

+ 18 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto

@@ -75,4 +75,21 @@ message NMContainerStatusProto {
   optional string diagnostics = 5 [default = "N/A"];
   optional int32 container_exit_status = 6;
   optional int64 creation_time = 7;
-}
+}
+
+message SCMUploaderNotifyRequestProto {
+  optional string resource_key = 1;
+  optional string filename = 2;
+}
+
+message SCMUploaderNotifyResponseProto {
+  optional bool accepted = 1;
+}
+
+message SCMUploaderCanUploadRequestProto {
+  optional string resource_key = 1;
+}
+
+message SCMUploaderCanUploadResponseProto {
+  optional bool uploadable = 1;
+}

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java

@@ -67,6 +67,10 @@ public class SharedCacheManager extends CompositeService {
     CleanerService cs = createCleanerService(store);
     addService(cs);
 
+    SharedCacheUploaderService nms =
+        createNMCacheUploaderSCMProtocolService(store);
+    addService(nms);
+
     // init metrics
     DefaultMetricsSystem.initialize("SharedCacheManager");
     JvmMetrics.initSingleton("SharedCacheManager", null);
@@ -97,6 +101,11 @@ public class SharedCacheManager extends CompositeService {
     return new CleanerService(store);
   }
 
+  private SharedCacheUploaderService
+      createNMCacheUploaderSCMProtocolService(SCMStore store) {
+    return new SharedCacheUploaderService(store);
+  }
+
   @Override
   protected void serviceStop() throws Exception {
 

+ 140 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheUploaderService.java

@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.SharedCacheUploaderMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+
+/**
+ * This service handles all rpc calls from the NodeManager uploader to the
+ * shared cache manager.
+ */
+public class SharedCacheUploaderService extends AbstractService
+    implements SCMUploaderProtocol {
+  private final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+
+  private Server server;
+  InetSocketAddress bindAddress;
+  private final SCMStore store;
+  private SharedCacheUploaderMetrics metrics;
+
+  public SharedCacheUploaderService(SCMStore store) {
+    super(SharedCacheUploaderService.class.getName());
+    this.store = store;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    this.bindAddress = getBindAddress(conf);
+
+    super.serviceInit(conf);
+  }
+
+  InetSocketAddress getBindAddress(Configuration conf) {
+    return conf.getSocketAddr(YarnConfiguration.SCM_UPLOADER_SERVER_ADDRESS,
+        YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_ADDRESS,
+        YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_PORT);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    Configuration conf = getConfig();
+    this.metrics = SharedCacheUploaderMetrics.initSingleton(conf);
+
+    YarnRPC rpc = YarnRPC.create(conf);
+    this.server =
+        rpc.getServer(SCMUploaderProtocol.class, this, bindAddress,
+            conf, null, // Secret manager null for now (security not supported)
+            conf.getInt(YarnConfiguration.SCM_UPLOADER_SERVER_THREAD_COUNT,
+                YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_THREAD_COUNT));
+
+    // TODO (YARN-2774): Enable service authorization
+
+    this.server.start();
+    bindAddress =
+        conf.updateConnectAddr(YarnConfiguration.SCM_UPLOADER_SERVER_ADDRESS,
+            server.getListenerAddress());
+
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (this.server != null) {
+      this.server.stop();
+      this.server = null;
+    }
+
+    super.serviceStop();
+  }
+
+  @Override
+  public SCMUploaderNotifyResponse notify(SCMUploaderNotifyRequest request)
+      throws YarnException, IOException {
+    SCMUploaderNotifyResponse response =
+        recordFactory.newRecordInstance(SCMUploaderNotifyResponse.class);
+
+    // TODO (YARN-2774): proper security/authorization needs to be implemented
+
+    String filename =
+        store.addResource(request.getResourceKey(), request.getFileName());
+
+    boolean accepted = filename.equals(request.getFileName());
+
+    if (accepted) {
+      this.metrics.incAcceptedUploads();
+    } else {
+      this.metrics.incRejectedUploads();
+    }
+
+    response.setAccepted(accepted);
+
+    return response;
+  }
+
+  @Override
+  public SCMUploaderCanUploadResponse canUpload(
+      SCMUploaderCanUploadRequest request) throws YarnException, IOException {
+    // TODO (YARN-2781): we may want to have a more flexible policy of
+    // instructing the node manager to upload only if it meets a certain
+    // criteria
+    // until then we return true for now
+    SCMUploaderCanUploadResponse response =
+        recordFactory.newRecordInstance(SCMUploaderCanUploadResponse.class);
+    response.setUploadable(true);
+    return response;
+  }
+}

+ 105 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/SharedCacheUploaderMetrics.java

@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.sharedcachemanager.metrics;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+
+/**
+ * This class is for maintaining shared cache uploader requests metrics
+ * and publishing them through the metrics interfaces.
+ */
+@Private
+@Evolving
+@Metrics(about="shared cache upload metrics", context="yarn")
+public class SharedCacheUploaderMetrics {
+
+  static final Log LOG =
+      LogFactory.getLog(SharedCacheUploaderMetrics.class);
+  final MetricsRegistry registry;
+
+  SharedCacheUploaderMetrics() {
+    registry = new MetricsRegistry("SharedCacheUploaderRequests");
+    LOG.debug("Initialized "+ registry);
+  }
+
+  enum Singleton {
+    INSTANCE;
+
+    SharedCacheUploaderMetrics impl;
+
+    synchronized SharedCacheUploaderMetrics init(Configuration conf) {
+      if (impl == null) {
+        impl = create();
+      }
+      return impl;
+    }
+  }
+
+  public static SharedCacheUploaderMetrics
+      initSingleton(Configuration conf) {
+    return Singleton.INSTANCE.init(conf);
+  }
+
+  public static SharedCacheUploaderMetrics getInstance() {
+    SharedCacheUploaderMetrics topMetrics = Singleton.INSTANCE.impl;
+    if (topMetrics == null)
+      throw new IllegalStateException(
+          "The SharedCacheUploaderMetrics singleton instance is not"
+          + "initialized. Have you called init first?");
+    return topMetrics;
+  }
+
+  static SharedCacheUploaderMetrics create() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+
+    SharedCacheUploaderMetrics metrics =
+        new SharedCacheUploaderMetrics();
+    ms.register("SharedCacheUploaderRequests", null, metrics);
+    return metrics;
+  }
+
+  @Metric("Number of accepted uploads") MutableCounterLong acceptedUploads;
+  @Metric("Number of rejected uploads") MutableCounterLong rejectedUploads;
+
+  /**
+   * One accepted upload event
+   */
+  public void incAcceptedUploads() {
+    acceptedUploads.incr();
+  }
+
+  /**
+   * One rejected upload event
+   */
+  public void incRejectedUploads() {
+    rejectedUploads.incr();
+  }
+
+  public long getAcceptedUploads() { return acceptedUploads.value(); }
+  public long getRejectUploads() { return rejectedUploads.value(); }
+}

+ 188 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestSharedCacheUploaderService.java

@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.SharedCacheUploaderMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SharedCacheResourceReference;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+/**
+ * Basic unit tests for the NodeManger to SCM Protocol Service.
+ */
+public class TestSharedCacheUploaderService {
+  private static File testDir = null;
+
+  @BeforeClass
+  public static void setupTestDirs() throws IOException {
+    testDir = new File("target",
+        TestSharedCacheUploaderService.class.getCanonicalName());
+    testDir.delete();
+    testDir.mkdirs();
+    testDir = testDir.getAbsoluteFile();
+  }
+
+  @AfterClass
+  public static void cleanupTestDirs() throws IOException {
+    if (testDir != null) {
+      testDir.delete();
+    }
+  }
+
+  private SharedCacheUploaderService service;
+  private SCMUploaderProtocol proxy;
+  private SCMStore store;
+  private final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+
+  @Before
+  public void startUp() {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.SCM_STORE_CLASS,
+        InMemorySCMStore.class.getName());
+    conf.set(YarnConfiguration.SHARED_CACHE_ROOT, testDir.getPath());
+    AppChecker appChecker = mock(AppChecker.class);
+    store = new InMemorySCMStore(appChecker);
+    store.init(conf);
+    store.start();
+
+    service = new SharedCacheUploaderService(store);
+    service.init(conf);
+    service.start();
+
+    YarnRPC rpc = YarnRPC.create(new Configuration());
+
+    InetSocketAddress scmAddress =
+        conf.getSocketAddr(YarnConfiguration.SCM_UPLOADER_SERVER_ADDRESS,
+            YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_ADDRESS,
+            YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_PORT);
+
+    proxy =
+        (SCMUploaderProtocol) rpc.getProxy(
+            SCMUploaderProtocol.class, scmAddress, conf);
+  }
+
+  @After
+  public void cleanUp() {
+    if (store != null) {
+      store.stop();
+    }
+
+    if (service != null) {
+      service.stop();
+    }
+
+    if (proxy != null) {
+      RPC.stopProxy(proxy);
+    }
+  }
+
+  @Test
+  public void testNotify_noEntry() throws Exception {
+    long accepted =
+        SharedCacheUploaderMetrics.getInstance().getAcceptedUploads();
+
+    SCMUploaderNotifyRequest request =
+        recordFactory.newRecordInstance(SCMUploaderNotifyRequest.class);
+    request.setResourceKey("key1");
+    request.setFilename("foo.jar");
+    assertTrue(proxy.notify(request).getAccepted());
+    Collection<SharedCacheResourceReference> set =
+        store.getResourceReferences("key1");
+    assertNotNull(set);
+    assertEquals(0, set.size());
+
+    assertEquals(
+        "NM upload metrics aren't updated.", 1,
+        SharedCacheUploaderMetrics.getInstance().getAcceptedUploads() -
+            accepted);
+
+  }
+
+  @Test
+  public void testNotify_entryExists_differentName() throws Exception {
+
+    long rejected =
+        SharedCacheUploaderMetrics.getInstance().getRejectUploads();
+
+    store.addResource("key1", "foo.jar");
+    SCMUploaderNotifyRequest request =
+        recordFactory.newRecordInstance(SCMUploaderNotifyRequest.class);
+    request.setResourceKey("key1");
+    request.setFilename("foobar.jar");
+    assertFalse(proxy.notify(request).getAccepted());
+    Collection<SharedCacheResourceReference> set =
+        store.getResourceReferences("key1");
+    assertNotNull(set);
+    assertEquals(0, set.size());
+    assertEquals(
+        "NM upload metrics aren't updated.", 1,
+        SharedCacheUploaderMetrics.getInstance().getRejectUploads() -
+            rejected);
+
+  }
+
+  @Test
+  public void testNotify_entryExists_sameName() throws Exception {
+
+    long accepted =
+        SharedCacheUploaderMetrics.getInstance().getAcceptedUploads();
+
+    store.addResource("key1", "foo.jar");
+    SCMUploaderNotifyRequest request =
+        recordFactory.newRecordInstance(SCMUploaderNotifyRequest.class);
+    request.setResourceKey("key1");
+    request.setFilename("foo.jar");
+    assertTrue(proxy.notify(request).getAccepted());
+    Collection<SharedCacheResourceReference> set =
+        store.getResourceReferences("key1");
+    assertNotNull(set);
+    assertEquals(0, set.size());
+    assertEquals(
+        "NM upload metrics aren't updated.", 1,
+        SharedCacheUploaderMetrics.getInstance().getAcceptedUploads() -
+            accepted);
+
+  }
+}