Browse Source

YARN-2236. [YARN-1492] Shared Cache uploader service on the Node Manager. (Chris Trezzo and Sanjin Lee via kasha)

Karthik Kambatla 10 năm trước cách đây
mục cha
commit
a04143039e
22 tập tin đã thay đổi với 1158 bổ sung12 xóa
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 38 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java
  3. 19 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  4. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
  5. 20 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java
  6. 37 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java
  7. 43 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksum.java
  8. 84 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksumFactory.java
  9. 3 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
  10. 18 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  11. 6 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
  12. 13 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
  13. 68 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
  14. 11 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
  15. 58 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEvent.java
  16. 28 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEventType.java
  17. 126 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java
  18. 289 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploader.java
  19. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
  20. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
  21. 50 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploadService.java
  22. 241 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploader.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -45,6 +45,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2186. [YARN-1492] Node Manager uploader service for cache manager. 
     (Chris Trezzo and Sangjin Lee via kasha)
 
+    YARN-2236. [YARN-1492] Shared Cache uploader service on the Node 
+    Manager. (Chris Trezzo and Sanjin Lee via kasha)
+
   IMPROVEMENTS
 
     YARN-1979. TestDirectoryCollection fails when the umask is unusual.

+ 38 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.api.records;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.util.Records;
 
@@ -48,6 +49,14 @@ public abstract class LocalResource {
   public static LocalResource newInstance(URL url, LocalResourceType type,
       LocalResourceVisibility visibility, long size, long timestamp,
       String pattern) {
+    return newInstance(url, type, visibility, size, timestamp, pattern, false);
+  }
+
+  @Public
+  @Unstable
+  public static LocalResource newInstance(URL url, LocalResourceType type,
+      LocalResourceVisibility visibility, long size, long timestamp,
+      String pattern, boolean shouldBeUploadedToSharedCache) {
     LocalResource resource = Records.newRecord(LocalResource.class);
     resource.setResource(url);
     resource.setType(type);
@@ -55,6 +64,7 @@ public abstract class LocalResource {
     resource.setSize(size);
     resource.setTimestamp(timestamp);
     resource.setPattern(pattern);
+    resource.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache);
     return resource;
   }
 
@@ -65,6 +75,15 @@ public abstract class LocalResource {
     return newInstance(url, type, visibility, size, timestamp, null);
   }
 
+  @Public
+  @Unstable
+  public static LocalResource newInstance(URL url, LocalResourceType type,
+      LocalResourceVisibility visibility, long size, long timestamp,
+      boolean shouldBeUploadedToSharedCache) {
+    return newInstance(url, type, visibility, size, timestamp, null,
+        shouldBeUploadedToSharedCache);
+  }
+
   /**
    * Get the <em>location</em> of the resource to be localized.
    * @return <em>location</em> of the resource to be localized
@@ -170,4 +189,23 @@ public abstract class LocalResource {
   @Public
   @Stable
   public abstract void setPattern(String pattern);
+
+  /**
+   * NM uses it to decide whether if it is necessary to upload the resource to
+   * the shared cache
+   */
+  @Public
+  @Unstable
+  public abstract boolean getShouldBeUploadedToSharedCache();
+
+  /**
+   * Inform NM whether upload to SCM is needed.
+   *
+   * @param shouldBeUploadedToSharedCache <em>shouldBeUploadedToSharedCache</em>
+   *          of this request
+   */
+  @Public
+  @Unstable
+  public abstract void setShouldBeUploadedToSharedCache(
+      boolean shouldBeUploadedToSharedCache);
 }

+ 19 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -1472,6 +1472,25 @@ public class YarnConfiguration extends Configuration {
       SHARED_CACHE_PREFIX + "uploader.server.thread-count";
   public static final int DEFAULT_SCM_UPLOADER_SERVER_THREAD_COUNT = 50;
 
+  /** the checksum algorithm implementation **/
+  public static final String SHARED_CACHE_CHECKSUM_ALGO_IMPL =
+      SHARED_CACHE_PREFIX + "checksum.algo.impl";
+  public static final String DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL =
+      "org.apache.hadoop.yarn.sharedcache.ChecksumSHA256Impl";
+
+  // node manager (uploader) configs
+  /**
+   * The replication factor for the node manager uploader for the shared cache.
+   */
+  public static final String SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR =
+      SHARED_CACHE_PREFIX + "nm.uploader.replication.factor";
+  public static final int DEFAULT_SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR =
+      10;
+
+  public static final String SHARED_CACHE_NM_UPLOADER_THREAD_COUNT =
+      SHARED_CACHE_PREFIX + "nm.uploader.thread-count";
+  public static final int DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT = 20;
+
   ////////////////////////////////
   // Other Configs
   ////////////////////////////////

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto

@@ -159,6 +159,7 @@ message LocalResourceProto {
   optional LocalResourceTypeProto type = 4;
   optional LocalResourceVisibilityProto visibility = 5;
   optional string pattern = 6;
+  optional bool should_be_uploaded_to_shared_cache = 7;
 }
 
 message ApplicationResourceUsageReportProto {

+ 20 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java

@@ -192,6 +192,26 @@ public class LocalResourcePBImpl extends LocalResource {
     builder.setPattern(pattern);
   }
 
+  @Override
+  public synchronized boolean getShouldBeUploadedToSharedCache() {
+    LocalResourceProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasShouldBeUploadedToSharedCache()) {
+      return false;
+    }
+    return p.getShouldBeUploadedToSharedCache();
+  }
+
+  @Override
+  public synchronized void setShouldBeUploadedToSharedCache(
+      boolean shouldBeUploadedToSharedCache) {
+    maybeInitBuilder();
+    if (!shouldBeUploadedToSharedCache) {
+      builder.clearShouldBeUploadedToSharedCache();
+      return;
+    }
+    builder.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache);
+  }
+
   private LocalResourceTypeProto convertToProtoFormat(LocalResourceType e) {
     return ProtoUtils.convertToProtoFormat(e);
   }

