Explorar o código

MAPREDUCE-4907. TrackerDistributedCacheManager issues too many getFileStatus calls (Sandy Ryza via tgraves)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1434807 13f79535-47bb-0310-9956-ffa450edef68
Thomas Graves %!s(int64=12) %!d(string=hai) anos
pai
achega
4a2eabbb61

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -8,6 +8,9 @@ Release 0.23.7 - UNRELEASED
 
   IMPROVEMENTS
 
+    MAPREDUCE-4907. TrackerDistributedCacheManager issues too many 
+    getFileStatus calls  (Sandy Ryza via tgraves)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 1 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java

@@ -249,9 +249,8 @@ class JobSubmitter {
     }
 
     //  set the timestamps of the archives and files
-    ClientDistributedCacheManager.determineTimestamps(conf);
     //  set the public/private visibility of the archives and files
-    ClientDistributedCacheManager.determineCacheVisibilities(conf);
+    ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf);
     // get DelegationToken for each cached file
     ClientDistributedCacheManager.getDelegationTokens(conf, job
         .getCredentials());

+ 58 - 31
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java

@@ -19,6 +19,8 @@ package org.apache.hadoop.mapreduce.filecache;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -37,6 +39,25 @@ import org.apache.hadoop.security.Credentials;
 @InterfaceAudience.Private
 public class ClientDistributedCacheManager {
 
+  /**
+   * Determines timestamps of files to be cached, and stores those
+   * in the configuration. Determines the visibilities of the distributed cache
+   * files and archives. The visibility of a cache path is "public" if the leaf
+   * component has READ permissions for others, and the parent subdirs have 
+   * EXECUTE permissions for others.
+   * 
+   * This is an internal method!
+   * 
+   * @param job
+   * @throws IOException
+   */
+  public static void determineTimestampsAndCacheVisibilities(Configuration job)
+  throws IOException {
+    Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
+    determineTimestamps(job, statCache);
+    determineCacheVisibilities(job, statCache);
+  }
+  
   /**
    * Determines timestamps of files to be cached, and stores those
    * in the configuration.  This is intended to be used internally by JobClient
@@ -47,16 +68,17 @@ public class ClientDistributedCacheManager {
    * @param job Configuration of a job.
    * @throws IOException
    */
-  public static void determineTimestamps(Configuration job) throws IOException {
+  public static void determineTimestamps(Configuration job,
+      Map<URI, FileStatus> statCache) throws IOException {
     URI[] tarchives = DistributedCache.getCacheArchives(job);
     if (tarchives != null) {
-      FileStatus status = getFileStatus(job, tarchives[0]);
+      FileStatus status = getFileStatus(job, tarchives[0], statCache);
       StringBuilder archiveFileSizes =
         new StringBuilder(String.valueOf(status.getLen()));
       StringBuilder archiveTimestamps =
         new StringBuilder(String.valueOf(status.getModificationTime()));
       for (int i = 1; i < tarchives.length; i++) {
-        status = getFileStatus(job, tarchives[i]);
+        status = getFileStatus(job, tarchives[i], statCache);
         archiveFileSizes.append(",");
         archiveFileSizes.append(String.valueOf(status.getLen()));
         archiveTimestamps.append(",");
@@ -68,13 +90,13 @@ public class ClientDistributedCacheManager {
   
     URI[] tfiles = DistributedCache.getCacheFiles(job);
     if (tfiles != null) {
-      FileStatus status = getFileStatus(job, tfiles[0]);
+      FileStatus status = getFileStatus(job, tfiles[0], statCache);
       StringBuilder fileSizes =
         new StringBuilder(String.valueOf(status.getLen()));
       StringBuilder fileTimestamps = new StringBuilder(String.valueOf(
         status.getModificationTime()));
       for (int i = 1; i < tfiles.length; i++) {
-        status = getFileStatus(job, tfiles[i]);
+        status = getFileStatus(job, tfiles[i], statCache);
         fileSizes.append(",");
         fileSizes.append(String.valueOf(status.getLen()));
         fileTimestamps.append(",");
@@ -123,25 +145,25 @@ public class ClientDistributedCacheManager {
    * @param job
    * @throws IOException
    */
-  public static void determineCacheVisibilities(Configuration job) 
-  throws IOException {
+  public static void determineCacheVisibilities(Configuration job,
+      Map<URI, FileStatus> statCache) throws IOException {
     URI[] tarchives = DistributedCache.getCacheArchives(job);
     if (tarchives != null) {
       StringBuilder archiveVisibilities =
-        new StringBuilder(String.valueOf(isPublic(job, tarchives[0])));
+        new StringBuilder(String.valueOf(isPublic(job, tarchives[0], statCache)));
       for (int i = 1; i < tarchives.length; i++) {
         archiveVisibilities.append(",");
-        archiveVisibilities.append(String.valueOf(isPublic(job, tarchives[i])));
+        archiveVisibilities.append(String.valueOf(isPublic(job, tarchives[i], statCache)));
       }
       setArchiveVisibilities(job, archiveVisibilities.toString());
     }
     URI[] tfiles = DistributedCache.getCacheFiles(job);
     if (tfiles != null) {
       StringBuilder fileVisibilities =
-        new StringBuilder(String.valueOf(isPublic(job, tfiles[0])));
+        new StringBuilder(String.valueOf(isPublic(job, tfiles[0], statCache)));
       for (int i = 1; i < tfiles.length; i++) {
         fileVisibilities.append(",");
-        fileVisibilities.append(String.valueOf(isPublic(job, tfiles[i])));
+        fileVisibilities.append(String.valueOf(isPublic(job, tfiles[i], statCache)));
       }
       setFileVisibilities(job, fileVisibilities.toString());
     }
@@ -193,19 +215,13 @@ public class ClientDistributedCacheManager {
   }
 
   /**
-   * Returns {@link FileStatus} of a given cache file on hdfs.
-   * 
-   * @param conf configuration
-   * @param cache cache file 
-   * @return {@link FileStatus} of a given cache file on hdfs
-   * @throws IOException
+   * Gets the file status for the given URI.  If the URI is in the cache,
+   * returns it.  Otherwise, fetches it and adds it to the cache.
    */
-  static FileStatus getFileStatus(Configuration conf, URI cache)
-    throws IOException {
-    FileSystem fileSystem = FileSystem.get(cache, conf);
-    Path filePath = new Path(cache.getPath());
-
-    return fileSystem.getFileStatus(filePath);
+  private static FileStatus getFileStatus(Configuration job, URI uri,
+      Map<URI, FileStatus> statCache) throws IOException {
+    FileSystem fileSystem = FileSystem.get(uri, job);
+    return getFileStatus(fileSystem, uri, statCache);
   }
 
   /**
@@ -216,14 +232,15 @@ public class ClientDistributedCacheManager {
    * @return true if the path in the uri is visible to all, false otherwise
    * @throws IOException
    */
-  static boolean isPublic(Configuration conf, URI uri) throws IOException {
+  static boolean isPublic(Configuration conf, URI uri,
+      Map<URI, FileStatus> statCache) throws IOException {
     FileSystem fs = FileSystem.get(uri, conf);
     Path current = new Path(uri.getPath());
     //the leaf level file should be readable by others
-    if (!checkPermissionOfOther(fs, current, FsAction.READ)) {
+    if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
       return false;
     }
-    return ancestorsHaveExecutePermissions(fs, current.getParent());
+    return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache);
   }
 
   /**
@@ -231,12 +248,12 @@ public class ClientDistributedCacheManager {
    * permission set for all users (i.e. that other users can traverse
    * the directory heirarchy to the given path)
    */
-  static boolean ancestorsHaveExecutePermissions(FileSystem fs, Path path)
-    throws IOException {
+  static boolean ancestorsHaveExecutePermissions(FileSystem fs, Path path,
+      Map<URI, FileStatus> statCache) throws IOException {
     Path current = path;
     while (current != null) {
       //the subdirs in the path should have execute permissions for others
-      if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE)) {
+      if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
         return false;
       }
       current = current.getParent();
@@ -254,8 +271,8 @@ public class ClientDistributedCacheManager {
    * @throws IOException
    */
   private static boolean checkPermissionOfOther(FileSystem fs, Path path,
-      FsAction action) throws IOException {
-    FileStatus status = fs.getFileStatus(path);
+      FsAction action, Map<URI, FileStatus> statCache) throws IOException {
+    FileStatus status = getFileStatus(fs, path.toUri(), statCache);
     FsPermission perms = status.getPermission();
     FsAction otherAction = perms.getOtherAction();
     if (otherAction.implies(action)) {
@@ -263,4 +280,14 @@ public class ClientDistributedCacheManager {
     }
     return false;
   }
+
+  private static FileStatus getFileStatus(FileSystem fs, URI uri,
+      Map<URI, FileStatus> statCache) throws IOException {
+    FileStatus stat = statCache.get(uri);
+    if (stat == null) {
+      stat = fs.getFileStatus(new Path(uri));
+      statCache.put(uri, stat);
+    }
+    return stat;
+  }
 }

+ 114 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/filecache/TestClientDistributedCacheManager.java

@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.filecache;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestClientDistributedCacheManager {
+  private static final Log LOG = LogFactory.getLog(
+      TestClientDistributedCacheManager.class);
+  
+  private static final String TEST_ROOT_DIR = 
+      new File(System.getProperty("test.build.data", "/tmp")).toURI()
+      .toString().replace(' ', '+');
+  
+  private FileSystem fs;
+  private Path firstCacheFile;
+  private Path secondCacheFile;
+  private Configuration conf;
+  
+  @Before
+  public void setup() throws IOException {
+    conf = new Configuration();
+    fs = FileSystem.get(conf);
+    firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
+    secondCacheFile = new Path(TEST_ROOT_DIR, "secondcachefile");
+    createTempFile(firstCacheFile, conf);
+    createTempFile(secondCacheFile, conf);
+  }
+  
+  @After
+  public void tearDown() throws IOException {
+    if (!fs.delete(firstCacheFile, false)) {
+      LOG.warn("Failed to delete firstcachefile");
+    }
+    if (!fs.delete(secondCacheFile, false)) {
+      LOG.warn("Failed to delete secondcachefile");
+    }
+  }
+  
+  @Test
+  public void testDetermineTimestamps() throws IOException {
+    Job job = Job.getInstance(conf);
+    job.addCacheFile(firstCacheFile.toUri());
+    job.addCacheFile(secondCacheFile.toUri());
+    Configuration jobConf = job.getConfiguration();
+    
+    Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
+    ClientDistributedCacheManager.determineTimestamps(jobConf, statCache);
+    
+    FileStatus firstStatus = statCache.get(firstCacheFile.toUri());
+    FileStatus secondStatus = statCache.get(secondCacheFile.toUri());
+    
+    Assert.assertNotNull(firstStatus);
+    Assert.assertNotNull(secondStatus);
+    Assert.assertEquals(2, statCache.size());
+    String expected = firstStatus.getModificationTime() + ","
+        + secondStatus.getModificationTime();
+    Assert.assertEquals(expected, jobConf.get(MRJobConfig.CACHE_FILE_TIMESTAMPS));
+  }
+  
+  @SuppressWarnings("deprecation")
+  void createTempFile(Path p, Configuration conf) throws IOException {
+    SequenceFile.Writer writer = null;
+    try {
+      writer = SequenceFile.createWriter(fs, conf, p,
+                                         Text.class, Text.class,
+                                         CompressionType.NONE);
+      writer.append(new Text("text"), new Text("moretext"));
+    } catch(Exception e) {
+      throw new IOException(e.getLocalizedMessage());
+    } finally {
+      if (writer != null) {
+        writer.close();
+      }
+      writer = null;
+    }
+    LOG.info("created: " + p);
+  }
+}