浏览代码

YARN-1771. Reduce the number of NameNode operations during localization of
public resources using a cache. Contributed by Sangjin Lee



git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1577392 13f79535-47bb-0310-9956-ffa450edef68

Christopher Douglas 11 年之前
父节点
当前提交
9695bc7af6

+ 5 - 5
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java

@@ -339,11 +339,11 @@ public class FileUtil {
   }
 
   /** Copy files between FileSystems. */
-  private static boolean copy(FileSystem srcFS, FileStatus srcStatus,
-                              FileSystem dstFS, Path dst,
-                              boolean deleteSource,
-                              boolean overwrite,
-                              Configuration conf) throws IOException {
+  public static boolean copy(FileSystem srcFS, FileStatus srcStatus,
+                             FileSystem dstFS, Path dst,
+                             boolean deleteSource,
+                             boolean overwrite,
+                             Configuration conf) throws IOException {
     Path src = srcStatus.getPath();
     dst = checkDest(src.getName(), dstFS, dst, overwrite);
     if (srcStatus.isDirectory()) {

+ 62 - 52
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java

@@ -18,19 +18,26 @@
 
 package org.apache.hadoop.mapred;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.FileWriter;
 import java.io.IOException;
 import java.net.URI;
 
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FilterFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -38,7 +45,6 @@ import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -89,8 +95,26 @@ public class TestLocalDistributedCacheManager {
   public void cleanup() throws Exception {
     delete(localDir);
   }
-  
-  @SuppressWarnings("rawtypes")
+
+  /**
+   * Mock input stream based on a byte array so that it can be used by a
+   * FSDataInputStream.
+   */
+  private static class MockInputStream extends ByteArrayInputStream
+      implements Seekable, PositionedReadable {
+    public MockInputStream(byte[] buf) {
+      super(buf);
+    }
+
+    // empty implementation for unused methods
+    public int read(long position, byte[] buffer, int offset, int length) { return -1; }
+    public void readFully(long position, byte[] buffer, int offset, int length) {}
+    public void readFully(long position, byte[] buffer) {}
+    public void seek(long position) {}
+    public long getPos() { return 0; }
+    public boolean seekToNewSource(long targetPos) { return false; }
+  }
+
   @Test
   public void testDownload() throws Exception {
     JobConf conf = new JobConf();
@@ -123,28 +147,22 @@ public class TestLocalDistributedCacheManager {
         }
       }
     });
-    
-    doAnswer(new Answer() {
+
+    when(mockfs.getConf()).thenReturn(conf);
+    final FSDataInputStream in =
+        new FSDataInputStream(new MockInputStream("This is a test file\n".getBytes()));
+    when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() {
       @Override
-      public Object answer(InvocationOnMock args) throws Throwable {
-        //Ignored boolean overwrite = (Boolean) args.getArguments()[0];
-        Path src = (Path)args.getArguments()[1];
-        Path dst = (Path)args.getArguments()[2];
-        if("file.txt".equals(src.getName())) {
-          File f = new File(dst.toUri().getPath());
-          FileWriter writer = new FileWriter(f);
-          try {
-            writer.append("This is a test file\n");
-          } finally {
-            if(writer != null) writer.close();
-          }
+      public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
+        Path src = (Path)args.getArguments()[0];
+        if ("file.txt".equals(src.getName())) {
+          return in;
         } else {
           throw new FileNotFoundException(src+" not supported by mocking");
         }
-        return null;
       }
-    }).when(mockfs).copyToLocalFile(anyBoolean(), any(Path.class), any(Path.class));
-    
+    });
+
     DistributedCache.addCacheFile(file, conf);
     conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101");
     conf.set(MRJobConfig.CACHE_FILES_SIZES, "201");
@@ -159,8 +177,7 @@ public class TestLocalDistributedCacheManager {
     }
     assertFalse(link.exists());
   }
-  
-  @SuppressWarnings("rawtypes")
+
   @Test
   public void testEmptyDownload() throws Exception {
     JobConf conf = new JobConf();
@@ -184,16 +201,16 @@ public class TestLocalDistributedCacheManager {
         throw new FileNotFoundException(p+" not supported by mocking");
       }
     });
