Browse Source

commit f4c85e6b0345142d773647da75bba7c528c201bc
Author: Hemanth Yamijala <yhemanth@friendchild-lm.(none)>
Date: Thu Jan 21 22:10:58 2010 +0530

MAPREDUCE:856 from https://issues.apache.org/jira/secure/attachment/12431040/MAPREDUCE-856-20090908-y20.txt

+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-856. Setup secure permissions for distributed cache files.
+ (Vinod Kumar Vavilapalli via yhemanth)
+


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077116 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 years ago
parent
commit
d7273158ac
28 changed files with 1914 additions and 695 deletions
  1. 6 0
      src/c++/task-controller/main.c
  2. 253 24
      src/c++/task-controller/task-controller.c
  3. 13 1
      src/c++/task-controller/task-controller.h
  4. 43 12
      src/c++/task-controller/tests/test-task-controller.c
  5. 5 4
      src/docs/src/documentation/content/xdocs/cluster_setup.xml
  6. 15 4
      src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
  7. 0 1
      src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
  8. 15 1
      src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
  9. 2 1
      src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
  10. 2 2
      src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
  11. 12 4
      src/mapred/org/apache/hadoop/mapred/JobClient.java
  12. 9 4
      src/mapred/org/apache/hadoop/mapred/JobConf.java
  13. 24 2
      src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
  14. 2 2
      src/mapred/org/apache/hadoop/mapred/MapTask.java
  15. 2 2
      src/mapred/org/apache/hadoop/mapred/Task.java
  16. 41 33
      src/mapred/org/apache/hadoop/mapred/TaskController.java
  17. 20 12
      src/mapred/org/apache/hadoop/mapred/TaskRunner.java
  18. 171 308
      src/mapred/org/apache/hadoop/mapred/TaskTracker.java
  19. 5 1
      src/mapred/org/apache/hadoop/mapred/pipes/Application.java
  20. 361 0
      src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
  21. 101 20
      src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
  22. 11 4
      src/test/org/apache/hadoop/mapred/TestIsolationRunner.java
  23. 119 95
      src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
  24. 117 41
      src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
  25. 4 2
      src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
  26. 6 4
      src/test/org/apache/hadoop/mapred/TestQueueManager.java
  27. 369 111
      src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
  28. 186 0
      src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java

+ 6 - 0
src/c++/task-controller/main.c