+ 37 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java

@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sharedcache;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Private
+@Evolving
+/**
+ * The SHA-256 implementation of the shared cache checksum interface.
+ */
+public class ChecksumSHA256Impl implements SharedCacheChecksum {
+  public String computeChecksum(InputStream in) throws IOException {
+    return DigestUtils.sha256Hex(in);
+  }
+}

+ 43 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksum.java

@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sharedcache;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+/**
+ * An interface to calculate a checksum for a resource in the shared cache. The
+ * checksum implementation should be thread safe.
+ */
+public interface SharedCacheChecksum {
+
+  /**
+   * Calculate the checksum of the passed input stream.
+   *
+   * @param in <code>InputStream</code> to be checksumed
+   * @return the message digest of the input stream
+   * @throws IOException
+   */
+  public String computeChecksum(InputStream in) throws IOException;
+}

+ 84 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksumFactory.java

@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sharedcache;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+@SuppressWarnings("unchecked")
+@Public
+@Evolving
+/**
+ * A factory class for creating checksum objects based on a configurable
+ * algorithm implementation
+ */
+public class SharedCacheChecksumFactory {
+  private static final
+      ConcurrentMap<Class<? extends SharedCacheChecksum>,SharedCacheChecksum>
+      instances =
+          new ConcurrentHashMap<Class<? extends SharedCacheChecksum>,
+          SharedCacheChecksum>();
+
+  private static final Class<? extends SharedCacheChecksum> defaultAlgorithm;
+
+  static {
+    try {
+      defaultAlgorithm = (Class<? extends SharedCacheChecksum>)
+          Class.forName(
+              YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL);
+    } catch (Exception e) {
+      // cannot happen
+      throw new ExceptionInInitializerError(e);
+    }
+  }
+
+  /**
+   * Get a new <code>SharedCacheChecksum</code> object based on the configurable
+   * algorithm implementation
+   * (see <code>yarn.sharedcache.checksum.algo.impl</code>)
+   *
+   * @return <code>SharedCacheChecksum</code> object
+   */
+  public static SharedCacheChecksum getChecksum(Configuration conf) {
+    Class<? extends SharedCacheChecksum> clazz =
+        conf.getClass(YarnConfiguration.SHARED_CACHE_CHECKSUM_ALGO_IMPL,
+        defaultAlgorithm, SharedCacheChecksum.class);
+    SharedCacheChecksum checksum = instances.get(clazz);
+    if (checksum == null) {
+      try {
+        checksum = ReflectionUtils.newInstance(clazz, conf);
+        SharedCacheChecksum old = instances.putIfAbsent(clazz, checksum);
+        if (old != null) {
+          checksum = old;
+        }
+      } catch (Exception e) {
+        throw new YarnRuntimeException(e);
+      }
+    }
+
+    return checksum;
+  }
+}

+ 3 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java

