Browse Source

YARN-2188. [YARN-1492] Client service for cache manager. (Chris Trezzo and Sangjin Lee via kasha)

Karthik Kambatla 10 năm trước cách đây
mục cha
commit
fe1f2db5ee
22 tập tin đã thay đổi với 1570 bổ sung4 xóa
  1. 4 1
      hadoop-yarn-project/CHANGES.txt
  2. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml
  3. 90 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocol.java
  4. 28 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocolPB.java
  5. 67 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceRequest.java
  6. 37 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceResponse.java
  7. 70 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceRequest.java
  8. 55 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceResponse.java
  9. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  10. 30 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_SCM_protocol.proto
  11. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
  12. 93 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientSCMProtocolPBClientImpl.java
  13. 78 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientSCMProtocolPBServiceImpl.java
  14. 122 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceRequestPBImpl.java
  15. 53 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceResponsePBImpl.java
  16. 120 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceRequestPBImpl.java
  17. 79 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceResponsePBImpl.java
  18. 20 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  19. 192 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/ClientProtocolService.java
  20. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
  21. 113 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/ClientSCMMetrics.java
  22. 278 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestClientSCMProtocolService.java

+ 4 - 1
hadoop-yarn-project/CHANGES.txt

@@ -46,7 +46,10 @@ Release 2.7.0 - UNRELEASED
     (Chris Trezzo and Sangjin Lee via kasha)
 
     YARN-2236. [YARN-1492] Shared Cache uploader service on the Node 
-    Manager. (Chris Trezzo and Sanjin Lee via kasha)
+    Manager. (Chris Trezzo and Sangjin Lee via kasha)
+
+    YARN-2188. [YARN-1492] Client service for cache manager. 
+    (Chris Trezzo and Sangjin Lee via kasha)
 
 
   IMPROVEMENTS

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml

@@ -96,6 +96,7 @@
                   <include>server/resourcemanager_administration_protocol.proto</include>
                   <include>application_history_client.proto</include>
                   <include>server/application_history_server.proto</include>
+                  <include>client_SCM_protocol.proto</include>
                 </includes>
               </source>
               <output>${project.build.directory}/generated-sources/java</output>

+ 90 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocol.java

@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * <p>
+ * The protocol between clients and the <code>SharedCacheManager</code> to claim
+ * and release resources in the shared cache.
+ * </p>
+ */
+@Public
+@Unstable
+public interface ClientSCMProtocol {
+  /**
+   * <p>
+   * The interface used by clients to claim a resource with the
+   * <code>SharedCacheManager.</code> The client uses a checksum to identify the
+   * resource and an {@link ApplicationId} to identify which application will be
+   * using the resource.
+   * </p>
+   *
+   * <p>
+   * The <code>SharedCacheManager</code> responds with whether or not the
+   * resource exists in the cache. If the resource exists, a <code>Path</code>
+   * to the resource in the shared cache is returned. If the resource does not
+   * exist, the response is empty.
+   * </p>
+   *
+   * @param request request to claim a resource in the shared cache
+   * @return response indicating if the resource is already in the cache
+   * @throws YarnException
+   * @throws IOException
+   */
+  public UseSharedCacheResourceResponse use(
+      UseSharedCacheResourceRequest request) throws YarnException, IOException;
+
+  /**
+   * <p>
+   * The interface used by clients to release a resource with the
+   * <code>SharedCacheManager.</code> This method is called once an application
+   * is no longer using a claimed resource in the shared cache. The client uses
+   * a checksum to identify the resource and an {@link ApplicationId} to
+   * identify which application is releasing the resource.
+   * </p>
+   *
+   * <p>
+   * Note: This method is an optimization and the client is not required to call
+   * it for correctness.
+   * </p>
+   *
+   * <p>
+   * Currently the <code>SharedCacheManager</code> sends an empty response.
+   * </p>
+   *
+   * @param request request to release a resource in the shared cache
+   * @return (empty) response on releasing the resource
+   * @throws YarnException
+   * @throws IOException
+   */
+  public ReleaseSharedCacheResourceResponse release(
+      ReleaseSharedCacheResourceRequest request) throws YarnException, IOException;
+
+}

+ 28 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocolPB.java

@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api;
+
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.yarn.proto.ClientSCMProtocol.ClientSCMProtocolService;
+
+@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.api.ClientSCMProtocolPB",
+    protocolVersion = 1)
+public interface ClientSCMProtocolPB extends
+    ClientSCMProtocolService.BlockingInterface {
+
+}

+ 67 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceRequest.java