@@ -105,10 +105,16 @@ int main(int argc, char **argv) {
   fprintf(LOGFILE, "main : user is %s\n", user_detail->pw_name);
 
   switch (command) {
+  case INITIALIZE_USER:
+    exit_code = initialize_user(user_detail->pw_name);
+    break;
   case INITIALIZE_JOB:
     job_id = argv[optind++];
     exit_code = initialize_job(job_id, user_detail->pw_name);
     break;
+  case INITIALIZE_DISTRIBUTEDCACHE:
+    exit_code = initialize_distributed_cache(user_detail->pw_name);
+    break;
   case LAUNCH_TASK_JVM:
     tt_root = argv[optind++];
     job_id = argv[optind++];

+ 253 - 24
src/c++/task-controller/task-controller.c

@@ -120,16 +120,26 @@ int check_variable_against_config(const char *config_key,
 /**
  * Utility function to concatenate argB to argA using the concat_pattern
  */
-char *concatenate(const char *argA, const char *argB, char *concat_pattern,
-    char *return_path_name) {
-  if (argA == NULL || argB == NULL) {
-    fprintf(LOGFILE, "One of the arguments passed for %s in null.\n",
-        return_path_name);
-    return NULL;
+char *concatenate(char *concat_pattern, char *return_path_name, int numArgs,
+    ...) {
+  va_list ap;
+  va_start(ap, numArgs);
+  int strlen_args = 0;
+  char *arg = NULL;
+  int j;
+  for (j = 0; j < numArgs; j++) {
+    arg = va_arg(ap, char*);
+    if (arg == NULL) {
+      fprintf(LOGFILE, "One of the arguments passed for %s in null.\n",
+          return_path_name);
+      return NULL;
+    }
+    strlen_args += strlen(arg);
   }
+  va_end(ap);
 
   char *return_path = NULL;
-  int str_len = strlen(concat_pattern) + strlen(argA) + strlen(argB);
+  int str_len = strlen(concat_pattern) + strlen_args;
 
   return_path = (char *) malloc(sizeof(char) * (str_len + 1));
   if (return_path == NULL) {
@@ -137,43 +147,62 @@ char *concatenate(const char *argA, const char *argB, char *concat_pattern,
     return NULL;
   }
   memset(return_path, '\0', str_len + 1);
-  snprintf(return_path, str_len, concat_pattern, argA, argB);
+  va_start(ap, numArgs);
+  vsnprintf(return_path, str_len, concat_pattern, ap);
+  va_end(ap);
   return return_path;
 }
 
 /**
- * Get the job-directory path from tt_root and job-id
+ * Get the job-directory path from tt_root, user name and job-id
+ */
+char *get_job_directory(const char * tt_root, const char *user,
+    const char *jobid) {
+  return concatenate(TT_JOB_DIR_PATTERN, "job_dir_path", 3, tt_root, user,
+      jobid);
+}
+
+/**
+ * Get the user directory of a particular user
+ */
+char *get_user_directory(const char *tt_root, const char *user) {
+  return concatenate(USER_DIR_PATTERN, "user_dir_path", 2, tt_root, user);
+}
+
+/**
+ * Get the distributed cache directory for a particular user
  */
-char *get_job_directory(const char * tt_root, const char *jobid) {
-  return concatenate(tt_root, jobid, TT_JOB_DIR_PATTERN, "job_dir_path");
+char *get_distributed_cache_directory(const char *tt_root, const char *user) {
+  return concatenate(USER_DISTRIBUTED_CACHE_DIR_PATTERN, "dist_cache_path", 2,
+      tt_root, user);
 }
 
 char *get_job_work_directory(const char *job_dir) {
-  return concatenate(job_dir, "", JOB_DIR_TO_JOB_WORK_PATTERN,
-      "job_work_dir_path");
+  return concatenate(JOB_DIR_TO_JOB_WORK_PATTERN, "job_work_dir_path", 2,
+      job_dir, "");
 }
 /**
  * Get the attempt directory for the given attempt_id
  */
 char *get_attempt_directory(const char *job_dir, const char *attempt_id) {
-  return concatenate(job_dir, attempt_id, JOB_DIR_TO_ATTEMPT_DIR_PATTERN,
-      "attempt_dir_path");
+  return concatenate(JOB_DIR_TO_ATTEMPT_DIR_PATTERN, "attempt_dir_path", 2,
+      job_dir, attempt_id);
 }
 
 /*
  * Get the path to the task launcher file which is created by the TT
  */
 char *get_task_launcher_file(const char *job_dir, const char *attempt_dir) {
-  return concatenate(job_dir, attempt_dir, TASK_SCRIPT_PATTERN,
-      "task_script_path");
+  return concatenate(TASK_SCRIPT_PATTERN, "task_script_path", 2, job_dir,
+      attempt_dir);
 }
 
 /**
  * Get the log directory for the given attempt.
  */
 char *get_task_log_dir(const char *log_dir, const char *attempt_id) {
-  return concatenate(log_dir, attempt_id, ATTEMPT_LOG_DIR_PATTERN,
-      "task_log_dir");
+  return concatenate(ATTEMPT_LOG_DIR_PATTERN, "task_log_dir", 2, log_dir,
+      attempt_id);
 }
 
 /**
@@ -332,6 +361,17 @@ static int secure_path(const char *path, uid_t uid, gid_t gid,
     if (!process_path) {
       continue;
     }
+    if (compare_ownership(uid, gid, entry->fts_path) == 0) {
+      // already set proper permissions.
+      // This might happen with distributed cache.
+#ifdef DEBUG
+      fprintf(
+          LOGFILE,
+          "already has private permissions. Not trying to change again for %s",
+          entry->fts_path);
+#endif
+      continue;
+    }
 
     if (check_ownership(entry->fts_path) != 0) {
       fprintf(LOGFILE,
@@ -359,8 +399,8 @@ static int secure_path(const char *path, uid_t uid, gid_t gid,
  * Function to prepare the attempt directories for the task JVM.
  * This is done by changing the ownership of the attempt directory recursively
  * to the job owner. We do the following:
- *     *  sudo chown user:mapred -R taskTracker/jobcache/$jobid/$attemptid/
- *     *  sudo chmod 2770 -R taskTracker/jobcache/$jobid/$attemptid/
+ *  *  sudo chown user:mapred -R taskTracker/$user/jobcache/$jobid/$attemptid/
+ *  *  sudo chmod 2770 -R taskTracker/$user/jobcache/$jobid/$attemptid/
  */
 int prepare_attempt_directories(const char *job_id, const char *attempt_id,
     const char *user) {
@@ -395,7 +435,7 @@ int prepare_attempt_directories(const char *job_id, const char *attempt_id,
   char **local_dir_ptr = local_dir;
   int failed = 0;
   while (*local_dir_ptr != NULL) {
-    job_dir = get_job_directory(*local_dir_ptr, job_id);
+    job_dir = get_job_directory(*local_dir_ptr, user, job_id);
     if (job_dir == NULL) {
       fprintf(LOGFILE, "Couldn't get job directory for %s.\n", job_id);
       failed = 1;
@@ -508,6 +548,20 @@ int get_user_details(const char *user) {
   return 0;
 }
 
+/**
+ * Compare ownership of a file with the given ids.
+ */
+int compare_ownership(uid_t uid, gid_t gid, char *path) {
+  struct stat filestat;
+  if (stat(path, &filestat) != 0) {
+    return UNABLE_TO_STAT_FILE;
+  }
+  if (uid == filestat.st_uid && gid == filestat.st_gid) {
+    return 0;
+  }
+  return 1;
+}
+
 /*
  * Function to check if the TaskTracker actually owns the file.
   */
@@ -526,6 +580,89 @@ int check_ownership(char *path) {
   return 0;
 }
 
+/**
+ * Function to initialize the user directories of a user.
+ * It does the following:
+ *     *  sudo chown user:mapred -R taskTracker/$user
+ *     *  sudo chmod 2570 -R taskTracker/$user
+ * This is done once per every user on the TaskTracker.
+ */
+int initialize_user(const char *user) {
+
+  if (user == NULL) {
+    fprintf(LOGFILE, "user passed is null.\n");
+    return INVALID_ARGUMENT_NUMBER;
+  }
+
+  if (get_user_details(user) < 0) {
+    fprintf(LOGFILE, "Couldn't get the user details of %s", user);
+    return INVALID_USER_NAME;
+  }
+
+  gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
+
+  char **local_dir = (char **) get_values(TT_SYS_DIR_KEY);
+  if (local_dir == NULL) {
+    fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY);
+    cleanup();
+    return INVALID_TT_ROOT;
+  }
+
+  char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY);
+#ifdef DEBUG
+  fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY,
+      full_local_dir_str);
+#endif
+
+  char *user_dir;
+  char **local_dir_ptr = local_dir;
+  int failed = 0;
+  while (*local_dir_ptr != NULL) {
+    user_dir = get_user_directory(*local_dir_ptr, user);
+    if (user_dir == NULL) {
+      fprintf(LOGFILE, "Couldn't get userdir directory for %s.\n", user);
+      failed = 1;
+      break;
+    }
+
+    struct stat filestat;
+    if (stat(user_dir, &filestat) != 0) {
+      if (errno == ENOENT) {
+#ifdef DEBUG
+        fprintf(LOGFILE, "user_dir %s doesn't exist. Not doing anything.\n",
+            user_dir);
+#endif
+      } else {
+        // stat failed because of something else!
+        fprintf(LOGFILE, "Failed to stat the user_dir %s\n",
+            user_dir);
+        failed = 1;
+        free(user_dir);
+        break;
+      }
+    } else if (secure_path(user_dir, user_detail->pw_uid, tasktracker_gid,
+        S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR | S_IXUSR | S_IRWXG)
+        != 0) {
+      // No setgid on files and setgid on dirs, 570
+      fprintf(LOGFILE, "Failed to secure the user_dir %s\n",
+          user_dir);
+      failed = 1;
+      free(user_dir);
+      break;
+    }
+
+    local_dir_ptr++;
+    free(user_dir);
+  }
+  free(local_dir);
+  free(full_local_dir_str);
+  cleanup();
+  if (failed) {
+    return INITIALIZE_USER_FAILED;
+  }
+  return 0;
+}
+
 /**
  * Function to prepare the job directories for the task JVM.
  * We do the following:
@@ -563,7 +700,7 @@ int initialize_job(const char *jobid, const char *user) {
   char **local_dir_ptr = local_dir;
   int failed = 0;
   while (*local_dir_ptr != NULL) {
-    job_dir = get_job_directory(*local_dir_ptr, jobid);
+    job_dir = get_job_directory(*local_dir_ptr, user, jobid);
     if (job_dir == NULL) {
       fprintf(LOGFILE, "Couldn't get job directory for %s.\n", jobid);
       failed = 1;
@@ -608,6 +745,7 @@ int initialize_job(const char *jobid, const char *user) {
               "job_work_dir %s doesn't exist. Not doing anything.\n",
               job_work_dir);
 #endif
+          free(job_work_dir);
         } else {
           // stat failed because of something else!
           fprintf(LOGFILE, "Failed to stat the job_work_dir %s\n",
@@ -640,6 +778,97 @@ int initialize_job(const char *jobid, const char *user) {
   return 0;
 }
 
+/**
+ * Function to initialize the distributed cache files of a user.
+ * It does the following:
+ *     *  sudo chown user:mapred -R taskTracker/$user/distcache
+ *     *  sudo chmod 2570 -R taskTracker/$user/distcache
+ * This is done once per every JVM launch. Tasks reusing JVMs just create
+ * symbolic links themselves and so there isn't anything specific to do in
+ * that case.
+ * Sometimes, it happens that a task uses the whole or part of a directory
+ * structure in taskTracker/$user/distcache. In this case, some paths are
+ * already set proper private permissions by this same function called during
+ * a previous JVM launch. In the current invocation, we only do the
+ * chown/chmod operation of files/directories that are newly created by the
+ * TaskTracker (i.e. those that still are not owned by user:mapred)
+ */
+int initialize_distributed_cache(const char *user) {
+
+  if (user == NULL) {
+    fprintf(LOGFILE, "user passed is null.\n");
+    return INVALID_ARGUMENT_NUMBER;
+  }
+
+  if (get_user_details(user) < 0) {
+    fprintf(LOGFILE, "Couldn't get the user details of %s", user);
+    return INVALID_USER_NAME;
+  }
+
+  gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
+
+  char **local_dir = (char **) get_values(TT_SYS_DIR_KEY);
+  if (local_dir == NULL) {
+    fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY);
+    cleanup();
+    return INVALID_TT_ROOT;
+  }
+
+  char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY);
+#ifdef DEBUG
+  fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY,
+      full_local_dir_str);
+#endif
+
+  char *distcache_dir;
+  char **local_dir_ptr = local_dir;
+  int failed = 0;
+  while (*local_dir_ptr != NULL) {
+    distcache_dir = get_distributed_cache_directory(*local_dir_ptr, user);
+    if (distcache_dir == NULL) {
+      fprintf(LOGFILE, "Couldn't get distcache directory for %s.\n", user);
+      failed = 1;
+      break;
+    }
+
+    struct stat filestat;
+    if (stat(distcache_dir, &filestat) != 0) {
+      if (errno == ENOENT) {
+#ifdef DEBUG
+        fprintf(LOGFILE, "distcache_dir %s doesn't exist. Not doing anything.\n",
+            distcache_dir);
+#endif
+      } else {
+        // stat failed because of something else!
+        fprintf(LOGFILE, "Failed to stat the distcache_dir %s\n",
+            distcache_dir);
+        failed = 1;
+        free(distcache_dir);
+        break;
+      }
+    } else if (secure_path(distcache_dir, user_detail->pw_uid,
+        tasktracker_gid, S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR
+            | S_IXUSR | S_IRWXG) != 0) {
+      // No setgid on files and setgid on dirs, 570
+      fprintf(LOGFILE, "Failed to secure the distcache_dir %s\n",
+          distcache_dir);
+      failed = 1;
+      free(distcache_dir);
+      break;
+    }
+
+    local_dir_ptr++;
+    free(distcache_dir);
+  }
+  free(local_dir);
+  free(full_local_dir_str);
+  cleanup();
+  if (failed) {
+    return INITIALIZE_DISTCACHE_FAILED;
+  }
+  return 0;
+}
+
 /**
  * Function used to initialize task. Prepares attempt_dir, jars_dir and
  * log_dir to be accessible by the child
@@ -719,7 +948,7 @@ int run_task_as_user(const char * user, const char *jobid, const char *taskid,
     goto cleanup;
   }
 
-  job_dir = get_job_directory(tt_root, jobid);
+  job_dir = get_job_directory(tt_root, user, jobid);
   if (job_dir == NULL) {
     fprintf(LOGFILE, "Couldn't obtain job_dir for %s in %s.\n", jobid, tt_root);
     exit_code = OUT_OF_MEMORY;

+ 13 - 1
src/c++/task-controller/task-controller.h

@@ -37,7 +37,9 @@
 
 //command definitions
 enum command {
+  INITIALIZE_USER,
   INITIALIZE_JOB,
+  INITIALIZE_DISTRIBUTEDCACHE,
   LAUNCH_TASK_JVM,
   INITIALIZE_TASK,
   TERMINATE_TASK_JVM,
@@ -63,9 +65,15 @@ enum errorcodes {
   PREPARE_TASK_LOGS_FAILED, //16
   INVALID_TT_LOG_DIR, //17
   OUT_OF_MEMORY, //18
+  INITIALIZE_DISTCACHE_FAILED, //19
+  INITIALIZE_USER_FAILED, //20
 };
 
-#define TT_JOB_DIR_PATTERN "%s/taskTracker/jobcache/%s"
+#define USER_DIR_PATTERN "%s/taskTracker/%s"
+
+#define TT_JOB_DIR_PATTERN USER_DIR_PATTERN"/jobcache/%s"
+
+#define USER_DISTRIBUTED_CACHE_DIR_PATTERN USER_DIR_PATTERN"/distcache"
 
 #define JOB_DIR_TO_JOB_WORK_PATTERN "%s/work"
 
@@ -91,10 +99,14 @@ extern FILE *LOGFILE;
 int run_task_as_user(const char * user, const char *jobid, const char *taskid,
     const char *tt_root);
 
+int initialize_user(const char *user);
+
 int initialize_task(const char *jobid, const char *taskid, const char *user);
 
 int initialize_job(const char *jobid, const char *user);
 
+int initialize_distributed_cache(const char *user);
+
 int kill_user_task(const char *user, const char *task_pid, int sig);
 
 int prepare_attempt_directory(const char *attempt_dir, const char *user);

+ 43 - 12
src/c++/task-controller/tests/test-task-controller.c

@@ -75,7 +75,7 @@ void test_check_variable_against_config() {
   }
 
   // Test the parsing of a multiple valued key from the config
-  char **values = (char **)get_values("mapred.local.dir");
+  char **values = (char **) get_values("mapred.local.dir");
   char **values_ptr = values;
   int i = 0;
   while (*values_ptr != NULL) {
@@ -111,11 +111,24 @@ void test_check_variable_against_config() {
   rmdir(hadoop_conf_dir);
 }
 
+void test_get_user_directory() {
+  char *user_dir = (char *) get_user_directory("/tmp", "user");
+  printf("user_dir obtained is %s\n", user_dir);
+  int ret = 0;
+  if (strcmp(user_dir, "/tmp/taskTracker/user") != 0) {
+    ret = -1;
+  }
+  free(user_dir);
+  assert(ret == 0);
+}
+
 void test_get_job_directory() {
-  char *job_dir = (char *) get_job_directory("/tmp", "job_200906101234_0001");
+  char *job_dir = (char *) get_job_directory("/tmp", "user",
+      "job_200906101234_0001");
   printf("job_dir obtained is %s\n", job_dir);
   int ret = 0;
-  if (strcmp(job_dir, "/tmp/taskTracker/jobcache/job_200906101234_0001") != 0) {
+  if (strcmp(job_dir, "/tmp/taskTracker/user/jobcache/job_200906101234_0001")
+      != 0) {
     ret = -1;
   }
   free(job_dir);
@@ -123,30 +136,34 @@ void test_get_job_directory() {
 }
 
 void test_get_attempt_directory() {
-  char *attempt_dir = (char *) get_attempt_directory(
-      "/tmp/taskTracker/jobcache/job_200906101234_0001",
-      "attempt_200906112028_0001_m_000000_0");
+  char *job_dir = (char *) get_job_directory("/tmp", "user",
+      "job_200906101234_0001");
+  printf("job_dir obtained is %s\n", job_dir);
+  char *attempt_dir = (char *) get_attempt_directory(job_dir,
+      "attempt_200906101234_0001_m_000000_0");
   printf("attempt_dir obtained is %s\n", attempt_dir);
   int ret = 0;
   if (strcmp(
       attempt_dir,
-      "/tmp/taskTracker/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0")
+      "/tmp/taskTracker/user/jobcache/job_200906101234_0001/attempt_200906101234_0001_m_000000_0")
       != 0) {
     ret = -1;
   }
+  free(job_dir);
   free(attempt_dir);
   assert(ret == 0);
 }
 
 void test_get_task_launcher_file() {
-  char *task_file = (char *) get_task_launcher_file(
-      "/tmp/taskTracker/jobcache/job_200906101234_0001",
+  char *job_dir = (char *) get_job_directory("/tmp", "user",
+      "job_200906101234_0001");
+  char *task_file = (char *) get_task_launcher_file(job_dir,
       "attempt_200906112028_0001_m_000000_0");
   printf("task_file obtained is %s\n", task_file);
   int ret = 0;
   if (strcmp(
       task_file,
-      "/tmp/taskTracker/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0/taskjvm.sh")
+      "/tmp/taskTracker/user/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0/taskjvm.sh")
       != 0) {
     ret = -1;
   }
@@ -168,13 +185,27 @@ void test_get_task_log_dir() {
 }
 
 int main(int argc, char **argv) {
-  printf("Starting tests\n");
+  printf("\nStarting tests\n");
   LOGFILE = stdout;
+
+  printf("\nTesting check_variable_against_config()\n");
   test_check_variable_against_config();
+
+  printf("\nTesting get_user_directory()\n");
+  test_get_user_directory();
+
+  printf("\nTesting get_job_directory()\n");
   test_get_job_directory();
+
+  printf("\nTesting get_attempt_directory()\n");
   test_get_attempt_directory();
+
+  printf("\nTesting get_task_launcher_file()\n");
   test_get_task_launcher_file();
+
+  printf("\nTesting get_task_log_dir()\n");
   test_get_task_log_dir();
-  printf("Finished tests\n");
+
+  printf("\nFinished tests\n");
   return 0;
 }

+ 5 - 4
src/docs/src/documentation/content/xdocs/cluster_setup.xml

@@ -528,10 +528,11 @@
             the tasks. For maximum security, this task controller 
             sets up restricted permissions and user/group ownership of
             local files and directories used by the tasks such as the
-            job jar files, intermediate files and task log files. Currently
-            permissions on distributed cache files are opened up to be
-            accessible by all users. In future, it is expected that stricter
-            file permissions are set for these files too.
+            job jar files, intermediate files, task log files and distributed
+            cache files. Particularly note that, because of this, except the
+            job owner and tasktracker, no other user can access any of the
+            local files/directories including those localized as part of the
+            distributed cache.
             </td>
             </tr>
             </table>

+ 15 - 4
src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java

@@ -33,6 +33,7 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -153,8 +154,18 @@ public class TaskDistributedCacheManager {
       FileStatus fileStatus = fileSystem.getFileStatus(new Path(uri.getPath()));
       String cacheId = this.distributedCacheManager.makeRelative(uri, taskConf);
       String cachePath = cacheSubdir + Path.SEPARATOR + cacheId;
-      Path localPath = lDirAlloc.getLocalPathForWrite(cachePath,
-                                fileStatus.getLen(), taskConf);
+
+      // Get the local path if the cacheFile is already localized or create one
+      // if it doesn't
+      Path localPath;
+      try {
+        localPath = lDirAlloc.getLocalPathToRead(cachePath, taskConf);
+      } catch (DiskErrorException de) {
+        localPath =
+            lDirAlloc.getLocalPathForWrite(cachePath, fileStatus.getLen(),
+                taskConf);
+      }
+
       String baseDir = localPath.toString().replace(cacheId, "");
       Path p = distributedCacheManager.getLocalCache(uri, taskConf,
           new Path(baseDir), fileStatus, 
@@ -221,7 +232,7 @@ public class TaskDistributedCacheManager {
    * Creates a class loader that includes the designated
    * files and archives.
    */
-  public ClassLoader makeClassLoader(final ClassLoader parent) 
+  public ClassLoader makeClassLoader(final ClassLoader parent)
       throws MalformedURLException {
     final URL[] urls = new URL[classPaths.size()];
     for (int i = 0; i < classPaths.size(); ++i) {
@@ -231,7 +242,7 @@ public class TaskDistributedCacheManager {
       @Override
       public ClassLoader run() {
         return new URLClassLoader(urls, parent);
-      }     
+      }
     });
   }
 }

+ 0 - 1
src/mapred/org/apache/hadoop/mapred/CleanupQueue.java

@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;

+ 15 - 1
src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java

@@ -36,8 +36,12 @@ import org.apache.commons.logging.LogFactory;
  * This class provides an implementation for launching and killing 
  * tasks that need to be run as the tasktracker itself. Hence,
  * many of the initializing or cleanup methods are not required here.
+ * 
+ * <br/>
+ * 
+ *  NOTE: This class is internal only class and not intended for users!!
  */
-class DefaultTaskController extends TaskController {
+public class DefaultTaskController extends TaskController {
 
   private static final Log LOG = 
       LogFactory.getLog(DefaultTaskController.class);
@@ -128,5 +132,15 @@ class DefaultTaskController extends TaskController {
       }
     }
   }
+
+  @Override
+  public void initializeDistributedCache(InitializationContext context) {
+    // Do nothing.
+  }
+
+  @Override
+  public void initializeUser(InitializationContext context) {
+    // Do nothing.
+  }
   
 }

+ 2 - 1
src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java

@@ -63,8 +63,9 @@ interface InterTrackerProtocol extends VersionedProtocol {
    * Version 25: JobIDs are passed in response to JobTracker restart 
    * Version 26: Added numRequiredSlots to TaskStatus for MAPREDUCE-516
    * Version 27: Adding node health status to TaskStatus for MAPREDUCE-211
+   * Version 28: Adding user name to the serialized Task for use by TT.
    */
-  public static final long versionID = 27L;
+  public static final long versionID = 28L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

+ 2 - 2
src/mapred/org/apache/hadoop/mapred/IsolationRunner.java

@@ -183,8 +183,8 @@ public class IsolationRunner {
     // where it is.
     Path localMetaSplit = 
         new LocalDirAllocator("mapred.local.dir").getLocalPathToRead(
-            TaskTracker.getLocalSplitFile(taskId.getJobID().toString(), taskId
-                .toString()), conf);
+            TaskTracker.getLocalSplitFile(conf.getUser(), taskId.getJobID()
+                .toString(), taskId.toString()), conf);
     DataInputStream splitFile = FileSystem.getLocal(conf).open(localMetaSplit);
     TaskSplitIndex splitIndex = new TaskSplitIndex();
     splitIndex.readFields(splitFile);

+ 12 - 4
src/mapred/org/apache/hadoop/mapred/JobClient.java

@@ -571,8 +571,8 @@ public class JobClient extends Configured implements MRConstants, Tool  {
      * set this user's id in job configuration, so later job files can be
      * accessed using this user's id
      */
-    UnixUserGroupInformation ugi = getUGI(job);
-      
+    setUGIAndUserGroupNames(job);
+
     //
     // Figure out what fs the JobTracker is using.  Copy the
     // job to it, under a temporary name.  This allows DFS to work,
@@ -665,15 +665,23 @@ public class JobClient extends Configured implements MRConstants, Tool  {
       LOG.warn("No job jar file set.  User classes may not be found. "+
                "See JobConf(Class) or JobConf#setJar(String).");
     }
+  }
 
-    // Set the user's name and working directory
+  /**
+   * Set the UGI, user name and the group name for the job.
+   * 
+   * @param job
+   * @throws IOException
+   */
+  void setUGIAndUserGroupNames(JobConf job)
+      throws IOException {
+    UnixUserGroupInformation ugi = getUGI(job);
     job.setUser(ugi.getUserName());
     if (ugi.getGroupNames().length > 0) {
       job.set("group.name", ugi.getGroupNames()[0]);
     }
   }
 
-
   private UnixUserGroupInformation getUGI(Configuration job) throws IOException {
     UnixUserGroupInformation ugi = null;
     try {

+ 9 - 4
src/mapred/org/apache/hadoop/mapred/JobConf.java

@@ -145,7 +145,12 @@ public class JobConf extends Configuration {
    * indicates that the options are turned off.
    */
   public static final long DISABLED_MEMORY_LIMIT = -1L;
-  
+
+  /**
+   * Property name for the configuration property mapred.local.dir
+   */
+  public static final String MAPRED_LOCAL_DIR_PROPERTY = "mapred.local.dir";
+
   /**
    * Name of the queue to which jobs will be submitted, if no queue
    * name is mentioned.
@@ -402,7 +407,7 @@ public class JobConf extends Configuration {
   }
 
   public String[] getLocalDirs() throws IOException {
-    return getStrings("mapred.local.dir");
+    return getStrings(MAPRED_LOCAL_DIR_PROPERTY);
   }
 
   public void deleteLocalFiles() throws IOException {
@@ -424,7 +429,7 @@ public class JobConf extends Configuration {
    * local directories.
    */
   public Path getLocalPath(String pathString) throws IOException {
-    return getLocalPath("mapred.local.dir", pathString);
+    return getLocalPath(MAPRED_LOCAL_DIR_PROPERTY, pathString);
   }
 
   /**
@@ -1562,7 +1567,7 @@ public class JobConf extends Configuration {
    * <p>
    * When a job starts, a shared directory is created at location
    * <code>
-   * ${mapred.local.dir}/taskTracker/jobcache/$jobid/work/ </code>.
+   * ${mapred.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>.
    * This directory is exposed to the users through 
    * <code>job.local.dir </code>.
    * So, the tasks can use this space 

+ 24 - 2
src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java

@@ -80,7 +80,9 @@ class LinuxTaskController extends TaskController {
    * List of commands that the setuid script will execute.
    */
   enum TaskCommands {
+    INITIALIZE_USER,
     INITIALIZE_JOB,
+    INITIALIZE_DISTRIBUTEDCACHE,
     LAUNCH_TASK_JVM,
     INITIALIZE_TASK,
     TERMINATE_TASK_JVM,
@@ -258,8 +260,10 @@ class LinuxTaskController extends TaskController {
     String taskId = context.task.getTaskID().toString();
     for (String dir : mapredLocalDirs) {
       File mapredDir = new File(dir);
-      File taskDir = new File(mapredDir, TaskTracker.getTaskWorkDir(
-          jobId, taskId, context.task.isTaskCleanupTask())).getParentFile();
+      File taskDir =
+          new File(mapredDir, TaskTracker.getTaskWorkDir(context.task
+              .getUser(), jobId, taskId, context.task.isTaskCleanupTask()))
+              .getParentFile();
       if (directory.equals(taskDir)) {
         return dir;
       }
@@ -270,6 +274,24 @@ class LinuxTaskController extends TaskController {
                 + directory.getAbsolutePath());
   }
 
+  @Override
+  public void initializeDistributedCache(InitializationContext context)
+      throws IOException {
+    LOG.debug("Going to initialize distributed cache for " + context.user
+        + " on the TT");
+    runCommand(TaskCommands.INITIALIZE_DISTRIBUTEDCACHE, context.user,
+        new ArrayList<String>(), context.workDir, null);
+  }
+
+  @Override
+  public void initializeUser(InitializationContext context)
+      throws IOException {
+    LOG.debug("Going to initialize user directories for " + context.user
+        + " on the TT");
+    runCommand(TaskCommands.INITIALIZE_USER, context.user,
+        new ArrayList<String>(), context.workDir, null);
+  }
+
   /**
    * Builds the command line for launching/terminating/killing task JVM.
    * Following is the format for launching/terminating/killing task JVM

+ 2 - 2
src/mapred/org/apache/hadoop/mapred/MapTask.java

@@ -118,8 +118,8 @@ class MapTask extends Task {
       // localize the split meta-information
       Path localSplitMeta =
         new LocalDirAllocator("mapred.local.dir").getLocalPathForWrite(
-            TaskTracker.getLocalSplitFile(getJobID().toString(), getTaskID()
-                .toString()), conf);
+            TaskTracker.getLocalSplitFile(conf.getUser(), getJobID()
+                .toString(), getTaskID().toString()), conf);
       LOG.debug("Writing local split to " + localSplitMeta);
       DataOutputStream out = FileSystem.getLocal(conf).create(localSplitMeta);
       splitMetaInfo.write(out);

+ 2 - 2
src/mapred/org/apache/hadoop/mapred/Task.java

@@ -120,7 +120,7 @@ abstract public class Task implements Writable, Configurable {
   ////////////////////////////////////////////
 
   private String jobFile;                         // job configuration file
-  private String user;
+  private String user;                            // user running the job
   private TaskAttemptID taskId;                   // unique, includes job id
   private int partition;                          // id within job
   TaskStatus taskStatus;                          // current status of the task
@@ -364,7 +364,7 @@ abstract public class Task implements Writable, Configurable {
   boolean isMapOrReduce() {
     return !jobSetup && !jobCleanup && !taskCleanup;
   }
-  
+
   /**
    * Get the name of the user running the job/task. TaskTracker needs task's
    * user name even before it's JobConf is localized. So we explicitly serialize

+ 41 - 33
src/mapred/org/apache/hadoop/mapred/TaskController.java

@@ -19,15 +19,13 @@ package org.apache.hadoop.mapred;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.mapred.TaskTracker.PermissionsHandler;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
@@ -38,9 +36,13 @@ import org.apache.hadoop.util.Shell.ShellCommandExecutor;
  * This class defines the API for initializing, finalizing and cleaning
  * up of tasks, as also the launching and killing task JVMs.
  * Subclasses of this class will implement the logic required for
- * performing the actual actions. 
+ * performing the actual actions.
+ * 
+ * <br/>
+ * 
+ * NOTE: This class is internal only class and not intended for users!!
  */
-abstract class TaskController implements Configurable {
+public abstract class TaskController implements Configurable {
   
   private Configuration conf;
   
@@ -78,29 +80,8 @@ abstract class TaskController implements Configurable {
         LOG.warn("Unable to create mapred-local directory : "
             + mapredlocalDir.getPath());
       } else {
-        PermissionsHandler.setPermissions(mapredlocalDir,
-            PermissionsHandler.sevenFiveFive);
-      }
-
-      // Set up the cache directory used for distributed cache files
-      File distributedCacheDir =
-          new File(localDir, TaskTracker.getDistributedCacheDir());
-      if (!distributedCacheDir.exists() && !distributedCacheDir.mkdirs()) {
-        LOG.warn("Unable to create cache directory : "
-            + distributedCacheDir.getPath());
-      } else {
-        PermissionsHandler.setPermissions(distributedCacheDir,
-            PermissionsHandler.sevenFiveFive);
-      }
-
-      // Set up the jobcache directory
-      File jobCacheDir = new File(localDir, TaskTracker.getJobCacheSubdir());
-      if (!jobCacheDir.exists() && !jobCacheDir.mkdirs()) {
-        LOG.warn("Unable to create job cache directory : "
-            + jobCacheDir.getPath());
-      } else {
-        PermissionsHandler.setPermissions(jobCacheDir,
-            PermissionsHandler.sevenFiveFive);
+        Localizer.PermissionsHandler.setPermissions(mapredlocalDir,
+            Localizer.PermissionsHandler.sevenFiveFive);
       }
     }
 
@@ -109,8 +90,8 @@ abstract class TaskController implements Configurable {
     if (!taskLog.exists() && !taskLog.mkdirs()) {
       LOG.warn("Unable to create taskLog directory : " + taskLog.getPath());
     } else {
-      PermissionsHandler.setPermissions(taskLog,
-          PermissionsHandler.sevenFiveFive);
+      Localizer.PermissionsHandler.setPermissions(taskLog,
+          Localizer.PermissionsHandler.sevenFiveFive);
     }
   }
 
@@ -123,6 +104,17 @@ abstract class TaskController implements Configurable {
    */
   abstract void initializeJob(JobInitializationContext context) throws IOException;
 
+  /**
+   * Take task-controller specific actions to initialize the distributed cache
+   * files. This involves setting appropriate permissions for these files so as
+   * to secure them to be accessible only their owners.
+   * 
+   * @param context
+   * @throws IOException
+   */
+  public abstract void initializeDistributedCache(InitializationContext context)
+      throws IOException;
+
   /**
    * Launch a task JVM
    * 
@@ -194,10 +186,17 @@ abstract class TaskController implements Configurable {
     long sleeptimeBeforeSigkill; // waiting time before sending SIGKILL to task JVM after sending SIGTERM
   }
 
-  static class JobInitializationContext {
+  /**
+   * NOTE: This class is internal only class and not intended for users!!
+   * 
+   */
+  public static class InitializationContext {
+    public File workDir;
+    public String user;
+  }
+
+  static class JobInitializationContext extends InitializationContext {
     JobID jobid;
-    File workDir;
-    String user;
   }
 
   /**
@@ -214,4 +213,13 @@ abstract class TaskController implements Configurable {
    * @param context task context
    */
   abstract void killTask(TaskControllerContext context);
+
+  /**
+   * Initialize user on this TaskTracer in a TaskController specific manner.
+   * 
+   * @param context
+   * @throws IOException
+   */
+  public abstract void initializeUser(InitializationContext context)
+      throws IOException;
 }

+ 20 - 12
src/mapred/org/apache/hadoop/mapred/TaskRunner.java

@@ -35,13 +35,14 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.TaskTracker.PermissionsHandler;
+import org.apache.hadoop.mapred.TaskController.InitializationContext;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -161,13 +162,18 @@ abstract class TaskRunner extends Thread {
       taskDistributedCacheManager = tracker.getTrackerDistributedCacheManager()
                                     .newTaskDistributedCacheManager(conf);
       taskDistributedCacheManager.setup(lDirAlloc, workDir,
-                                        TaskTracker.getDistributedCacheDir());
+                          TaskTracker.getDistributedCacheDir(conf.getUser()));
       
       // Set up the child task's configuration. After this call, no localization
       // of files should happen in the TaskTracker's process space. Any changes to
       // the conf object after this will NOT be reflected to the child.
       setupChildTaskConfiguration(lDirAlloc);
       
+      InitializationContext context = new InitializationContext();
+      context.user = conf.getUser();
+      context.workDir = new File(conf.get(TaskTracker.JOB_LOCAL_DIR));
+      tracker.getTaskController().initializeDistributedCache(context);
+
       if (!prepare()) {
         return;
       }
@@ -262,8 +268,8 @@ abstract class TaskRunner extends Thread {
     if (!b) {
       LOG.warn("mkdirs failed. Ignoring");
     } else {
-      PermissionsHandler.setPermissions(logDir,
-          PermissionsHandler.sevenZeroZero);
+      Localizer.PermissionsHandler.setPermissions(logDir,
+          Localizer.PermissionsHandler.sevenZeroZero);
     }
     return logFiles;
   }
@@ -279,9 +285,9 @@ abstract class TaskRunner extends Thread {
       throws IOException {
 
     Path localTaskFile =
-        lDirAlloc.getLocalPathForWrite(TaskTracker.getTaskConfFile(t
-            .getJobID().toString(), t.getTaskID().toString(), t
-            .isTaskCleanupTask()), conf);
+        lDirAlloc.getLocalPathForWrite(TaskTracker.getTaskConfFile(
+            t.getUser(), t.getJobID().toString(), t.getTaskID().toString(), t
+                .isTaskCleanupTask()), conf);
 
     // write the child's task configuration file to the local disk
     writeLocalTaskFile(localTaskFile.toString(), conf);
@@ -569,16 +575,17 @@ abstract class TaskRunner extends Thread {
    * process space.
    */
   static void setupChildMapredLocalDirs(Task t, JobConf conf) {
-    String[] localDirs = conf.getStrings("mapred.local.dir");
+    String[] localDirs = conf.getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
     String jobId = t.getJobID().toString();
     String taskId = t.getTaskID().toString();
     boolean isCleanup = t.isTaskCleanupTask();
+    String user = t.getUser();
     StringBuffer childMapredLocalDir =
         new StringBuffer(localDirs[0] + Path.SEPARATOR
-            + TaskTracker.getLocalTaskDir(jobId, taskId, isCleanup));
+            + TaskTracker.getLocalTaskDir(user, jobId, taskId, isCleanup));
     for (int i = 1; i < localDirs.length; i++) {
       childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR
-          + TaskTracker.getLocalTaskDir(jobId, taskId, isCleanup));
+          + TaskTracker.getLocalTaskDir(user, jobId, taskId, isCleanup));
     }
     LOG.debug("mapred.local.dir for child : " + childMapredLocalDir);
     conf.set("mapred.local.dir", childMapredLocalDir.toString());
@@ -589,8 +596,9 @@ abstract class TaskRunner extends Thread {
       TaskAttemptID task, boolean isCleanup, JobConf conf) 
       throws IOException {
     Path workDir =
-        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task
-            .getJobID().toString(), task.toString(), isCleanup), conf);
+        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
+            conf.getUser(), task.getJobID().toString(), task.toString(),
+            isCleanup), conf);
 
     return new File(workDir.toString());
   }

+ 171 - 308
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -54,6 +54,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -184,7 +185,7 @@ public class TaskTracker
   
   // The filesystem where job files are stored
   FileSystem systemFS = null;
-  private FileSystem localFs = null;
+  FileSystem localFs = null;
   private final HttpServer server;
     
   volatile boolean shuttingDown = false;
@@ -222,8 +223,8 @@ public class TaskTracker
   //for serving map output to the other nodes
 
   static Random r = new Random();
-  static final String SUBDIR = "taskTracker";
-  private static final String DISTCACHEDIR = "distcache";
+  public static final String SUBDIR = "taskTracker";
+  static final String DISTCACHEDIR = "distcache";
   static final String JOBCACHE = "jobcache";
   static final String OUTPUT = "output";
   private static final String JARSDIR = "jars";
@@ -235,6 +236,7 @@ public class TaskTracker
 
   private JobConf fConf;
   private JobConf originalConf;
+  private Localizer localizer;
   private int maxMapSlots;
   private int maxReduceSlots;
   private int failures;
@@ -250,7 +252,7 @@ public class TaskTracker
   
   private MapEventsFetcherThread mapEventsFetcher;
   int workerThreads;
-  private CleanupQueue directoryCleanupThread;
+  CleanupQueue directoryCleanupThread;
   volatile JvmManager jvmManager;
   
   private TaskMemoryManagerThread taskMemoryManager;
@@ -347,7 +349,6 @@ public class TaskTracker
       shuffleMetricsRecord.update();
     }
   }
-  
 
   
   
@@ -395,7 +396,7 @@ public class TaskTracker
         }
       }, "taskCleanup");
 
-  TaskController getTaskController() {
+  public TaskController getTaskController() {
     return taskController;
   }
   
@@ -444,72 +445,75 @@ public class TaskTracker
     return TaskTracker.SUBDIR + Path.SEPARATOR + user;
   } 
 
-  static String getDistributedCacheDir() {
-    return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.DISTCACHEDIR;
+  Localizer getLocalizer() {
+    return localizer;
   }
 
-  static String getJobCacheSubdir() {
-    return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
+  void setLocalizer(Localizer l) {
+    localizer = l;
   }
 
-  public static String getLocalJobDir(String user, String jobid) {
-    return getUserDir(user) + Path.SEPARATOR + getJobCacheSubdir() 
-        + Path.SEPARATOR + jobid;
-  } 
+  public static String getDistributedCacheDir(String user) {
+    return getUserDir(user) + Path.SEPARATOR + TaskTracker.DISTCACHEDIR;
+  }
 
-  static String getLocalJobDir(String jobid) {
-    return getJobCacheSubdir() + Path.SEPARATOR + jobid;
+  public static String getJobCacheSubdir(String user) {
+    return getUserDir(user) + Path.SEPARATOR + TaskTracker.JOBCACHE;
   }
 
-  static String getLocalJobConfFile(String jobid) {
-    return getLocalJobDir(jobid) + Path.SEPARATOR + TaskTracker.JOBFILE;
+  public static String getLocalJobDir(String user, String jobid) {
+    return getJobCacheSubdir(user) + Path.SEPARATOR + jobid;
   }
 
-  static String getTaskConfFile(String jobid, String taskid,
-      boolean isCleanupAttempt) {
-    return getLocalTaskDir(jobid, taskid, isCleanupAttempt) + Path.SEPARATOR
-        + TaskTracker.JOBFILE;
+  static String getLocalJobConfFile(String user, String jobid) {
+    return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOBFILE;
   }
 
-  static String getJobJarsDir(String jobid) {
-    return getLocalJobDir(jobid) + Path.SEPARATOR + TaskTracker.JARSDIR;
+  static String getTaskConfFile(String user, String jobid, String taskid,
+      boolean isCleanupAttempt) {
+    return getLocalTaskDir(user, jobid, taskid, isCleanupAttempt)
+    + Path.SEPARATOR + TaskTracker.JOBFILE;
   }
 
-  static String getJobJarFile(String jobid) {
-    return getJobJarsDir(jobid) + Path.SEPARATOR + "job.jar";
+  static String getJobJarsDir(String user, String jobid) {
+    return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JARSDIR;
   }
 
-  static String getJobWorkDir(String jobid) {
-    return getLocalJobDir(jobid) + Path.SEPARATOR + MRConstants.WORKDIR;
+  static String getJobJarFile(String user, String jobid) {
+    return getJobJarsDir(user, jobid) + Path.SEPARATOR + "job.jar";
+  }
+  
+  static String getJobWorkDir(String user, String jobid) {
+    return getLocalJobDir(user, jobid) + Path.SEPARATOR + MRConstants.WORKDIR;
   }
 
-  static String getLocalSplitFile(String jobid, String taskid) {
-    return TaskTracker.getLocalTaskDir(jobid, taskid) + Path.SEPARATOR
+  static String getLocalSplitFile(String user, String jobid, String taskid) {
+    return TaskTracker.getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR
     + TaskTracker.LOCAL_SPLIT_FILE;
   }
 
-  static String getIntermediateOutputDir(String jobid, String taskid) {
-    return getLocalTaskDir(jobid, taskid) + Path.SEPARATOR
-        + TaskTracker.OUTPUT;
+  static String getIntermediateOutputDir(String user, String jobid,
+      String taskid) {
+    return getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR
+    + TaskTracker.OUTPUT;
   }
 
-  static String getLocalTaskDir(String jobid, String taskid) {
-    return getLocalTaskDir(jobid, taskid, false);
+  static String getLocalTaskDir(String user, String jobid, String taskid) {
+    return getLocalTaskDir(user, jobid, taskid, false);
   }
-
-  static String getLocalTaskDir(String jobid, String taskid,
+  
+  public static String getLocalTaskDir(String user, String jobid, String taskid,
       boolean isCleanupAttempt) {
-    String taskDir = getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
+    String taskDir = getLocalJobDir(user, jobid) + Path.SEPARATOR + taskid;
     if (isCleanupAttempt) {
       taskDir = taskDir + TASK_CLEANUP_SUFFIX;
     }
     return taskDir;
   }
-
-  static String getTaskWorkDir(String jobid, String taskid,
+  
+  static String getTaskWorkDir(String user, String jobid, String taskid,
       boolean isCleanupAttempt) {
-    String dir =
-      getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
+    String dir = getLocalJobDir(user, jobid) + Path.SEPARATOR + taskid;
     if (isCleanupAttempt) {
       dir = dir + TASK_CLEANUP_SUFFIX;
     }
@@ -677,7 +681,10 @@ public class TaskTracker
     
     //setup and create jobcache directory with appropriate permissions
     taskController.setup();
-    
+
+    // create a localizer instance
+    setLocalizer(new Localizer(localFs, fConf.getLocalDirs(), taskController));
+
     //Start up node health checker service.
     if (shouldStartHealthMonitor(this.fConf)) {
       startHealthMonitor(this.fConf);
@@ -888,8 +895,11 @@ public class TaskTracker
     Path localJarFile = null;
     Task t = tip.getTask();
     JobID jobId = t.getJobID();
-
     RunningJob rjob = addTaskToJob(jobId, tip);
+
+    // Initialize the user directories if needed.
+    getLocalizer().initializeUserDirs(t.getUser());
+
     synchronized (rjob) {
       if (!rjob.localized) {
         JobConf localJobConf = localizeJobFiles(t);
@@ -949,19 +959,19 @@ public class TaskTracker
 
     // Initialize the job directories first
     FileSystem localFs = FileSystem.getLocal(fConf);
-    initializeJobDirs(jobId, localFs, fConf.getStrings("mapred.local.dir"));
+    getLocalizer().initializeJobDirs(userName, jobId);
 
     // Download the job.xml for this job from the system FS
     Path localJobFile =
-        localizeJobConfFile(new Path(t.getJobFile()), userFs, jobId);
+        localizeJobConfFile(new Path(t.getJobFile()), userName, userFs, jobId);
 
     JobConf localJobConf = new JobConf(localJobFile);
 
     // create the 'job-work' directory: job-specific shared directory for use as
     // scratch space by all tasks of the same job running on this TaskTracker.
     Path workDir =
-        lDirAlloc.getLocalPathForWrite(getJobWorkDir(jobId.toString()),
-          fConf);
+        lDirAlloc.getLocalPathForWrite(getJobWorkDir(userName,
+            jobId.toString()), fConf);
     if (!localFs.mkdirs(workDir)) {
       throw new IOException("Mkdirs failed to create "
           + workDir.toString());
@@ -970,154 +980,11 @@ public class TaskTracker
     localJobConf.set(JOB_LOCAL_DIR, workDir.toUri().getPath());
 
     // Download the job.jar for this job from the system FS
-    localizeJobJarFile(jobId, userFs, localJobConf);
+    localizeJobJarFile(userName, jobId, userFs, localJobConf);
 
     return localJobConf;
   }
 
-  static class PermissionsHandler {
-    /**
-     * Permission information useful for setting permissions for a given path.
-     * Using this, one can set all possible combinations of permissions for the
-     * owner of the file. But permissions for the group and all others can only
-     * be set together, i.e. permissions for group cannot be set different from
-     * those for others and vice versa.
-     */
-    static class PermissionsInfo {
-      public boolean readPermissions;
-      public boolean writePermissions;
-      public boolean executablePermissions;
-      public boolean readPermsOwnerOnly;
-      public boolean writePermsOwnerOnly;
-      public boolean executePermsOwnerOnly;
-
-      /**
-       * Create a permissions-info object with the given attributes
-       *
-       * @param readPerms
-       * @param writePerms
-       * @param executePerms
-       * @param readOwnerOnly
-       * @param writeOwnerOnly
-       * @param executeOwnerOnly
-       */
-      public PermissionsInfo(boolean readPerms, boolean writePerms,
-          boolean executePerms, boolean readOwnerOnly, boolean writeOwnerOnly,
-          boolean executeOwnerOnly) {
-        readPermissions = readPerms;
-        writePermissions = writePerms;
-        executablePermissions = executePerms;
-        readPermsOwnerOnly = readOwnerOnly;
-        writePermsOwnerOnly = writeOwnerOnly;
-        executePermsOwnerOnly = executeOwnerOnly;
-      }
-    }
-
-    /**
-     * Set permission on the given file path using the specified permissions
-     * information. We use java api to set permission instead of spawning chmod
-     * processes. This saves a lot of time. Using this, one can set all possible
-     * combinations of permissions for the owner of the file. But permissions
-     * for the group and all others can only be set together, i.e. permissions
-     * for group cannot be set different from those for others and vice versa.
-     *
-     * This method should satisfy the needs of most of the applications. For
-     * those it doesn't, {@link FileUtil#chmod} can be used.
-     *
-     * @param f file path
-     * @param pInfo permissions information
-     * @return true if success, false otherwise
-     */
-    static boolean setPermissions(File f, PermissionsInfo pInfo) {
-      if (pInfo == null) {
-        LOG.debug(" PermissionsInfo is null, returning.");
-        return true;
-      }
-
-      LOG.debug("Setting permission for " + f.getAbsolutePath());
-
-      boolean ret = true;
-
-      // Clear all the flags
-      ret = f.setReadable(false, false) && ret;
-      ret = f.setWritable(false, false) && ret;
-      ret = f.setExecutable(false, false) && ret;
-
-      ret = f.setReadable(pInfo.readPermissions, pInfo.readPermsOwnerOnly);
-      LOG.debug("Readable status for " + f + " set to " + ret);
-      ret =
-        f.setWritable(pInfo.writePermissions, pInfo.writePermsOwnerOnly)
-        && ret;
-      LOG.debug("Writable status for " + f + " set to " + ret);
-      ret =
-        f.setExecutable(pInfo.executablePermissions,
-            pInfo.executePermsOwnerOnly)
-            && ret;
-
-      LOG.debug("Executable status for " + f + " set to " + ret);
-      return ret;
-    }
-
-    /**
-     * Permissions rwxr_xr_x
-     */
-    static PermissionsInfo sevenFiveFive =
-      new PermissionsInfo(true, true, true, false, true, false);
-    /**
-     * Completely private permissions
-     */
-    static PermissionsInfo sevenZeroZero =
-      new PermissionsInfo(true, true, true, true, true, true);
-  }
-
-  /**
-   * Prepare the job directories for a given job. To be called by the job
-   * localization code, only if the job is not already localized.
-   *
-   * <br>
-   * Here, we set 700 permissions on the job directories created on all disks.
-   * This we do so as to avoid any misuse by other users till the time
-   * {@link TaskController#initializeJob(JobInitializationContext)} is run at a
-   * later time to set proper private permissions on the job directories. <br>
-   *
-   * @param jobId
-   * @param fs
-   * @param localDirs
-   * @throws IOException
-   */
-  private static void initializeJobDirs(JobID jobId, FileSystem fs,
-      String[] localDirs)
-  throws IOException {
-    boolean initJobDirStatus = false;
-    String jobDirPath = getLocalJobDir(jobId.toString());
-    for (String localDir : localDirs) {
-      Path jobDir = new Path(localDir, jobDirPath);
-      if (fs.exists(jobDir)) {
-        // this will happen on a partial execution of localizeJob. Sometimes
-        // copying job.xml to the local disk succeeds but copying job.jar might
-        // throw out an exception. We should clean up and then try again.
-        fs.delete(jobDir, true);
-      }
-
-      boolean jobDirStatus = fs.mkdirs(jobDir);
-      if (!jobDirStatus) {
-        LOG.warn("Not able to create job directory " + jobDir.toString());
-      }
-
-      initJobDirStatus = initJobDirStatus || jobDirStatus;
-
-      // job-dir has to be private to the TT
-      PermissionsHandler.setPermissions(new File(jobDir.toUri().getPath()),
-          PermissionsHandler.sevenZeroZero);
-    }
-
-    if (!initJobDirStatus) {
-      throw new IOException("Not able to initialize job directories "
-          + "in any of the configured local directories for job "
-          + jobId.toString());
-    }
-  }
-
   /**
    * Download the job configuration file from the FS.
    *
@@ -1126,7 +993,7 @@ public class TaskTracker
    * @return the local file system path of the downloaded file.
    * @throws IOException
    */
-  private Path localizeJobConfFile(Path jobFile, FileSystem userFs, JobID jobId)
+  private Path localizeJobConfFile(Path jobFile, String user, FileSystem userFs, JobID jobId)
   throws IOException {
     // Get sizes of JobFile and JarFile
     // sizes are -1 if they are not present.
@@ -1139,8 +1006,8 @@ public class TaskTracker
       jobFileSize = -1;
     }
     Path localJobFile =
-      lDirAlloc.getLocalPathForWrite(getLocalJobConfFile(jobId.toString()),
-          jobFileSize, fConf);
+      lDirAlloc.getLocalPathForWrite(getLocalJobConfFile(user,
+          jobId.toString()), jobFileSize, fConf);
 
     // Download job.xml
     userFs.copyToLocalFile(jobFile, localJobFile);
@@ -1156,7 +1023,7 @@ public class TaskTracker
    * @param localJobConf
    * @throws IOException
    */
-  private void localizeJobJarFile(JobID jobId, FileSystem userFs,
+  private void localizeJobJarFile(String user, JobID jobId, FileSystem userFs,
       JobConf localJobConf)
   throws IOException {
     // copy Jar file to the local FS and unjar it.
@@ -1171,11 +1038,11 @@ public class TaskTracker
       } catch (FileNotFoundException fe) {
         jarFileSize = -1;
       }
-      // Here we check for and we check five times the size of jarFileSize
-      // to accommodate for unjarring the jar file in userfiles directory
+      // Here we check for five times the size of jarFileSize to accommodate for
+      // unjarring the jar file in the jars directory
       Path localJarFile =
-        lDirAlloc.getLocalPathForWrite(getJobJarFile(jobId.toString()),
-            5 * jarFileSize, fConf);
+        lDirAlloc.getLocalPathForWrite(
+            getJobJarFile(user, jobId.toString()), 5 * jarFileSize, fConf);
 
       //Download job.jar
       userFs.copyToLocalFile(jarFilePath, localJarFile);
@@ -1189,44 +1056,6 @@ public class TaskTracker
     }
   }
 
-  /**
-   * Create taskDirs on all the disks. Otherwise, in some cases, like when
-   * LinuxTaskController is in use, child might wish to balance load across
-   * disks but cannot itself create attempt directory because of the fact that
-   * job directory is writable only by the TT.
-   *
-   * @param jobId
-   * @param attemptId
-   * @param isCleanupAttempt
-   * @param fs
-   * @param localDirs
-   * @throws IOException
-   */
-  private static void initializeAttemptDirs(String jobId, String attemptId,
-      boolean isCleanupAttempt, FileSystem fs, String[] localDirs)
-  throws IOException {
-
-    boolean initStatus = false;
-    String attemptDirPath =
-      getLocalTaskDir(jobId, attemptId, isCleanupAttempt);
-
-    for (String localDir : localDirs) {
-      Path localAttemptDir = new Path(localDir, attemptDirPath);
-
-      boolean attemptDirStatus = fs.mkdirs(localAttemptDir);
-      if (!attemptDirStatus) {
-        LOG.warn("localAttemptDir " + localAttemptDir.toString()
-            + " couldn't be created.");
-      }
-      initStatus = initStatus || attemptDirStatus;
-    }
-
-    if (!initStatus) {
-      throw new IOException("Not able to initialize attempt directories "
-          + "in any of the configured local directories for the attempt "
-          + attemptId);
-    }
-  }
   private void launchTaskForJob(TaskInProgress tip, JobConf jobConf)
       throws IOException{
     synchronized (tip) {
@@ -1266,7 +1095,7 @@ public class TaskTracker
     }
     
     this.running = false;
-        
+
     // Clear local storage
     cleanupStorage();
         
@@ -1814,9 +1643,8 @@ public class TaskTracker
         }
         // Delete the job directory for this  
         // task if the job is done/failed
-        if (!rjob.keepJobFiles){
-          directoryCleanupThread.addToQueue(localFs, getLocalFiles(fConf, 
-            getLocalJobDir(rjob.getJobID().toString())));
+        if (!rjob.keepJobFiles) {
+          removeJobFiles(rjob.jobConf.getUser(), rjob.getJobID().toString());
         }
         // Remove this job 
         rjob.tasks.clear();
@@ -1829,7 +1657,18 @@ public class TaskTracker
     getJobTokenSecretManager().removeTokenForJob(jobId.toString());  
   }      
     
-    
+  /**
+   * This job's files are no longer needed on this TT, remove them.
+   *
+   * @param rjob
+   * @throws IOException
+   */
+  void removeJobFiles(String user, String jobId)
+  throws IOException {
+    directoryCleanupThread.addToQueue(localFs, getLocalFiles(fConf,
+        getLocalJobDir(user, jobId)));
+  }
+
   /**
    * Remove the tip and update all relevant state.
    * 
@@ -2282,14 +2121,14 @@ public class TaskTracker
       FileSystem localFs = FileSystem.getLocal(fConf);
 
       // create taskDirs on all the disks.
-      initializeAttemptDirs(task.getJobID().toString(), task.getTaskID()
-          .toString(), task.isTaskCleanupTask(), localFs, fConf
-          .getStrings("mapred.local.dir"));
+      getLocalizer().initializeAttemptDirs(task.getUser(),
+          task.getJobID().toString(), task.getTaskID().toString(),
+          task.isTaskCleanupTask());
 
       // create the working-directory of the task 
       Path cwd =
-          lDirAlloc.getLocalPathForWrite(getTaskWorkDir(task.getJobID()
-              .toString(), task.getTaskID().toString(), task
+          lDirAlloc.getLocalPathForWrite(getTaskWorkDir(task.getUser(), task
+              .getJobID().toString(), task.getTaskID().toString(), task
               .isTaskCleanupTask()), defaultJobConf);
       if (!localFs.mkdirs(cwd)) {
         throw new IOException("Mkdirs failed to create " 
@@ -2351,10 +2190,14 @@ public class TaskTracker
       return task;
     }
     
-    public TaskRunner getTaskRunner() {
+    TaskRunner getTaskRunner() {
       return runner;
     }
 
+    void setTaskRunner(TaskRunner rnr) {
+      this.runner = rnr;
+    }
+
     public synchronized void setJobConf(JobConf lconf){
       this.localJobConf = lconf;
       keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
@@ -2388,7 +2231,7 @@ public class TaskTracker
         if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
           this.taskStatus.setRunState(TaskStatus.State.RUNNING);
         }
-        this.runner = task.createRunner(TaskTracker.this, this);
+        setTaskRunner(task.createRunner(TaskTracker.this, this));
         this.runner.start();
         this.taskStatus.setStartTime(System.currentTimeMillis());
       } else {
@@ -2605,13 +2448,13 @@ public class TaskTracker
               }
               File workDir = null;
               try {
-                workDir = new File(lDirAlloc.getLocalPathToRead(
-                                     TaskTracker.getLocalTaskDir( 
-                                       task.getJobID().toString(), 
-                                       task.getTaskID().toString(),
-                                       task.isTaskCleanupTask())
-                                     + Path.SEPARATOR + MRConstants.WORKDIR,
-                                     localJobConf). toString());
+                workDir =
+                    new File(lDirAlloc.getLocalPathToRead(
+                        TaskTracker.getLocalTaskDir(task.getUser(), task
+                            .getJobID().toString(), task.getTaskID()
+                            .toString(), task.isTaskCleanupTask())
+                            + Path.SEPARATOR + MRConstants.WORKDIR,
+                        localJobConf).toString());
               } catch (IOException e) {
                 LOG.warn("Working Directory of the task " + task.getTaskID() +
                                 " doesnt exist. Caught exception " +
@@ -2889,50 +2732,60 @@ public class TaskTracker
         }
       }
       synchronized (this) {
+        // localJobConf could be null if localization has not happened
+        // then no cleanup will be required.
+        if (localJobConf == null) {
+          return;
+        }
         try {
-          // localJobConf could be null if localization has not happened
-          // then no cleanup will be required.
-          if (localJobConf == null) {
-            return;
-          }
-          String localTaskDir =
-              getLocalTaskDir(task.getJobID().toString(), taskId.toString(),
-                  task.isTaskCleanupTask());
-          String taskWorkDir =
-              getTaskWorkDir(task.getJobID().toString(), taskId.toString(),
-                  task.isTaskCleanupTask());
-          if (needCleanup) {
-            if (runner != null) {
-              //cleans up the output directory of the task (where map outputs 
-              //and reduce inputs get stored)
-              runner.close();
-            }
-
-            if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-              // No jvm reuse, remove everything
-              directoryCleanupThread.addToQueue(localFs,
-                  getLocalFiles(defaultJobConf,
-                  localTaskDir));
-            }  
-            else {
-              // Jvm reuse. We don't delete the workdir since some other task
-              // (running in the same JVM) might be using the dir. The JVM
-              // running the tasks would clean the workdir per a task in the
-              // task process itself.
-              directoryCleanupThread.addToQueue(localFs, getLocalFiles(
-                  defaultJobConf, localTaskDir + Path.SEPARATOR
-                      + TaskTracker.JOBFILE));
-            }
-          } else {
-            if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-              directoryCleanupThread.addToQueue(localFs,
-                  getLocalFiles(defaultJobConf,
-                  taskWorkDir));
-            }  
-          }
+          removeTaskFiles(needCleanup, taskId);
         } catch (Throwable ie) {
-          LOG.info("Error cleaning up task runner: " + 
-                   StringUtils.stringifyException(ie));
+          LOG.info("Error cleaning up task runner: "
+              + StringUtils.stringifyException(ie));
+        }
+      }
+    }
+
+    /**
+     * Some or all of the files from this task are no longer required. Remove
+     * them via CleanupQueue.
+     * 
+     * @param needCleanup
+     * @param taskId
+     * @throws IOException 
+     */
+    void removeTaskFiles(boolean needCleanup, TaskAttemptID taskId)
+        throws IOException {
+      if (needCleanup) {
+        if (runner != null) {
+          // cleans up the output directory of the task (where map outputs
+          // and reduce inputs get stored)
+          runner.close();
+        }
+
+        String localTaskDir =
+            getLocalTaskDir(task.getUser(), task.getJobID().toString(), taskId
+                .toString(), task.isTaskCleanupTask());
+        if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+          // No jvm reuse, remove everything
+          directoryCleanupThread.addToQueue(localFs, getLocalFiles(
+              defaultJobConf, localTaskDir));
+        } else {
+          // Jvm reuse. We don't delete the workdir since some other task
+          // (running in the same JVM) might be using the dir. The JVM
+          // running the tasks would clean the workdir per a task in the
+          // task process itself.
+          directoryCleanupThread.addToQueue(localFs, getLocalFiles(
+              defaultJobConf, localTaskDir + Path.SEPARATOR
+                  + TaskTracker.JOBFILE));
+        }
+      } else {
+        if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+          String taskWorkDir =
+              getTaskWorkDir(task.getUser(), task.getJobID().toString(),
+                  taskId.toString(), task.isTaskCleanupTask());
+          directoryCleanupThread.addToQueue(localFs, getLocalFiles(
+              defaultJobConf, taskWorkDir));
         }
       }
     }
@@ -3371,15 +3224,25 @@ public class TaskTracker
         FileSystem rfs = ((LocalFileSystem)
             context.getAttribute("local.file.system")).getRaw();
 
-        // Index file
-        Path indexFileName = lDirAlloc.getLocalPathToRead(
-            TaskTracker.getIntermediateOutputDir(jobId, mapId)
-            + "/file.out.index", conf);
-        
-        // Map-output file
-        Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
-            TaskTracker.getIntermediateOutputDir(jobId, mapId)
-            + "/file.out", conf);
+      String userName = null;
+      synchronized (tracker.runningJobs) {
+        RunningJob rjob = tracker.runningJobs.get(JobID.forName(jobId));
+        if (rjob == null) {
+          throw new IOException("Unknown job " + jobId + "!!");
+        }
+        userName = rjob.jobConf.getUser();
+      }
+      // Index file
+      Path indexFileName =
+          lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+              userName, jobId, mapId)
+              + "/file.out.index", conf);
+
+      // Map-output file
+      Path mapOutputFileName =
+          lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+              userName, jobId, mapId)
+              + "/file.out", conf);
 
         /**
          * Read the index file to get the information about where

+ 5 - 1
src/mapred/org/apache/hadoop/mapred/pipes/Application.java

@@ -89,7 +89,11 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
     }
 
     String executable = DistributedCache.getLocalCacheFiles(conf)[0].toString();
-    FileUtil.chmod(executable, "a+x");
+    if (!new File(executable).canExecute()) {
+      // LinuxTaskController sets +x permissions on all distcache files already.
+      // In case of DefaultTaskController, set permissions here.
+      FileUtil.chmod(executable, "u+x");
+    }
     cmd.add(executable);
     // wrap the command in a stdout/stderr capture
     TaskAttemptID taskid = TaskAttemptID.forName(conf.get("mapred.task.id"));

+ 361 - 0
src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java

@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.tasktracker;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.hadoop.mapred.TaskController.InitializationContext;
+
+/**
+ * 
+ * NOTE: This class is internal only and not intended for users!!
+ */
+public class Localizer {
+
+  static final Log LOG = LogFactory.getLog(Localizer.class);
+
+  private FileSystem fs;
+  private String[] localDirs;
+  private TaskController taskController;
+
+  /**
+   * Create a Localizer instance
+   * 
+   * @param fileSys
+   * @param lDirs
+   * @param tc
+   */
+  public Localizer(FileSystem fileSys, String[] lDirs, TaskController tc) {
+    fs = fileSys;
+    localDirs = lDirs;
+    taskController = tc;
+  }
+
+  /**
+   * NOTE: This class is internal only class and not intended for users!!
+   * 
+   */
+  public static class PermissionsHandler {
+    /**
+     * Permission information useful for setting permissions for a given path.
+     * Using this, one can set all possible combinations of permissions for the
+     * owner of the file. But permissions for the group and all others can only
+     * be set together, i.e. permissions for group cannot be set different from
+     * those for others and vice versa.
+     */
+    public static class PermissionsInfo {
+      public boolean readPermissions;
+      public boolean writePermissions;
+      public boolean executablePermissions;
+      public boolean readPermsOwnerOnly;
+      public boolean writePermsOwnerOnly;
+      public boolean executePermsOwnerOnly;
+
+      /**
+       * Create a permissions-info object with the given attributes
+       * 
+       * @param readPerms
+       * @param writePerms
+       * @param executePerms
+       * @param readOwnerOnly
+       * @param writeOwnerOnly
+       * @param executeOwnerOnly
+       */
+      public PermissionsInfo(boolean readPerms, boolean writePerms,
+          boolean executePerms, boolean readOwnerOnly, boolean writeOwnerOnly,
+          boolean executeOwnerOnly) {
+        readPermissions = readPerms;
+        writePermissions = writePerms;
+        executablePermissions = executePerms;
+        readPermsOwnerOnly = readOwnerOnly;
+        writePermsOwnerOnly = writeOwnerOnly;
+        executePermsOwnerOnly = executeOwnerOnly;
+      }
+    }
+
+    /**
+     * Set permission on the given file path using the specified permissions
+     * information. We use java api to set permission instead of spawning chmod
+     * processes. This saves a lot of time. Using this, one can set all possible
+     * combinations of permissions for the owner of the file. But permissions
+     * for the group and all others can only be set together, i.e. permissions
+     * for group cannot be set different from those for others and vice versa.
+     * 
+     * This method should satisfy the needs of most of the applications. For
+     * those it doesn't, {@link FileUtil#chmod} can be used.
+     * 
+     * @param f file path
+     * @param pInfo permissions information
+     * @return true if success, false otherwise
+     */
+    public static boolean setPermissions(File f, PermissionsInfo pInfo) {
+      if (pInfo == null) {
+        LOG.debug(" PermissionsInfo is null, returning.");
+        return true;
+      }
+
+      LOG.debug("Setting permission for " + f.getAbsolutePath());
+
+      boolean ret = true;
+
+      // Clear all the flags
+      ret = f.setReadable(false, false) && ret;
+      ret = f.setWritable(false, false) && ret;
+      ret = f.setExecutable(false, false) && ret;
+
+      ret = f.setReadable(pInfo.readPermissions, pInfo.readPermsOwnerOnly);
+      LOG.debug("Readable status for " + f + " set to " + ret);
+      ret =
+          f.setWritable(pInfo.writePermissions, pInfo.writePermsOwnerOnly)
+              && ret;
+      LOG.debug("Writable status for " + f + " set to " + ret);
+      ret =
+          f.setExecutable(pInfo.executablePermissions,
+              pInfo.executePermsOwnerOnly)
+              && ret;
+
+      LOG.debug("Executable status for " + f + " set to " + ret);
+      return ret;
+    }
+
+    /**
+     * Permissions rwxr_xr_x
+     */
+    public static final PermissionsInfo sevenFiveFive =
+        new PermissionsInfo(true, true, true, false, true, false);
+    /**
+     * Completely private permissions
+     */
+    public static final PermissionsInfo sevenZeroZero =
+        new PermissionsInfo(true, true, true, true, true, true);
+  }
+
+  // Data-structure for synchronizing localization of user directories.
+  private Map<String, AtomicBoolean> localizedUsers =
+      new HashMap<String, AtomicBoolean>();
+
+  /**
+   * Initialize the local directories for a particular user on this TT. This
+   * involves creation and setting permissions of the following directories
+   * <ul>
+   * <li>$mapred.local.dir/taskTracker/$user</li>
+   * <li>$mapred.local.dir/taskTracker/$user/jobcache</li>
+   * <li>$mapred.local.dir/taskTracker/$user/distcache</li>
+   * </ul>
+   * 
+   * @param user
+   * @throws IOException
+   */
+  public void initializeUserDirs(String user)
+      throws IOException {
+
+    if (user == null) {
+      // This shouldn't happen in general
+      throw new IOException(
+          "User is null. Cannot initialized user-directories.");
+    }
+
+    AtomicBoolean localizedUser;
+    synchronized (localizedUsers) {
+      if (!localizedUsers.containsKey(user)) {
+        localizedUsers.put(user, new AtomicBoolean(false));
+      }
+      localizedUser = localizedUsers.get(user);
+    }
+
+    synchronized (localizedUser) {
+
+      if (localizedUser.get()) {
+        // User-directories are already localized for his user.
+        LOG.info("User-directories for the user " + user
+            + " are already initialized on this TT. Not doing anything.");
+        return;
+      }
+
+      LOG.info("Initializing user " + user + " on this TT.");
+
+      boolean userDirStatus = false;
+      boolean jobCacheDirStatus = false;
+      boolean distributedCacheDirStatus = false;
+
+      for (String localDir : localDirs) {
+
+        Path userDir = new Path(localDir, TaskTracker.getUserDir(user));
+
+        // Set up the user-directory.
+        if (fs.exists(userDir) || fs.mkdirs(userDir)) {
+
+          // Set permissions on the user-directory
+          PermissionsHandler.setPermissions(
+              new File(userDir.toUri().getPath()),
+              PermissionsHandler.sevenZeroZero);
+          userDirStatus = true;
+
+          // Set up the jobcache directory
+          File jobCacheDir =
+              new File(localDir, TaskTracker.getJobCacheSubdir(user));
+          if (jobCacheDir.exists() || jobCacheDir.mkdirs()) {
+            // Set permissions on the jobcache-directory
+            PermissionsHandler.setPermissions(jobCacheDir,
+                PermissionsHandler.sevenZeroZero);
+            jobCacheDirStatus = true;
+          } else {
+            LOG.warn("Unable to create job cache directory : "
+                + jobCacheDir.getPath());
+          }
+
+          // Set up the cache directory used for distributed cache files
+          File distributedCacheDir =
+              new File(localDir, TaskTracker.getDistributedCacheDir(user));
+          if (distributedCacheDir.exists() || distributedCacheDir.mkdirs()) {
+            // Set permissions on the distcache-directory
+            PermissionsHandler.setPermissions(distributedCacheDir,
+                PermissionsHandler.sevenZeroZero);
+            distributedCacheDirStatus = true;
+          } else {
+            LOG.warn("Unable to create distributed-cache directory : "
+                + distributedCacheDir.getPath());
+          }
+        } else {
+          LOG.warn("Unable to create the user directory : " + userDir);
+        }
+      }
+
+      if (!userDirStatus) {
+        throw new IOException("Not able to initialize user directories "
+            + "in any of the configured local directories for user " + user);
+      }
+      if (!jobCacheDirStatus) {
+        throw new IOException("Not able to initialize job-cache directories "
+            + "in any of the configured local directories for user " + user);
+      }
+      if (!distributedCacheDirStatus) {
+        throw new IOException(
+            "Not able to initialize distributed-cache directories "
+                + "in any of the configured local directories for user "
+                + user);
+      }
+
+      // Now, run the task-controller specific code to initialize the
+      // user-directories.
+      InitializationContext context = new InitializationContext();
+      context.user = user;
+      context.workDir = null;
+      taskController.initializeUser(context);
+
+      // Localization of the user is done
+      localizedUser.set(true);
+    }
+  }
+
+  /**
+   * Prepare the job directories for a given job. To be called by the job
+   * localization code, only if the job is not already localized.
+   * 
+   * <br>
+   * Here, we set 700 permissions on the job directories created on all disks.
+   * This we do so as to avoid any misuse by other users till the time
+   * {@link TaskController#initializeJob(JobInitializationContext)} is run at a
+   * later time to set proper private permissions on the job directories. <br>
+   * 
+   * @param user
+   * @param jobId
+   * @throws IOException
+   */
+  public void initializeJobDirs(String user, JobID jobId)
+      throws IOException {
+    boolean initJobDirStatus = false;
+    String jobDirPath = TaskTracker.getLocalJobDir(user, jobId.toString());
+    for (String localDir : localDirs) {
+      Path jobDir = new Path(localDir, jobDirPath);
+      if (fs.exists(jobDir)) {
+        // this will happen on a partial execution of localizeJob. Sometimes
+        // copying job.xml to the local disk succeeds but copying job.jar might
+        // throw out an exception. We should clean up and then try again.
+        fs.delete(jobDir, true);
+      }
+
+      boolean jobDirStatus = fs.mkdirs(jobDir);
+      if (!jobDirStatus) {
+        LOG.warn("Not able to create job directory " + jobDir.toString());
+      }
+
+      initJobDirStatus = initJobDirStatus || jobDirStatus;
+
+      // job-dir has to be private to the TT
+      Localizer.PermissionsHandler.setPermissions(new File(jobDir.toUri()
+          .getPath()), Localizer.PermissionsHandler.sevenZeroZero);
+    }
+
+    if (!initJobDirStatus) {
+      throw new IOException("Not able to initialize job directories "
+          + "in any of the configured local directories for job "
+          + jobId.toString());
+    }
+  }
+
+  /**
+   * Create taskDirs on all the disks. Otherwise, in some cases, like when
+   * LinuxTaskController is in use, child might wish to balance load across
+   * disks but cannot itself create attempt directory because of the fact that
+   * job directory is writable only by the TT.
+   * 
+   * @param user
+   * @param jobId
+   * @param attemptId
+   * @param isCleanupAttempt
+   * @throws IOException
+   */
+  public void initializeAttemptDirs(String user, String jobId,
+      String attemptId, boolean isCleanupAttempt)
+      throws IOException {
+
+    boolean initStatus = false;
+    String attemptDirPath =
+        TaskTracker.getLocalTaskDir(user, jobId, attemptId, isCleanupAttempt);
+
+    for (String localDir : localDirs) {
+      Path localAttemptDir = new Path(localDir, attemptDirPath);
+
+      boolean attemptDirStatus = fs.mkdirs(localAttemptDir);
+      if (!attemptDirStatus) {
+        LOG.warn("localAttemptDir " + localAttemptDir.toString()
+            + " couldn't be created.");
+      }
+      initStatus = initStatus || attemptDirStatus;
+    }
+
+    if (!initStatus) {
+      throw new IOException("Not able to initialize attempt directories "
+          + "in any of the configured local directories for the attempt "
+          + attemptId.toString());
+    }
+  }
+}

+ 101 - 20
src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java

@@ -24,33 +24,72 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Random;
 
+import javax.security.auth.login.LoginException;
+
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.DefaultTaskController;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.hadoop.mapred.TaskController.InitializationContext;
+import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.filecache.TaskDistributedCacheManager;
+import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.security.UserGroupInformation;
 
 public class TestTrackerDistributedCacheManager extends TestCase {
-  private static final String TEST_LOCAL_DIR_PROP = "test.local.dir";
-  private static String TEST_CACHE_BASE_DIR =
-    new Path(System.getProperty("test.build.data","/tmp/cachebasedir"))
-    .toString().replace(' ', '+');
-  private static String TEST_ROOT_DIR =
-    System.getProperty("test.build.data", "/tmp/distributedcache");
+
+  protected String TEST_ROOT_DIR =
+      new File(System.getProperty("test.build.data", "/tmp"),
+          TestTrackerDistributedCacheManager.class.getSimpleName())
+          .getAbsolutePath();
+
+  protected File ROOT_MAPRED_LOCAL_DIR;
+  private static String TEST_CACHE_BASE_DIR;
+  protected int numLocalDirs = 6;
+
   private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
   private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K
-  private Configuration conf;
-  private Path firstCacheFile;
-  private Path secondCacheFile;
+  protected Configuration conf;
+  protected Path firstCacheFile;
+  protected Path secondCacheFile;
+
+  protected LocalDirAllocator localDirAllocator = 
+    new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
 
   @Override
   protected void setUp() throws IOException {
+
+    // Prepare the tests' root dir
+    File TEST_ROOT = new File(TEST_ROOT_DIR);
+    if (!TEST_ROOT.exists()) {
+      TEST_ROOT.mkdirs();
+    }
+
+    // Prepare the tests' mapred-local-dir
+    ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local");
+    ROOT_MAPRED_LOCAL_DIR.mkdirs();
+    String []localDirs = new String[numLocalDirs];
+    for (int i = 0; i < numLocalDirs; i++) {
+      localDirs[i] = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i).getPath();
+    }
+
+    TEST_CACHE_BASE_DIR =
+        new File(TEST_ROOT_DIR, "cachebasedir").getAbsolutePath();
+
     conf = new Configuration();
     conf.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
-    conf.set(TEST_LOCAL_DIR_PROP, TEST_ROOT_DIR);
+    conf.setStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY, localDirs);
     conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+
+    // Create the temporary cache files to be used in the tests.
     firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
     secondCacheFile = new Path(TEST_ROOT_DIR, "secondcachefile");
     createTempFile(firstCacheFile);
@@ -59,29 +98,43 @@ public class TestTrackerDistributedCacheManager extends TestCase {
 
   /**
    * This is the typical flow for using the DistributedCache classes.
+   * 
+   * @throws IOException
+   * @throws LoginException
    */
-  public void testManagerFlow() throws IOException {
-    TrackerDistributedCacheManager manager = 
-        new TrackerDistributedCacheManager(conf);
-    LocalDirAllocator localDirAllocator = 
-        new LocalDirAllocator(TEST_LOCAL_DIR_PROP);
+  public void testManagerFlow() throws IOException, LoginException {
 
+    // ****** Imitate JobClient code
     // Configures a task/job with both a regular file and a "classpath" file.
     Configuration subConf = new Configuration(conf);
     DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
     DistributedCache.addFileToClassPath(secondCacheFile, subConf);
     TrackerDistributedCacheManager.determineTimestamps(subConf);
+    // ****** End of imitating JobClient code
 
     Path jobFile = new Path(TEST_ROOT_DIR, "job.xml");
     FileOutputStream os = new FileOutputStream(new File(jobFile.toString()));
     subConf.writeXml(os);
     os.close();
 
+    String userName = getJobOwnerName();
+
+    // ****** Imitate TaskRunner code.
+    TrackerDistributedCacheManager manager = 
+      new TrackerDistributedCacheManager(conf);
     TaskDistributedCacheManager handle =
       manager.newTaskDistributedCacheManager(subConf);
     assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
-    handle.setup(localDirAllocator, 
-        new File(new Path(TEST_ROOT_DIR, "workdir").toString()), "distcache");
+    File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
+    handle.setup(localDirAllocator, workDir, TaskTracker
+        .getDistributedCacheDir(userName));
+
+    InitializationContext context = new InitializationContext();
+    context.user = userName;
+    context.workDir = workDir;
+    getTaskController().initializeDistributedCache(context);
+    // ****** End of imitating TaskRunner code
+
     Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
     assertNotNull(null, localCacheFiles);
     assertEquals(2, localCacheFiles.length);
@@ -94,12 +147,39 @@ public class TestTrackerDistributedCacheManager extends TestCase {
     assertEquals(1, handle.getClassPaths().size());
     assertEquals(cachedSecondFile.toString(), handle.getClassPaths().get(0));
 
+    checkFilePermissions(localCacheFiles);
+
     // Cleanup
     handle.release();
     manager.purgeCache();
     assertFalse(pathToFile(cachedFirstFile).exists());
   }
 
+  /**
+   * Check proper permissions on the cache files
+   * 
+   * @param localCacheFiles
+   * @throws IOException
+   */
+  protected void checkFilePermissions(Path[] localCacheFiles)
+      throws IOException {
+    Path cachedFirstFile = localCacheFiles[0];
+    Path cachedSecondFile = localCacheFiles[1];
+    // Both the files should have executable permissions on them.
+    assertTrue("First cache file is not executable!", new File(cachedFirstFile
+        .toUri().getPath()).canExecute());
+    assertTrue("Second cache file is not executable!", new File(
+        cachedSecondFile.toUri().getPath()).canExecute());
+  }
+
+  protected TaskController getTaskController() {
+    return new DefaultTaskController();
+  }
+
+  protected String getJobOwnerName() throws LoginException {
+    UserGroupInformation ugi = UserGroupInformation.login(conf);
+    return ugi.getUserName();
+  }
 
   /** test delete cache */
   public void testDeleteCache() throws Exception {
@@ -122,7 +202,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
         new Path(TEST_CACHE_BASE_DIR));
     assertTrue("DistributedCache failed deleting old" + 
         " cache when the cache store is full.",
-        dirStatuses.length > 1);
+        dirStatuses.length == 1);
   }
   
   public void testFileSystemOtherThanDefault() throws Exception {
@@ -152,15 +232,16 @@ public class TestTrackerDistributedCacheManager extends TestCase {
   protected void tearDown() throws IOException {
     new File(firstCacheFile.toString()).delete();
     new File(secondCacheFile.toString()).delete();
+    FileUtil.fullyDelete(new File(TEST_ROOT_DIR));
   }
 
-  private void assertFileLengthEquals(Path a, Path b) 
+  protected void assertFileLengthEquals(Path a, Path b) 
       throws FileNotFoundException {
     assertEquals("File sizes mismatch.", 
        pathToFile(a).length(), pathToFile(b).length());
   }
 
-  private File pathToFile(Path p) {
+  protected File pathToFile(Path p) {
     return new File(p.toString());
   }
 }

+ 11 - 4
src/test/org/apache/hadoop/mapred/TestIsolationRunner.java

@@ -22,6 +22,8 @@ import java.io.File;
 import java.io.IOException;
 import java.util.UUID;
 
+import javax.security.auth.login.LoginException;
+
 import junit.framework.TestCase;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -32,6 +34,7 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.security.UserGroupInformation;
 
 /** 
  * Re-runs a map task using the IsolationRunner. 
@@ -99,15 +102,19 @@ public class TestIsolationRunner extends TestCase {
   }
   
   private Path getAttemptJobXml(JobConf conf, JobID jobId, boolean isMap)
-      throws IOException {
+      throws IOException, LoginException {
     String taskid =
         new TaskAttemptID(new TaskID(jobId, isMap, 0), 0).toString();
     return new LocalDirAllocator("mapred.local.dir").getLocalPathToRead(
-        TaskTracker.getTaskConfFile(jobId.toString(), taskid, false), conf);
+        TaskTracker.getTaskConfFile(UserGroupInformation.login(conf)
+            .getUserName(), jobId.toString(), taskid, false), conf);
   }
 
-  public void testIsolationRunOfMapTask() throws 
-      IOException, InterruptedException, ClassNotFoundException {
+  public void testIsolationRunOfMapTask()
+      throws IOException,
+      InterruptedException,
+      ClassNotFoundException,
+      LoginException {
     MiniMRCluster mr = null;
     try {
       mr = new MiniMRCluster(1, "file:///", 4);

+ 119 - 95
src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java

@@ -22,16 +22,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import javax.security.auth.login.LoginException;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
-import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
-import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 
 /**
  * Test to verify localization of a job and localization of a task on a
@@ -45,7 +40,6 @@ public class TestLocalizationWithLinuxTaskController extends
       LogFactory.getLog(TestLocalizationWithLinuxTaskController.class);
 
   private File configFile;
-  private MyLinuxTaskController taskController;
 
   private static String taskTrackerSpecialGroup;
 
@@ -66,10 +60,24 @@ public class TestLocalizationWithLinuxTaskController extends
         ClusterWithLinuxTaskController.createTaskControllerConf(path,
             localDirs);
     String execPath = path + "/task-controller";
-    taskController.setTaskControllerExe(execPath);
+    ((MyLinuxTaskController) taskController).setTaskControllerExe(execPath);
     taskTrackerSpecialGroup = getFilePermissionAttrs(execPath)[2];
     taskController.setConf(trackerFConf);
     taskController.setup();
+
+    tracker.setLocalizer(new Localizer(tracker.localFs, localDirs,
+        taskController));
+
+    // Rewrite conf so as to reflect task's correct user name.
+    String ugi =
+        System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
+    JobConf jobConf = new JobConf(task.getConf());
+    jobConf.setUser(ugi.split(",")[0]);
+    File jobConfFile = uploadJobConf(jobConf);
+    // Create the task again to change the job-user
+    task =
+      new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1);
+    task.setConf(jobConf);
   }
 
   @Override
@@ -91,75 +99,114 @@ public class TestLocalizationWithLinuxTaskController extends
   }
 
   /**
-   * Test job localization with {@link LinuxTaskController}. Also check the
-   * permissions and file ownership of the job related files.
+   * Test the localization of a user on the TT when {@link LinuxTaskController}
+   * is in use.
    */
   @Override
-  public void testJobLocalization()
-      throws IOException,
-      LoginException {
+  public void testUserLocalization()
+      throws IOException {
 
     if (!ClusterWithLinuxTaskController.shouldRun()) {
       return;
     }
 
-    // Do job localization
-    JobConf localizedJobConf = tracker.localizeJobFiles(task);
+    super.testJobLocalization();
+  }
 
-    String ugi =
-        System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
-    localizedJobConf.setUser(ugi.split(",")[0]);
+  @Override
+  protected void checkUserLocalization()
+      throws IOException {
+    // Check the directory structure and permissions
+    for (String dir : localDirs) {
+
+      File localDir = new File(dir);
+      assertTrue("mapred.local.dir " + localDir + " isn'task created!",
+          localDir.exists());
+
+      File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR);
+      assertTrue("taskTracker sub-dir in the local-dir " + localDir
+          + "is not created!", taskTrackerSubDir.exists());
+
+      File userDir = new File(taskTrackerSubDir, task.getUser());
+      assertTrue("user-dir in taskTrackerSubdir " + taskTrackerSubDir
+          + "is not created!", userDir.exists());
+      checkFilePermissions(userDir.getAbsolutePath(), "dr-xrws---", task
+          .getUser(), taskTrackerSpecialGroup);
+
+      File jobCache = new File(userDir, TaskTracker.JOBCACHE);
+      assertTrue("jobcache in the userDir " + userDir + " isn't created!",
+          jobCache.exists());
+      checkFilePermissions(jobCache.getAbsolutePath(), "dr-xrws---", task
+          .getUser(), taskTrackerSpecialGroup);
+
+      // Verify the distributed cache dir.
+      File distributedCacheDir =
+          new File(localDir, TaskTracker
+              .getDistributedCacheDir(task.getUser()));
+      assertTrue("distributed cache dir " + distributedCacheDir
+          + " doesn't exists!", distributedCacheDir.exists());
+      checkFilePermissions(distributedCacheDir.getAbsolutePath(),
+          "dr-xrws---", task.getUser(), taskTrackerSpecialGroup);
+    }
+  }
+
+  /**
+   * Test job localization with {@link LinuxTaskController}. Also check the
+   * permissions and file ownership of the job related files.
+   */
+  @Override
+  public void testJobLocalization()
+      throws IOException {
 
-    // Now initialize the job via task-controller so as to set
-    // ownership/permissions of jars, job-work-dir
-    JobInitializationContext context = new JobInitializationContext();
-    context.jobid = jobId;
-    context.user = localizedJobConf.getUser();
-    context.workDir =
-        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
+    if (!ClusterWithLinuxTaskController.shouldRun()) {
+      return;
+    }
 
-    // /////////// The method being tested
-    taskController.initializeJob(context);
-    // ///////////
+    super.testJobLocalization();
+  }
 
+  @Override
+  protected void checkJobLocalization()
+      throws IOException {
     for (String localDir : trackerFConf.getStrings("mapred.local.dir")) {
       File jobDir =
-          new File(localDir, TaskTracker.getLocalJobDir(jobId.toString()));
+          new File(localDir, TaskTracker.getLocalJobDir(task.getUser(), jobId
+              .toString()));
       // check the private permissions on the job directory
-      checkFilePermissions(jobDir.getAbsolutePath(), "dr-xrws---",
-          localizedJobConf.getUser(), taskTrackerSpecialGroup);
+      checkFilePermissions(jobDir.getAbsolutePath(), "dr-xrws---", task
+          .getUser(), taskTrackerSpecialGroup);
     }
 
     // check the private permissions of various directories
     List<Path> dirs = new ArrayList<Path>();
     Path jarsDir =
-        lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarsDir(jobId
-            .toString()), trackerFConf);
+        lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarsDir(task.getUser(),
+            jobId.toString()), trackerFConf);
     dirs.add(jarsDir);
     dirs.add(new Path(jarsDir, "lib"));
     for (Path dir : dirs) {
       checkFilePermissions(dir.toUri().getPath(), "dr-xrws---",
-          localizedJobConf.getUser(), taskTrackerSpecialGroup);
+          task.getUser(), taskTrackerSpecialGroup);
     }
 
     // job-work dir needs user writable permissions
     Path jobWorkDir =
-        lDirAlloc.getLocalPathToRead(TaskTracker.getJobWorkDir(jobId
-            .toString()), trackerFConf);
-    checkFilePermissions(jobWorkDir.toUri().getPath(), "drwxrws---",
-        localizedJobConf.getUser(), taskTrackerSpecialGroup);
+        lDirAlloc.getLocalPathToRead(TaskTracker.getJobWorkDir(task.getUser(),
+            jobId.toString()), trackerFConf);
+    checkFilePermissions(jobWorkDir.toUri().getPath(), "drwxrws---", task
+        .getUser(), taskTrackerSpecialGroup);
 
     // check the private permissions of various files
     List<Path> files = new ArrayList<Path>();
-    files.add(lDirAlloc.getLocalPathToRead(TaskTracker
-        .getLocalJobConfFile(jobId.toString()), trackerFConf));
-    files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(jobId
-        .toString()), trackerFConf));
+    files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalJobConfFile(
+        task.getUser(), jobId.toString()), trackerFConf));
+    files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(task
+        .getUser(), jobId.toString()), trackerFConf));
     files.add(new Path(jarsDir, "lib" + Path.SEPARATOR + "lib1.jar"));
     files.add(new Path(jarsDir, "lib" + Path.SEPARATOR + "lib2.jar"));
     for (Path file : files) {
-      checkFilePermissions(file.toUri().getPath(), "-r-xrwx---",
-          localizedJobConf.getUser(), taskTrackerSpecialGroup);
+      checkFilePermissions(file.toUri().getPath(), "-r-xrwx---", task
+          .getUser(), taskTrackerSpecialGroup);
     }
   }
 
@@ -169,73 +216,50 @@ public class TestLocalizationWithLinuxTaskController extends
    */
   @Override
   public void testTaskLocalization()
-      throws IOException,
-      LoginException {
+      throws IOException {
 
     if (!ClusterWithLinuxTaskController.shouldRun()) {
       return;
     }
 
-    JobConf localizedJobConf = tracker.localizeJobFiles(task);
-    String ugi =
-        System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
-    localizedJobConf.setUser(ugi.split(",")[0]);
-
-    // Now initialize the job via task-controller so as to set
-    // ownership/permissions of jars, job-work-dir
-    JobInitializationContext jobContext = new JobInitializationContext();
-    jobContext.jobid = jobId;
-    jobContext.user = localizedJobConf.getUser();
-    jobContext.workDir =
-        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
-    taskController.initializeJob(jobContext);
-
-    TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
-    tip.setJobConf(localizedJobConf);
-
-    // localize the task.
-    tip.localizeTask(task);
-    TaskRunner runner = task.createRunner(tracker, tip);
-    runner.setupChildTaskConfiguration(lDirAlloc);
-    Path workDir =
-        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task
-            .getJobID().toString(), task.getTaskID().toString(), task
-            .isTaskCleanupTask()), trackerFConf);
-    TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
-        localizedJobConf);
-    File[] logFiles = TaskRunner.prepareLogFiles(task.getTaskID());
-
-    // Initialize task
-    TaskControllerContext taskContext =
-        new TaskController.TaskControllerContext();
-    taskContext.env =
-        new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
-            .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
-    taskContext.task = task;
-    // /////////// The method being tested
-    taskController.initializeTask(taskContext);
-    // ///////////
+    super.testTaskLocalization();
+  }
 
+  @Override
+  protected void checkTaskLocalization()
+      throws IOException {
     // check the private permissions of various directories
     List<Path> dirs = new ArrayList<Path>();
-    dirs.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalTaskDir(jobId
-        .toString(), taskId.toString()), trackerFConf));
-    dirs.add(workDir);
-    dirs.add(new Path(workDir, "tmp"));
-    dirs.add(new Path(logFiles[1].getParentFile().getAbsolutePath()));
+    dirs.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalTaskDir(task
+        .getUser(), jobId.toString(), taskId.toString()), trackerFConf));
+    dirs.add(attemptWorkDir);
+    dirs.add(new Path(attemptWorkDir, "tmp"));
+    dirs.add(new Path(attemptLogFiles[1].getParentFile().getAbsolutePath()));
     for (Path dir : dirs) {
       checkFilePermissions(dir.toUri().getPath(), "drwxrws---",
-          localizedJobConf.getUser(), taskTrackerSpecialGroup);
+          task.getUser(), taskTrackerSpecialGroup);
     }
 
     // check the private permissions of various files
     List<Path> files = new ArrayList<Path>();
     files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