@@ -32,6 +32,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
@@ -134,8 +135,8 @@ public class FSDownload implements Callable<Path> {
    * @return true if the path in the current path is visible to all, false
    * otherwise
    */
-  @VisibleForTesting
-  static boolean isPublic(FileSystem fs, Path current, FileStatus sStat,
+  @Private
+  public static boolean isPublic(FileSystem fs, Path current, FileStatus sStat,
       LoadingCache<Path,Future<FileStatus>> statCache) throws IOException {
     current = fs.makeQualified(current);
     //the leaf level file should be readable by others

+ 18 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -1458,6 +1458,24 @@
     <value>50</value>
   </property>
 
+  <property>
+    <description>The algorithm used to compute checksums of files (SHA-256 by default)</description>
+    <name>yarn.sharedcache.checksum.algo.impl</name>
+    <value>org.apache.hadoop.yarn.sharedcache.ChecksumSHA256Impl</value>
+  </property>
+
+  <property>
+    <description>The replication factor for the node manager uploader for the shared cache (10 by default)</description>
+    <name>yarn.sharedcache.nm.uploader.replication.factor</name>
+    <value>10</value>
+  </property>
+
+  <property>
+    <description>The number of threads used to upload files from a node manager instance (20 by default)</description>
+    <name>yarn.sharedcache.nm.uploader.thread-count</name>
+    <value>20</value>
+  </property>
+
   <!-- Other configuration -->
   <property>
     <description>The interval that the yarn client library uses to poll the

+ 6 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java

@@ -70,6 +70,7 @@ import com.google.common.annotations.VisibleForTesting;
  * Builder utilities to construct various objects.
  *
  */
+@Private
 public class BuilderUtils {
 
   private static final RecordFactory recordFactory = RecordFactoryProvider
@@ -94,7 +95,8 @@ public class BuilderUtils {
   }
 
   public static LocalResource newLocalResource(URL url, LocalResourceType type,
-      LocalResourceVisibility visibility, long size, long timestamp) {
+      LocalResourceVisibility visibility, long size, long timestamp,
+      boolean shouldBeUploadedToSharedCache) {
     LocalResource resource =
       recordFactory.newRecordInstance(LocalResource.class);
     resource.setResource(url);
@@ -102,14 +104,15 @@ public class BuilderUtils {
     resource.setVisibility(visibility);
     resource.setSize(size);
     resource.setTimestamp(timestamp);
+    resource.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache);
     return resource;
   }
 
   public static LocalResource newLocalResource(URI uri,
       LocalResourceType type, LocalResourceVisibility visibility, long size,
-      long timestamp) {
+      long timestamp, boolean shouldBeUploadedToSharedCache) {
     return newLocalResource(ConverterUtils.getYarnUrlFromURI(uri), type,
-        visibility, size, timestamp);
+        visibility, size, timestamp, shouldBeUploadedToSharedCache);
   }
 
   public static ApplicationId newApplicationId(RecordFactory recordFactory,
@@ -245,7 +248,6 @@ public class BuilderUtils {
     return newToken(Token.class, identifier, kind, password, service);
   }
 
-  @Private
   @VisibleForTesting
   public static Token newContainerToken(NodeId nodeId,
       byte[] password, ContainerTokenIdentifier tokenIdentifier) {

+ 13 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -119,6 +119,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler;
@@ -227,6 +229,13 @@ public class ContainerManagerImpl extends CompositeService implements
     addIfService(logHandler);
     dispatcher.register(LogHandlerEventType.class, logHandler);
     
+    // add the shared cache upload service (it will do nothing if the shared
+    // cache is disabled)
+    SharedCacheUploadService sharedCacheUploader =
+        createSharedCacheUploaderService();
+    addService(sharedCacheUploader);
+    dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader);
+
     waitForContainersOnShutdownMillis =
         conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
             YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
@@ -367,6 +376,10 @@ public class ContainerManagerImpl extends CompositeService implements
         deletionContext, dirsHandler, context);
   }
 
+  protected SharedCacheUploadService createSharedCacheUploaderService() {
+    return new SharedCacheUploadService();
+  }
+
   protected ContainersLauncher createContainersLauncher(Context context,
       ContainerExecutor exec) {
     return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this);

+ 68 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java

@@ -27,6 +27,7 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -59,6 +60,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
@@ -104,6 +107,10 @@ public class ContainerImpl implements Container {
     new ArrayList<LocalResourceRequest>();
   private final List<LocalResourceRequest> appRsrcs =
     new ArrayList<LocalResourceRequest>();
+  private final Map<LocalResourceRequest, Path> resourcesToBeUploaded =
+      new ConcurrentHashMap<LocalResourceRequest, Path>();
+  private final Map<LocalResourceRequest, Boolean> resourcesUploadPolicies =
+      new ConcurrentHashMap<LocalResourceRequest, Boolean>();
 
   // whether container has been recovered after a restart
   private RecoveredContainerStatus recoveredStatus =
@@ -637,6 +644,8 @@ public class ContainerImpl implements Container {
                 container.pendingResources.put(req, links);
               }
               links.add(rsrc.getKey());
+              storeSharedCacheUploadPolicy(container, req, rsrc.getValue()
+                  .getShouldBeUploadedToSharedCache());
               switch (rsrc.getValue().getVisibility()) {
               case PUBLIC:
                 container.publicRsrcs.add(req);
@@ -685,31 +694,77 @@ public class ContainerImpl implements Container {
     }
   }
 
+  /**
+   * Store the resource's shared cache upload policies
+   * Given LocalResourceRequest can be shared across containers in
+   * LocalResourcesTrackerImpl, we preserve the upload policies here.
+   * In addition, it is possible for the application to create several
+   * "identical" LocalResources as part of
+   * ContainerLaunchContext.setLocalResources with different symlinks.
+   * There is a corner case where these "identical" local resources have
+   * different upload policies. For that scenario, upload policy will be set to
+   * true as long as there is at least one LocalResource entry with
+   * upload policy set to true.
+   */
+  private static void storeSharedCacheUploadPolicy(ContainerImpl container,
+      LocalResourceRequest resourceRequest, Boolean uploadPolicy) {
+    Boolean storedUploadPolicy =
+        container.resourcesUploadPolicies.get(resourceRequest);
+    if (storedUploadPolicy == null || (!storedUploadPolicy && uploadPolicy)) {
+      container.resourcesUploadPolicies.put(resourceRequest, uploadPolicy);
+    }
+  }
+
   /**
    * Transition when one of the requested resources for this container
    * has been successfully localized.
    */
   static class LocalizedTransition implements
       MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
+    @SuppressWarnings("unchecked")
     @Override
     public ContainerState transition(ContainerImpl container,
         ContainerEvent event) {
       ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
-      List<String> syms =
-          container.pendingResources.remove(rsrcEvent.getResource());
+      LocalResourceRequest resourceRequest = rsrcEvent.getResource();
+      Path location = rsrcEvent.getLocation();
+      List<String> syms = container.pendingResources.remove(resourceRequest);
       if (null == syms) {
-        LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
+        LOG.warn("Localized unknown resource " + resourceRequest +
                  " for container " + container.containerId);
         assert false;
         // fail container?
         return ContainerState.LOCALIZING;
       }
-      container.localizedResources.put(rsrcEvent.getLocation(), syms);
+      container.localizedResources.put(location, syms);
+
+      // check to see if this resource should be uploaded to the shared cache
+      // as well
+      if (shouldBeUploadedToSharedCache(container, resourceRequest)) {
+        container.resourcesToBeUploaded.put(resourceRequest, location);
+      }
       if (!container.pendingResources.isEmpty()) {
         return ContainerState.LOCALIZING;
       }
 
       container.sendLaunchEvent();
+
+      // If this is a recovered container that has already launched, skip
+      // uploading resources to the shared cache. We do this to avoid uploading
+      // the same resources multiple times. The tradeoff is that in the case of
+      // a recovered container, there is a chance that resources don't get
+      // uploaded into the shared cache. This is OK because resources are not
+      // acknowledged by the SCM until they have been uploaded by the node
+      // manager.
+      if (container.recoveredStatus != RecoveredContainerStatus.LAUNCHED
+          && container.recoveredStatus != RecoveredContainerStatus.COMPLETED) {
+        // kick off uploads to the shared cache
+        container.dispatcher.getEventHandler().handle(
+            new SharedCacheUploadEvent(container.resourcesToBeUploaded, container
+                .getLaunchContext(), container.getUser(),
+                SharedCacheUploadEventType.UPLOAD));
+      }
+
       container.metrics.endInitingContainer();
       return ContainerState.LOCALIZED;
     }
@@ -1018,4 +1073,13 @@ public class ContainerImpl implements Container {
   private boolean hasDefaultExitCode() {
     return (this.exitCode == ContainerExitStatus.INVALID);
   }
+
+  /**
+   * Returns whether the specific resource should be uploaded to the shared
+   * cache.
+   */
+  private static boolean shouldBeUploadedToSharedCache(ContainerImpl container,
+      LocalResourceRequest resource) {
+    return container.resourcesUploadPolicies.get(resource);
+  }
 }

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java

@@ -151,6 +151,17 @@ public class LocalResourceRequest
     return pattern;
   }
   
+  @Override
+  public boolean getShouldBeUploadedToSharedCache() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setShouldBeUploadedToSharedCache(
+      boolean shouldBeUploadedToSharedCache) {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   public void setResource(URL resource) {
     throw new UnsupportedOperationException();

+ 58 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEvent.java

@@ -0,0 +1,58 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
+
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+
+@Private
+@Unstable
+public class SharedCacheUploadEvent extends
+    AbstractEvent<SharedCacheUploadEventType> {
+  private final Map<LocalResourceRequest,Path> resources;
+  private final ContainerLaunchContext context;
+  private final String user;
+
+  public SharedCacheUploadEvent(Map<LocalResourceRequest,Path> resources,
+      ContainerLaunchContext context, String user,
+      SharedCacheUploadEventType eventType) {
+    super(eventType);
+    this.resources = resources;
+    this.context = context;
+    this.user = user;
+  }
+
+  public Map<LocalResourceRequest,Path> getResources() {
+    return resources;
+  }
+
+  public ContainerLaunchContext getContainerLaunchContext() {
+    return context;
+  }
+
+  public String getUser() {
+    return user;
+  }
+}

+ 28 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEventType.java

@@ -0,0 +1,28 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+@Private
+@Unstable
+public enum SharedCacheUploadEventType {
+  UPLOAD
+}

+ 126 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java

@@ -0,0 +1,126 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+@Private
+@Unstable
+/**
+ * Service that uploads localized files to the shared cache. The upload is
+ * considered not critical, and is done on a best-effort basis. Failure to
+ * upload is not fatal.
+ */
+public class SharedCacheUploadService extends AbstractService implements
+    EventHandler<SharedCacheUploadEvent> {
+  private static final Log LOG =
+      LogFactory.getLog(SharedCacheUploadService.class);
+
+  private boolean enabled;
+  private FileSystem fs;
+  private FileSystem localFs;
+  private ExecutorService uploaderPool;
+  private SCMUploaderProtocol scmClient;
+
+  public SharedCacheUploadService() {
+    super(SharedCacheUploadService.class.getName());
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    enabled = conf.getBoolean(YarnConfiguration.SHARED_CACHE_ENABLED,
+        YarnConfiguration.DEFAULT_SHARED_CACHE_ENABLED);
+    if (enabled) {
+      int threadCount =
+          conf.getInt(YarnConfiguration.SHARED_CACHE_NM_UPLOADER_THREAD_COUNT,
+              YarnConfiguration.DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT);
+      uploaderPool = Executors.newFixedThreadPool(threadCount,
+          new ThreadFactoryBuilder().
+            setNameFormat("Shared cache uploader #%d").
+            build());
+      scmClient = createSCMClient(conf);
+      try {
+        fs = FileSystem.get(conf);
+        localFs = FileSystem.getLocal(conf);
+      } catch (IOException e) {
+        LOG.error("Unexpected exception in getting the filesystem", e);
+        throw new RuntimeException(e);
+      }
+    }
+    super.serviceInit(conf);
+  }
+
+  private SCMUploaderProtocol createSCMClient(Configuration conf) {
+    YarnRPC rpc = YarnRPC.create(conf);
+    InetSocketAddress scmAddress =
+        conf.getSocketAddr(YarnConfiguration.SCM_UPLOADER_SERVER_ADDRESS,
+            YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_ADDRESS,
+            YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_PORT);
+    return (SCMUploaderProtocol)rpc.getProxy(
+        SCMUploaderProtocol.class, scmAddress, conf);
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (enabled) {
+      uploaderPool.shutdown();
+      RPC.stopProxy(scmClient);
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public void handle(SharedCacheUploadEvent event) {
+    if (enabled) {
+      Map<LocalResourceRequest,Path> resources = event.getResources();
+      for (Map.Entry<LocalResourceRequest,Path> e: resources.entrySet()) {
+        SharedCacheUploader uploader =
+            new SharedCacheUploader(e.getKey(), e.getValue(), event.getUser(),
+                getConfig(), scmClient, fs, localFs);
+        // fire off an upload task
+        uploaderPool.submit(uploader);
+      }
+    }
+  }
+
+  public boolean isEnabled() {
+    return enabled;
+  }
+}

+ 289 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploader.java

@@ -0,0 +1,289 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.URISyntaxException;
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
+import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
+import org.apache.hadoop.yarn.sharedcache.SharedCacheChecksum;
+import org.apache.hadoop.yarn.sharedcache.SharedCacheChecksumFactory;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.FSDownload;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The callable class that handles the actual upload to the shared cache.
+ */
+class SharedCacheUploader implements Callable<Boolean> {
+  // rwxr-xr-x
+  static final FsPermission DIRECTORY_PERMISSION =
+      new FsPermission((short)00755);
+  // r-xr-xr-x
+  static final FsPermission FILE_PERMISSION =
+      new FsPermission((short)00555);
+
+  private static final Log LOG = LogFactory.getLog(SharedCacheUploader.class);
+  private static final ThreadLocal<Random> randomTl =
+      new ThreadLocal<Random>() {
+        @Override
+        protected Random initialValue() {
+          return new Random(System.nanoTime());
+        }
+      };
+
+  private final LocalResource resource;
+  private final Path localPath;
+  private final String user;
+  private final Configuration conf;
+  private final SCMUploaderProtocol scmClient;
+  private final FileSystem fs;
+  private final FileSystem localFs;
+  private final String sharedCacheRootDir;
+  private final int nestedLevel;
+  private final SharedCacheChecksum checksum;
+  private final RecordFactory recordFactory;
+
+  public SharedCacheUploader(LocalResource resource, Path localPath,
+      String user, Configuration conf, SCMUploaderProtocol scmClient)
+          throws IOException {
+    this(resource, localPath, user, conf, scmClient,
+        FileSystem.get(conf), localPath.getFileSystem(conf));
+  }
+
+  /**
+   * @param resource the local resource that contains the original remote path
+   * @param localPath the path in the local filesystem where the resource is
+   * localized
+   * @param fs the filesystem of the shared cache
+   * @param localFs the local filesystem
+   */
+  public SharedCacheUploader(LocalResource resource, Path localPath,
+      String user, Configuration conf, SCMUploaderProtocol scmClient,
+      FileSystem fs, FileSystem localFs) {
+    this.resource = resource;
+    this.localPath = localPath;
+    this.user = user;
+    this.conf = conf;
+    this.scmClient = scmClient;
+    this.fs = fs;
+    this.sharedCacheRootDir =
+        conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+            YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+    this.nestedLevel = SharedCacheUtil.getCacheDepth(conf);
+    this.checksum = SharedCacheChecksumFactory.getChecksum(conf);
+    this.localFs = localFs;
+    this.recordFactory = RecordFactoryProvider.getRecordFactory(null);
+  }
+
+  /**
+   * Uploads the file under the shared cache, and notifies the shared cache
+   * manager. If it is unable to upload the file because it already exists, it
+   * returns false.
+   */
+  @Override
+  public Boolean call() throws Exception {
+    Path tempPath = null;
+    try {
+      if (!verifyAccess()) {
+        LOG.warn("User " + user + " is not authorized to upload file " +
+            localPath.getName());
+        return false;
+      }
+
+      // first determine the actual local path that will be used for upload
+      Path actualPath = getActualPath();
+      // compute the checksum
+      String checksumVal = computeChecksum(actualPath);
+      // create the directory (if it doesn't exist)
+      Path directoryPath =
+          new Path(SharedCacheUtil.getCacheEntryPath(nestedLevel,
+              sharedCacheRootDir, checksumVal));
+      // let's not check if the directory already exists: in the vast majority
+      // of the cases, the directory does not exist; as long as mkdirs does not
+      // error out if it exists, we should be fine
+      fs.mkdirs(directoryPath, DIRECTORY_PERMISSION);
+      // create the temporary file
+      tempPath = new Path(directoryPath, getTemporaryFileName(actualPath));
+      if (!uploadFile(actualPath, tempPath)) {
+        LOG.warn("Could not copy the file to the shared cache at " + tempPath);
+        return false;
+      }
+
+      // set the permission so that it is readable but not writable
+      fs.setPermission(tempPath, FILE_PERMISSION);
+      // rename it to the final filename
+      Path finalPath = new Path(directoryPath, actualPath.getName());
+      if (!fs.rename(tempPath, finalPath)) {
+        LOG.warn("The file already exists under " + finalPath +
+            ". Ignoring this attempt.");
+        deleteTempFile(tempPath);
+        return false;
+      }
+
+      // notify the SCM
+      if (!notifySharedCacheManager(checksumVal, actualPath.getName())) {
+        // the shared cache manager rejected the upload (as it is likely
+        // uploaded under a different name
+        // clean up this file and exit
+        fs.delete(finalPath, false);
+        return false;
+      }
+
+      // set the replication factor
+      short replication =
+          (short)conf.getInt(YarnConfiguration.SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR,
+              YarnConfiguration.DEFAULT_SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR);
+      fs.setReplication(finalPath, replication);
+      LOG.info("File " + actualPath.getName() +
+          " was uploaded to the shared cache at " + finalPath);
+      return true;
+    } catch (IOException e) {
+      LOG.warn("Exception while uploading the file " + localPath.getName(), e);
+      // in case an exception is thrown, delete the temp file
+      deleteTempFile(tempPath);
+      throw e;
+    }
+  }
+
+  @VisibleForTesting
+  Path getActualPath() throws IOException {
+    Path path = localPath;
+    FileStatus status = localFs.getFileStatus(path);
+    if (status != null && status.isDirectory()) {
+      // for certain types of resources that get unpacked, the original file may
+      // be found under the directory with the same name (see
+      // FSDownload.unpack); check if the path is a directory and if so look
+      // under it
+      path = new Path(path, path.getName());
+    }
+    return path;
+  }
+
+  private void deleteTempFile(Path tempPath) {
+    try {
+      if (tempPath != null && fs.exists(tempPath)) {
+        fs.delete(tempPath, false);
+      }
+    } catch (IOException ignore) {}
+  }
+
+  /**
+   * Checks that the (original) remote file is either owned by the user who
+   * started the app or public.
+   */
+  @VisibleForTesting
+  boolean verifyAccess() throws IOException {
+    // if it is in the public cache, it's trivially OK
+    if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) {
+      return true;
+    }
+
+    final Path remotePath;
+    try {
+      remotePath = ConverterUtils.getPathFromYarnURL(resource.getResource());
+    } catch (URISyntaxException e) {
+      throw new IOException("Invalid resource", e);
+    }
+
+    // get the file status of the HDFS file
+    FileSystem remoteFs = remotePath.getFileSystem(conf);
+    FileStatus status = remoteFs.getFileStatus(remotePath);
+    // check to see if the file has been modified in any way
+    if (status.getModificationTime() != resource.getTimestamp()) {
+      LOG.warn("The remote file " + remotePath +
+          " has changed since it's localized; will not consider it for upload");
+      return false;
+    }
+
+    // check for the user ownership
+    if (status.getOwner().equals(user)) {
+      return true; // the user owns the file
+    }
+    // check if the file is publicly readable otherwise
+    return fileIsPublic(remotePath, remoteFs, status);
+  }
+
+  @VisibleForTesting
+  boolean fileIsPublic(final Path remotePath, FileSystem remoteFs,
+      FileStatus status) throws IOException {
+    return FSDownload.isPublic(remoteFs, remotePath, status, null);
+  }
+
+  /**
+   * Uploads the file to the shared cache under a temporary name, and returns
+   * the result.
+   */
+  @VisibleForTesting
+  boolean uploadFile(Path sourcePath, Path tempPath) throws IOException {
+    return FileUtil.copy(localFs, sourcePath, fs, tempPath, false, conf);
+  }
+
+  @VisibleForTesting
+  String computeChecksum(Path path) throws IOException {
+    InputStream is = localFs.open(path);
+    try {
+      return checksum.computeChecksum(is);
+    } finally {
+      try { is.close(); } catch (IOException ignore) {}
+    }
+  }
+
+  private String getTemporaryFileName(Path path) {
+    return path.getName() + "-" + randomTl.get().nextLong();
+  }
+
+  @VisibleForTesting
+  boolean notifySharedCacheManager(String checksumVal, String fileName)
+      throws IOException {
+    try {
+      SCMUploaderNotifyRequest request =
+          recordFactory.newRecordInstance(SCMUploaderNotifyRequest.class);
+      request.setResourceKey(checksumVal);
+      request.setFilename(fileName);
+      return scmClient.notify(request).getAccepted();
+    } catch (YarnException e) {
+      throw new IOException(e);
+    } catch (UndeclaredThrowableException e) {
+      // retrieve the cause of the exception and throw it as an IOException
+      throw new IOException(e.getCause() == null ? e : e.getCause());
+    }
+  }
+}

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java

@@ -642,7 +642,7 @@ public class TestContainer {
     URL url = BuilderUtils.newURL("file", null, 0, "/local" + vis + "/" + name);
     LocalResource rsrc =
         BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis,
-            r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L);
+            r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L, false);
     return new SimpleEntry<String, LocalResource>(name, rsrc);
   }
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

@@ -1760,7 +1760,7 @@ public class TestResourceLocalizationService {
     URL url = getPath("/local/PRIVATE/" + name);
     LocalResource rsrc =
         BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis,
-            r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L);
+            r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L, false);
     return rsrc;
   }
   

+ 50 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploadService.java

@@ -0,0 +1,50 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
+
+import static org.junit.Assert.assertSame;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Test;
+
+public class TestSharedCacheUploadService {
+
+  @Test
+  public void testInitDisabled() {
+    testInit(false);
+  }
+
+  @Test
+  public void testInitEnabled() {
+    testInit(true);
+  }
+
+  public void testInit(boolean enabled) {
+    Configuration conf = new Configuration();
+    conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, enabled);
+
+    SharedCacheUploadService service = new SharedCacheUploadService();
+    service.init(conf);
+    assertSame(enabled, service.isEnabled());
+
+    service.stop();
+  }
+
+}