-    
-    doAnswer(new Answer() {
+
+    when(mockfs.getConf()).thenReturn(conf);
+    when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() {
       @Override
-      public Object answer(InvocationOnMock args) throws Throwable {
-        //Ignored boolean overwrite = (Boolean) args.getArguments()[0];
-        Path src = (Path)args.getArguments()[1];
+      public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
+        Path src = (Path)args.getArguments()[0];
         throw new FileNotFoundException(src+" not supported by mocking");
       }
-    }).when(mockfs).copyToLocalFile(anyBoolean(), any(Path.class), any(Path.class));
-    
+    });
+
     conf.set(MRJobConfig.CACHE_FILES, "");
     conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
     LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
@@ -203,9 +220,8 @@ public class TestLocalDistributedCacheManager {
       manager.close();
     }
   }
-  
-  
-  @SuppressWarnings("rawtypes")
+
+
   @Test
   public void testDuplicateDownload() throws Exception {
     JobConf conf = new JobConf();
@@ -238,28 +254,22 @@ public class TestLocalDistributedCacheManager {
         }
       }
     });
-    
-    doAnswer(new Answer() {
+
+    when(mockfs.getConf()).thenReturn(conf);
+    final FSDataInputStream in =
+        new FSDataInputStream(new MockInputStream("This is a test file\n".getBytes()));
+    when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() {
       @Override
-      public Object answer(InvocationOnMock args) throws Throwable {
-        //Ignored boolean overwrite = (Boolean) args.getArguments()[0];
-        Path src = (Path)args.getArguments()[1];
-        Path dst = (Path)args.getArguments()[2];
-        if("file.txt".equals(src.getName())) {
-          File f = new File(dst.toUri().getPath());
-          FileWriter writer = new FileWriter(f);
-          try {
-            writer.append("This is a test file\n");
-          } finally {
-            if(writer != null) writer.close();
-          }
+      public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
+        Path src = (Path)args.getArguments()[0];
+        if ("file.txt".equals(src.getName())) {
+          return in;
         } else {
           throw new FileNotFoundException(src+" not supported by mocking");
         }
-        return null;
       }
-    }).when(mockfs).copyToLocalFile(anyBoolean(), any(Path.class), any(Path.class));
-    
+    });
+
     DistributedCache.addCacheFile(file, conf);
     DistributedCache.addCacheFile(file, conf);
     conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101,101");

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -273,6 +273,9 @@ Release 2.4.0 - UNRELEASED
     expose analogous getApplication(s)/Attempt(s)/Container(s) APIs. (Mayank
     Bansal via zjshen)
 
+    YARN-1771. Reduce the number of NameNode operations during localization of
+    public resources using a cache. (Sangjin Lee via cdouglas)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 87 - 23
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java

@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
@@ -43,6 +45,11 @@ import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.util.concurrent.Futures;
+
 /**
  * Download a single URL to the local disk.
  *
@@ -56,6 +63,7 @@ public class FSDownload implements Callable<Path> {
   private final UserGroupInformation userUgi;
   private Configuration conf;
   private LocalResource resource;
+  private final LoadingCache<Path,Future<FileStatus>> statCache;
   
   /** The local FS dir path under which this resource is to be localized to */
   private Path destDirPath;