-        .getJobID().toString(), task.getTaskID().toString(), task
-        .isTaskCleanupTask()), trackerFConf));
+        .getUser(), task.getJobID().toString(), task.getTaskID().toString(),
+        task.isTaskCleanupTask()), trackerFConf));
     for (Path file : files) {
-      checkFilePermissions(file.toUri().getPath(), "-rwxrwx---",
-          localizedJobConf.getUser(), taskTrackerSpecialGroup);
+      checkFilePermissions(file.toUri().getPath(), "-rwxrwx---", task
+          .getUser(), taskTrackerSpecialGroup);
+    }
+  }
+
+  /**
+   * Test cleanup of task files with {@link LinuxTaskController}.
+   */
+  @Override
+  public void testTaskCleanup()
+      throws IOException {
+    if (!ClusterWithLinuxTaskController.shouldRun()) {
+      return;
     }
+    super.testTaskCleanup();
   }
 }

+ 117 - 41
src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java

@@ -27,6 +27,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import javax.security.auth.login.LoginException;
+
 import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
@@ -39,6 +41,8 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * A JUnit test to test Mini Map-Reduce Cluster with Mini-DFS.
@@ -118,55 +122,108 @@ public class TestMiniMRWithDFS extends TestCase {
     }
     return result.toString();
   }