@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * <p>The request from clients to release a resource in the shared cache.</p>
+ */
+@Public
+@Unstable
+public abstract class ReleaseSharedCacheResourceRequest {
+
+  /**
+   * Get the <code>ApplicationId</code> of the resource to be released.
+   *
+   * @return <code>ApplicationId</code>
+   */
+  @Public
+  @Unstable
+  public abstract ApplicationId getAppId();
+
+  /**
+   * Set the <code>ApplicationId</code> of the resource to be released.
+   *
+   * @param id <code>ApplicationId</code>
+   */
+  @Public
+  @Unstable
+  public abstract void setAppId(ApplicationId id);
+
+  /**
+   * Get the <code>key</code> of the resource to be released.
+   *
+   * @return <code>key</code>
+   */
+  @Public
+  @Unstable
+  public abstract String getResourceKey();
+
+  /**
+   * Set the <code>key</code> of the resource to be released.
+   *
+   * @param key unique identifier for the resource
+   */
+  @Public
+  @Unstable
+  public abstract void setResourceKey(String key);
+}

+ 37 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceResponse.java

@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * <p>
+ * The response to clients from the <code>SharedCacheManager</code> when
+ * releasing a resource in the shared cache.
+ * </p>
+ *
+ * <p>
+ * Currently, this is empty.
+ * </p>
+ */
+@Public
+@Unstable
+public abstract class ReleaseSharedCacheResourceResponse {
+}

+ 70 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceRequest.java

@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * <p>
+ * The request from clients to the <code>SharedCacheManager</code> that claims a
+ * resource in the shared cache.
+ * </p>
+ */
+@Public
+@Unstable
+public abstract class UseSharedCacheResourceRequest {
+
+  /**
+   * Get the <code>ApplicationId</code> of the resource to be used.
+   *
+   * @return <code>ApplicationId</code>
+   */
+  @Public
+  @Unstable
+  public abstract ApplicationId getAppId();
+
+  /**
+   * Set the <code>ApplicationId</code> of the resource to be used.
+   *
+   * @param id <code>ApplicationId</code>
+   */
+  @Public
+  @Unstable
+  public abstract void setAppId(ApplicationId id);
+
+  /**
+   * Get the <code>key</code> of the resource to be used.
+   *
+   * @return <code>key</code>
+   */
+  @Public
+  @Unstable
+  public abstract String getResourceKey();
+
+  /**
+   * Set the <code>key</code> of the resource to be used.
+   *
+   * @param key unique identifier for the resource
+   */
+  @Public
+  @Unstable
+  public abstract void setResourceKey(String key);
+}

+ 55 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceResponse.java