@@ -71,11 +79,18 @@ public class FSDownload implements Callable<Path> {
 
   public FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf,
       Path destDirPath, LocalResource resource) {
+    this(files, ugi, conf, destDirPath, resource, null);
+  }
+
+  public FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf,
+      Path destDirPath, LocalResource resource,
+      LoadingCache<Path,Future<FileStatus>> statCache) {
     this.conf = conf;
     this.destDirPath = destDirPath;
     this.files = files;
     this.userUgi = ugi;
     this.resource = resource;
+    this.statCache = statCache;
   }
 
   LocalResource getResource() {
@@ -90,28 +105,43 @@ public class FSDownload implements Callable<Path> {
   }
 
   /**
-   * Returns a boolean to denote whether a cache file is visible to all(public)
+   * Creates the cache loader for the status loading cache. This should be used
+   * to create an instance of the status cache that is passed into the
+   * FSDownload constructor.
+   */
+  public static CacheLoader<Path,Future<FileStatus>>
+      createStatusCacheLoader(final Configuration conf) {
+    return new CacheLoader<Path,Future<FileStatus>>() {
+      public Future<FileStatus> load(Path path) {
+        try {
+          FileSystem fs = path.getFileSystem(conf);
+          return Futures.immediateFuture(fs.getFileStatus(path));
+        } catch (Throwable th) {
+          // report failures so it can be memoized
+          return Futures.immediateFailedFuture(th);
+        }
+      }
+    };
+  }
+
+  /**
+   * Returns a boolean to denote whether a cache file is visible to all (public)
    * or not
-   * @param conf
-   * @param uri
-   * @return true if the path in the uri is visible to all, false otherwise
-   * @throws IOException
+   *
+   * @return true if the path in the current path is visible to all, false
+   * otherwise
    */
-  private static boolean isPublic(FileSystem fs, Path current) throws IOException {
+  @VisibleForTesting
+  static boolean isPublic(FileSystem fs, Path current, FileStatus sStat,
+      LoadingCache<Path,Future<FileStatus>> statCache) throws IOException {
     current = fs.makeQualified(current);
     //the leaf level file should be readable by others
-    if (!checkPublicPermsForAll(fs, current, FsAction.READ_EXECUTE, FsAction.READ)) {
+    if (!checkPublicPermsForAll(fs, sStat, FsAction.READ_EXECUTE, FsAction.READ)) {
       return false;
     }
-    return ancestorsHaveExecutePermissions(fs, current.getParent());
+    return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache);
   }
 
-  private static boolean checkPublicPermsForAll(FileSystem fs, Path current, 
-      FsAction dir, FsAction file) 
-    throws IOException {
-    return checkPublicPermsForAll(fs, fs.getFileStatus(current), dir, file);
-  }
-    
   private static boolean checkPublicPermsForAll(FileSystem fs, 
         FileStatus status, FsAction dir, FsAction file) 
     throws IOException {
@@ -137,12 +167,13 @@ public class FSDownload implements Callable<Path> {
    * permission set for all users (i.e. that other users can traverse
    * the directory heirarchy to the given path)
    */
-  private static boolean ancestorsHaveExecutePermissions(FileSystem fs, Path path)
-    throws IOException {
+  private static boolean ancestorsHaveExecutePermissions(FileSystem fs,
+      Path path, LoadingCache<Path,Future<FileStatus>> statCache)
+      throws IOException {
     Path current = path;
     while (current != null) {
       //the subdirs in the path should have execute permissions for others
-      if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE)) {
+      if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
         return false;
       }
       current = current.getParent();
@@ -160,14 +191,46 @@ public class FSDownload implements Callable<Path> {
    * @throws IOException
    */
   private static boolean checkPermissionOfOther(FileSystem fs, Path path,
-      FsAction action) throws IOException {
-    FileStatus status = fs.getFileStatus(path);
+      FsAction action, LoadingCache<Path,Future<FileStatus>> statCache)
+      throws IOException {
+    FileStatus status = getFileStatus(fs, path, statCache);
     FsPermission perms = status.getPermission();
     FsAction otherAction = perms.getOtherAction();
     return otherAction.implies(action);
   }
 
-  
+  /**
+   * Obtains the file status, first by checking the stat cache if it is
+   * available, and then by getting it explicitly from the filesystem. If we got
+   * the file status from the filesystem, it is added to the stat cache.
+   *
+   * The stat cache is expected to be managed by callers who provided it to
+   * FSDownload.
+   */
+  private static FileStatus getFileStatus(final FileSystem fs, final Path path,
+      LoadingCache<Path,Future<FileStatus>> statCache) throws IOException {
+    // if the stat cache does not exist, simply query the filesystem
+    if (statCache == null) {
+      return fs.getFileStatus(path);
+    }
+
+    try {
+      // get or load it from the cache
+      return statCache.get(path).get();
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      // the underlying exception should normally be IOException
+      if (cause instanceof IOException) {
+        throw (IOException)cause;
+      } else {
+        throw new IOException(cause);
+      }
+    } catch (InterruptedException e) { // should not happen
+      Thread.currentThread().interrupt();
+      throw new IOException(e);
+    }
+  }
+
   private Path copy(Path sCopy, Path dstdir) throws IOException {
     FileSystem sourceFs = sCopy.getFileSystem(conf);
     Path dCopy = new Path(dstdir, "tmp_"+sCopy.getName());
@@ -178,14 +241,15 @@ public class FSDownload implements Callable<Path> {
           ", was " + sStat.getModificationTime());
     }
     if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) {
-      if (!isPublic(sourceFs, sCopy)) {
+      if (!isPublic(sourceFs, sCopy, sStat, statCache)) {
         throw new IOException("Resource " + sCopy +
             " is not publicly accessable and as such cannot be part of the" +
             " public cache.");
       }
     }
-    
-    sourceFs.copyToLocalFile(sCopy, dCopy);
+
+    FileUtil.copy(sourceFs, sStat, FileSystem.getLocal(conf), dCopy, false,
+        true, conf);
     return dCopy;
   }
 

+ 95 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java

@@ -21,27 +21,35 @@ package org.apache.hadoop.yarn.util;
 import static org.apache.hadoop.fs.CreateFlag.CREATE;
 import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.jar.JarEntry;
 import java.util.jar.JarOutputStream;
 import java.util.jar.Manifest;
+import java.util.zip.GZIPOutputStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
-import java.util.zip.GZIPOutputStream;
 
 import junit.framework.Assert;
 
@@ -64,10 +72,13 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.AfterClass;
 import org.junit.Test;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
 public class TestFSDownload {
 
   private static final Log LOG = LogFactory.getLog(TestFSDownload.class);
@@ -88,6 +99,18 @@ public class TestFSDownload {
 
   static LocalResource createFile(FileContext files, Path p, int len,
       Random r, LocalResourceVisibility vis) throws IOException {
+    createFile(files, p, len, r);
+    LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
+    ret.setResource(ConverterUtils.getYarnUrlFromPath(p));
+    ret.setSize(len);
+    ret.setType(LocalResourceType.FILE);
+    ret.setVisibility(vis);
+    ret.setTimestamp(files.getFileStatus(p).getModificationTime());
+    return ret;
+  }
+
+  static void createFile(FileContext files, Path p, int len, Random r)
+      throws IOException {
     FSDataOutputStream out = null;
     try {
       byte[] bytes = new byte[len];
@@ -97,13 +120,6 @@ public class TestFSDownload {
     } finally {
       if (out != null) out.close();
     }
-    LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
-    ret.setResource(ConverterUtils.getYarnUrlFromPath(p));
-    ret.setSize(len);
-    ret.setType(LocalResourceType.FILE);
-    ret.setVisibility(vis);
-    ret.setTimestamp(files.getFileStatus(p).getModificationTime());
-    return ret;
   }
 
   static LocalResource createJar(FileContext files, Path p,
@@ -285,6 +301,76 @@ public class TestFSDownload {
     }
   }
 
+  @Test (timeout=60000)
+  public void testDownloadPublicWithStatCache() throws IOException,
+      URISyntaxException, InterruptedException, ExecutionException {
+    final Configuration conf = new Configuration();
+    FileContext files = FileContext.getLocalFSFileContext(conf);
+    Path basedir = files.makeQualified(new Path("target",
+      TestFSDownload.class.getSimpleName()));
+    files.mkdir(basedir, null, true);
+    conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
+
+    int size = 512;
+
+    final ConcurrentMap<Path,AtomicInteger> counts =
+        new ConcurrentHashMap<Path,AtomicInteger>();
+    final CacheLoader<Path,Future<FileStatus>> loader =
+        FSDownload.createStatusCacheLoader(conf);
+    final LoadingCache<Path,Future<FileStatus>> statCache =
+        CacheBuilder.newBuilder().build(new CacheLoader<Path,Future<FileStatus>>() {
+      public Future<FileStatus> load(Path path) throws Exception {
+        // increment the count
+        AtomicInteger count = counts.get(path);
+        if (count == null) {
+          count = new AtomicInteger(0);
+          AtomicInteger existing = counts.putIfAbsent(path, count);
+          if (existing != null) {
+            count = existing;
+          }
+        }
+        count.incrementAndGet();
+
+        // use the default loader
+        return loader.load(path);
+      }
+    });
+
+    // test FSDownload.isPublic() concurrently
+    final int fileCount = 3;
+    List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
+    for (int i = 0; i < fileCount; i++) {
+      Random rand = new Random();
+      long sharedSeed = rand.nextLong();
+      rand.setSeed(sharedSeed);
+      System.out.println("SEED: " + sharedSeed);
+      final Path path = new Path(basedir, "test-file-" + i);
+      createFile(files, path, size, rand);
+      final FileSystem fs = path.getFileSystem(conf);
+      final FileStatus sStat = fs.getFileStatus(path);
+      tasks.add(new Callable<Boolean>() {
+        public Boolean call() throws IOException {
+          return FSDownload.isPublic(fs, path, sStat, statCache);
+        }
+      });
+    }
+
+    ExecutorService exec = Executors.newFixedThreadPool(fileCount);
+    try {
+      List<Future<Boolean>> futures = exec.invokeAll(tasks);
+      // files should be public
+      for (Future<Boolean> future: futures) {
+        assertTrue(future.get());
+      }
+      // for each path exactly one file status call should be made
+      for (AtomicInteger count: counts.values()) {
+        assertSame(count.get(), 1);
+      }
+    } finally {
+      exec.shutdown();
+    }
+  }
+
   @Test (timeout=10000)
   public void testDownload() throws IOException, URISyntaxException,
       InterruptedException {

+ 17 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizerContext.java

@@ -18,20 +18,34 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 
+import com.google.common.cache.LoadingCache;
+
 public class LocalizerContext {
 
   private final String user;
   private final ContainerId containerId;
   private final Credentials credentials;
+  private final LoadingCache<Path,Future<FileStatus>> statCache;
 
   public LocalizerContext(String user, ContainerId containerId,
       Credentials credentials) {
+    this(user, containerId, credentials, null);
+  }
+
+  public LocalizerContext(String user, ContainerId containerId,
+      Credentials credentials,
+      LoadingCache<Path,Future<FileStatus>> statCache) {
     this.user = user;
     this.containerId = containerId;
     this.credentials = credentials;
+    this.statCache = statCache;
   }
 
   public String getUser() {
@@ -46,4 +60,7 @@ public class LocalizerContext {
     return credentials;
   }
 
+  public LoadingCache<Path,Future<FileStatus>> getStatCache() {
+    return statCache;
+  }
 }

+ 9 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java

@@ -83,8 +83,8 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
 import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
@@ -119,6 +119,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.FSDownload;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.LoadingCache;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class ResourceLocalizationService extends CompositeService
@@ -362,8 +364,11 @@ public class ResourceLocalizationService extends CompositeService
   private void handleInitContainerResources(
       ContainerLocalizationRequestEvent rsrcReqs) {
     Container c = rsrcReqs.getContainer();
+    // create a loading cache for the file statuses
+    LoadingCache<Path,Future<FileStatus>> statCache =
+        CacheBuilder.newBuilder().build(FSDownload.createStatusCacheLoader(getConfig()));
     LocalizerContext ctxt = new LocalizerContext(
-        c.getUser(), c.getContainerId(), c.getCredentials());
+        c.getUser(), c.getContainerId(), c.getCredentials(), statCache);
     Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
       rsrcReqs.getRequestedResources();
     for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
@@ -680,7 +685,8 @@ public class ResourceLocalizationService extends CompositeService
             // completing and being dequeued before pending updated
             synchronized (pending) {
               pending.put(queue.submit(new FSDownload(lfs, null, conf,
-                  publicDirDestPath, resource)), request);
+                  publicDirDestPath, resource, request.getContext().getStatCache())),
+                  request);
             }
           } catch (IOException e) {
             rsrc.unlock();