-  
+
   /**
    * Make sure that there are exactly the directories that we expect to find.
+   * 
+   * <br/>
+   * <br/>
+   * 
+   * For e.g., if we want to check the existence of *only* the directories for
+   * user1's tasks job1-attempt1, job1-attempt2, job2-attempt1, we pass user1 as
+   * user, {job1, job1, job2, job3} as jobIds and {attempt1, attempt2, attempt1,
+   * attempt3} as taskDirs.
+   * 
    * @param mr the map-reduce cluster
+   * @param user the name of the job-owner
+   * @param jobIds the list of jobs
    * @param taskDirs the task ids that should be present
    */
-  static void checkTaskDirectories(MiniMRCluster mr,
-                                           String[] jobIds,
-                                           String[] taskDirs) {
+  static void checkTaskDirectories(MiniMRCluster mr, String user,
+      String[] jobIds, String[] taskDirs) {
+
     mr.waitUntilIdle();
     int trackers = mr.getNumTaskTrackers();
-    List<String> neededDirs = new ArrayList<String>(Arrays.asList(taskDirs));
-    boolean[] found = new boolean[taskDirs.length];
-    for(int i=0; i < trackers; ++i) {
-      int numNotDel = 0;
+
+    List<String> observedJobDirs = new ArrayList<String>();
+    List<String> observedFilesInsideJobDir = new ArrayList<String>();
+
+    for (int i = 0; i < trackers; ++i) {
+
+      // Verify that mapred-local-dir and it's direct contents are valid
       File localDir = new File(mr.getTaskTrackerLocalDir(i));
-      LOG.debug("Tracker directory: " + localDir);
-      File trackerDir = new File(localDir, TaskTracker.SUBDIR);
-      assertTrue("local dir " + localDir + " does not exist.", 
-                 localDir.isDirectory());
-      assertTrue("task tracker dir " + trackerDir + " does not exist.", 
-                 trackerDir.isDirectory());
-      String contents[] = localDir.list();
-      String trackerContents[] = trackerDir.list();
-      for(int j=0; j < contents.length; ++j) {
-        System.out.println("Local " + localDir + ": " + contents[j]);
-      }
-      for(int j=0; j < trackerContents.length; ++j) {
-        System.out.println("Local jobcache " + trackerDir + ": " + trackerContents[j]);
-      }
-      for(int fileIdx = 0; fileIdx < contents.length; ++fileIdx) {
-        String name = contents[fileIdx];
-        if (!(TaskTracker.SUBDIR.equals(contents[fileIdx]))) {
-          LOG.debug("Looking at " + name);
-          assertTrue("Spurious directory " + name + " found in " +
-                     localDir, false);
+      assertTrue("Local dir " + localDir + " does not exist.", localDir
+          .isDirectory());
+      LOG.info("Verifying contents of mapred.local.dir "
+          + localDir.getAbsolutePath());
+
+      // Verify contents(user-dir) of tracker-sub-dir
+      File trackerSubDir = new File(localDir, TaskTracker.SUBDIR);
+      if (trackerSubDir.isDirectory()) {
+
+        // Verify contents of user-dir and populate the job-dirs/attempt-dirs
+        // lists
+        File userDir = new File(trackerSubDir, user);
+        if (userDir.isDirectory()) {
+          LOG.info("Verifying contents of user-dir "
+              + userDir.getAbsolutePath());
+          verifyContents(new String[] { TaskTracker.JOBCACHE,
+              TaskTracker.DISTCACHEDIR }, userDir.list());
+
+          File jobCacheDir =
+              new File(localDir, TaskTracker.getJobCacheSubdir(user));
+          String[] jobDirs = jobCacheDir.list();
+          observedJobDirs.addAll(Arrays.asList(jobDirs));
+
+          for (String jobDir : jobDirs) {
+            String[] attemptDirs = new File(jobCacheDir, jobDir).list();
+            observedFilesInsideJobDir.addAll(Arrays.asList(attemptDirs));
+          }
         }
       }
-      for (int idx = 0; idx < neededDirs.size(); ++idx) {
-        String name = neededDirs.get(idx);
-        if (new File(new File(new File(trackerDir, TaskTracker.JOBCACHE),
-                              jobIds[idx]), name).isDirectory()) {
-          found[idx] = true;
-          numNotDel++;
-        }  
+    }
+
+    // Now verify that only expected job-dirs and attempt-dirs are present.
+    LOG.info("Verifying the list of job directories");
+    verifyContents(jobIds, observedJobDirs.toArray(new String[observedJobDirs
+        .size()]));
+    LOG.info("Verifying the list of task directories");
+    // All taskDirs should be present in the observed list. Other files like
+    // job.xml etc may be present too, we are not checking them here.
+    for (int j = 0; j < taskDirs.length; j++) {
+      assertTrue(
+          "Expected task-directory " + taskDirs[j] + " is not present!",
+          observedFilesInsideJobDir.contains(taskDirs[j]));
+    }
+  }
+
+  /**
+   * Check the list of expectedFiles against the list of observedFiles and make
+   * sure they both are the same. Duplicates can be present in either of the
+   * lists and all duplicate entries are treated as a single entity.
+   * 
+   * @param expectedFiles
+   * @param observedFiles
+   */
+  private static void verifyContents(String[] expectedFiles,
+      String[] observedFiles) {
+    boolean[] foundExpectedFiles = new boolean[expectedFiles.length];
+    boolean[] validObservedFiles = new boolean[observedFiles.length];
+    for (int j = 0; j < observedFiles.length; ++j) {
+      for (int k = 0; k < expectedFiles.length; ++k) {
+        if (expectedFiles[k].equals(observedFiles[j])) {
+          foundExpectedFiles[k] = true;
+          validObservedFiles[j] = true;
+        }
       }
     }
-    for(int i=0; i< found.length; i++) {
-      assertTrue("Directory " + taskDirs[i] + " not found", found[i]);
+    for (int j = 0; j < foundExpectedFiles.length; j++) {
+      assertTrue("Expected file " + expectedFiles[j] + " not found",
+          foundExpectedFiles[j]);
+    }
+    for (int j = 0; j < validObservedFiles.length; j++) {
+      assertTrue("Unexpected file " + observedFiles[j] + " found",
+          validObservedFiles[j]);
     }
   }
 
@@ -176,7 +233,16 @@ public class TestMiniMRWithDFS extends TestCase {
         NUM_MAPS, NUM_SAMPLES, jobconf).doubleValue();
     double error = Math.abs(Math.PI - estimate);
     assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
-    checkTaskDirectories(mr, new String[]{}, new String[]{});
+    String userName = jobconf.getUser();
+    if (userName == null) {
+      try {
+        userName = UnixUserGroupInformation.login(jobconf).getUserName();
+      } catch (LoginException le) {
+        throw new IOException("Cannot get the login username : "
+            + StringUtils.stringifyException(le));
+      }
+    }
+    checkTaskDirectories(mr, userName, new String[] {}, new String[] {});
   }
 
   public static void runWordCount(MiniMRCluster mr, JobConf jobConf) 
@@ -195,9 +261,19 @@ public class TestMiniMRWithDFS extends TestCase {
     assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
                  "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result.output);
     JobID jobid = result.job.getID();
-    TaskAttemptID taskid = new TaskAttemptID(new TaskID(jobid, true, 1),0);
-    checkTaskDirectories(mr, new String[]{jobid.toString()}, 
-                         new String[]{taskid.toString()});
+    TaskAttemptID taskid = new TaskAttemptID(
+        new TaskID(jobid, true, 1),0);
+    String userName = jobConf.getUser();
+    if (userName == null) {
+      try {
+        userName = UnixUserGroupInformation.login(jobConf).getUserName();
+      } catch (LoginException le) {
+        throw new IOException("Cannot get the login username : "
+            + StringUtils.stringifyException(le));
+      }
+    }
+    checkTaskDirectories(mr, userName, new String[] { jobid.toString() },
+        new String[] { taskid.toString() });
     // test with maps=0
     jobConf = mr.createJobConf();
     input = "owen is oom";

+ 4 - 2
src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java

@@ -128,6 +128,10 @@ public class TestMiniMRWithDFSWithDistinctUsers extends TestCase {
       Path outDir = new Path("/testing/distinct/output");
       TestMiniMRClasspath.configureWordCount(fs, jobTrackerName, job1,
                                              input, 2, 1, inDir, outDir);
+
+      job1 = createJobConf(job1, PI_UGI);
+      runJobAsUser(job1, PI_UGI);
+
       JobConf job2 = mr.createJobConf();
       Path inDir2 = new Path("/testing/distinct/input2");
       Path outDir2 = new Path("/testing/distinct/output2");
@@ -135,8 +139,6 @@ public class TestMiniMRWithDFSWithDistinctUsers extends TestCase {
                                              input, 2, 1, inDir2, outDir2);
       job2 = createJobConf(job2, WC_UGI);
       runJobAsUser(job2, WC_UGI);
-      JobConf wc = createJobConf(mr, WC_UGI);
-      TestMiniMRWithDFS.runWordCount(mr, wc);
     } finally {
       if (dfs != null) { dfs.shutdown(); }
       if (mr != null) { mr.shutdown();}

+ 6 - 4
src/test/org/apache/hadoop/mapred/TestQueueManager.java

@@ -482,8 +482,8 @@ public class TestQueueManager extends TestCase {
       try {
         conf.set("mapred.job.tracker", "localhost:"
             + miniMRCluster.getJobTrackerPort());
-        JobClient jc = new JobClient(conf);
-        jc.getJob(rjob.getJobID()).killJob();
+        JobClient client = new JobClient(miniMRCluster.createJobConf());
+        client.getJob(rjob.getID()).killJob();
         if (!shouldSucceed) {
           fail("should fail kill operation");  
         }
@@ -524,8 +524,8 @@ public class TestQueueManager extends TestCase {
       try {
         conf.set("mapred.job.tracker", "localhost:"
             + miniMRCluster.getJobTrackerPort());
-        JobClient jc = new JobClient(conf);
-        jc.getJob(rjob.getJobID()).setJobPriority("VERY_LOW");
+        JobClient client = new JobClient(miniMRCluster.createJobConf());
+        client.getJob(rjob.getID()).setJobPriority("VERY_LOW");
         if (!shouldSucceed) {
           fail("changing priority should fail.");
         }
@@ -605,6 +605,8 @@ public class TestQueueManager extends TestCase {
     if (shouldComplete) {
       rJob = JobClient.runJob(jc);  
     } else {
+      // Job should be submitted as 'userInfo'. So both the client as well as
+      // the configuration should point to the same UGI.
       rJob = new JobClient(jc).submitJob(jc);
     }
     return rJob;

+ 369 - 111
src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java

@@ -18,21 +18,27 @@
 package org.apache.hadoop.mapred;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.jar.JarOutputStream;
 import java.util.zip.ZipEntry;
 
-import javax.security.auth.login.LoginException;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
+import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 
 import junit.framework.TestCase;
@@ -53,20 +59,53 @@ public class TestTaskTrackerLocalization extends TestCase {
       LogFactory.getLog(TestTaskTrackerLocalization.class);
 
   protected TaskTracker tracker;
+  protected UserGroupInformation taskTrackerUGI;
+  protected TaskController taskController;
   protected JobConf trackerFConf;
+  private JobConf localizedJobConf;
   protected JobID jobId;
   protected TaskAttemptID taskId;
   protected Task task;
   protected String[] localDirs;
   protected static LocalDirAllocator lDirAlloc =
       new LocalDirAllocator("mapred.local.dir");
+  protected Path attemptWorkDir;
+  protected File[] attemptLogFiles;
+  protected JobConf localizedTaskConf;
+
+  class InlineCleanupQueue extends CleanupQueue {
+    List<Path> stalePaths = new ArrayList<Path>();
+
+    public InlineCleanupQueue() {
+      // do nothing
+    }
+
+    @Override
+    public void addToQueue(FileSystem fs, Path... paths) {
+      // delete in-line
+      for (Path p : paths) {
+        try {
+          LOG.info("Trying to delete the path " + p);
+          if (!fs.delete(p, true)) {
+            LOG.warn("Stale path " + p.toUri().getPath());
+            stalePaths.add(p);
+          }
+        } catch (IOException e) {
+          LOG.warn("Caught exception while deleting path "
+              + p.toUri().getPath());
+          LOG.info(StringUtils.stringifyException(e));
+          stalePaths.add(p);
+        }
+      }
+    }
+  }
 
   @Override
   protected void setUp()
       throws Exception {
     TEST_ROOT_DIR =
-        new File(System.getProperty("test.build.data", "/tmp"),
-            "testTaskTrackerLocalization");
+        new File(System.getProperty("test.build.data", "/tmp"), getClass()
+            .getSimpleName());
     if (!TEST_ROOT_DIR.exists()) {
       TEST_ROOT_DIR.mkdirs();
     }
@@ -86,30 +125,27 @@ public class TestTaskTrackerLocalization extends TestCase {
     }
     trackerFConf.setStrings("mapred.local.dir", localDirs);
 
-    // Create the job jar file
-    File jobJarFile = new File(TEST_ROOT_DIR, "jobjar-on-dfs.jar");
-    JarOutputStream jstream =
-        new JarOutputStream(new FileOutputStream(jobJarFile));
-    ZipEntry ze = new ZipEntry("lib/lib1.jar");
-    jstream.putNextEntry(ze);
-    jstream.closeEntry();
-    ze = new ZipEntry("lib/lib2.jar");
-    jstream.putNextEntry(ze);
-    jstream.closeEntry();
-    jstream.finish();
-    jstream.close();
-    trackerFConf.setJar(jobJarFile.toURI().toString());
+    // Create the job configuration file. Same as trackerConf in this test.
+    JobConf jobConf = trackerFConf;
 
-    // Create the job configuration file
-    File jobConfFile = new File(TEST_ROOT_DIR, "jobconf-on-dfs.xml");
-    FileOutputStream out = new FileOutputStream(jobConfFile);
-    trackerFConf.writeXml(out);
-    out.close();
+    // JobClient sets the job credentials.
+    new JobClient().setUGIAndUserGroupNames(jobConf);
+
+    // JobClient uploads the job jar to the file system and sets it in the
+    // jobConf.
+    uploadJobJar(jobConf);
+
+    // JobClient uploads the jobConf to the file system.
+    File jobConfFile = uploadJobConf(jobConf);
 
     // Set up the TaskTracker
     tracker = new TaskTracker();
     tracker.setConf(trackerFConf);
-    tracker.systemFS = FileSystem.getLocal(trackerFConf); // for test case
+
+    // for test case system FS is the local FS
+    tracker.localFs = tracker.systemFS = FileSystem.getLocal(trackerFConf);
+
+    taskTrackerUGI = UserGroupInformation.login(trackerFConf);
 
     // Set up the task to be localized
     String jtIdentifier = "200907202331";
@@ -118,12 +154,53 @@ public class TestTaskTrackerLocalization extends TestCase {
         new TaskAttemptID(jtIdentifier, jobId.getId(), true, 1, 0);
     task =
         new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1);
+    task.setConf(jobConf); // Set conf. Set user name in particular.
 
-    TaskController taskController = new DefaultTaskController();
+    taskController = new DefaultTaskController();
     taskController.setConf(trackerFConf);
     taskController.setup();
+    tracker.setLocalizer(new Localizer(tracker.localFs, localDirs,
+    taskController));
+  }
+
+  /**
+   * @param jobConf
+   * @throws IOException
+   * @throws FileNotFoundException
+   */
+  private void uploadJobJar(JobConf jobConf)
+      throws IOException,
+      FileNotFoundException {
+    File jobJarFile = new File(TEST_ROOT_DIR, "jobjar-on-dfs.jar");
+    JarOutputStream jstream =
+        new JarOutputStream(new FileOutputStream(jobJarFile));
+    ZipEntry ze = new ZipEntry("lib/lib1.jar");
+    jstream.putNextEntry(ze);
+    jstream.closeEntry();
+    ze = new ZipEntry("lib/lib2.jar");
+    jstream.putNextEntry(ze);
+    jstream.closeEntry();
+    jstream.finish();
+    jstream.close();
+    jobConf.setJar(jobJarFile.toURI().toString());
   }
 
+  /**
+   * @param jobConf
+   * @return
+   * @throws FileNotFoundException
+   * @throws IOException
+   */
+  protected File uploadJobConf(JobConf jobConf)
+      throws FileNotFoundException,
+      IOException {
+    File jobConfFile = new File(TEST_ROOT_DIR, "jobconf-on-dfs.xml");
+    FileOutputStream out = new FileOutputStream(jobConfFile);
+    jobConf.writeXml(out);
+    out.close();
+    return jobConfFile;
+   }
+  
   @Override
   protected void tearDown()
       throws Exception {
@@ -145,71 +222,71 @@ public class TestTaskTrackerLocalization extends TestCase {
     assertTrue("Path " + path + " has the permissions " + attrs[0]
         + " instead of the expected " + expectedPermissions, attrs[0]
         .equals(expectedPermissions));
-    assertTrue("Path " + path + " is not user owned not by "
-        + expectedOwnerUser + " but by " + attrs[1], attrs[1]
-        .equals(expectedOwnerUser));
-    assertTrue("Path " + path + " is not group owned not by "
-        + expectedOwnerGroup + " but by " + attrs[2], attrs[2]
-        .equals(expectedOwnerGroup));
+    assertTrue("Path " + path + " is user owned not by " + expectedOwnerUser
+        + " but by " + attrs[1], attrs[1].equals(expectedOwnerUser));
+    assertTrue("Path " + path + " is group owned not by " + expectedOwnerGroup
+        + " but by " + attrs[2], attrs[2].equals(expectedOwnerGroup));
   }
 
   /**
    * Verify the task-controller's setup functionality
    * 
    * @throws IOException
-   * @throws LoginException
    */
   public void testTaskControllerSetup()
-      throws IOException,
-      LoginException {
+      throws IOException {
     // Task-controller is already set up in the test's setup method. Now verify.
-    UserGroupInformation ugi = UserGroupInformation.login(new JobConf());
     for (String localDir : localDirs) {
 
       // Verify the local-dir itself.
       File lDir = new File(localDir);
       assertTrue("localDir " + lDir + " doesn't exists!", lDir.exists());
-      checkFilePermissions(lDir.getAbsolutePath(), "drwxr-xr-x", ugi
-          .getUserName(), ugi.getGroupNames()[0]);
-
-      // Verify the distributed cache dir.
-      File distributedCacheDir =
-          new File(localDir, TaskTracker.getDistributedCacheDir());
-      assertTrue("distributed cache dir " + distributedCacheDir
-          + " doesn't exists!", distributedCacheDir.exists());
-      checkFilePermissions(distributedCacheDir.getAbsolutePath(),
-          "drwxr-xr-x", ugi.getUserName(), ugi.getGroupNames()[0]);
-
-      // Verify the job cache dir.
-      File jobCacheDir = new File(localDir, TaskTracker.getJobCacheSubdir());
-      assertTrue("jobCacheDir " + jobCacheDir + " doesn't exists!",
-          jobCacheDir.exists());
-      checkFilePermissions(jobCacheDir.getAbsolutePath(), "drwxr-xr-x", ugi
-          .getUserName(), ugi.getGroupNames()[0]);
+      checkFilePermissions(lDir.getAbsolutePath(), "drwxr-xr-x", task
+          .getUser(), taskTrackerUGI.getGroupNames()[0]);
     }
 
     // Verify the pemissions on the userlogs dir
     File taskLog = TaskLog.getUserLogDir();
-    checkFilePermissions(taskLog.getAbsolutePath(), "drwxr-xr-x", ugi
-        .getUserName(), ugi.getGroupNames()[0]);
+    checkFilePermissions(taskLog.getAbsolutePath(), "drwxr-xr-x", task
+        .getUser(), taskTrackerUGI.getGroupNames()[0]);
   }
 
   /**
-   * Test job localization on a TT. Tests localization of job.xml, job.jar and
-   * corresponding setting of configuration.
+   * Test the localization of a user on the TT.
    * 
    * @throws IOException
-   * @throws LoginException
    */
-  public void testJobLocalization()
-      throws IOException,
-      LoginException {
+  public void testUserLocalization()
+      throws IOException {
 
     // /////////// The main method being tested
-    JobConf localizedJobConf = tracker.localizeJobFiles(task);
+    tracker.getLocalizer().initializeUserDirs(task.getUser());
     // ///////////
 
-    // Check the directory structure
+    // Check the directory structure and permissions
+    checkUserLocalization();
+
+    // For the sake of testing re-entrancy of initializeUserDirs(), we remove
+    // the user directories now and make sure that further calls of the method
+    // don't create directories any more.
+    for (String dir : localDirs) {
+      File userDir = new File(dir, TaskTracker.getUserDir(task.getUser()));
+      FileUtil.fullyDelete(userDir);
+    }
+
+    // Now call the method again.
+    tracker.getLocalizer().initializeUserDirs(task.getUser());
+
+    // Files should not be created now and so shouldn't be there anymore.
+    for (String dir : localDirs) {
+      File userDir = new File(dir, TaskTracker.getUserDir(task.getUser()));
+      assertFalse("Unexpectedly, user-dir " + userDir.getAbsolutePath()
+          + " exists!", userDir.exists());
+    }
+  }
+
+  protected void checkUserLocalization()
+      throws IOException {
     for (String dir : localDirs) {
 
       File localDir = new File(dir);
@@ -220,31 +297,87 @@ public class TestTaskTrackerLocalization extends TestCase {
       assertTrue("taskTracker sub-dir in the local-dir " + localDir
           + "is not created!", taskTrackerSubDir.exists());
 
-      File jobCache = new File(taskTrackerSubDir, TaskTracker.JOBCACHE);
-      assertTrue("jobcache in the taskTrackerSubdir " + taskTrackerSubDir
-          + " isn'task created!", jobCache.exists());
+      File userDir = new File(taskTrackerSubDir, task.getUser());
+      assertTrue("user-dir in taskTrackerSubdir " + taskTrackerSubDir
+          + "is not created!", userDir.exists());
+      checkFilePermissions(userDir.getAbsolutePath(), "drwx------", task
+          .getUser(), taskTrackerUGI.getGroupNames()[0]);
+
+      File jobCache = new File(userDir, TaskTracker.JOBCACHE);
+      assertTrue("jobcache in the userDir " + userDir + " isn't created!",
+          jobCache.exists());
+      checkFilePermissions(jobCache.getAbsolutePath(), "drwx------", task
+          .getUser(), taskTrackerUGI.getGroupNames()[0]);
+
+      // Verify the distributed cache dir.
+      File distributedCacheDir =
+          new File(localDir, TaskTracker
+              .getDistributedCacheDir(task.getUser()));
+      assertTrue("distributed cache dir " + distributedCacheDir
+          + " doesn't exists!", distributedCacheDir.exists());
+      checkFilePermissions(distributedCacheDir.getAbsolutePath(),
+          "drwx------", task.getUser(), taskTrackerUGI.getGroupNames()[0]);
+    }
+  }
+
+  /**
+   * Test job localization on a TT. Tests localization of job.xml, job.jar and
+   * corresponding setting of configuration. Also test
+   * {@link TaskController#initializeJob(JobInitializationContext)}
+   * 
+   * @throws IOException
+   */
+  public void testJobLocalization()
+      throws IOException {
+
+    tracker.getLocalizer().initializeUserDirs(task.getUser());
+
+    // /////////// The main method being tested
+    localizedJobConf = tracker.localizeJobFiles(task);
+    // ///////////
+
+    // Now initialize the job via task-controller so as to set
+    // ownership/permissions of jars, job-work-dir
+    JobInitializationContext context = new JobInitializationContext();
+    context.jobid = jobId;
+    context.user = task.getUser();
+    context.workDir =
+        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
+
+    // /////////// The method being tested
+    taskController.initializeJob(context);
+    // ///////////
+
+    checkJobLocalization();
+  }
+
+  protected void checkJobLocalization()
+      throws IOException {
+    // Check the directory structure
+    for (String dir : localDirs) {
+
+      File localDir = new File(dir);
+      File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR);
+      File userDir = new File(taskTrackerSubDir, task.getUser());
+      File jobCache = new File(userDir, TaskTracker.JOBCACHE);
 
       File jobDir = new File(jobCache, jobId.toString());
-      assertTrue("job-dir in " + jobCache + " isn'task created!", jobDir
-          .exists());
+      assertTrue("job-dir in " + jobCache + " isn't created!", jobDir.exists());
 
       // check the private permissions on the job directory
-      UserGroupInformation ugi = UserGroupInformation.login(localizedJobConf);
-      checkFilePermissions(jobDir.getAbsolutePath(), "drwx------", ugi
-          .getUserName(), ugi.getGroupNames()[0]);
+      checkFilePermissions(jobDir.getAbsolutePath(), "drwx------", task
+          .getUser(), taskTrackerUGI.getGroupNames()[0]);
     }
 
     // check the localization of job.xml
-    LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
-
     assertTrue("job.xml is not localized on this TaskTracker!!", lDirAlloc
-        .getLocalPathToRead(TaskTracker.getLocalJobConfFile(jobId.toString()),
-            trackerFConf) != null);
+        .getLocalPathToRead(TaskTracker.getLocalJobConfFile(task.getUser(),
+            jobId.toString()), trackerFConf) != null);
 
     // check the localization of job.jar
     Path jarFileLocalized =
-        lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(jobId
-            .toString()), trackerFConf);
+        lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(task.getUser(),
+            jobId.toString()), trackerFConf);
     assertTrue("job.jar is not localized on this TaskTracker!!",
         jarFileLocalized != null);
     assertTrue("lib/lib1.jar is not unjarred on this TaskTracker!!", new File(
@@ -256,8 +389,8 @@ public class TestTaskTrackerLocalization extends TestCase {
 
     // check the creation of job work directory
     assertTrue("job-work dir is not created on this TaskTracker!!", lDirAlloc
-        .getLocalPathToRead(TaskTracker.getJobWorkDir(jobId.toString()),
-            trackerFConf) != null);
+        .getLocalPathToRead(TaskTracker.getJobWorkDir(task.getUser(), jobId
+            .toString()), trackerFConf) != null);
 
     // Check the setting of job.local.dir and job.jar which will eventually be
     // used by the user's task
@@ -267,11 +400,11 @@ public class TestTaskTrackerLocalization extends TestCase {
     String localizedJobJar = localizedJobConf.getJar();
     for (String localDir : localizedJobConf.getStrings("mapred.local.dir")) {
       if (localizedJobLocalDir.equals(localDir + Path.SEPARATOR
-          + TaskTracker.getJobWorkDir(jobId.toString()))) {
+          + TaskTracker.getJobWorkDir(task.getUser(), jobId.toString()))) {
         jobLocalDirFlag = true;
       }
       if (localizedJobJar.equals(localDir + Path.SEPARATOR
-          + TaskTracker.getJobJarFile(jobId.toString()))) {
+          + TaskTracker.getJobJarFile(task.getUser(), jobId.toString()))) {
         mapredJarFlag = true;
       }
     }
@@ -287,13 +420,21 @@ public class TestTaskTrackerLocalization extends TestCase {
    * Test task localization on a TT.
    * 
    * @throws IOException
-   * @throws LoginException
    */
   public void testTaskLocalization()
-      throws IOException,
-      LoginException {
+      throws IOException {
 
-    JobConf localizedJobConf = tracker.localizeJobFiles(task);
+    tracker.getLocalizer().initializeUserDirs(task.getUser());
+    localizedJobConf = tracker.localizeJobFiles(task);
+
+    // Now initialize the job via task-controller so as to set
+    // ownership/permissions of jars, job-work-dir
+    JobInitializationContext jobContext = new JobInitializationContext();
+    jobContext.jobid = jobId;
+    jobContext.user = task.getUser();
+    jobContext.workDir =
+        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
+    taskController.initializeJob(jobContext);
 
     TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
     tip.setJobConf(localizedJobConf);
@@ -304,77 +445,194 @@ public class TestTaskTrackerLocalization extends TestCase {
 
     // check the functionality of localizeTask
     for (String dir : trackerFConf.getStrings("mapred.local.dir")) {
-      assertTrue("attempt-dir in localDir " + dir + " is not created!!",
-          new File(dir, TaskTracker.getLocalTaskDir(jobId.toString(), taskId
-              .toString())).exists());
+      File attemptDir =
+          new File(dir, TaskTracker.getLocalTaskDir(task.getUser(), jobId
+              .toString(), taskId.toString()));
+      assertTrue("attempt-dir " + attemptDir + " in localDir " + dir
+          + " is not created!!", attemptDir.exists());
     }
 
-    Path workDir =
-        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task
-            .getJobID().toString(), task.getTaskID().toString(), task
-            .isTaskCleanupTask()), trackerFConf);
+    attemptWorkDir =
+        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
+            task.getUser(), task.getJobID().toString(), task.getTaskID()
+                .toString(), task.isTaskCleanupTask()), trackerFConf);
     assertTrue("atttempt work dir for " + taskId.toString()
-        + " is not created in any of the configured dirs!!", workDir != null);
+        + " is not created in any of the configured dirs!!",
+        attemptWorkDir != null);
 
     TaskRunner runner = task.createRunner(tracker, tip);
 
     // /////// Few more methods being tested
     runner.setupChildTaskConfiguration(lDirAlloc);
-    TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
+    TaskRunner.createChildTmpDir(new File(attemptWorkDir.toUri().getPath()),
         localizedJobConf);
-    File[] logFiles = TaskRunner.prepareLogFiles(task.getTaskID());
-    // ///////
+    attemptLogFiles = TaskRunner.prepareLogFiles(task.getTaskID());
 
     // Make sure the task-conf file is created
     Path localTaskFile =
         lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
-            .getJobID().toString(), task.getTaskID().toString(), task
-            .isTaskCleanupTask()), trackerFConf);
+            .getUser(), task.getJobID().toString(), task.getTaskID()
+            .toString(), task.isTaskCleanupTask()), trackerFConf);
     assertTrue("Task conf file " + localTaskFile.toString()
         + " is not created!!", new File(localTaskFile.toUri().getPath())
         .exists());
 
     // /////// One more method being tested. This happens in child space.
-    JobConf localizedTaskConf = new JobConf(localTaskFile);
+    localizedTaskConf = new JobConf(localTaskFile);
     TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
     // ///////
 
+    // Initialize task via TaskController
+    TaskControllerContext taskContext =
+        new TaskController.TaskControllerContext();
+    taskContext.env =
+        new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
+            .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
+    taskContext.task = task;
+    // /////////// The method being tested
+    taskController.initializeTask(taskContext);
+    // ///////////
+
+    checkTaskLocalization();
+  }
+
+  protected void checkTaskLocalization()
+      throws IOException {
     // Make sure that the mapred.local.dir is sandboxed
     for (String childMapredLocalDir : localizedTaskConf
         .getStrings("mapred.local.dir")) {
       assertTrue("Local dir " + childMapredLocalDir + " is not sandboxed !!",
-          childMapredLocalDir.endsWith(TaskTracker.getLocalTaskDir(jobId
-              .toString(), taskId.toString(), false)));
+          childMapredLocalDir.endsWith(TaskTracker.getLocalTaskDir(task
+              .getUser(), jobId.toString(), taskId.toString(), false)));
     }
 
     // Make sure task task.getJobFile is changed and pointed correctly.
     assertTrue(task.getJobFile().endsWith(
-        TaskTracker
-            .getTaskConfFile(jobId.toString(), taskId.toString(), false)));
+        TaskTracker.getTaskConfFile(task.getUser(), jobId.toString(), taskId
+            .toString(), false)));
 
     // Make sure that the tmp directories are created
     assertTrue("tmp dir is not created in workDir "
-        + workDir.toUri().getPath(),
-        new File(workDir.toUri().getPath(), "tmp").exists());
+        + attemptWorkDir.toUri().getPath(), new File(attemptWorkDir.toUri()
+        .getPath(), "tmp").exists());
 
-    // Make sure that the log are setup properly
+    // Make sure that the logs are setup properly
     File logDir =
         new File(HADOOP_LOG_DIR, TaskLog.USERLOGS_DIR_NAME + Path.SEPARATOR
             + task.getTaskID().toString());
     assertTrue("task's log dir " + logDir.toString() + " doesn't exist!",
         logDir.exists());
-    UserGroupInformation ugi = UserGroupInformation.login(localizedJobConf);
-    checkFilePermissions(logDir.getAbsolutePath(), "drwx------", ugi
-        .getUserName(), ugi.getGroupNames()[0]);
+    checkFilePermissions(logDir.getAbsolutePath(), "drwx------", task
+        .getUser(), taskTrackerUGI.getGroupNames()[0]);
 
     File expectedStdout = new File(logDir, TaskLog.LogName.STDOUT.toString());
     assertTrue("stdout log file is improper. Expected : "
-        + expectedStdout.toString() + " Observed : " + logFiles[0].toString(),
-        expectedStdout.toString().equals(logFiles[0].toString()));
+        + expectedStdout.toString() + " Observed : "
+        + attemptLogFiles[0].toString(), expectedStdout.toString().equals(
+        attemptLogFiles[0].toString()));
     File expectedStderr =
         new File(logDir, Path.SEPARATOR + TaskLog.LogName.STDERR.toString());
     assertTrue("stderr log file is improper. Expected : "
-        + expectedStderr.toString() + " Observed : " + logFiles[1].toString(),
-        expectedStderr.toString().equals(logFiles[1].toString()));
+        + expectedStderr.toString() + " Observed : "
+        + attemptLogFiles[1].toString(), expectedStderr.toString().equals(
+        attemptLogFiles[1].toString()));
+  }
+
+  /**
+   * @throws IOException
+   */
+  public void testTaskCleanup()
+      throws IOException {
+
+    // Localize job and localize task.
+    tracker.getLocalizer().initializeUserDirs(task.getUser());
+    localizedJobConf = tracker.localizeJobFiles(task);
+    // Now initialize the job via task-controller so as to set
+    // ownership/permissions of jars, job-work-dir
+    JobInitializationContext jobContext = new JobInitializationContext();
+    jobContext.jobid = jobId;
+    jobContext.user = localizedJobConf.getUser();
+    jobContext.workDir =
+        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
+    taskController.initializeJob(jobContext);
+    TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
+    tip.setJobConf(localizedJobConf);
+    tip.localizeTask(task);
+    Path workDir =
+        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
+            task.getUser(), task.getJobID().toString(), task.getTaskID()
+                .toString(), task.isTaskCleanupTask()), trackerFConf);
+    TaskRunner runner = task.createRunner(tracker, tip);
+    tip.setTaskRunner(runner);
+    runner.setupChildTaskConfiguration(lDirAlloc);
+    TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
+        localizedJobConf);
+    TaskRunner.prepareLogFiles(task.getTaskID());
+    Path localTaskFile =
+        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
+            .getUser(), task.getJobID().toString(), task.getTaskID()
+            .toString(), task.isTaskCleanupTask()), trackerFConf);
+    JobConf localizedTaskConf = new JobConf(localTaskFile);
+    TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
+    TaskControllerContext taskContext =
+        new TaskController.TaskControllerContext();
+    taskContext.env =
+        new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
+            .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
+    taskContext.task = task;
+    // /////////// The method being tested
+    taskController.initializeTask(taskContext);
+
+    // TODO: Let the task run and create files.
+
+    InlineCleanupQueue cleanupQueue = new InlineCleanupQueue();
+    tracker.directoryCleanupThread = cleanupQueue;
+
+    // ////////// The central methods being tested
+    tip.removeTaskFiles(true, taskId);
+    tracker.removeJobFiles(task.getUser(), jobId.toString());
+    // //////////
+
+    // TODO: make sure that all files intended to be deleted are deleted.
+
+    assertTrue("Some task files are not deleted!! Number of stale paths is "
+        + cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0);
+
+    // Check that the empty $mapred.local.dir/taskTracker/$user dirs are still
+    // there.
+    for (String localDir : localDirs) {
+      Path userDir =
+          new Path(localDir, TaskTracker.getUserDir(task.getUser()));
+      assertTrue("User directory " + userDir + " is not present!!",
+          tracker.localFs.exists(userDir));
+    }
+
+    // Test userlogs cleanup.
+    verifyUserLogsCleanup();
+  }
+
+  /**
+   * Test userlogs cleanup.
+   * 
+   * @throws IOException
+   */
+  private void verifyUserLogsCleanup()
+      throws IOException {
+    Path logDir =
+        new Path(HADOOP_LOG_DIR.toURI().getPath(), TaskLog.USERLOGS_DIR_NAME
+            + Path.SEPARATOR + task.getTaskID().toString());
+
+    // Logs should be there before cleanup.
+    assertTrue("Userlogs dir " + logDir + " is not presen as expected!!",
+        tracker.localFs.exists(logDir));
+
+    // ////////// Another being tested
+    TaskLog.cleanup(-1); // -1 so as to move purgeTimeStamp to future and file
+    // modification time behind retainTimeStatmp
+    // //////////
+
+    // Logs should be gone after cleanup.
+    assertFalse("Userlogs dir " + logDir + " is not deleted as expected!!",
+        tracker.localFs.exists(logDir));
   }
 }