@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * <p>
+ * The response from the SharedCacheManager to the client that indicates whether
+ * a requested resource exists in the cache.
+ * </p>
+ */
+@Public
+@Unstable
+public abstract class UseSharedCacheResourceResponse {
+
+  /**
+   * Get the <code>Path</code> corresponding to the requested resource in the
+   * shared cache.
+   *
+   * @return String A <code>Path</code> if the resource exists in the shared
+   *         cache, <code>null</code> otherwise
+   */
+  @Public
+  @Unstable
+  public abstract String getPath();
+
+  /**
+   * Set the <code>Path</code> corresponding to a resource in the shared cache.
+   *
+   * @param p A <code>Path</code> corresponding to a resource in the shared
+   *          cache
+   */
+  @Public
+  @Unstable
+  public abstract void setPath(String p);
+
+}

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -1472,6 +1472,18 @@ public class YarnConfiguration extends Configuration {
       SHARED_CACHE_PREFIX + "uploader.server.thread-count";
   public static final int DEFAULT_SCM_UPLOADER_SERVER_THREAD_COUNT = 50;
 
+  /** The address of the client interface in the SCM. */
+  public static final String SCM_CLIENT_SERVER_ADDRESS =
+      SHARED_CACHE_PREFIX + "client-server.address";
+  public static final int DEFAULT_SCM_CLIENT_SERVER_PORT = 8045;
+  public static final String DEFAULT_SCM_CLIENT_SERVER_ADDRESS = "0.0.0.0:"
+      + DEFAULT_SCM_CLIENT_SERVER_PORT;
+
+  /** The number of threads used to handle shared cache manager requests. */
+  public static final String SCM_CLIENT_SERVER_THREAD_COUNT =
+      SHARED_CACHE_PREFIX + "client-server.thread-count";
+  public static final int DEFAULT_SCM_CLIENT_SERVER_THREAD_COUNT = 50;
+
   /** the checksum algorithm implementation **/
   public static final String SHARED_CACHE_CHECKSUM_ALGO_IMPL =
       SHARED_CACHE_PREFIX + "checksum.algo.impl";

+ 30 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_SCM_protocol.proto

@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.yarn.proto";
+option java_outer_classname = "ClientSCMProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.yarn;
+
+import "yarn_service_protos.proto";
+
+service ClientSCMProtocolService {
+  rpc use (UseSharedCacheResourceRequestProto) returns (UseSharedCacheResourceResponseProto);
+  rpc release (ReleaseSharedCacheResourceRequestProto) returns (ReleaseSharedCacheResourceResponseProto);
+}

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto

@@ -306,6 +306,27 @@ message GetContainersResponseProto {
   repeated ContainerReportProto containers = 1;
 }
 
+//////////////////////////////////////////////////////
+/////// client_SCM_Protocol //////////////////////////
+//////////////////////////////////////////////////////
+
+message UseSharedCacheResourceRequestProto {
+  optional ApplicationIdProto applicationId = 1;
+  optional string resourceKey = 2;
+}
+
+message UseSharedCacheResourceResponseProto {
+  optional string path = 1;
+}
+
+message ReleaseSharedCacheResourceRequestProto {
+  optional ApplicationIdProto applicationId = 1;
+  optional string resourceKey = 2;
+}
+
+message ReleaseSharedCacheResourceResponseProto {
+}
+
 //////////////////////////////////////////////////////
 //  reservation_protocol
 //////////////////////////////////////////////////////

+ 93 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientSCMProtocolPBClientImpl.java

@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.impl.pb.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.api.ClientSCMProtocol;
+import org.apache.hadoop.yarn.api.ClientSCMProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReleaseSharedCacheResourceRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReleaseSharedCacheResourceResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceResponsePBImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceRequestProto;
+
+import com.google.protobuf.ServiceException;
+
+public class ClientSCMProtocolPBClientImpl implements ClientSCMProtocol,
+    Closeable {
+
+  private ClientSCMProtocolPB proxy;
+
+  public ClientSCMProtocolPBClientImpl(long clientVersion,
+      InetSocketAddress addr, Configuration conf) throws IOException {
+    RPC.setProtocolEngine(conf, ClientSCMProtocolPB.class,
+      ProtobufRpcEngine.class);
+    proxy = RPC.getProxy(ClientSCMProtocolPB.class, clientVersion, addr, conf);
+  }
+
+  @Override
+  public void close() {
+    if (this.proxy != null) {
+      RPC.stopProxy(this.proxy);
+      this.proxy = null;
+    }
+  }
+
+  @Override
+  public UseSharedCacheResourceResponse use(
+      UseSharedCacheResourceRequest request) throws YarnException, IOException {
+    UseSharedCacheResourceRequestProto requestProto =
+        ((UseSharedCacheResourceRequestPBImpl) request).getProto();
+    try {
+      return new UseSharedCacheResourceResponsePBImpl(proxy.use(null,
+          requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public ReleaseSharedCacheResourceResponse release(
+      ReleaseSharedCacheResourceRequest request) throws YarnException,
+      IOException {
+    ReleaseSharedCacheResourceRequestProto requestProto =
+        ((ReleaseSharedCacheResourceRequestPBImpl) request).getProto();
+    try {
+      return new ReleaseSharedCacheResourceResponsePBImpl(proxy.release(null,
+          requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+}

+ 78 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientSCMProtocolPBServiceImpl.java

@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.impl.pb.service;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.ClientSCMProtocol;
+import org.apache.hadoop.yarn.api.ClientSCMProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReleaseSharedCacheResourceRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReleaseSharedCacheResourceResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceResponsePBImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceResponseProto;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+public class ClientSCMProtocolPBServiceImpl implements ClientSCMProtocolPB {
+
+  private ClientSCMProtocol real;
+
+  public ClientSCMProtocolPBServiceImpl(ClientSCMProtocol impl) {
+    this.real = impl;
+  }
+
+  @Override
+  public UseSharedCacheResourceResponseProto use(RpcController controller,
+      UseSharedCacheResourceRequestProto proto) throws ServiceException {
+    UseSharedCacheResourceRequestPBImpl request =
+        new UseSharedCacheResourceRequestPBImpl(proto);
+    try {
+      UseSharedCacheResourceResponse response = real.use(request);
+      return ((UseSharedCacheResourceResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public ReleaseSharedCacheResourceResponseProto release(
+      RpcController controller, ReleaseSharedCacheResourceRequestProto proto)
+      throws ServiceException {
+    ReleaseSharedCacheResourceRequestPBImpl request =
+        new ReleaseSharedCacheResourceRequestPBImpl(proto);
+    try {
+      ReleaseSharedCacheResourceResponse response = real.release(request);
+      return ((ReleaseSharedCacheResourceResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+}

+ 122 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceRequestPBImpl.java

@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceRequestProtoOrBuilder;
+
+public class ReleaseSharedCacheResourceRequestPBImpl extends
+    ReleaseSharedCacheResourceRequest {
+  ReleaseSharedCacheResourceRequestProto proto =
+      ReleaseSharedCacheResourceRequestProto.getDefaultInstance();
+  ReleaseSharedCacheResourceRequestProto.Builder builder = null;
+  boolean viaProto = false;
+
+  private ApplicationId applicationId = null;
+
+  public ReleaseSharedCacheResourceRequestPBImpl() {
+    builder = ReleaseSharedCacheResourceRequestProto.newBuilder();
+  }
+
+  public ReleaseSharedCacheResourceRequestPBImpl(
+      ReleaseSharedCacheResourceRequestProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public ReleaseSharedCacheResourceRequestProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public ApplicationId getAppId() {
+    ReleaseSharedCacheResourceRequestProtoOrBuilder p =
+        viaProto ? proto : builder;
+    if (this.applicationId != null) {
+      return this.applicationId;
+    }
+    if (!p.hasApplicationId()) {
+      return null;
+    }
+    this.applicationId = convertFromProtoFormat(p.getApplicationId());
+    return this.applicationId;
+  }
+
+  @Override
+  public void setAppId(ApplicationId id) {
+    maybeInitBuilder();
+    if (id == null)
+      builder.clearApplicationId();
+    this.applicationId = id;
+  }
+
+  @Override
+  public String getResourceKey() {
+    ReleaseSharedCacheResourceRequestProtoOrBuilder p =
+        viaProto ? proto : builder;
+    return (p.hasResourceKey()) ? p.getResourceKey() : null;
+  }
+
+  @Override
+  public void setResourceKey(String key) {
+    maybeInitBuilder();
+    if (key == null) {
+      builder.clearResourceKey();
+      return;
+    }
+    builder.setResourceKey(key);
+  }
+
+  private void mergeLocalToBuilder() {
+    if (applicationId != null) {
+      builder.setApplicationId(convertToProtoFormat(this.applicationId));
+    }
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto)
+      maybeInitBuilder();
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = ReleaseSharedCacheResourceRequestProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+    return new ApplicationIdPBImpl(p);
+  }
+
+  private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+    return ((ApplicationIdPBImpl) t).getProto();
+  }
+
+}

+ 53 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceResponsePBImpl.java

@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceResponseProto;
+
+public class ReleaseSharedCacheResourceResponsePBImpl extends
+    ReleaseSharedCacheResourceResponse {
+  ReleaseSharedCacheResourceResponseProto proto =
+      ReleaseSharedCacheResourceResponseProto.getDefaultInstance();
+  ReleaseSharedCacheResourceResponseProto.Builder builder = null;
+  boolean viaProto = false;
+
+  public ReleaseSharedCacheResourceResponsePBImpl() {
+    builder = ReleaseSharedCacheResourceResponseProto.newBuilder();
+  }
+
+  public ReleaseSharedCacheResourceResponsePBImpl(
+      ReleaseSharedCacheResourceResponseProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public ReleaseSharedCacheResourceResponseProto getProto() {
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = ReleaseSharedCacheResourceResponseProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+}

+ 120 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceRequestPBImpl.java

@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceRequestProtoOrBuilder;
+
+public class UseSharedCacheResourceRequestPBImpl extends
+    UseSharedCacheResourceRequest {
+  UseSharedCacheResourceRequestProto proto = UseSharedCacheResourceRequestProto
+      .getDefaultInstance();
+  UseSharedCacheResourceRequestProto.Builder builder = null;
+  boolean viaProto = false;
+
+  private ApplicationId applicationId = null;
+
+  public UseSharedCacheResourceRequestPBImpl() {
+    builder = UseSharedCacheResourceRequestProto.newBuilder();
+  }
+
+  public UseSharedCacheResourceRequestPBImpl(
+      UseSharedCacheResourceRequestProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public UseSharedCacheResourceRequestProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public ApplicationId getAppId() {
+    UseSharedCacheResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.applicationId != null) {
+      return this.applicationId;
+    }
+    if (!p.hasApplicationId()) {
+      return null;
+    }
+    this.applicationId = convertFromProtoFormat(p.getApplicationId());
+    return this.applicationId;
+  }
+
+  @Override
+  public void setAppId(ApplicationId id) {
+    maybeInitBuilder();
+    if (id == null)
+      builder.clearApplicationId();
+    this.applicationId = id;
+  }
+
+  @Override
+  public String getResourceKey() {
+    UseSharedCacheResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.hasResourceKey()) ? p.getResourceKey() : null;
+  }
+
+  @Override
+  public void setResourceKey(String key) {
+    maybeInitBuilder();
+    if (key == null) {
+      builder.clearResourceKey();
+      return;
+    }
+    builder.setResourceKey(key);
+  }
+
+  private void mergeLocalToBuilder() {
+    if (applicationId != null) {
+      builder.setApplicationId(convertToProtoFormat(this.applicationId));
+    }
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto)
+      maybeInitBuilder();
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = UseSharedCacheResourceRequestProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+    return new ApplicationIdPBImpl(p);
+  }
+
+  private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+    return ((ApplicationIdPBImpl) t).getProto();
+  }
+
+}

+ 79 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceResponsePBImpl.java

@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceResponseProtoOrBuilder;
+
+public class UseSharedCacheResourceResponsePBImpl extends
+    UseSharedCacheResourceResponse {
+  UseSharedCacheResourceResponseProto proto =
+      UseSharedCacheResourceResponseProto
+      .getDefaultInstance();
+  UseSharedCacheResourceResponseProto.Builder builder = null;
+  boolean viaProto = false;
+
+  public UseSharedCacheResourceResponsePBImpl() {
+    builder = UseSharedCacheResourceResponseProto.newBuilder();
+  }
+
+  public UseSharedCacheResourceResponsePBImpl(
+      UseSharedCacheResourceResponseProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public UseSharedCacheResourceResponseProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public String getPath() {
+    UseSharedCacheResourceResponseProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.hasPath()) ? p.getPath() : null;
+  }
+
+  @Override
+  public void setPath(String path) {
+    maybeInitBuilder();
+    if (path == null) {
+      builder.clearPath();
+      return;
+    }
+    builder.setPath(path);
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto)
+      maybeInitBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = UseSharedCacheResourceResponseProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+}

+ 20 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -1459,19 +1459,36 @@
   </property>
 
   <property>
-    <description>The algorithm used to compute checksums of files (SHA-256 by default)</description>
+    <description>The address of the client interface in the SCM
+    (shared cache manager)</description>
+    <name>yarn.sharedcache.client-server.address</name>
+    <value>0.0.0.0:8045</value>
+  </property>
+
+  <property>
+    <description>The number of threads used to handle shared cache manager
+    requests from clients (50 by default)</description>
+    <name>yarn.sharedcache.client-server.thread-count</name>
+    <value>50</value>
+  </property>
+
+  <property>
+    <description>The algorithm used to compute checksums of files (SHA-256 by
+    default)</description>
     <name>yarn.sharedcache.checksum.algo.impl</name>
     <value>org.apache.hadoop.yarn.sharedcache.ChecksumSHA256Impl</value>
   </property>
 
   <property>
-    <description>The replication factor for the node manager uploader for the shared cache (10 by default)</description>
+    <description>The replication factor for the node manager uploader for the
+    shared cache (10 by default)</description>
     <name>yarn.sharedcache.nm.uploader.replication.factor</name>
     <value>10</value>
   </property>
 
   <property>
-    <description>The number of threads used to upload files from a node manager instance (20 by default)</description>
+    <description>The number of threads used to upload files from a node manager
+    instance (20 by default)</description>
     <name>yarn.sharedcache.nm.uploader.thread-count</name>
     <value>20</value>
   </property>

+ 192 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/ClientProtocolService.java

@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ClientSCMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.ClientSCMMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SharedCacheResourceReference;
+
+/**
+ * This service handles all rpc calls from the client to the shared cache
+ * manager.
+ */
+@Private
+@Evolving
+public class ClientProtocolService extends AbstractService implements
+    ClientSCMProtocol {
+
+  private static final Log LOG = LogFactory.getLog(ClientProtocolService.class);
+
+  private final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+
+  private Server server;
+  InetSocketAddress clientBindAddress;
+  private final SCMStore store;
+  private int cacheDepth;
+  private String cacheRoot;
+  private ClientSCMMetrics metrics;
+
+  public ClientProtocolService(SCMStore store) {
+    super(ClientProtocolService.class.getName());
+    this.store = store;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    this.clientBindAddress = getBindAddress(conf);
+
+    this.cacheDepth = SharedCacheUtil.getCacheDepth(conf);
+
+    this.cacheRoot =
+        conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+            YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+
+    super.serviceInit(conf);
+  }
+
+  InetSocketAddress getBindAddress(Configuration conf) {
+    return conf.getSocketAddr(YarnConfiguration.SCM_CLIENT_SERVER_ADDRESS,
+        YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_ADDRESS,
+        YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_PORT);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    Configuration conf = getConfig();
+    this.metrics = ClientSCMMetrics.initSingleton(conf);
+
+    YarnRPC rpc = YarnRPC.create(conf);
+    this.server =
+        rpc.getServer(ClientSCMProtocol.class, this,
+            clientBindAddress,
+            conf, null, // Secret manager null for now (security not supported)
+            conf.getInt(YarnConfiguration.SCM_CLIENT_SERVER_THREAD_COUNT,
+                YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_THREAD_COUNT));
+
+    // TODO (YARN-2774): Enable service authorization
+
+    this.server.start();
+    clientBindAddress =
+        conf.updateConnectAddr(YarnConfiguration.SCM_CLIENT_SERVER_ADDRESS,
+            server.getListenerAddress());
+
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (this.server != null) {
+      this.server.stop();
+    }
+
+    super.serviceStop();
+  }
+
+  @Override
+  public UseSharedCacheResourceResponse use(
+      UseSharedCacheResourceRequest request) throws YarnException,
+      IOException {
+
+    UseSharedCacheResourceResponse response =
+        recordFactory.newRecordInstance(UseSharedCacheResourceResponse.class);
+
+    UserGroupInformation callerUGI;
+    try {
+      callerUGI = UserGroupInformation.getCurrentUser();
+    } catch (IOException ie) {
+      LOG.info("Error getting UGI ", ie);
+      throw RPCUtil.getRemoteException(ie);
+    }
+
+    String fileName =
+        this.store.addResourceReference(request.getResourceKey(),
+            new SharedCacheResourceReference(request.getAppId(),
+                callerUGI.getShortUserName()));
+
+    if (fileName != null) {
+      response
+          .setPath(getCacheEntryFilePath(request.getResourceKey(), fileName));
+      this.metrics.incCacheHitCount();
+    } else {
+      this.metrics.incCacheMissCount();
+    }
+
+    return response;
+  }
+
+  @Override
+  public ReleaseSharedCacheResourceResponse release(
+      ReleaseSharedCacheResourceRequest request) throws YarnException,
+      IOException {
+
+    ReleaseSharedCacheResourceResponse response =
+        recordFactory
+            .newRecordInstance(ReleaseSharedCacheResourceResponse.class);
+
+    UserGroupInformation callerUGI;
+    try {
+      callerUGI = UserGroupInformation.getCurrentUser();
+    } catch (IOException ie) {
+      LOG.info("Error getting UGI ", ie);
+      throw RPCUtil.getRemoteException(ie);
+    }
+
+    boolean removed =
+        this.store.removeResourceReference(
+            request.getResourceKey(),
+            new SharedCacheResourceReference(request.getAppId(), callerUGI
+                .getShortUserName()), true);
+
+    if (removed) {
+      this.metrics.incCacheRelease();
+    }
+
+    return response;
+  }
+
+  private String getCacheEntryFilePath(String checksum, String filename) {
+    return SharedCacheUtil.getCacheEntryPath(this.cacheDepth,
+        this.cacheRoot, checksum) + Path.SEPARATOR_CHAR + filename;
+  }
+}

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java

@@ -71,6 +71,9 @@ public class SharedCacheManager extends CompositeService {
         createNMCacheUploaderSCMProtocolService(store);
     addService(nms);
 
+    ClientProtocolService cps = createClientProtocolService(store);
+    addService(cps);
+
     // init metrics
     DefaultMetricsSystem.initialize("SharedCacheManager");
     JvmMetrics.initSingleton("SharedCacheManager", null);
@@ -106,6 +109,10 @@ public class SharedCacheManager extends CompositeService {
     return new SharedCacheUploaderService(store);
   }
 
+  private ClientProtocolService createClientProtocolService(SCMStore store) {
+    return new ClientProtocolService(store);
+  }
+
   @Override
   protected void serviceStop() throws Exception {
 

+ 113 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/ClientSCMMetrics.java

@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.sharedcachemanager.metrics;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+
+/**
+ * This class is for maintaining  client requests metrics
+ * and publishing them through the metrics interfaces.
+ */
+@Private
+@Unstable
+@Metrics(about="Client SCM metrics", context="yarn")
+public class ClientSCMMetrics {
+
+  private static final Log LOG = LogFactory.getLog(ClientSCMMetrics.class);
+  final MetricsRegistry registry;
+
+  ClientSCMMetrics() {
+    registry = new MetricsRegistry("clientRequests");
+    LOG.debug("Initialized " + registry);
+  }
+
+  enum Singleton {
+    INSTANCE;
+
+    ClientSCMMetrics impl;
+
+    synchronized ClientSCMMetrics init(Configuration conf) {
+      if (impl == null) {
+        impl = create();
+      }
+      return impl;
+    }
+  }
+
+  public static ClientSCMMetrics initSingleton(Configuration conf) {
+    return Singleton.INSTANCE.init(conf);
+  }
+
+  public static ClientSCMMetrics getInstance() {
+    ClientSCMMetrics topMetrics = Singleton.INSTANCE.impl;
+    if (topMetrics == null) {
+      throw new IllegalStateException(
+          "The ClientSCMMetrics singleton instance is not initialized."
+          + " Have you called init first?");
+    }
+    return topMetrics;
+  }
+
+  static ClientSCMMetrics create() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+
+    ClientSCMMetrics metrics = new ClientSCMMetrics();
+    ms.register("clientRequests", null, metrics);
+    return metrics;
+  }
+
+  @Metric("Number of cache hits") MutableCounterLong cacheHits;
+  @Metric("Number of cache misses") MutableCounterLong cacheMisses;
+  @Metric("Number of cache releases") MutableCounterLong cacheReleases;
+
+  /**
+   * One cache hit event
+   */
+  public void incCacheHitCount() {
+    cacheHits.incr();
+  }
+
+  /**
+   * One cache miss event
+   */
+  public void incCacheMissCount() {
+    cacheMisses.incr();
+  }
+
+  /**
+   * One cache release event
+   */
+  public void incCacheRelease() {
+    cacheReleases.incr();
+  }
+
+  public long getCacheHits() { return cacheHits.value(); }
+  public long getCacheMisses() { return cacheMisses.value(); }
+  public long getCacheReleases() { return cacheReleases.value(); }
+
+}

+ 278 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestClientSCMProtocolService.java

@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ClientSCMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.ClientSCMMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SharedCacheResourceReference;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+/**
+ * Basic unit tests for the Client to SCM Protocol Service.
+ */
+public class TestClientSCMProtocolService {
+  private static File testDir = null;
+
+  @BeforeClass
+  public static void setupTestDirs() throws IOException {
+    testDir = new File("target",
+        TestSharedCacheUploaderService.class.getCanonicalName());
+    testDir.delete();
+    testDir.mkdirs();
+    testDir = testDir.getAbsoluteFile();
+  }
+
+  @AfterClass
+  public static void cleanupTestDirs() throws IOException {
+    if (testDir != null) {
+      testDir.delete();
+    }
+  }
+
+
+  private ClientProtocolService service;
+  private ClientSCMProtocol clientSCMProxy;
+  private SCMStore store;
+  private final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+
+  @Before
+  public void startUp() {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.SCM_STORE_CLASS,
+        InMemorySCMStore.class.getName());
+    conf.set(YarnConfiguration.SHARED_CACHE_ROOT, testDir.getPath());
+    AppChecker appChecker = mock(AppChecker.class);
+    store = new InMemorySCMStore(appChecker);
+    store.init(conf);
+    store.start();
+
+    service = new ClientProtocolService(store);
+    service.init(conf);
+    service.start();
+
+    YarnRPC rpc = YarnRPC.create(new Configuration());
+
+    InetSocketAddress scmAddress =
+        conf.getSocketAddr(YarnConfiguration.SCM_CLIENT_SERVER_ADDRESS,
+            YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_ADDRESS,
+            YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_PORT);
+
+    clientSCMProxy =
+        (ClientSCMProtocol) rpc.getProxy(ClientSCMProtocol.class, scmAddress,
+            conf);
+  }
+
+  @After
+  public void cleanUp() {
+    if (store != null) {
+      store.stop();
+      store = null;
+    }
+
+    if (service != null) {
+      service.stop();
+      service = null;
+    }
+
+    if (clientSCMProxy != null) {
+      RPC.stopProxy(clientSCMProxy);
+      clientSCMProxy = null;
+    }
+  }
+
+  @Test
+  public void testUse_MissingEntry() throws Exception {
+    long misses = ClientSCMMetrics.getInstance().getCacheMisses();
+    UseSharedCacheResourceRequest request =
+        recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class);
+    request.setResourceKey("key1");
+    request.setAppId(createAppId(1, 1L));
+    assertNull(clientSCMProxy.use(request).getPath());
+    assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics
+        .getInstance().getCacheMisses() - misses);
+  }
+
+  @Test
+  public void testUse_ExistingEntry_NoAppIds() throws Exception {
+    // Pre-populate the SCM with one cache entry
+    store.addResource("key1", "foo.jar");
+
+    long hits = ClientSCMMetrics.getInstance().getCacheHits();
+
+    UseSharedCacheResourceRequest request =
+        recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class);
+    request.setResourceKey("key1");
+    request.setAppId(createAppId(2, 2L));
+    // Expecting default depth of 3 and under the shared cache root dir
+    String expectedPath = testDir.getAbsolutePath() + "/k/e/y/key1/foo.jar";
+    assertEquals(expectedPath, clientSCMProxy.use(request).getPath());
+    assertEquals(1, store.getResourceReferences("key1").size());
+    assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics
+        .getInstance().getCacheHits() - hits);
+
+  }
+
+  @Test
+  public void testUse_ExistingEntry_OneId() throws Exception {
+    // Pre-populate the SCM with one cache entry
+    store.addResource("key1", "foo.jar");
+    store.addResourceReference("key1",
+        new SharedCacheResourceReference(createAppId(1, 1L), "user"));
+    assertEquals(1, store.getResourceReferences("key1").size());
+    long hits = ClientSCMMetrics.getInstance().getCacheHits();
+
+    // Add a new distinct appId
+    UseSharedCacheResourceRequest request =
+        recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class);
+    request.setResourceKey("key1");
+    request.setAppId(createAppId(2, 2L));
+
+    // Expecting default depth of 3 under the shared cache root dir
+    String expectedPath = testDir.getAbsolutePath() + "/k/e/y/key1/foo.jar";
+    assertEquals(expectedPath, clientSCMProxy.use(request).getPath());
+    assertEquals(2, store.getResourceReferences("key1").size());
+    assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics
+        .getInstance().getCacheHits() - hits);
+  }
+
+  @Test
+  public void testUse_ExistingEntry_DupId() throws Exception {
+    // Pre-populate the SCM with one cache entry
+    store.addResource("key1", "foo.jar");
+    UserGroupInformation testUGI = UserGroupInformation.getCurrentUser();
+    store.addResourceReference("key1",
+        new SharedCacheResourceReference(createAppId(1, 1L),
+            testUGI.getShortUserName()));
+    assertEquals(1, store.getResourceReferences("key1").size());
+
+    long hits = ClientSCMMetrics.getInstance().getCacheHits();
+
+    // Add a new duplicate appId
+    UseSharedCacheResourceRequest request =
+        recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class);
+    request.setResourceKey("key1");
+    request.setAppId(createAppId(1, 1L));
+
+    // Expecting default depth of 3 under the shared cache root dir
+    String expectedPath = testDir.getAbsolutePath() + "/k/e/y/key1/foo.jar";
+    assertEquals(expectedPath, clientSCMProxy.use(request).getPath());
+    assertEquals(1, store.getResourceReferences("key1").size());
+
+    assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics
+        .getInstance().getCacheHits() - hits);
+  }
+
+  @Test
+  public void testRelease_ExistingEntry_NonExistantAppId() throws Exception {
+    // Pre-populate the SCM with one cache entry
+    store.addResource("key1", "foo.jar");
+    store.addResourceReference("key1",
+        new SharedCacheResourceReference(createAppId(1, 1L), "user"));
+    assertEquals(1, store.getResourceReferences("key1").size());
+
+    long releases = ClientSCMMetrics.getInstance().getCacheReleases();
+
+    ReleaseSharedCacheResourceRequest request =
+        recordFactory
+            .newRecordInstance(ReleaseSharedCacheResourceRequest.class);
+    request.setResourceKey("key1");
+    request.setAppId(createAppId(2, 2L));
+    clientSCMProxy.release(request);
+    assertEquals(1, store.getResourceReferences("key1").size());
+
+    assertEquals(
+        "Client SCM metrics were updated when a release did not happen", 0,
+        ClientSCMMetrics.getInstance().getCacheReleases() - releases);
+
+  }
+
+  @Test
+  public void testRelease_ExistingEntry_WithAppId() throws Exception {
+    // Pre-populate the SCM with one cache entry
+    store.addResource("key1", "foo.jar");
+    UserGroupInformation testUGI = UserGroupInformation.getCurrentUser();
+    store.addResourceReference("key1",
+        new SharedCacheResourceReference(createAppId(1, 1L),
+            testUGI.getShortUserName()));
+    assertEquals(1, store.getResourceReferences("key1").size());
+
+    long releases = ClientSCMMetrics.getInstance().getCacheReleases();
+
+    ReleaseSharedCacheResourceRequest request =
+        recordFactory
+            .newRecordInstance(ReleaseSharedCacheResourceRequest.class);
+    request.setResourceKey("key1");
+    request.setAppId(createAppId(1, 1L));
+    clientSCMProxy.release(request);
+    assertEquals(0, store.getResourceReferences("key1").size());
+
+    assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics
+        .getInstance().getCacheReleases() - releases);
+
+  }
+
+  @Test
+  public void testRelease_MissingEntry() throws Exception {
+
+    long releases = ClientSCMMetrics.getInstance().getCacheReleases();
+
+    ReleaseSharedCacheResourceRequest request =
+        recordFactory
+            .newRecordInstance(ReleaseSharedCacheResourceRequest.class);
+    request.setResourceKey("key2");
+    request.setAppId(createAppId(2, 2L));
+    clientSCMProxy.release(request);
+    assertNotNull(store.getResourceReferences("key2"));
+    assertEquals(0, store.getResourceReferences("key2").size());
+    assertEquals(
+        "Client SCM metrics were updated when a release did not happen.", 0,
+        ClientSCMMetrics.getInstance().getCacheReleases() - releases);
+  }
+
+  private ApplicationId createAppId(int id, long timestamp) {
+    return ApplicationId.newInstance(timestamp, id);
+  }
+}