Browse Source

commit 7502c42f5739df691f08b91914b3455d0345329e
Author: Devaraj Das <ddas@yahoo-inc.com>
Date: Tue Jul 27 14:58:11 2010 -0700

MAPREDUCE:1288 from https://issues.apache.org/jira/secure/attachment/12450631/MR-1288-bp20-3.patch

+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1288. Fixes TrackerDistributedCacheManager to take into account
+ the owner of the localized file in the mapping from cache URIs to
+ CacheStatus objects. (ddas)
+


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077604 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 years ago
parent
commit
bb4f61e34f

+ 3 - 1
src/mapred/org/apache/hadoop/filecache/DistributedCache.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.filecache;
 import java.io.*;
 import java.util.*;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
@@ -325,7 +326,8 @@ public class DistributedCache {
       throw new IOException("TimeStamp of the uri couldnot be found");
     }
     new TrackerDistributedCacheManager(conf, new DefaultTaskController())
-        .releaseCache(cache, conf, Long.parseLong(timestamp));
+        .releaseCache(cache, conf, Long.parseLong(timestamp), 
+            TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
   }
   
   /**

+ 9 - 3
src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java

@@ -74,14 +74,18 @@ public class TaskDistributedCacheManager {
     /** Whether this is to be added to the classpath */
     final boolean shouldBeAddedToClassPath;
     boolean localized = false;