+ 186 - 0
src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java

@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
+import org.apache.hadoop.filecache.TestTrackerDistributedCacheManager;
+
+/**
+ * Test the DistributedCacheManager when LinuxTaskController is used.
+ * 
+ */
+public class TestTrackerDistributedCacheManagerWithLinuxTaskController extends
+    TestTrackerDistributedCacheManager {
+
+  private File configFile;
+  private MyLinuxTaskController taskController;
+  private String taskTrackerSpecialGroup;
+
+  private static final Log LOG =
+      LogFactory
+          .getLog(TestTrackerDistributedCacheManagerWithLinuxTaskController.class);
+
+  @Override
+  protected void setUp()
+      throws IOException {
+
+    if (!ClusterWithLinuxTaskController.shouldRun()) {
+      return;
+    }
+
+    TEST_ROOT_DIR =
+        new File(System.getProperty("test.build.data", "/tmp"),
+            TestTrackerDistributedCacheManagerWithLinuxTaskController.class
+                .getSimpleName()).getAbsolutePath();
+
+    super.setUp();
+
+    taskController = new MyLinuxTaskController();
+    String path =
+        System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
+    configFile =
+        ClusterWithLinuxTaskController.createTaskControllerConf(path, conf
+            .getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
+    String execPath = path + "/task-controller";
+    taskController.setTaskControllerExe(execPath);
+    taskController.setConf(conf);
+    taskController.setup();
+
+    taskTrackerSpecialGroup =
+        TestTaskTrackerLocalization.getFilePermissionAttrs(execPath)[2];
+  }
+
+  @Override
+  protected void tearDown()
+      throws IOException {
+    if (!ClusterWithLinuxTaskController.shouldRun()) {
+      return;
+    }
+    if (configFile != null) {
+      configFile.delete();
+    }
+    super.tearDown();
+  }
+
+  /**
+   * Test the control flow of distributed cache manager when LinuxTaskController
+   * is used.
+   */
+  @Override
+  public void testManagerFlow()
+      throws IOException,
+      LoginException {
+    if (!ClusterWithLinuxTaskController.shouldRun()) {
+      return;
+    }
+
+    super.testManagerFlow();
+  }
+
+  @Override
+  protected String getJobOwnerName() {
+    String ugi =
+        System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
+    String userName = ugi.split(",")[0];
+    return userName;
+  }
+
+  @Override
+  protected TaskController getTaskController() {
+    return taskController;
+  }
+
+  @Override
+  protected void checkFilePermissions(Path[] localCacheFiles)
+      throws IOException {
+    String cachedFirstFile = localCacheFiles[0].toUri().getPath();
+    String cachedSecondFile = localCacheFiles[1].toUri().getPath();
+    String userName = getJobOwnerName();
+
+    // First make sure that the cache files have proper permissions.
+    TestTaskTrackerLocalization.checkFilePermissions(cachedFirstFile,
+        "-r-xrwx---", userName, taskTrackerSpecialGroup);
+    TestTaskTrackerLocalization.checkFilePermissions(cachedSecondFile,
+        "-r-xrwx---", userName, taskTrackerSpecialGroup);
+
+    // Now. make sure that all the path components also have proper
+    // permissions.
+    checkPermissionOnPathComponents(cachedFirstFile, userName);
+    checkPermissionOnPathComponents(cachedSecondFile, userName);
+  }
+
+  /**
+   * @param cachedFilePath
+   * @param userName
+   * @throws IOException
+   */
+  private void checkPermissionOnPathComponents(String cachedFilePath,
+      String userName)
+      throws IOException {
+    // The trailing distcache/file/... string
+    String trailingStringForFirstFile =
+        cachedFilePath.replaceFirst(ROOT_MAPRED_LOCAL_DIR.getAbsolutePath()
+            + Path.SEPARATOR + "0_[0-" + (numLocalDirs - 1) + "]"
+            + Path.SEPARATOR + TaskTracker.getDistributedCacheDir(userName),
+            "");
+    LOG.info("Leading path for cacheFirstFile is : "
+        + trailingStringForFirstFile);
+    // The leading mapred.local.dir/0_[0-n]/taskTracker/$user string.
+    String leadingStringForFirstFile =
+        cachedFilePath.substring(0, cachedFilePath
+            .lastIndexOf(trailingStringForFirstFile));
+    LOG.info("Leading path for cacheFirstFile is : "
+        + leadingStringForFirstFile);
+
+    // Now check path permissions, starting with cache file's parent dir.
+    File path = new File(cachedFilePath).getParentFile();
+    while (!path.getAbsolutePath().equals(leadingStringForFirstFile)) {
+      TestTaskTrackerLocalization.checkFilePermissions(path.getAbsolutePath(),
+          "dr-xrws---", userName, taskTrackerSpecialGroup);
+      path = path.getParentFile();
+    }
+  }
+
+  @Override
+  public void testDeleteCache()
+      throws Exception {
+    if (!ClusterWithLinuxTaskController.shouldRun()) {
+      return;
+    }
+    super.testDeleteCache();
+  }
+
+  @Override
+  public void testFileSystemOtherThanDefault()
+      throws Exception {
+    if (!ClusterWithLinuxTaskController.shouldRun()) {
+      return;
+    }
+    super.testFileSystemOtherThanDefault();
+  }
+}