+ 241 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploader.java

@@ -0,0 +1,241 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse;
+import org.junit.Test;
+
+public class TestSharedCacheUploader {
+
+  /**
+   * If verifyAccess fails, the upload should fail
+   */
+  @Test
+  public void testFailVerifyAccess() throws Exception {
+    SharedCacheUploader spied = createSpiedUploader();
+    doReturn(false).when(spied).verifyAccess();
+
+    assertFalse(spied.call());
+  }
+
+  /**
+   * If rename fails, the upload should fail
+   */
+  @Test
+  public void testRenameFail() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+    LocalResource resource = mock(LocalResource.class);
+    Path localPath = mock(Path.class);
+    when(localPath.getName()).thenReturn("foo.jar");
+    String user = "joe";
+    SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
+    SCMUploaderNotifyResponse response = mock(SCMUploaderNotifyResponse.class);
+    when(response.getAccepted()).thenReturn(true);
+    when(scmClient.notify(isA(SCMUploaderNotifyRequest.class))).
+        thenReturn(response);
+    FileSystem fs = mock(FileSystem.class);
+    // return false when rename is called
+    when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(false);
+    FileSystem localFs = FileSystem.getLocal(conf);
+    SharedCacheUploader spied =
+        createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
+            localFs);
+    // stub verifyAccess() to return true
+    doReturn(true).when(spied).verifyAccess();
+    // stub getActualPath()
+    doReturn(localPath).when(spied).getActualPath();
+    // stub computeChecksum()
+    doReturn("abcdef0123456789").when(spied).computeChecksum(isA(Path.class));
+    // stub uploadFile() to return true
+    doReturn(true).when(spied).uploadFile(isA(Path.class), isA(Path.class));
+
+    assertFalse(spied.call());
+  }
+
+  /**
+   * If verifyAccess, uploadFile, rename, and notification succeed, the upload
+   * should succeed
+   */
+  @Test
+  public void testSuccess() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+    LocalResource resource = mock(LocalResource.class);
+    Path localPath = mock(Path.class);
+    when(localPath.getName()).thenReturn("foo.jar");
+    String user = "joe";
+    SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
+    SCMUploaderNotifyResponse response = mock(SCMUploaderNotifyResponse.class);
+    when(response.getAccepted()).thenReturn(true);
+    when(scmClient.notify(isA(SCMUploaderNotifyRequest.class))).
+        thenReturn(response);
+    FileSystem fs = mock(FileSystem.class);
+    // return false when rename is called
+    when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true);
+    FileSystem localFs = FileSystem.getLocal(conf);
+    SharedCacheUploader spied =
+        createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
+            localFs);
+    // stub verifyAccess() to return true
+    doReturn(true).when(spied).verifyAccess();
+    // stub getActualPath()
+    doReturn(localPath).when(spied).getActualPath();
+    // stub computeChecksum()
+    doReturn("abcdef0123456789").when(spied).computeChecksum(isA(Path.class));
+    // stub uploadFile() to return true
+    doReturn(true).when(spied).uploadFile(isA(Path.class), isA(Path.class));
+    // stub notifySharedCacheManager to return true
+    doReturn(true).when(spied).notifySharedCacheManager(isA(String.class),
+        isA(String.class));
+
+    assertTrue(spied.call());
+  }
+
+  /**
+   * If verifyAccess, uploadFile, and rename succed, but it receives a nay from
+   * SCM, the file should be deleted
+   */
+  @Test
+  public void testNotifySCMFail() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+    LocalResource resource = mock(LocalResource.class);
+    Path localPath = mock(Path.class);
+    when(localPath.getName()).thenReturn("foo.jar");
+    String user = "joe";
+    FileSystem fs = mock(FileSystem.class);
+    // return false when rename is called
+    when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true);
+    FileSystem localFs = FileSystem.getLocal(conf);
+    SharedCacheUploader spied =
+        createSpiedUploader(resource, localPath, user, conf, null, fs,
+            localFs);
+    // stub verifyAccess() to return true
+    doReturn(true).when(spied).verifyAccess();
+    // stub getActualPath()
+    doReturn(localPath).when(spied).getActualPath();
+    // stub computeChecksum()
+    doReturn("abcdef0123456789").when(spied).computeChecksum(isA(Path.class));
+    // stub uploadFile() to return true
+    doReturn(true).when(spied).uploadFile(isA(Path.class), isA(Path.class));
+    // stub notifySharedCacheManager to return true
+    doReturn(false).when(spied).notifySharedCacheManager(isA(String.class),
+        isA(String.class));
+
+    assertFalse(spied.call());
+    verify(fs).delete(isA(Path.class), anyBoolean());
+  }
+
+  /**
+   * If resource is public, verifyAccess should succeed
+   */
+  @Test
+  public void testVerifyAccessPublicResource() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+    LocalResource resource = mock(LocalResource.class);
+    // give public visibility
+    when(resource.getVisibility()).thenReturn(LocalResourceVisibility.PUBLIC);
+    Path localPath = mock(Path.class);
+    when(localPath.getName()).thenReturn("foo.jar");
+    String user = "joe";
+    SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
+    FileSystem fs = mock(FileSystem.class);
+    FileSystem localFs = FileSystem.getLocal(conf);
+    SharedCacheUploader spied =
+        createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
+            localFs);
+
+    assertTrue(spied.verifyAccess());
+  }
+
+  /**
+   * If the localPath does not exists, getActualPath should get to one level
+   * down
+   */
+  @Test
+  public void testGetActualPath() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+    LocalResource resource = mock(LocalResource.class);
+    // give public visibility
+    when(resource.getVisibility()).thenReturn(LocalResourceVisibility.PUBLIC);
+    Path localPath = new Path("foo.jar");
+    String user = "joe";
+    SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
+    FileSystem fs = mock(FileSystem.class);
+    FileSystem localFs = mock(FileSystem.class);
+    // stub it to return a status that indicates a directory
+    FileStatus status = mock(FileStatus.class);
+    when(status.isDirectory()).thenReturn(true);
+    when(localFs.getFileStatus(localPath)).thenReturn(status);
+    SharedCacheUploader spied =
+        createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
+            localFs);
+
+    Path actualPath = spied.getActualPath();
+    assertEquals(actualPath.getName(), localPath.getName());
+    assertEquals(actualPath.getParent().getName(), localPath.getName());
+  }
+
+  private SharedCacheUploader createSpiedUploader() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+    LocalResource resource = mock(LocalResource.class);
+    Path localPath = mock(Path.class);
+    String user = "foo";
+    SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
+    FileSystem fs = FileSystem.get(conf);
+    FileSystem localFs = FileSystem.getLocal(conf);
+    return createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
+        localFs);
+  }
+
+  private SharedCacheUploader createSpiedUploader(LocalResource resource, Path localPath,
+      String user, Configuration conf, SCMUploaderProtocol scmClient,
+      FileSystem fs, FileSystem localFs)
+          throws IOException {
+    SharedCacheUploader uploader = new SharedCacheUploader(resource, localPath, user, conf, scmClient,
+        fs, localFs);
+    return spy(uploader);
+  }
+}