+    /** The owner of the localized file. Relevant only on the tasktrackers */
+    final String owner;
 
     private CacheFile(URI uri, FileType type, boolean isPublic, long timestamp, 
-        boolean classPath) {
+        boolean classPath) throws IOException {
       this.uri = uri;
       this.type = type;
       this.isPublic = isPublic;
       this.timestamp = timestamp;
       this.shouldBeAddedToClassPath = classPath;
+      this.owner = 
+          TrackerDistributedCacheManager.getLocalizedCacheOwner(isPublic);
     }
 
     /**
@@ -90,7 +94,8 @@ public class TaskDistributedCacheManager {
      * files.
      */
     private static List<CacheFile> makeCacheFiles(URI[] uris, 
-        String[] timestamps, String cacheVisibilities[], Path[] paths, FileType type) {
+        String[] timestamps, String cacheVisibilities[], Path[] paths, 
+        FileType type) throws IOException {
       List<CacheFile> ret = new ArrayList<CacheFile>();
       if (uris != null) {
         if (uris.length != timestamps.length) {
@@ -235,7 +240,8 @@ public class TaskDistributedCacheManager {
   public void release() throws IOException {
     for (CacheFile c : cacheFiles) {
       if (c.getLocalized()) {
-        distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp);
+        distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp, 
+            c.owner);
       }
     }
   }

+ 32 - 9
src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java

@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.InvalidJobConfException;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 
@@ -141,7 +142,8 @@ public class TrackerDistributedCacheManager {
       boolean isArchive, long confFileStamp,
       Path currentWorkDir, boolean honorSymLinkConf, boolean isPublic)
       throws IOException {
-    String key = getKey(cache, conf, confFileStamp);
+    String key;
+    key = getKey(cache, conf, confFileStamp, getLocalizedCacheOwner(isPublic));
     CacheStatus lcacheStatus;
     Path localizedPath = null;
     synchronized (cachedArchives) {
@@ -218,11 +220,13 @@ public class TrackerDistributedCacheManager {
    * @param cache The cache URI to be released
    * @param conf configuration which contains the filesystem the cache
    * is contained in.
+   * @param timeStamp the timestamp on the file represented by the cache URI
+   * @param owner the owner of the localized file
    * @throws IOException
    */
-  void releaseCache(URI cache, Configuration conf, long timeStamp)
-    throws IOException {
-    String key = getKey(cache, conf, timeStamp);
+  void releaseCache(URI cache, Configuration conf, long timeStamp,
+      String owner) throws IOException {
+    String key = getKey(cache, conf, timeStamp, owner);
     synchronized (cachedArchives) {
       CacheStatus lcacheStatus = cachedArchives.get(key);
       if (lcacheStatus == null) {
@@ -239,9 +243,9 @@ public class TrackerDistributedCacheManager {
   /*
    * This method is called from unit tests. 
    */
-  int getReferenceCount(URI cache, Configuration conf, long timeStamp) 
-    throws IOException {
-    String key = getKey(cache, conf, timeStamp);
+  int getReferenceCount(URI cache, Configuration conf, long timeStamp,
+      String owner) throws IOException {
+    String key = getKey(cache, conf, timeStamp, owner);
     synchronized (cachedArchives) {
       CacheStatus lcacheStatus = cachedArchives.get(key);
       if (lcacheStatus == null) {
@@ -251,6 +255,25 @@ public class TrackerDistributedCacheManager {
     }
   }
 
+  /**
+   * Get the user who should "own" the localized distributed cache file.
+   * If the cache is public, the tasktracker user is the owner. If private,
+   * the user that the task is running as, is the owner.
+   * @param isPublic
+   * @return the owner as a shortname string
+   * @throws IOException
+   */
+  static String getLocalizedCacheOwner(boolean isPublic) throws IOException {  
+    String user;
+    if (isPublic) {
+      user = UserGroupInformation.getLoginUser().getShortUserName();
+    } else {
+      user = UserGroupInformation.getCurrentUser().getShortUserName();
+    }
+    return user;
+  }
+
+
   // To delete the caches which have a refcount of zero
 
   private void compactCache(Configuration conf) throws IOException {
@@ -520,9 +543,9 @@ public class TrackerDistributedCacheManager {
     return true;
   }
 
-  String getKey(URI cache, Configuration conf, long timeStamp) 
+  String getKey(URI cache, Configuration conf, long timeStamp, String user) 
       throws IOException {
-    return makeRelative(cache, conf) + String.valueOf(timeStamp);
+    return makeRelative(cache, conf) + String.valueOf(timeStamp) + user;
   }
   
   /**

+ 8 - 4
src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java

@@ -240,7 +240,8 @@ public class TestTrackerDistributedCacheManager extends TestCase {
           TaskTracker.getPublicDistributedCacheDir());
     handle.release();
     for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
-      assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp));
+      assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp, 
+          c.owner));
     }
     
     Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
@@ -278,7 +279,8 @@ public class TestTrackerDistributedCacheManager extends TestCase {
     th = null;
     for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
       try {
-        assertEquals(0, manager.getReferenceCount(c.uri, conf2, c.timestamp));
+        assertEquals(0, manager.getReferenceCount(c.uri, conf2, c.timestamp, 
+            c.owner));
       } catch (IOException ie) {
         th = ie;
         LOG.info("Exception getting reference count for " + c.uri, ie);
@@ -496,7 +498,8 @@ public class TestTrackerDistributedCacheManager extends TestCase {
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(firstCacheFile), false,
         now, new Path(TEST_ROOT_DIR), false, false);
-    manager.releaseCache(firstCacheFile.toUri(), conf2, now);
+    manager.releaseCache(firstCacheFile.toUri(), conf2, now, 
+        TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
     //in above code,localized a file of size 4K and then release the cache 
     // which will cause the cache be deleted when the limit goes out. 
     // The below code localize another cache which's designed to
@@ -549,7 +552,8 @@ public class TestTrackerDistributedCacheManager extends TestCase {
         fs.getFileStatus(thirdCacheFile), false,
         now, new Path(TEST_ROOT_DIR), false, false);
     // Release the third cache so that it can be deleted while sweeping
-    manager.releaseCache(thirdCacheFile.toUri(), conf2, now);
+    manager.releaseCache(thirdCacheFile.toUri(), conf2, now, 
+        TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
     // Getting the fourth cache will make the number of sub directories becomes
     // 3 which is greater than 2. So the released cache will be deleted.
     Path fourthLocalCache = manager.getLocalCache(fourthCacheFile.toUri(), conf2,