Forráskód Böngészése

commit 3d86e34d8f754bbda13cbaa6d18fe223d708a20a
Author: Vinod Kumar <vinodkv@yahoo-inc.com>
Date: Wed Mar 17 11:37:43 2010 +0530

MAPREDUCE-927 from https://issues.apache.org/jira/secure/attachment/12439009/patch-927-5-dist.txt


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077322 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 éve
szülő
commit
a6b1decef7
25 módosított fájl, 1354 hozzáadás és 386 törlés
  1. 101 15
      src/c++/task-controller/task-controller.c
  2. 5 2
      src/c++/task-controller/task-controller.h
  3. 18 2
      src/c++/task-controller/tests/test-task-controller.c
  4. 1 1
      src/mapred/mapred-default.xml
  5. 0 1
      src/mapred/org/apache/hadoop/mapred/Child.java
  6. 8 7
      src/mapred/org/apache/hadoop/mapred/JvmManager.java
  7. 22 44
      src/mapred/org/apache/hadoop/mapred/TaskLog.java
  8. 1 1
      src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java
  9. 48 114
      src/mapred/org/apache/hadoop/mapred/TaskLogsTruncater.java
  10. 41 44
      src/mapred/org/apache/hadoop/mapred/TaskTracker.java
  11. 214 0
      src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java
  12. 2 0
      src/mapred/org/apache/hadoop/mapreduce/JobContext.java
  13. 41 0
      src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/JVMInfo.java
  14. 20 1
      src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
  15. 47 0
      src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/DeleteJobEvent.java
  16. 74 0
      src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JobCompletedEvent.java
  17. 47 0
      src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JobStartedEvent.java
  18. 47 0
      src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JvmFinishedEvent.java
  19. 49 0
      src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogEvent.java
  20. 145 0
      src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogManager.java
  21. 5 3
      src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
  22. 50 58
      src/test/org/apache/hadoop/mapred/TestTaskLogsTruncater.java
  23. 48 93
      src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
  24. 296 0
      src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java
  25. 24 0
      src/test/org/apache/hadoop/mapred/UtilsForTests.java

+ 101 - 15
src/c++/task-controller/task-controller.c

@@ -212,9 +212,17 @@ char *get_task_dir_path(const char *tt_root, const char *user,
 /**
  * Get the log directory for the given attempt.
  */
-char *get_task_log_dir(const char *log_dir, const char *attempt_id) {
-  return concatenate(ATTEMPT_LOG_DIR_PATTERN, "task_log_dir", 2, log_dir,
-      attempt_id);
+char *get_task_log_dir(const char *log_dir, const char *job_id, 
+    const char *attempt_id) {
+  return concatenate(ATTEMPT_LOG_DIR_PATTERN, "task_log_dir", 3, log_dir,
+      job_id, attempt_id);
+}
+
+/**
+ * Get the log directory for the given job.
+ */
+char *get_job_log_dir(const char *log_dir, const char *job_id) {
+  return concatenate(JOB_LOG_DIR_PATTERN, "job_log_dir", 2, log_dir, job_id);
 }
 
 /**
@@ -379,7 +387,8 @@ static int secure_path(const char *path, uid_t uid, gid_t gid,
       continue;
     }
 
-    if (should_check_ownership && (check_ownership(entry->fts_path) != 0)) {
+    if (should_check_ownership && 
+        (check_ownership(entry->fts_path, uid, gid) != 0)) {
       fprintf(LOGFILE,
           "Invalid file path. %s not user/group owned by the tasktracker.\n",
           entry->fts_path);
@@ -497,16 +506,62 @@ int prepare_attempt_directories(const char *job_id, const char *attempt_id,
   return 0;
 }
 
+/**
+ * Function to prepare the job log dir for the child. It gives the user
+ * ownership of the job's log-dir to the user and group ownership to the
+ * user running tasktracker.
+ *     *  sudo chown user:mapred log-dir/userlogs/$jobid
+ *     *  sudo chmod -R 2770 log-dir/userlogs/$jobid // user is same as tt_user
+ *     *  sudo chmod -R 2570 log-dir/userlogs/$jobid // user is not tt_user
+ */
+int prepare_job_logs(const char *log_dir, const char *job_id,
+    mode_t permissions) {
+
+  char *job_log_dir = get_job_log_dir(log_dir, job_id);
+  if (job_log_dir == NULL) {
+    fprintf(LOGFILE, "Couldn't get job log directory %s.\n", job_log_dir);
+    return -1;
+  }
+
+  struct stat filestat;
+  if (stat(job_log_dir, &filestat) != 0) {
+    if (errno == ENOENT) {
+#ifdef DEBUG
+      fprintf(LOGFILE, "job_log_dir %s doesn't exist. Not doing anything.\n",
+          job_log_dir);
+#endif
+      free(job_log_dir);
+      return 0;
+    } else {
+      // stat failed because of something else!
+      fprintf(LOGFILE, "Failed to stat the job log dir %s\n", job_log_dir);
+      free(job_log_dir);
+      return -1;
+    }
+  }
+
+  gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
+  if (secure_path(job_log_dir, user_detail->pw_uid, tasktracker_gid,
+      permissions, S_ISGID | permissions, 1) != 0) {
+    fprintf(LOGFILE, "Failed to secure the log_dir %s\n", job_log_dir);
+    free(job_log_dir);
+    return -1;
+  }
+  free(job_log_dir);
+  return 0;
+}
+
 /**
  * Function to prepare the task logs for the child. It gives the user
  * ownership of the attempt's log-dir to the user and group ownership to the
  * user running tasktracker.
- *     *  sudo chown user:mapred log-dir/userlogs/$attemptid
- *     *  sudo chmod -R 2770 log-dir/userlogs/$attemptid
+ *     *  sudo chown user:mapred log-dir/userlogs/$jobid/$attemptid
+ *     *  sudo chmod -R 2770 log-dir/userlogs/$jobid/$attemptid
  */
-int prepare_task_logs(const char *log_dir, const char *task_id) {
+int prepare_task_logs(const char *log_dir, const char *job_id, 
+    const char *task_id) {
 
-  char *task_log_dir = get_task_log_dir(log_dir, task_id);
+  char *task_log_dir = get_task_log_dir(log_dir, job_id, task_id);
   if (task_log_dir == NULL) {
     fprintf(LOGFILE, "Couldn't get task_log directory %s.\n", task_log_dir);
     return -1;
@@ -524,10 +579,12 @@ int prepare_task_logs(const char *log_dir, const char *task_id) {
       fprintf(LOGFILE, "task_log_dir %s doesn't exist. Not doing anything.\n",
           task_log_dir);
 #endif
+      free(task_log_dir);
       return 0;
     } else {
       // stat failed because of something else!
       fprintf(LOGFILE, "Failed to stat the task_log_dir %s\n", task_log_dir);
+      free(task_log_dir);
       return -1;
     }
   }
@@ -537,8 +594,10 @@ int prepare_task_logs(const char *log_dir, const char *task_id) {
       S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG, 1) != 0) {
     // setgid on dirs but not files, 770. As of now, there are no files though
     fprintf(LOGFILE, "Failed to secure the log_dir %s\n", task_log_dir);
+    free(task_log_dir);
     return -1;
   }
+  free(task_log_dir);
   return 0;
 }
 
@@ -557,8 +616,9 @@ int get_user_details(const char *user) {
 
 /*
  * Function to check if the TaskTracker actually owns the file.
+ * Or it has right ownership already. 
   */
-int check_ownership(char *path) {
+int check_ownership(char *path, uid_t uid, gid_t gid) {
   struct stat filestat;
   if (stat(path, &filestat) != 0) {
     return UNABLE_TO_STAT_FILE;
@@ -566,8 +626,10 @@ int check_ownership(char *path) {
   // check user/group. User should be TaskTracker user, group can either be
   // TaskTracker's primary group or the special group to which binary's
   // permissions are set.
-  if (getuid() != filestat.st_uid || (getgid() != filestat.st_gid && getegid()
-      != filestat.st_gid)) {
+  // Or it can be the user/group owned by uid and gid passed. 
+  if ((getuid() != filestat.st_uid || (getgid() != filestat.st_gid && getegid()
+      != filestat.st_gid)) &&
+      ((uid != filestat.st_uid) || (gid != filestat.st_gid))) {
     return FILE_NOT_OWNED_BY_TASKTRACKER;
   }
   return 0;
@@ -668,10 +730,13 @@ int initialize_user(const char *user) {
  * Function to prepare the job directories for the task JVM.
  * We do the following:
  *     *  sudo chown user:mapred -R taskTracker/$user/jobcache/$jobid
+ *     *  sudo chown user:mapred -R logs/userlogs/$jobid 
  *     *  if user is not $tt_user,
  *     *    sudo chmod 2570 -R taskTracker/$user/jobcache/$jobid
+ *     *    sudo chmod 2570 -R logs/userlogs/$jobid
  *     *  else // user is tt_user
  *     *    sudo chmod 2770 -R taskTracker/$user/jobcache/$jobid
+ *     *    sudo chmod 2770 -R logs/userlogs/$jobid
  *     *
  *     *  For any user, sudo chmod 2770 taskTracker/$user/jobcache/$jobid/work
  */
@@ -783,11 +848,32 @@ int initialize_job(const char *jobid, const char *user) {
   }
   free(local_dir);
   free(full_local_dir_str);
-  cleanup();
+  int exit_code = 0;
   if (failed) {
-    return INITIALIZE_JOB_FAILED;
+    exit_code = INITIALIZE_JOB_FAILED;
+    goto cleanup;
   }
-  return 0;
+
+  char *log_dir = (char *) get_value(TT_LOG_DIR_KEY);
+  if (log_dir == NULL) {
+    fprintf(LOGFILE, "Log directory is not configured.\n");
+    exit_code = INVALID_TT_LOG_DIR;
+    goto cleanup;
+  }
+
+  if (prepare_job_logs(log_dir, jobid, permissions) != 0) {
+    fprintf(LOGFILE, "Couldn't prepare job logs directory %s for %s.\n",
+        log_dir, jobid);
+    exit_code = PREPARE_JOB_LOGS_FAILED;
+  }
+
+  cleanup:
+  // free configurations
+  cleanup();
+  if (log_dir != NULL) {
+    free(log_dir);
+  }
+  return exit_code;
 }
 
 /**
@@ -891,7 +977,7 @@ int initialize_task(const char *jobid, const char *taskid, const char *user) {
     goto cleanup;
   }
 
-  if (prepare_task_logs(log_dir, taskid) != 0) {
+  if (prepare_task_logs(log_dir, jobid, taskid) != 0) {
     fprintf(LOGFILE, "Couldn't prepare task logs directory %s for %s.\n",
         log_dir, taskid);
     exit_code = PREPARE_TASK_LOGS_FAILED;

+ 5 - 2
src/c++/task-controller/task-controller.h

@@ -70,7 +70,8 @@ enum errorcodes {
   INITIALIZE_DISTCACHEFILE_FAILED, //19
   INITIALIZE_USER_FAILED, //20
   UNABLE_TO_BUILD_PATH, //21
-  INVALID_TASKCONTROLLER_PERMISSIONS //22
+  INVALID_TASKCONTROLLER_PERMISSIONS, //22
+  PREPARE_JOB_LOGS_FAILED, //23
 };
 
 #define USER_DIR_PATTERN "%s/taskTracker/%s"
@@ -83,7 +84,9 @@ enum errorcodes {
 
 #define JOB_DIR_TO_ATTEMPT_DIR_PATTERN "%s/%s"
 
-#define ATTEMPT_LOG_DIR_PATTERN "%s/userlogs/%s"
+#define JOB_LOG_DIR_PATTERN "%s/userlogs/%s"
+
+#define ATTEMPT_LOG_DIR_PATTERN JOB_LOG_DIR_PATTERN"/%s"
 
 #define TASK_SCRIPT_PATTERN "%s/%s/taskjvm.sh"
 

+ 18 - 2
src/c++/task-controller/tests/test-task-controller.c

@@ -171,13 +171,26 @@ void test_get_task_launcher_file() {
   assert(ret == 0);
 }
 
+void test_get_job_log_dir() {
+  char *logdir = (char *) get_job_log_dir("/tmp/testing",
+    "job_200906101234_0001");
+  printf("logdir obtained is %s\n", logdir);
+  int ret = 0;
+  if (strcmp(logdir, "/tmp/testing/userlogs/job_200906101234_0001") != 0) {
+    ret = -1;
+  }
+  free(logdir);
+  assert(ret == 0);
+}
+
 void test_get_task_log_dir() {
   char *logdir = (char *) get_task_log_dir("/tmp/testing",
-      "attempt_200906112028_0001_m_000000_0");
+    "job_200906101234_0001", "attempt_200906112028_0001_m_000000_0");
   printf("logdir obtained is %s\n", logdir);
   int ret = 0;
   if (strcmp(logdir,
-      "/tmp/testing/userlogs/attempt_200906112028_0001_m_000000_0") != 0) {
+      "/tmp/testing/userlogs/job_200906101234_0001/attempt_200906112028_0001_m_000000_0")
+      != 0) {
     ret = -1;
   }
   free(logdir);
@@ -203,6 +216,9 @@ int main(int argc, char **argv) {
   printf("\nTesting get_task_launcher_file()\n");
   test_get_task_launcher_file();
 
+  printf("\nTesting get_job_log_dir()\n");
+  test_get_job_log_dir();
+
   printf("\nTesting get_task_log_dir()\n");
   test_get_task_log_dir();
 

+ 1 - 1
src/mapred/mapred-default.xml

@@ -631,7 +631,7 @@
   <name>mapred.userlog.retain.hours</name>
   <value>24</value>
   <description>The maximum time, in hours, for which the user-logs are to be 
-          retained.
+               retained after the job completion.
   </description>
 </property>
 

+ 0 - 1
src/mapred/org/apache/hadoop/mapred/Child.java

@@ -193,7 +193,6 @@ class Child {
 
         numTasksToExecute = job.getNumTasksToExecutePerJvm();
         assert(numTasksToExecute != 0);
-        TaskLog.cleanup(job.getInt("mapred.userlog.retain.hours", 24));
 
         task.setConf(job);
 

+ 8 - 7
src/mapred/org/apache/hadoop/mapred/JvmManager.java

@@ -32,6 +32,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.mapreduce.server.tasktracker.JVMInfo;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.JvmFinishedEvent;
 import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
@@ -419,9 +421,7 @@ class JvmManager {
       }
       public void run() {
         runChild(env);
-
-        // Post-JVM-exit logs processing. Truncate the logs.
-        truncateJVMLogs();
+        jvmFinished();
       }
 
       public void runChild(JvmEnv env) {
@@ -481,11 +481,12 @@ class JvmManager {
         }
       }
 
-      // Post-JVM-exit logs processing. Truncate the logs.
-      private void truncateJVMLogs() {
+      // Post-JVM-exit logs processing. inform user log manager
+      private void jvmFinished() {
         Task firstTask = initalContext.task;
-        tracker.getTaskLogsMonitor().addProcessForLogTruncation(
-            firstTask.getTaskID(), tasksGiven);
+        JvmFinishedEvent jfe = new JvmFinishedEvent(new JVMInfo(firstTask
+            .getTaskID(), tasksGiven));
+        tracker.getUserLogManager().addLogEvent(jfe);
       }
 
       public void taskRan() {

+ 22 - 44
src/mapred/org/apache/hadoop/mapred/TaskLog.java

@@ -22,7 +22,6 @@ import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
 import java.io.DataOutputStream;
 import java.io.File;
-import java.io.FileFilter;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -42,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
 import org.apache.log4j.Appender;
@@ -71,7 +71,7 @@ public class TaskLog {
   }
 
   public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
-    return new File(getBaseDir(taskid.toString()), filter.toString());
+    return new File(getAttemptDir(taskid.toString()), filter.toString());
   }
 
   /**
@@ -94,7 +94,7 @@ public class TaskLog {
           + ie);
       return null;
     }
-    return new File(getBaseDir(l.location), filter.toString());
+    return new File(getAttemptDir(l.location), filter.toString());
   }
 
   /**
@@ -108,7 +108,7 @@ public class TaskLog {
    */
   static String getRealTaskLogFilePath(String location, LogName filter)
       throws IOException {
-    return FileUtil.makeShellPath(new File(getBaseDir(location),
+    return FileUtil.makeShellPath(new File(getAttemptDir(location),
         filter.toString()));
   }
 
@@ -144,7 +144,7 @@ public class TaskLog {
     for (LogName filter : new LogName[] { LogName.DEBUGOUT, LogName.PROFILE }) {
       LogFileDetail l = new LogFileDetail();
       l.location = loc;
-      l.length = new File(getBaseDir(l.location), filter.toString()).length();
+      l.length = new File(getAttemptDir(l.location), filter.toString()).length();
       l.start = 0;
       allLogsFileDetails.put(filter, l);
     }
@@ -166,7 +166,7 @@ public class TaskLog {
   }
   
   private static File getTmpIndexFile(String taskid) {
-    return new File(getBaseDir(taskid), "log.tmp");
+    return new File(getAttemptDir(taskid), "log.tmp");
   }
   public static File getIndexFile(String taskid) {
     return getIndexFile(taskid, false);
@@ -174,9 +174,9 @@ public class TaskLog {
   
   public static File getIndexFile(String taskid, boolean isCleanup) {
     if (isCleanup) {
-      return new File(getBaseDir(taskid), "log.index.cleanup");
+      return new File(getAttemptDir(taskid), "log.index.cleanup");
     } else {
-      return new File(getBaseDir(taskid), "log.index");
+      return new File(getAttemptDir(taskid), "log.index");
     }
   }
 
@@ -184,8 +184,9 @@ public class TaskLog {
     return System.getProperty("hadoop.log.dir");
   }
 
-  static File getBaseDir(String taskid) {
-    return new File(LOG_DIR, taskid);
+  static File getAttemptDir(String taskid) {
+    return new File(getJobDir(TaskAttemptID.forName(taskid).getJobID()),
+        taskid);
   }
 
   static final List<LogName> LOGS_TRACKED_BY_INDEX_FILES =
@@ -317,39 +318,6 @@ public class TaskLog {
     }
   }
 
-  private static class TaskLogsPurgeFilter implements FileFilter {
-    long purgeTimeStamp;
-  
-    TaskLogsPurgeFilter(long purgeTimeStamp) {
-      this.purgeTimeStamp = purgeTimeStamp;
-    }
-
-    public boolean accept(File file) {
-      LOG.debug("PurgeFilter - file: " + file + ", mtime: " + file.lastModified() + ", purge: " + purgeTimeStamp);
-      return file.lastModified() < purgeTimeStamp;
-    }
-  }
-
-  /**
-   * Purge old user logs.
-   * 
-   * @throws IOException
-   */
-  public static synchronized void cleanup(int logsRetainHours
-                                          ) throws IOException {
-    // Purge logs of tasks on this tasktracker if their  
-    // mtime has exceeded "mapred.task.log.retain" hours
-    long purgeTimeStamp = System.currentTimeMillis() - 
-                            (logsRetainHours*60L*60*1000);
-    File[] oldTaskLogs = LOG_DIR.listFiles
-                           (new TaskLogsPurgeFilter(purgeTimeStamp));
-    if (oldTaskLogs != null) {
-      for (int i=0; i < oldTaskLogs.length; ++i) {
-        FileUtil.fullyDelete(oldTaskLogs[i]);
-      }
-    }
-  }
-
   static class Reader extends InputStream {
     private long bytesRemaining;
     private FileInputStream file;
@@ -390,7 +358,7 @@ public class TaskLog {
       start += fileDetail.start;
       end += fileDetail.start;
       bytesRemaining = end - start;
-      file = new FileInputStream(new File(getBaseDir(fileDetail.location), 
+      file = new FileInputStream(new File(getAttemptDir(fileDetail.location), 
           kind.toString()));
       // skip upto start
       long pos = 0;
@@ -696,4 +664,14 @@ public class TaskLog {
     return LOG_DIR;
   }
   
+  /**
+   * Get the user log directory for the job jobid.
+   * 
+   * @param jobid
+   * @return user log directory for the job
+   */
+  public static File getJobDir(JobID jobid) {
+    return new File(getUserLogDir(), jobid.toString());
+  }
+
 } // TaskLog

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java

@@ -141,7 +141,7 @@ public class TaskLogServlet extends HttpServlet {
    */
   static Configuration getConfFromJobACLsFile(String attemptIdStr) {
     Configuration conf = new Configuration(false);
-    conf.addResource(new Path(TaskLog.getBaseDir(attemptIdStr).toString(),
+    conf.addResource(new Path(TaskLog.getAttemptDir(attemptIdStr).toString(),
         TaskRunner.jobACLsFile));
     return conf;
   }

+ 48 - 114
src/mapred/org/apache/hadoop/mapred/TaskLogsMonitor.java → src/mapred/org/apache/hadoop/mapred/TaskLogsTruncater.java

@@ -31,74 +31,54 @@ import java.util.Map.Entry;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapred.TaskLog.LogName;
 import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
-import org.apache.hadoop.util.StringUtils;
-
-class TaskLogsMonitor extends Thread {
-  static final Log LOG = LogFactory.getLog(TaskLogsMonitor.class);
+import org.apache.hadoop.mapreduce.server.tasktracker.JVMInfo;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.UserLogManager;
 
+/**
+ * The class for truncating the user logs. 
+ * Should be used only by {@link UserLogManager}. 
+ *
+ */
+public class TaskLogsTruncater {
+  static final Log LOG = LogFactory.getLog(TaskLogsTruncater.class);
+
+  static final String MAP_USERLOG_RETAIN_SIZE =
+    "mapreduce.cluster.map.userlog.retain-size";
+  static final String REDUCE_USERLOG_RETAIN_SIZE =
+    "mapreduce.cluster.reduce.userlog.retain-size";
+  static final int DEFAULT_RETAIN_SIZE = -1;
+  
   long mapRetainSize, reduceRetainSize;
 
-  public TaskLogsMonitor(long mapRetSize, long reduceRetSize) {
-    mapRetainSize = mapRetSize;
-    reduceRetainSize = reduceRetSize;
-    LOG.info("Starting logs' monitor with mapRetainSize=" + mapRetainSize
-        + " and reduceRetainSize=" + reduceRetSize);
-  }
+  public TaskLogsTruncater(Configuration conf) {
+    mapRetainSize = conf.getLong(MAP_USERLOG_RETAIN_SIZE, DEFAULT_RETAIN_SIZE);
+    reduceRetainSize = conf.getLong(REDUCE_USERLOG_RETAIN_SIZE,
+        DEFAULT_RETAIN_SIZE);
+    LOG.info("Initializing logs' truncater with mapRetainSize=" + mapRetainSize
+        + " and reduceRetainSize=" + reduceRetainSize);
 
-  /**
-   * The list of tasks that have finished and so need their logs to be
-   * truncated.
-   */
-  private Map<TaskAttemptID, PerJVMInfo> finishedJVMs =
-      new HashMap<TaskAttemptID, PerJVMInfo>();
+  }
 
   private static final int DEFAULT_BUFFER_SIZE = 4 * 1024;
 
   static final int MINIMUM_RETAIN_SIZE_FOR_TRUNCATION = 0;
 
-  private static class PerJVMInfo {
-
-    List<Task> allAttempts;
-
-    public PerJVMInfo(List<Task> allAtmpts) {
-      this.allAttempts = allAtmpts;
-    }
-  }
-
-  /**
-   * Process(JVM/debug script) has finished. Asynchronously truncate the logs of
-   * all the corresponding tasks to the configured limit. In case of JVM, both
-   * the firstAttempt as well as the list of all attempts that ran in the same
-   * JVM have to be passed. For debug script, the (only) attempt itself should
-   * be passed as both the firstAttempt as well as the list of attempts.
-   * 
-   * @param firstAttempt
-   * @param isTaskCleanup
-   */
-  void addProcessForLogTruncation(TaskAttemptID firstAttempt,
-      List<Task> allAttempts) {
-    LOG.info("Adding the jvm with first-attempt " + firstAttempt
-        + " for logs' truncation");
-    PerJVMInfo lInfo = new PerJVMInfo(allAttempts);
-    synchronized (finishedJVMs) {
-      finishedJVMs.put(firstAttempt, lInfo);
-      finishedJVMs.notify();
-    }
-  }
-
   /**
    * Process the removed task's logs. This involves truncating them to
    * retainSize.
    */
-  void truncateLogs(TaskAttemptID firstAttempt, PerJVMInfo lInfo) {
+  public void truncateLogs(JVMInfo lInfo) {
+    TaskAttemptID firstAttempt = TaskAttemptID.downgrade(lInfo
+        .getFirstAttemptID());
 
     // Read the log-file details for all the attempts that ran in this JVM
     Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails;
     try {
-      taskLogFileDetails = getAllLogsFileDetails(lInfo.allAttempts);
+      taskLogFileDetails = getAllLogsFileDetails(lInfo.getAllAttempts());
     } catch (IOException e) {
       LOG.warn(
           "Exception in truncateLogs while getting allLogsFileDetails()."
@@ -107,7 +87,7 @@ class TaskLogsMonitor extends Thread {
     }
 
     // set this boolean to true if any of the log files is truncated
-    boolean truncated = false;
+    boolean indexModified = false;
 
     Map<Task, Map<LogName, LogFileDetail>> updatedTaskLogFileDetails =
         new HashMap<Task, Map<LogName, LogFileDetail>>();
@@ -117,7 +97,7 @@ class TaskLogsMonitor extends Thread {
           updatedTaskLogFileDetails, logName);
     }
 
-    File attemptLogDir = TaskLog.getBaseDir(firstAttempt.toString());
+    File attemptLogDir = TaskLog.getAttemptDir(firstAttempt.toString());
 
     FileWriter tmpFileWriter;
     FileReader logFileReader;
@@ -162,19 +142,19 @@ class TaskLogsMonitor extends Thread {
 
       long newCurrentOffset = 0;
       // Process each attempt from the ordered list passed.
-      for (Task task : lInfo.allAttempts) {
+      for (Task task : lInfo.getAllAttempts()) {
 
         // Truncate the log files of this task-attempt so that only the last
         // retainSize many bytes of this log file is retained and the log
         // file is reduced in size saving disk space.
         long retainSize =
             (task.isMapTask() ? mapRetainSize : reduceRetainSize);
-        LogFileDetail newLogFileDetail = new LogFileDetail();
+        LogFileDetail newLogFileDetail = null;
         try {
           newLogFileDetail =
               truncateALogFileOfAnAttempt(task.getTaskID(),
                   taskLogFileDetails.get(task).get(logName), retainSize,
-                  tmpFileWriter, logFileReader);
+                  tmpFileWriter, logFileReader, logName);
         } catch (IOException ioe) {
           LOG.warn("Cannot truncate the log file "
               + logFile.getAbsolutePath()
@@ -201,7 +181,7 @@ class TaskLogsMonitor extends Thread {
           newLogFileDetail.start = newCurrentOffset;
           updatedTaskLogFileDetails.get(task).put(logName, newLogFileDetail);
           newCurrentOffset += newLogFileDetail.length;
-          truncated = true; // set the flag truncated
+          indexModified = true; // set the flag
         }
       }
 
@@ -230,7 +210,7 @@ class TaskLogsMonitor extends Thread {
       }
     }
 
-    if (truncated) {
+    if (indexModified) {
       // Update the index files
       updateIndicesAfterLogTruncation(firstAttempt, updatedTaskLogFileDetails);
     }
@@ -242,12 +222,12 @@ class TaskLogsMonitor extends Thread {
    * @param updatedTaskLogFileDetails
    * @param logName
    */
-  private void copyOriginalIndexFileInfo(PerJVMInfo lInfo,
+  private void copyOriginalIndexFileInfo(JVMInfo lInfo,
       Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails,
       Map<Task, Map<LogName, LogFileDetail>> updatedTaskLogFileDetails,
       LogName logName) {
     if (TaskLog.LOGS_TRACKED_BY_INDEX_FILES.contains(logName)) {
-      for (Task task : lInfo.allAttempts) {
+      for (Task task : lInfo.getAllAttempts()) {
         if (!updatedTaskLogFileDetails.containsKey(task)) {
           updatedTaskLogFileDetails.put(task,
               new HashMap<LogName, LogFileDetail>());
@@ -289,12 +269,12 @@ class TaskLogsMonitor extends Thread {
    * @param logName
    * @return true if truncation is needed, false otherwise
    */
-  private boolean isTruncationNeeded(PerJVMInfo lInfo,
+  private boolean isTruncationNeeded(JVMInfo lInfo,
       Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails,
       LogName logName) {
     boolean truncationNeeded = false;
     LogFileDetail logFileDetail = null;
-    for (Task task : lInfo.allAttempts) {
+    for (Task task : lInfo.getAllAttempts()) {
       long taskRetainSize =
           (task.isMapTask() ? mapRetainSize : reduceRetainSize);
       Map<LogName, LogFileDetail> allLogsFileDetails =
@@ -326,7 +306,8 @@ class TaskLogsMonitor extends Thread {
   private LogFileDetail truncateALogFileOfAnAttempt(
       final TaskAttemptID taskID, final LogFileDetail oldLogFileDetail,
       final long taskRetainSize, final FileWriter tmpFileWriter,
-      final FileReader logFileReader) throws IOException {
+      final FileReader logFileReader,
+      final LogName logName) throws IOException {
     LogFileDetail newLogFileDetail = new LogFileDetail();
 
     // ///////////// Truncate log file ///////////////////////
@@ -335,14 +316,14 @@ class TaskLogsMonitor extends Thread {
     newLogFileDetail.location = oldLogFileDetail.location;
     if (taskRetainSize > MINIMUM_RETAIN_SIZE_FOR_TRUNCATION
         && oldLogFileDetail.length > taskRetainSize) {
-      LOG.info("Truncating logs for " + taskID + " from "
+      LOG.info("Truncating " + logName + " logs for " + taskID + " from "
           + oldLogFileDetail.length + "bytes to " + taskRetainSize
           + "bytes.");
       newLogFileDetail.length = taskRetainSize;
     } else {
-      LOG.info("No truncation needed for " + taskID + " length is "
-          + oldLogFileDetail.length + " retain size " + taskRetainSize
-          + "bytes.");
+      LOG.debug("No truncation needed for " + logName + " logs for " + taskID
+          + " length is " + oldLogFileDetail.length + " retain size "
+          + taskRetainSize + "bytes.");
       newLogFileDetail.length = oldLogFileDetail.length;
     }
     long charsSkipped =
@@ -351,7 +332,8 @@ class TaskLogsMonitor extends Thread {
     if (charsSkipped != oldLogFileDetail.length - newLogFileDetail.length) {
       throw new IOException("Erroneously skipped " + charsSkipped
           + " instead of the expected "
-          + (oldLogFileDetail.length - newLogFileDetail.length));
+          + (oldLogFileDetail.length - newLogFileDetail.length)
+          + " while truncating " + logName + " logs for " + taskID );
     }
     long alreadyRead = 0;
     while (alreadyRead < newLogFileDetail.length) {
@@ -404,59 +386,11 @@ class TaskLogsMonitor extends Thread {
         TaskLog.writeToIndexFile(firstAttempt, task.getTaskID(),
             task.isTaskCleanupTask(), logLengths);
       } catch (IOException ioe) {
-        LOG.warn("Exception in updateIndicesAfterLogTruncation : "
-            + StringUtils.stringifyException(ioe));
         LOG.warn("Exception encountered while updating index file of task "
             + task.getTaskID()
-            + ". Ignoring and continuing with other tasks.");
+            + ". Ignoring and continuing with other tasks.", ioe);
       }
     }
   }
 
-  /**
-   * 
-   * @throws IOException
-   */
-  void monitorTaskLogs() throws IOException {
-
-    Map<TaskAttemptID, PerJVMInfo> tasksBeingTruncated =
-      new HashMap<TaskAttemptID, PerJVMInfo>();
-
-    // Start monitoring newly added finishedJVMs
-    synchronized (finishedJVMs) {
-      tasksBeingTruncated.clear();
-      tasksBeingTruncated.putAll(finishedJVMs);
-      finishedJVMs.clear();
-    }
-
-    for (Entry<TaskAttemptID, PerJVMInfo> entry : 
-                tasksBeingTruncated.entrySet()) {
-      truncateLogs(entry.getKey(), entry.getValue());
-    }
-  }
-
-  @Override
-  public void run() {
-
-    while (true) {
-      try {
-        monitorTaskLogs();
-        try {
-          synchronized (finishedJVMs) {
-            while (finishedJVMs.isEmpty()) {
-              finishedJVMs.wait();
-            }
-          }
-        } catch (InterruptedException e) {
-          LOG.warn(getName() + " is interrupted. Returning");
-          return;
-        }
-      } catch (Throwable e) {
-        LOG.warn(getName()
-            + " encountered an exception while monitoring : "
-            + StringUtils.stringifyException(e));
-        LOG.info("Ingoring the exception and continuing monitoring.");
-      }
-    }
-  }
 }

+ 41 - 44
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -54,7 +54,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
-import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.mapreduce.server.tasktracker.*;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.*;
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -128,11 +129,6 @@ public class TaskTracker
   static final String MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY =
      "mapred.tasktracker.pmem.reserved";
 
-  static final String MAP_USERLOG_RETAIN_SIZE =
-      "mapreduce.cluster.map.userlog.retain-size";
-  static final String REDUCE_USERLOG_RETAIN_SIZE =
-      "mapreduce.cluster.reduce.userlog.retain-size";
-
   static final long WAIT_FOR_DONE = 3 * 1000;
   private int httpPort;
 
@@ -276,7 +272,7 @@ public class TaskTracker
   private long reduceSlotSizeMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
   private long totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
 
-  private TaskLogsMonitor taskLogsMonitor;
+  private UserLogManager userLogManager;
 
   static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY =
       "mapred.tasktracker.memory_calculator_plugin";
@@ -456,12 +452,12 @@ public class TaskTracker
     }
   }
 
-  TaskLogsMonitor getTaskLogsMonitor() {
-    return this.taskLogsMonitor;
+  UserLogManager getUserLogManager() {
+    return this.userLogManager;
   }
 
-  void setTaskLogsMonitor(TaskLogsMonitor t) {
-    this.taskLogsMonitor = t;
+  void setUserLogManager(UserLogManager u) {
+    this.userLogManager = u;
   }
 
   public static String getUserDir(String user) {
@@ -717,9 +713,7 @@ public class TaskTracker
 
     initializeMemoryManagement();
 
-    setTaskLogsMonitor(new TaskLogsMonitor(getMapUserLogRetainSize(),
-        getReduceUserLogRetainSize()));
-    getTaskLogsMonitor().start();
+    getUserLogManager().clearOldUserLogs(fConf);
 
     setIndexCache(new IndexCache(this.fConf));
 
@@ -953,10 +947,8 @@ public class TaskTracker
                               new LocalDirAllocator("mapred.local.dir");
 
   // intialize the job directory
-  @SuppressWarnings("unchecked")
-  private void localizeJob(TaskInProgress tip) 
+  RunningJob localizeJob(TaskInProgress tip) 
   throws IOException, InterruptedException {
-    Path localJarFile = null;
     Task t = tip.getTask();
     JobID jobId = t.getJobID();
     RunningJob rjob = addTaskToJob(jobId, tip);
@@ -967,7 +959,8 @@ public class TaskTracker
     synchronized (rjob) {
       if (!rjob.localized) {
         JobConf localJobConf = localizeJobFiles(t, rjob);
-        
+        // initialize job log directory
+        initializeJobLogDir(jobId);
         // Now initialize the job via task-controller so as to set
         // ownership/permissions of jars, job-work-dir. Note that initializeJob
         // should be the last call after every other directory/file to be
@@ -985,7 +978,7 @@ public class TaskTracker
         rjob.localized = true;
       }
     }
-    launchTaskForJob(tip, new JobConf(rjob.jobConf)); 
+    return rjob;
   }
 
   /**
@@ -1062,6 +1055,15 @@ public class TaskTracker
     return localJobConf;
   }
 
+  // create job userlog dir
+  void initializeJobLogDir(JobID jobId) {
+    // remove it from tasklog cleanup thread first,
+    // it might be added there because of tasktracker reinit or restart
+    JobStartedEvent jse = new JobStartedEvent(jobId);
+    getUserLogManager().addLogEvent(jse);
+    localizer.initializeJobLogDir(jobId);
+  }
+
   /**
    * Download the job configuration file from the FS.
    *
@@ -1183,10 +1185,6 @@ public class TaskTracker
     this.mapLauncher.interrupt();
     this.reduceLauncher.interrupt();
 
-    // All tasks are killed. So, they are removed from TaskLog monitoring also.
-    // Interrupt the monitor.
-    getTaskLogsMonitor().interrupt();
-
     jvmManager.stop();
     
     // shutdown RPC connections
@@ -1261,7 +1259,8 @@ public class TaskTracker
     server.start();
     this.httpPort = server.getPort();
     checkJettyPort(httpPort);
-    
+    // create user log manager
+    setUserLogManager(new UserLogManager(conf));
     // Initialize the jobACLSManager
     jobACLsManager = new TaskTrackerJobACLsManager(this);
     initialize();
@@ -1620,22 +1619,6 @@ public class TaskTracker
     return heartbeatResponse;
   }
 
-  long getMapUserLogRetainSize() {
-    return fConf.getLong(MAP_USERLOG_RETAIN_SIZE, -1);
-  }
-
-  void setMapUserLogRetainSize(long retainSize) {
-    fConf.setLong(MAP_USERLOG_RETAIN_SIZE, retainSize);
-  }
-
-  long getReduceUserLogRetainSize() {
-    return fConf.getLong(REDUCE_USERLOG_RETAIN_SIZE, -1);
-  }
-
-  void setReduceUserLogRetainSize(long retainSize) {
-    fConf.setLong(REDUCE_USERLOG_RETAIN_SIZE, retainSize);
-  }
-
   /**
    * Return the total virtual memory available on this TaskTracker.
    * @return total size of virtual memory.
@@ -1778,7 +1761,7 @@ public class TaskTracker
    * @param action The action with the job
    * @throws IOException
    */
-  private synchronized void purgeJob(KillJobAction action) throws IOException {
+  synchronized void purgeJob(KillJobAction action) throws IOException {
     JobID jobId = action.getJobID();
     LOG.info("Received 'KillJobAction' for job: " + jobId);
     RunningJob rjob = null;
@@ -1803,6 +1786,13 @@ public class TaskTracker
         if (!rjob.keepJobFiles) {
           removeJobFiles(rjob.jobConf.getUser(), rjob.getJobID());
         }
+        // add job to user log manager
+        long now = System.currentTimeMillis();
+        JobCompletedEvent jca = new JobCompletedEvent(rjob
+            .getJobID(), now, UserLogCleaner.getUserlogRetainHours(rjob
+            .getJobConf()));
+        getUserLogManager().addLogEvent(jca);
+
         // Remove this job 
         rjob.tasks.clear();
       }
@@ -2157,7 +2147,8 @@ public class TaskTracker
    */
   void startNewTask(TaskInProgress tip) {
     try {
-      localizeJob(tip);
+      RunningJob rjob = localizeJob(tip);
+      launchTaskForJob(tip, new JobConf(rjob.jobConf)); 
     } catch (Throwable e) {
       String msg = ("Error initializing " + tip.getTask().getTaskID() + 
                     ":\n" + StringUtils.stringifyException(e));
@@ -2217,6 +2208,7 @@ public class TaskTracker
    */
   public void run() {
     try {
+      getUserLogManager().start();
       startCleanupThreads();
       boolean denied = false;
       while (running && !shuttingDown && !denied) {
@@ -2711,8 +2703,9 @@ public class TaskTracker
 
               // Debug-command is run. Do the post-debug-script-exit debug-logs
               // processing. Truncate the logs.
-              getTaskLogsMonitor().addProcessForLogTruncation(
-                  task.getTaskID(), Arrays.asList(task));
+              JvmFinishedEvent jvmFinished = new JvmFinishedEvent(
+                  new JVMInfo(task.getTaskID(), Arrays.asList(task)));
+              getUserLogManager().addLogEvent(jvmFinished);
             }
           }
           taskStatus.setProgress(0.0f);
@@ -3259,6 +3252,10 @@ public class TaskTracker
     FetchStatus getFetchStatus() {
       return f;
     }
+
+    JobConf getJobConf() {
+      return jobConf;
+    }
   }
 
   /**

+ 214 - 0
src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java

@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.*;
+
+/**
+ * This is used only in UserLogManager, to manage cleanup of user logs.
+ */
+public class UserLogCleaner extends Thread {
+  private static final Log LOG = LogFactory.getLog(UserLogCleaner.class);
+  static final String USERLOGCLEANUP_SLEEPTIME = 
+    "mapreduce.tasktracker.userlogcleanup.sleeptime";
+  static final int DEFAULT_USER_LOG_RETAIN_HOURS = 24; // 1 day
+  static final long DEFAULT_THREAD_SLEEP_TIME = 1000 * 60 * 60; // 1 hour
+
+  private UserLogManager userLogManager;
+  private Map<JobID, Long> completedJobs = Collections
+      .synchronizedMap(new HashMap<JobID, Long>());
+  private final long threadSleepTime;
+  private CleanupQueue cleanupQueue;
+
+  private Clock clock;
+  private FileSystem localFs;
+
+  public UserLogCleaner(UserLogManager userLogManager, Configuration conf)
+      throws IOException {
+    this.userLogManager = userLogManager;
+    threadSleepTime = conf.getLong(USERLOGCLEANUP_SLEEPTIME,
+        DEFAULT_THREAD_SLEEP_TIME);
+    cleanupQueue = new CleanupQueue();
+    localFs = FileSystem.getLocal(conf);
+    setClock(new Clock());
+    setDaemon(true);
+  }
+
+  void setClock(Clock clock) {
+    this.clock = clock;
+  }
+
+  Clock getClock() {
+    return this.clock;
+  }
+
+  CleanupQueue getCleanupQueue() {
+    return cleanupQueue;
+  }
+
+  void setCleanupQueue(CleanupQueue cleanupQueue) {
+    this.cleanupQueue = cleanupQueue;
+  }
+
+  @Override
+  public void run() {
+    // This thread wakes up after every threadSleepTime interval
+    // and deletes if there are any old logs.
+    while (true) {
+      try {
+        // sleep
+        Thread.sleep(threadSleepTime);
+        processCompletedJobs();
+      } catch (Throwable e) {
+        LOG.warn(getClass().getSimpleName()
+            + " encountered an exception while monitoring :", e);
+        LOG.info("Ingoring the exception and continuing monitoring.");
+      }
+    }
+  }
+
+  void processCompletedJobs() throws IOException {
+    long now = clock.getTime();
+    // iterate through completedJobs and remove old logs.
+    synchronized (completedJobs) {
+      Iterator<Entry<JobID, Long>> completedJobIter = completedJobs.entrySet()
+          .iterator();
+      while (completedJobIter.hasNext()) {
+        Entry<JobID, Long> entry = completedJobIter.next();
+        // see if the job is old enough
+        if (entry.getValue().longValue() <= now) {
+          // add the job for deletion
+          userLogManager.addLogEvent(new DeleteJobEvent(entry.getKey()));
+          completedJobIter.remove();
+        }
+      }
+    }
+  }
+
+  public void deleteJobLogs(JobID jobid) throws IOException {
+    deleteLogPath(TaskLog.getJobDir(jobid).getAbsolutePath());
+  }
+
+  /**
+   * Clears all the logs in userlog directory.
+   * 
+   * Adds the job directories for deletion with default retain hours. Deletes
+   * all other directories, if any. This is usually called on reinit/restart of
+   * the TaskTracker
+   * 
+   * @param conf
+   * @throws IOException
+   */
+  public void clearOldUserLogs(Configuration conf) throws IOException {
+    File userLogDir = TaskLog.getUserLogDir();
+    if (userLogDir.exists()) {
+      String[] logDirs = userLogDir.list();
+      if (logDirs.length > 0) {
+        // add all the log dirs to taskLogsMnonitor.
+        long now = clock.getTime();
+        for (String logDir : logDirs) {
+          JobID jobid = null;
+          try {
+            jobid = JobID.forName(logDir);
+          } catch (IllegalArgumentException ie) {
+            // if the directory is not a jobid, delete it immediately
+            deleteLogPath(new File(userLogDir, logDir).getAbsolutePath());
+            continue;
+          }
+          // add the job log directory for deletion with default retain hours,
+          // if it is not already added
+          if (!completedJobs.containsKey(jobid)) {
+            JobCompletedEvent jce = new JobCompletedEvent(jobid, now,
+                getUserlogRetainHours(conf));
+            userLogManager.addLogEvent(jce);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * If the configuration is null or user-log retain hours is not configured,
+   * the retain hours are {@value UserLogCleaner#DEFAULT_USER_LOG_RETAIN_HOURS}
+   */
+  static int getUserlogRetainHours(Configuration conf) {
+    return (conf == null ? DEFAULT_USER_LOG_RETAIN_HOURS : conf.getInt(
+        JobContext.USER_LOG_RETAIN_HOURS, DEFAULT_USER_LOG_RETAIN_HOURS));
+  }
+
+  /**
+   * Adds job user-log directory to cleanup thread to delete logs after user-log
+   * retain hours.
+   * 
+   * @param jobCompletionTime
+   *          job completion time in millis
+   * @param retainHours
+   *          the user-log retain hours for the job
+   * @param jobid
+   *          JobID for which user logs should be deleted
+   */
+  public void markJobLogsForDeletion(long jobCompletionTime, int retainHours,
+      org.apache.hadoop.mapreduce.JobID jobid) {
+    long retainTimeStamp = jobCompletionTime + (retainHours * 1000L * 60L * 60L);
+    LOG.info("Adding " + jobid + " for user-log deletion with retainTimeStamp:"
+        + retainTimeStamp);
+    completedJobs.put(jobid, Long.valueOf(retainTimeStamp));
+  }
+
+  /**
+   * Remove job from user log deletion.
+   * 
+   * @param jobid
+   */
+  public void unmarkJobFromLogDeletion(JobID jobid) {
+    if (completedJobs.remove(jobid) != null) {
+      LOG.info("Removing " + jobid + " from user-log deletion");
+    }
+  }
+
+  /**
+   * Deletes the log path.
+   * 
+   * This path will be removed through {@link CleanupQueue}
+   * 
+   * @param logPath
+   * @throws IOException
+   */
+  private void deleteLogPath(String logPath) throws IOException {
+    LOG.info("Deleting user log path " + logPath);
+    PathDeletionContext context = new PathDeletionContext(localFs, logPath);
+    cleanupQueue.addToQueue(context);
+  }
+}

+ 2 - 0
src/mapred/org/apache/hadoop/mapreduce/JobContext.java

@@ -64,6 +64,8 @@ public class JobContext {
   
   public static final String JOB_CANCEL_DELEGATION_TOKEN = 
     "mapreduce.job.complete.cancel.delegation.tokens";
+  public static final String USER_LOG_RETAIN_HOURS = 
+    "mapred.userlog.retain.hours";
   
   protected UserGroupInformation ugi;
   

+ 41 - 0
src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/JVMInfo.java

@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.tasktracker;
+
+import java.util.List;
+
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+public class JVMInfo {
+  private TaskAttemptID firstAttemptID; 
+  private List<Task> allAttempts;
+
+  public JVMInfo(TaskAttemptID firstAttemptID, List<Task> allAttempts) {
+    this.firstAttemptID = firstAttemptID;
+    this.allAttempts = allAttempts;
+  }
+  
+  public TaskAttemptID getFirstAttemptID() {
+    return firstAttemptID;
+  }
+
+  public List<Task> getAllAttempts() {
+    return allAttempts;
+  }
+}

+ 20 - 1
src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java

@@ -28,10 +28,11 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapred.TaskTracker;
 import org.apache.hadoop.mapred.TaskController.InitializationContext;
+import org.apache.hadoop.mapreduce.JobID;
 
 /**
  * 
@@ -358,4 +359,22 @@ public class Localizer {
           + attemptId.toString());
     }
   }
+
+  /**
+   * Create job log directory and set appropriate permissions for the directory.
+   * 
+   * @param jobId
+   */
+  public void initializeJobLogDir(JobID jobId) {
+    File jobUserLogDir = TaskLog.getJobDir(jobId);
+    if (!jobUserLogDir.exists()) {
+      boolean ret = jobUserLogDir.mkdirs();
+      if (!ret) {
+        LOG.warn("Could not create job user log directory: " + jobUserLogDir);
+        return;
+      }
+    }
+    Localizer.PermissionsHandler.setPermissions(jobUserLogDir,
+        Localizer.PermissionsHandler.sevenZeroZero);
+  }
 }

+ 47 - 0
src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/DeleteJobEvent.java

@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.tasktracker.userlogs;
+
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * This is an {@link UserLogEvent} sent when job logs should be deleted.
+ */
+public class DeleteJobEvent extends UserLogEvent {
+  private JobID jobid;
+
+  /**
+   * Create the event to delete job log directory.
+   * 
+   * @param jobid
+   *          The {@link JobID} whose logs should be deleted.
+   */
+  public DeleteJobEvent(JobID jobid) {
+    super(EventType.DELETE_JOB);
+    this.jobid = jobid;
+  }
+
+  /**
+   * Get the jobid.
+   * 
+   * @return object of {@link JobID}
+   */
+  public JobID getJobID() {
+    return jobid;
+  }
+}

+ 74 - 0
src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JobCompletedEvent.java

@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.tasktracker.userlogs;
+
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * This is an {@link UserLogEvent} sent when the job completes
+ */
+public class JobCompletedEvent extends UserLogEvent {
+  private JobID jobid;
+  private long jobCompletionTime;
+  private int retainHours;
+
+  /**
+   * Create the event for job completion.
+   * 
+   * @param jobid
+   *          The completed {@link JobID} .
+   * @param jobCompletionTime
+   *          The job completion time.
+   * @param retainHours
+   *          The number of hours for which the job logs should be retained
+   */
+  public JobCompletedEvent(JobID jobid, long jobCompletionTime,
+      int retainHours) {
+    super(EventType.JOB_COMPLETED);
+    this.jobid = jobid;
+    this.jobCompletionTime = jobCompletionTime;
+    this.retainHours = retainHours;
+  }
+
+  /**
+   * Get the job id.
+   * 
+   * @return object of {@link JobID}
+   */
+  public JobID getJobID() {
+    return jobid;
+  }
+
+  /**
+   * Get the job completion time-stamp in milli-seconds.
+   * 
+   * @return job completion time.
+   */
+  public long getJobCompletionTime() {
+    return jobCompletionTime;
+  }
+
+  /**
+   * Get the number of hours for which job logs should be retained.
+   * 
+   * @return retainHours
+   */
+  public int getRetainHours() {
+    return retainHours;
+  }
+}

+ 47 - 0
src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JobStartedEvent.java

@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.tasktracker.userlogs;
+
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * This is an {@link UserLogEvent} sent when the job starts.
+ */
+public class JobStartedEvent extends UserLogEvent {
+  private JobID jobid;
+
+  /**
+   * Create the event to inform the job has started.
+   * 
+   * @param jobid
+   *          The {@link JobID} which started
+   */
+  public JobStartedEvent(JobID jobid) {
+    super(EventType.JOB_STARTED);
+    this.jobid = jobid;
+  }
+
+  /**
+   * Get the job id.
+   * 
+   * @return object of {@link JobID}
+   */
+  public JobID getJobID() {
+    return jobid;
+  }
+}

+ 47 - 0
src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JvmFinishedEvent.java

@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.tasktracker.userlogs;
+
+import org.apache.hadoop.mapreduce.server.tasktracker.JVMInfo;
+
+/**
+ * This is an {@link UserLogEvent} sent when the jvm finishes.
+ */
+public class JvmFinishedEvent extends UserLogEvent {
+  private JVMInfo jvmInfo;
+
+  /**
+   * Create the event to inform that the jvm has finished.
+   * 
+   * @param jvmInfo
+   *          The finished {@link JVMInfo}
+   */
+  public JvmFinishedEvent(JVMInfo jvmInfo) {
+    super(EventType.JVM_FINISHED);
+    this.jvmInfo = jvmInfo;
+  }
+
+  /**
+   * Get the jvm info.
+   * 
+   * @return object of {@link JVMInfo}
+   */
+  public JVMInfo getJvmInfo() {
+    return jvmInfo;
+  }
+}

+ 49 - 0
src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogEvent.java

@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.tasktracker.userlogs;
+
+import org.apache.hadoop.mapred.TaskTracker;
+
+/**
+ * A directive from the various components of {@link TaskTracker} to the
+ * {@link UserLogManager} to inform about an event.
+ */
+public abstract class UserLogEvent {
+  
+  public enum EventType {
+    JVM_FINISHED,
+    JOB_STARTED,
+    JOB_COMPLETED,
+    DELETE_JOB,
+  };
+
+  private EventType eventType;
+  
+  protected UserLogEvent(EventType eventType) {
+    this.eventType = eventType;
+  }
+  
+  /**
+   * Return the {@link EventType}.
+   * @return the {@link EventType}.
+   */
+  public EventType getEventType() {
+    return eventType;
+  }
+
+}

+ 145 - 0
src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogManager.java

@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.tasktracker.userlogs;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskLogsTruncater;
+import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.hadoop.mapred.UserLogCleaner;
+
+/**
+ * This manages user logs on the {@link TaskTracker}.
+ */
+public class UserLogManager {
+  private static final Log LOG = LogFactory.getLog(UserLogManager.class);
+  private BlockingQueue<UserLogEvent> userLogEvents = 
+    new LinkedBlockingQueue<UserLogEvent>();
+  private TaskLogsTruncater taskLogsTruncater;
+  private UserLogCleaner userLogCleaner;
+
+  private Thread monitorLogEvents = new Thread() {
+    @Override
+    public void run() {
+      while (true) {
+        try {
+          monitor();
+        } catch (Exception e) {
+          LOG.warn("Exception while monitoring user log events", e);
+        }
+      }
+    }
+  };
+
+  /**
+   * Create the user log manager to manage user logs on {@link TaskTracker}.
+   * 
+   * It should be explicitly started using {@link #start()} to start functioning
+   * 
+   * @param conf
+   *          The {@link Configuration}
+   * 
+   * @throws IOException
+   */
+  public UserLogManager(Configuration conf) throws IOException {
+    taskLogsTruncater = new TaskLogsTruncater(conf);
+    userLogCleaner = new UserLogCleaner(this, conf);
+    monitorLogEvents.setDaemon(true);
+  }
+
+  /**
+   * Starts managing the logs
+   */
+  public void start() {
+    userLogCleaner.start();
+    monitorLogEvents.start();
+  }
+
+  protected void monitor() throws Exception {
+    UserLogEvent event = userLogEvents.take();
+    processEvent(event);
+  }
+
+  protected void processEvent(UserLogEvent event) throws IOException {
+    if (event instanceof JvmFinishedEvent) {
+      doJvmFinishedAction((JvmFinishedEvent) event);
+    } else if (event instanceof JobCompletedEvent) {
+      doJobCompletedAction((JobCompletedEvent) event);
+    } else if (event instanceof JobStartedEvent) {
+      doJobStartedAction((JobStartedEvent) event);
+    } else if (event instanceof DeleteJobEvent) {
+      doDeleteJobAction((DeleteJobEvent) event);
+    } else { 
+      LOG.warn("Unknown event " + event.getEventType() + " passed.");
+    }
+  }
+
+  /**
+   * Called during TaskTracker restart/re-init.
+   * 
+   * @param conf
+   *          TT's conf
+   * @throws IOException
+   */
+  public void clearOldUserLogs(Configuration conf)
+      throws IOException {
+    userLogCleaner.clearOldUserLogs(conf);
+  }
+
+  private void doJvmFinishedAction(JvmFinishedEvent event) {
+    taskLogsTruncater.truncateLogs(event.getJvmInfo());
+  }
+
+  private void doJobStartedAction(JobStartedEvent event) {
+    userLogCleaner.unmarkJobFromLogDeletion(event.getJobID());
+  }
+
+  private void doJobCompletedAction(JobCompletedEvent event) {
+    userLogCleaner.markJobLogsForDeletion(event.getJobCompletionTime(), event
+        .getRetainHours(), event.getJobID());
+  }
+
+  private void doDeleteJobAction(DeleteJobEvent event) throws IOException {
+    userLogCleaner.deleteJobLogs(event.getJobID());
+  }
+
+  /**
+   * Add the {@link UserLogEvent} for processing.
+   * 
+   * @param event
+   */
+  public void addLogEvent(UserLogEvent event) {
+    userLogEvents.add(event);
+  }
+
+  /**
+   * Get {@link UserLogCleaner}.
+   * 
+   * This method is called only from unit tests.
+   * 
+   * @return {@link UserLogCleaner}
+   */
+  public UserLogCleaner getUserLogCleaner() {
+    return userLogCleaner;
+  }
+}

+ 5 - 3
src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java

@@ -77,9 +77,6 @@ public class TestLocalizationWithLinuxTaskController extends
     String user = ugi.split(",")[0];
     jobConf.setUser(user);
     File jobConfFile = uploadJobConf(jobConf);
-    // Create the task again to change the job-user
-    task =
-      new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1);
     task.setConf(jobConf);
     task.setUser(user);
     taskTrackerUserName = UserGroupInformation.getLoginUser()
@@ -208,6 +205,11 @@ public class TestLocalizationWithLinuxTaskController extends
       checkFilePermissions(file.toUri().getPath(), expectedFilePerms, task
           .getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
     }
+
+    // check job user-log directory permissions
+    File jobLogDir = TaskLog.getJobDir(jobId);
+    checkFilePermissions(jobLogDir.toString(), expectedDirPerms, task.getUser(),
+        ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
   }
 
   @Override

+ 50 - 58
src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java → src/test/org/apache/hadoop/mapred/TestTaskLogsTruncater.java

@@ -31,6 +31,7 @@ import java.util.Map;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -40,6 +41,9 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
 import org.apache.hadoop.mapred.TaskLog.LogName;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapreduce.server.tasktracker.JVMInfo;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.JvmFinishedEvent;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.UserLogManager;
 import org.apache.hadoop.mapreduce.split.JobSplit;
 
 import org.junit.After;
@@ -49,11 +53,11 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
 
 /**
- * Verify the logs' monitoring functionality.
+ * Verify the logs' truncation functionality.
  */
-public class TestTaskLogsMonitor {
+public class TestTaskLogsTruncater {
 
-  static final Log LOG = LogFactory.getLog(TestTaskLogsMonitor.class);
+  static final Log LOG = LogFactory.getLog(TestTaskLogsTruncater.class);
 
   /**
    * clean-up any stale directories after enabling writable permissions for all
@@ -85,7 +89,7 @@ public class TestTaskLogsMonitor {
           + logFile);
     }
 
-    File attemptDir = TaskLog.getBaseDir(attemptID.toString());
+    File attemptDir = TaskLog.getAttemptDir(attemptID.toString());
     if (!attemptDir.exists() && !attemptDir.mkdirs()) {
       throw new IOException("Couldn't create all ancestor dirs for "
           + logFile);
@@ -126,6 +130,14 @@ public class TestTaskLogsMonitor {
     return allLogsFileLengths;
   }
 
+  private Configuration setRetainSizes(long mapRetainSize,
+      long reduceRetainSize) {
+    Configuration conf = new Configuration();
+    conf.setLong(TaskLogsTruncater.MAP_USERLOG_RETAIN_SIZE, mapRetainSize);
+    conf.setLong(TaskLogsTruncater.REDUCE_USERLOG_RETAIN_SIZE, reduceRetainSize);
+    return conf;
+  }
+
   /**
    * Test cases which don't need any truncation of log-files. Without JVM-reuse.
    * 
@@ -133,9 +145,8 @@ public class TestTaskLogsMonitor {
    */
   @Test
   public void testNoTruncationNeeded() throws IOException {
-    TaskTracker taskTracker = new TaskTracker();
-    TaskLogsMonitor logsMonitor = new TaskLogsMonitor(1000L, 1000L);
-    taskTracker.setTaskLogsMonitor(logsMonitor);
+    Configuration conf = setRetainSizes(1000L, 1000L);
+    UserLogManager logManager = new UtilsForTests.InLineUserLogManager(conf);
 
     TaskID baseId = new TaskID();
     int taskcount = 0;
@@ -151,17 +162,16 @@ public class TestTaskLogsMonitor {
     File logIndex = TaskLog.getIndexFile(attemptID.toString(), false);
     long indexModificationTimeStamp = logIndex.lastModified();
 
-    logsMonitor.monitorTaskLogs();
-    File attemptDir = TaskLog.getBaseDir(attemptID.toString());
+    File attemptDir = TaskLog.getAttemptDir(attemptID.toString());
     assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
     assertEquals("index file got modified", indexModificationTimeStamp,
         logIndex.lastModified());
 
     // Finish the task and the JVM too.
-    logsMonitor.addProcessForLogTruncation(attemptID, Arrays.asList(task));
+    JVMInfo jvmInfo = new JVMInfo(attemptID, Arrays.asList(task));
+    logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
 
     // There should be no truncation of the log-file.
-    logsMonitor.monitorTaskLogs();
     assertTrue(attemptDir.exists());
     assertEquals("index file got modified", indexModificationTimeStamp,
         logIndex.lastModified());
@@ -175,7 +185,7 @@ public class TestTaskLogsMonitor {
     }
 
     // truncate it once again
-    logsMonitor.monitorTaskLogs();
+    logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
     assertEquals("index file got modified", indexModificationTimeStamp,
         logIndex.lastModified());
     
@@ -195,10 +205,9 @@ public class TestTaskLogsMonitor {
    */
   @Test
   public void testDisabledLogTruncation() throws IOException {
-    TaskTracker taskTracker = new TaskTracker();
     // Anything less than 0 disables the truncation.
-    TaskLogsMonitor logsMonitor = new TaskLogsMonitor(-1L, -1L);
-    taskTracker.setTaskLogsMonitor(logsMonitor);
+    Configuration conf = setRetainSizes(-1L, -1L);
+    UserLogManager logManager = new UtilsForTests.InLineUserLogManager(conf);
 
     TaskID baseId = new TaskID();
     int taskcount = 0;
@@ -212,15 +221,14 @@ public class TestTaskLogsMonitor {
       writeRealBytes(attemptID, attemptID, log, 1500, 'H');
     }
 
-    logsMonitor.monitorTaskLogs();
-    File attemptDir = TaskLog.getBaseDir(attemptID.toString());
+    File attemptDir = TaskLog.getAttemptDir(attemptID.toString());
     assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
 
     // Finish the task and the JVM too.
-    logsMonitor.addProcessForLogTruncation(attemptID, Arrays.asList(task));
+    JVMInfo jvmInfo = new JVMInfo(attemptID, Arrays.asList(task));
+    logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
 
     // The log-file should not be truncated.
-    logsMonitor.monitorTaskLogs();
     assertTrue(attemptDir.exists());
     Map<LogName, Long> logLengths = getAllLogsFileLengths(attemptID, false);
     for (LogName log : LogName.values()) {
@@ -238,9 +246,8 @@ public class TestTaskLogsMonitor {
    */
   @Test
   public void testLogTruncationOnFinishing() throws IOException {
-    TaskTracker taskTracker = new TaskTracker();
-    TaskLogsMonitor logsMonitor = new TaskLogsMonitor(1000L, 1000L);
-    taskTracker.setTaskLogsMonitor(logsMonitor);
+    Configuration conf = setRetainSizes(1000L, 1000L);
+    UserLogManager logManager = new UtilsForTests.InLineUserLogManager(conf);
 
     TaskID baseId = new TaskID();
     int taskcount = 0;
@@ -254,15 +261,14 @@ public class TestTaskLogsMonitor {
       writeRealBytes(attemptID, attemptID, log, 1500, 'H');
     }
 
-    logsMonitor.monitorTaskLogs();
-    File attemptDir = TaskLog.getBaseDir(attemptID.toString());
+    File attemptDir = TaskLog.getAttemptDir(attemptID.toString());
     assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
 
     // Finish the task and the JVM too.
-    logsMonitor.addProcessForLogTruncation(attemptID, Arrays.asList(task));
+    JVMInfo jvmInfo = new JVMInfo(attemptID, Arrays.asList(task));
+    logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
 
     // The log-file should now be truncated.
-    logsMonitor.monitorTaskLogs();
     assertTrue(attemptDir.exists());
 
     Map<LogName, Long> logLengths = getAllLogsFileLengths(attemptID, false);
@@ -274,7 +280,6 @@ public class TestTaskLogsMonitor {
     }
 
     // truncate once again
-    logsMonitor.monitorTaskLogs();
     logLengths = getAllLogsFileLengths(attemptID, false);
     for (LogName log : LogName.values()) {
       File logFile = TaskLog.getTaskLogFile(attemptID, log);
@@ -293,9 +298,8 @@ public class TestTaskLogsMonitor {
    */
   @Test
   public void testLogTruncation() throws IOException {
-    TaskTracker taskTracker = new TaskTracker();
-    TaskLogsMonitor logsMonitor = new TaskLogsMonitor(1000L, 1000L);
-    taskTracker.setTaskLogsMonitor(logsMonitor);
+    Configuration conf = setRetainSizes(1000L, 1000L);
+    UserLogManager logManager = new UtilsForTests.InLineUserLogManager(conf);
 
     TaskID baseId = new TaskID();
     int taskcount = 0;
@@ -308,15 +312,14 @@ public class TestTaskLogsMonitor {
     writeRealBytes(attemptID, attemptID, LogName.SYSLOG, 1500, 'H');
     writeRealBytes(attemptID, attemptID, LogName.STDERR, 500, 'H');
 
-    logsMonitor.monitorTaskLogs();
-    File attemptDir = TaskLog.getBaseDir(attemptID.toString());
+    File attemptDir = TaskLog.getAttemptDir(attemptID.toString());
     assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
 
     // Finish the task and the JVM too.
-    logsMonitor.addProcessForLogTruncation(attemptID, Arrays.asList(task));
+    JVMInfo jvmInfo = new JVMInfo(attemptID, Arrays.asList(task));
+    logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
 
     // The log-file should now be truncated.
-    logsMonitor.monitorTaskLogs();
     assertTrue(attemptDir.exists());
 
     Map<LogName, Long> logLengths = getAllLogsFileLengths(attemptID, false);
@@ -330,7 +333,7 @@ public class TestTaskLogsMonitor {
     assertEquals(500, logLengths.get(LogName.STDERR).longValue());
 
     // truncate once again
-    logsMonitor.monitorTaskLogs();
+    logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
     logLengths = getAllLogsFileLengths(attemptID, false);
     logFile = TaskLog.getTaskLogFile(attemptID, LogName.SYSLOG);
     assertEquals(1000, logFile.length());
@@ -349,9 +352,8 @@ public class TestTaskLogsMonitor {
    */
   @Test
   public void testLogTruncationOnFinishingWithJVMReuse() throws IOException {
-    TaskTracker taskTracker = new TaskTracker();
-    TaskLogsMonitor logsMonitor = new TaskLogsMonitor(150L, 150L);
-    taskTracker.setTaskLogsMonitor(logsMonitor);
+    Configuration conf = setRetainSizes(150L, 150L);
+    UserLogManager logManager = new UtilsForTests.InLineUserLogManager(conf);
 
     TaskID baseTaskID = new TaskID();
     int attemptsCount = 0;
@@ -364,37 +366,27 @@ public class TestTaskLogsMonitor {
     // Let the tasks write logs more than retain-size
     writeRealBytes(attempt1, attempt1, LogName.SYSLOG, 200, 'A');
 
-    logsMonitor.monitorTaskLogs();
-
-    File attemptDir = TaskLog.getBaseDir(attempt1.toString());
+    File attemptDir = TaskLog.getAttemptDir(attempt1.toString());
     assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
 
     // Start another attempt in the same JVM
     TaskAttemptID attempt2 = new TaskAttemptID(baseTaskID, attemptsCount++);
     Task task2 = new MapTask(null, attempt2, 0, new JobSplit.TaskSplitIndex(),
                              0);
-    logsMonitor.monitorTaskLogs();
-
     // Let attempt2 also write some logs
     writeRealBytes(attempt1, attempt2, LogName.SYSLOG, 100, 'B');
-    logsMonitor.monitorTaskLogs();
-
     // Start yet another attempt in the same JVM
     TaskAttemptID attempt3 = new TaskAttemptID(baseTaskID, attemptsCount++);
     Task task3 = new MapTask(null, attempt3, 0, new JobSplit.TaskSplitIndex(),
                              0);
-    logsMonitor.monitorTaskLogs();
-
     // Let attempt3 also write some logs
     writeRealBytes(attempt1, attempt3, LogName.SYSLOG, 225, 'C');
-    logsMonitor.monitorTaskLogs();
-
     // Finish the JVM.
-    logsMonitor.addProcessForLogTruncation(attempt1,
-        Arrays.asList((new Task[] { task1, task2, task3 })));
+    JVMInfo jvmInfo = new JVMInfo(attempt1, Arrays.asList((new Task[] { task1,
+        task2, task3 })));
+    logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
 
     // The log-file should now be truncated.
-    logsMonitor.monitorTaskLogs();
     assertTrue(attemptDir.exists());
     File logFile = TaskLog.getTaskLogFile(attempt1, LogName.SYSLOG);
     assertEquals(400, logFile.length());
@@ -435,7 +427,7 @@ public class TestTaskLogsMonitor {
     }
     assertTrue("Log-truncation didn't happen properly!", dataValid);
 
-    logsMonitor.monitorTaskLogs();
+    logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
     assertEquals(400, logFile.length());
   }
 
@@ -470,8 +462,8 @@ public class TestTaskLogsMonitor {
     MiniMRCluster mr = null;
     try {
       JobConf clusterConf = new JobConf();
-      clusterConf.setLong(TaskTracker.MAP_USERLOG_RETAIN_SIZE, 10000L);
-      clusterConf.setLong(TaskTracker.REDUCE_USERLOG_RETAIN_SIZE, 10000L);
+      clusterConf.setLong(TaskLogsTruncater.MAP_USERLOG_RETAIN_SIZE, 10000L);
+      clusterConf.setLong(TaskLogsTruncater.REDUCE_USERLOG_RETAIN_SIZE, 10000L);
       mr = new MiniMRCluster(1, "file:///", 3, null, null, clusterConf);
 
       JobConf conf = mr.createJobConf();
@@ -525,7 +517,7 @@ public class TestTaskLogsMonitor {
   }
 
   /**
-   * Test the truncation of DEBUGOUT file by {@link TaskLogsMonitor}
+   * Test the truncation of DEBUGOUT file by {@link TaskLogsTruncater}
    * @throws IOException 
    */
   @Test
@@ -534,8 +526,8 @@ public class TestTaskLogsMonitor {
     MiniMRCluster mr = null;
     try {
       JobConf clusterConf = new JobConf();
-      clusterConf.setLong(TaskTracker.MAP_USERLOG_RETAIN_SIZE, 10000L);
-      clusterConf.setLong(TaskTracker.REDUCE_USERLOG_RETAIN_SIZE, 10000L);
+      clusterConf.setLong(TaskLogsTruncater.MAP_USERLOG_RETAIN_SIZE, 10000L);
+      clusterConf.setLong(TaskLogsTruncater.REDUCE_USERLOG_RETAIN_SIZE, 10000L);
       mr = new MiniMRCluster(1, "file:///", 3, null, null, clusterConf);
 
       JobConf conf = mr.createJobConf();

+ 48 - 93
src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java

@@ -38,12 +38,11 @@ import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
 import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Shell;
 
 /**
@@ -75,6 +74,7 @@ public class TestTaskTrackerLocalization extends TestCase {
   protected Path attemptWorkDir;
   protected File[] attemptLogFiles;
   protected JobConf localizedTaskConf;
+  private TaskInProgress tip;
 
   /**
    * Dummy method in this base class. Only derived classes will define this
@@ -113,8 +113,14 @@ public class TestTaskTrackerLocalization extends TestCase {
     trackerFConf.setStrings("mapred.local.dir", localDirs);
 
     // Create the job configuration file. Same as trackerConf in this test.
-    JobConf jobConf = trackerFConf;
-
+    JobConf jobConf = new JobConf(trackerFConf);
+    // Set job view ACLs in conf sothat validation of contents of jobACLsFile
+    // can be done against this value. Have both users and groups
+    String jobViewACLs = "user1,user2, group1,group2";
+    jobConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACLs);
+    jobConf.setInt("mapred.userlog.retain.hours", 0);
+    String jtIdentifier = "200907202331";
+    jobId = new JobID(jtIdentifier, 1);
 
     // JobClient uploads the job jar to the file system and sets it in the
     // jobConf.
@@ -123,9 +129,16 @@ public class TestTaskTrackerLocalization extends TestCase {
     // JobClient uploads the jobConf to the file system.
     File jobConfFile = uploadJobConf(jobConf);
 
+    // create jobTokens file
+    uploadJobTokensFile();
+
     // Set up the TaskTracker
     tracker = new TaskTracker();
     tracker.setConf(trackerFConf);
+    tracker.setIndexCache(new IndexCache(trackerFConf));
+    tracker.setUserLogManager(new UtilsForTests.InLineUserLogManager(
+        trackerFConf));
+    tracker.setTaskMemoryManagerEnabledFlag();
 
     // for test case system FS is the local FS
 
@@ -136,13 +149,6 @@ public class TestTaskTrackerLocalization extends TestCase {
     taskTrackerUGI = UserGroupInformation.getCurrentUser();
 
     // Set up the task to be localized
-    String jtIdentifier = "200907202331";
-    jobId = new JobID(jtIdentifier, 1);
-    
-    TaskTracker.RunningJob rjob = new TaskTracker.RunningJob(jobId);
-    rjob.ugi = UserGroupInformation.getCurrentUser();
-    tracker.runningJobs.put(jobId, rjob);
-    
     taskId =
         new TaskAttemptID(jtIdentifier, jobId.getId(), true, 1, 0);
     task =
@@ -150,8 +156,6 @@ public class TestTaskTrackerLocalization extends TestCase {
     task.setConf(jobConf); // Set conf. Set user name in particular.
     task.setUser(UserGroupInformation.getCurrentUser().getUserName());
 
-    // create jobTokens file
-    uploadJobTokensFile();
 
     taskController = new DefaultTaskController();
     taskController.setConf(trackerFConf);
@@ -160,6 +164,10 @@ public class TestTaskTrackerLocalization extends TestCase {
     tracker.setTaskController(taskController);
     tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
                                        taskController));
+
+    // mimic register task
+    // create the tip
+    tip = tracker.new TaskInProgress(task, trackerFConf);
   }
 
   /**
@@ -356,24 +364,8 @@ public class TestTaskTrackerLocalization extends TestCase {
     if (!canRun()) {
       return;
     }
-    tracker.getLocalizer().initializeUserDirs(task.getUser());
-
-    // /////////// The main method being tested
-    localizedJobConf = tracker.localizeJobFiles(task, 
-        new TaskTracker.RunningJob(task.getJobID()));
-    // ///////////
-
-    // Now initialize the job via task-controller so as to set
-    // ownership/permissions of jars, job-work-dir
-    JobInitializationContext context = new JobInitializationContext();
-    context.jobid = jobId;
-    context.user = task.getUser();
-    context.workDir =
-        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
-
-    // /////////// The method being tested
-    taskController.initializeJob(context);
-    // ///////////
+    TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+    localizedJobConf = rjob.getJobConf();
 
     checkJobLocalization();
   }
@@ -441,6 +433,13 @@ public class TestTaskTrackerLocalization extends TestCase {
     assertTrue(
         "mapred.jar is not set properly to the target users directory : "
             + localizedJobJar, mapredJarFlag);
+
+    // check job user-log directory permissions
+    File jobLogDir = TaskLog.getJobDir(jobId);
+    assertTrue("job log directory " + jobLogDir + " does not exist!", jobLogDir
+        .exists());
+    checkFilePermissions(jobLogDir.toString(), "drwx------", task.getUser(),
+        taskTrackerUGI.getGroupNames()[0]);
   }
 
   /**
@@ -453,25 +452,9 @@ public class TestTaskTrackerLocalization extends TestCase {
     if (!canRun()) {
       return;
     }
-    tracker.getLocalizer().initializeUserDirs(task.getUser());
-    localizedJobConf = tracker.localizeJobFiles(task, 
-        new TaskTracker.RunningJob(task.getJobID()));
-
-    // Set job view ACLs in conf sothat validation of contents of jobACLsFile
-    // can be done against this value. Have both users and groups
-    String jobViewACLs = "user1,user2, group1,group2";
-    localizedJobConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACLs);
-
-    // Now initialize the job via task-controller so as to set
-    // ownership/permissions of jars, job-work-dir
-    JobInitializationContext jobContext = new JobInitializationContext();
-    jobContext.jobid = jobId;
-    jobContext.user = task.getUser();
-    jobContext.workDir =
-        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
-    taskController.initializeJob(jobContext);
+    TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+    localizedJobConf = rjob.getJobConf();
 
-    TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
     tip.setJobConf(localizedJobConf);
 
     // ////////// The central method being tested
@@ -552,9 +535,7 @@ public class TestTaskTrackerLocalization extends TestCase {
         .getPath(), "tmp").exists());
 
     // Make sure that the logs are setup properly
-    File logDir =
-        new File(HADOOP_LOG_DIR, TaskLog.USERLOGS_DIR_NAME + Path.SEPARATOR
-            + task.getTaskID().toString());
+    File logDir = TaskLog.getAttemptDir(taskId.toString());
     assertTrue("task's log dir " + logDir.toString() + " doesn't exist!",
         logDir.exists());
     checkFilePermissions(logDir.getAbsolutePath(), "drwx------", task
@@ -709,21 +690,12 @@ public class TestTaskTrackerLocalization extends TestCase {
   private void testTaskCleanup(boolean needCleanup, boolean jvmReuse)
       throws Exception {
     // Localize job and localize task.
-    tracker.getLocalizer().initializeUserDirs(task.getUser());
-    localizedJobConf = tracker.localizeJobFiles(task, 
-        new TaskTracker.RunningJob(task.getJobID()));
+    TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
+    TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+    localizedJobConf = rjob.getJobConf();
     if (jvmReuse) {
       localizedJobConf.setNumTasksToExecutePerJvm(2);
     }
-    // Now initialize the job via task-controller so as to set
-    // ownership/permissions of jars, job-work-dir
-    JobInitializationContext jobContext = new JobInitializationContext();
-    jobContext.jobid = jobId;
-    jobContext.user = localizedJobConf.getUser();
-    jobContext.workDir =
-        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
-    taskController.initializeJob(jobContext);
-    TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
     tip.setJobConf(localizedJobConf);
     tip.localizeTask(task);
     Path workDir =
@@ -777,22 +749,17 @@ public class TestTaskTrackerLocalization extends TestCase {
    */
   private void verifyUserLogsCleanup()
       throws IOException {
-    Path logDir =
-        new Path(HADOOP_LOG_DIR.toURI().getPath(), TaskLog.USERLOGS_DIR_NAME
-            + Path.SEPARATOR + task.getTaskID().toString());
-
+    // verify user logs cleanup
+    File jobUserLogDir = TaskLog.getJobDir(jobId);
     // Logs should be there before cleanup.
-    assertTrue("Userlogs dir " + logDir + " is not presen as expected!!",
-        tracker.getLocalFileSystem().exists(logDir));
-
-    // ////////// Another being tested
-    TaskLog.cleanup(-1); // -1 so as to move purgeTimeStamp to future and file
-    // modification time behind retainTimeStatmp
-    // //////////
-
+    assertTrue("Userlogs dir " + jobUserLogDir + " is not present as expected!!",
+          jobUserLogDir.exists());
+    tracker.purgeJob(new KillJobAction(jobId));
+    tracker.getUserLogManager().getUserLogCleaner().processCompletedJobs();
+    
     // Logs should be gone after cleanup.
-    assertFalse("Userlogs dir " + logDir + " is not deleted as expected!!",
-        tracker.getLocalFileSystem().exists(logDir));
+    assertFalse("Userlogs dir " + jobUserLogDir + " is not deleted as expected!!",
+        jobUserLogDir.exists());
   }
   
   /**
@@ -806,24 +773,12 @@ public class TestTaskTrackerLocalization extends TestCase {
     }
     
     LOG.info("Running testJobCleanup()");
-    // Localize job and localize task.
-    tracker.getLocalizer().initializeUserDirs(task.getUser());
-    localizedJobConf = 
-      tracker.localizeJobFiles(task, 
-                               new TaskTracker.RunningJob(task.getJobID()));
-    
-    // Now initialize the job via task-controller so as to set
-    // ownership/permissions of job-work-dir
-    JobInitializationContext jobContext = new JobInitializationContext();
-    jobContext.jobid = jobId;
-    jobContext.user = localizedJobConf.getUser();
-    jobContext.workDir =
-        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
-    taskController.initializeJob(jobContext);
-    
     // Set an inline cleanup queue
     InlineCleanupQueue cleanupQueue = new InlineCleanupQueue();
     tracker.setCleanupThread(cleanupQueue);
+    // Localize job and localize task.
+    TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+    localizedJobConf = rjob.getJobConf();
     
     // Create a file in job's work-dir with 555
     String jobWorkDir = 

+ 296 - 0
src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java

@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.*;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Test;
+
+public class TestUserLogCleanup {
+  private static String jtid = "test";
+  private static long ONE_HOUR = 1000 * 60 * 60;
+  private Localizer localizer;
+  private UserLogManager userLogManager;
+  private UserLogCleaner userLogCleaner;
+  private TaskTracker tt;
+  private FakeClock myClock;
+  private JobID jobid1 = new JobID(jtid, 1);
+  private JobID jobid2 = new JobID(jtid, 2);
+  private JobID jobid3 = new JobID(jtid, 3);
+  private JobID jobid4 = new JobID(jtid, 4);
+  private File foo = new File(TaskLog.getUserLogDir(), "foo");
+  private File bar = new File(TaskLog.getUserLogDir(), "bar");
+
+  public TestUserLogCleanup() throws IOException {
+    Configuration conf = new Configuration();
+    startTT(conf);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    FileUtil.fullyDelete(TaskLog.getUserLogDir());
+  }
+
+  private File localizeJob(JobID jobid) throws IOException {
+    File jobUserlog = TaskLog.getJobDir(jobid);
+    // localize job log directory
+    tt.initializeJobLogDir(jobid);
+    assertTrue(jobUserlog + " directory is not created.", jobUserlog.exists());
+    return jobUserlog;
+  }
+
+  private void jobFinished(JobID jobid, int logRetainHours) {
+    JobCompletedEvent jce = new JobCompletedEvent(jobid, myClock.getTime(),
+        logRetainHours);
+    userLogManager.addLogEvent(jce);
+  }
+
+  private void startTT(Configuration conf) throws IOException {
+    myClock = new FakeClock(); // clock is reset.
+    tt = new TaskTracker();
+    localizer = new Localizer(FileSystem.get(conf), conf
+        .getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY),
+        new DefaultTaskController());
+    tt.setLocalizer(localizer);
+    userLogManager = new UtilsForTests.InLineUserLogManager(conf);
+    userLogCleaner = userLogManager.getUserLogCleaner();
+    userLogCleaner.setClock(myClock);
+    tt.setUserLogManager(userLogManager);
+    userLogManager.clearOldUserLogs(conf);
+  }
+
+  private void ttReinited() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(JobContext.USER_LOG_RETAIN_HOURS, 3);
+    userLogManager.clearOldUserLogs(conf);
+  }
+
+  private void ttRestarted() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(JobContext.USER_LOG_RETAIN_HOURS, 3);
+    startTT(conf);
+  }
+
+  /**
+   * Tests job user-log directory deletion.
+   * 
+   * Adds two jobs for log deletion. One with one hour retain hours, other with
+   * two retain hours. After an hour,
+   * TaskLogCleanupThread.processCompletedJobs() call, makes sure job with 1hr
+   * retain hours is removed and other is retained. After one more hour, job
+   * with 2hr retain hours is also removed.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testJobLogCleanup() throws IOException {
+    File jobUserlog1 = localizeJob(jobid1);
+    File jobUserlog2 = localizeJob(jobid2);
+
+    // add job user log directory for deletion, with 2 hours for deletion
+    jobFinished(jobid1, 2);
+
+    // add the job for deletion with one hour as retain hours
+    jobFinished(jobid2, 1);
+
+    // remove old logs and see jobid1 is not removed and jobid2 is removed
+    myClock.advance(ONE_HOUR);
+    userLogCleaner.processCompletedJobs();
+    assertTrue(jobUserlog1 + " got deleted", jobUserlog1.exists());
+    assertFalse(jobUserlog2 + " still exists.", jobUserlog2.exists());
+
+    myClock.advance(ONE_HOUR);
+    // remove old logs and see jobid1 is removed now
+    userLogCleaner.processCompletedJobs();
+    assertFalse(jobUserlog1 + " still exists.", jobUserlog1.exists());
+  }
+
+  /**
+   * Tests user-log directory cleanup on a TT re-init with 3 hours as log retain
+   * hours for tracker.
+   * 
+   * Adds job1 deletion before the re-init with 2 hour retain hours. Adds job2
+   * for which there are no tasks/killJobAction after the re-init. Adds job3 for
+   * which there is localizeJob followed by killJobAction with 3 hours as retain
+   * hours. Adds job4 for which there are some tasks after the re-init.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testUserLogCleanup() throws IOException {
+    File jobUserlog1 = localizeJob(jobid1);
+    File jobUserlog2 = localizeJob(jobid2);
+    File jobUserlog3 = localizeJob(jobid3);
+    File jobUserlog4 = localizeJob(jobid4);
+    // create a some files/dirs in userlog
+    foo.mkdirs();
+    bar.createNewFile();
+
+    // add the jobid1 for deletion with retainhours = 2
+    jobFinished(jobid1, 2);
+
+    // time is now 1.
+    myClock.advance(ONE_HOUR);
+
+    // mimic TaskTracker reinit
+    // re-init the tt with 3 hours as user log retain hours.
+    // This re-init clears the user log directory
+    // job directories will be added with 3 hours as retain hours.
+    // i.e. They will be deleted at time 4.
+    ttReinited();
+
+    assertFalse(foo.exists());
+    assertFalse(bar.exists());
+    assertTrue(jobUserlog1.exists());
+    assertTrue(jobUserlog2.exists());
+    assertTrue(jobUserlog3.exists());
+    assertTrue(jobUserlog4.exists());
+
+    myClock.advance(ONE_HOUR);
+    // time is now 2.
+    userLogCleaner.processCompletedJobs();
+    assertFalse(jobUserlog1.exists());
+    assertTrue(jobUserlog2.exists());
+    assertTrue(jobUserlog3.exists());
+    assertTrue(jobUserlog4.exists());
+
+    // mimic localizeJob followed KillJobAction for jobid3
+    // add the job for deletion with retainhours = 3.
+    // jobid3 should be deleted at time 5.
+    jobUserlog3 = localizeJob(jobid3);
+    jobFinished(jobid3, 3);
+
+    // mimic localizeJob for jobid4
+    jobUserlog4 = localizeJob(jobid4);
+
+    // do cleanup
+    myClock.advance(2 * ONE_HOUR);
+    // time is now 4.
+    userLogCleaner.processCompletedJobs();
+
+    // jobid2 will be deleted
+    assertFalse(jobUserlog1.exists());
+    assertFalse(jobUserlog2.exists());
+    assertTrue(jobUserlog3.exists());
+    assertTrue(jobUserlog4.exists());
+
+    myClock.advance(ONE_HOUR);
+    // time is now 5.
+    // do cleanup again
+    userLogCleaner.processCompletedJobs();
+
+    // jobid3 will be deleted
+    assertFalse(jobUserlog1.exists());
+    assertFalse(jobUserlog2.exists());
+    assertFalse(jobUserlog3.exists());
+    assertTrue(jobUserlog4.exists());
+  }
+
+  /**
+   * Tests user-log directory cleanup on a TT restart.
+   * 
+   * Adds job1 deletion before the restart with 2 hour retain hours. Adds job2
+   * for which there are no tasks/killJobAction after the restart. Adds job3 for
+   * which there is localizeJob followed by killJobAction after the restart with
+   * 3 hours retain hours. Adds job4 for which there are some tasks after the
+   * restart.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testUserLogCleanupAfterRestart() throws IOException {
+    File jobUserlog1 = localizeJob(jobid1);
+    File jobUserlog2 = localizeJob(jobid2);
+    File jobUserlog3 = localizeJob(jobid3);
+    File jobUserlog4 = localizeJob(jobid4);
+    // create a some files/dirs in userlog
+    foo.mkdirs();
+    bar.createNewFile();
+
+    // add the jobid1 for deletion with retain hours = 2
+    jobFinished(jobid1, 2);
+
+    // time is now 1.
+    myClock.advance(ONE_HOUR);
+
+    // Mimic the TaskTracker restart
+    // Restart the tt with 3 hours as user log retain hours.
+    // This restart clears the user log directory
+    // job directories will be added with 3 hours as retain hours.
+    // i.e. They will be deleted at time 3 as clock will reset after the restart
+    ttRestarted();
+
+    assertFalse(foo.exists());
+    assertFalse(bar.exists());
+    assertTrue(jobUserlog1.exists());
+    assertTrue(jobUserlog2.exists());
+    assertTrue(jobUserlog3.exists());
+    assertTrue(jobUserlog4.exists());
+
+    myClock.advance(ONE_HOUR);
+    // time is now 1.
+    userLogCleaner.processCompletedJobs();
+    assertTrue(jobUserlog1.exists());
+    assertTrue(jobUserlog2.exists());
+    assertTrue(jobUserlog3.exists());
+    assertTrue(jobUserlog4.exists());
+
+    // mimic localizeJob followed KillJobAction for jobid3
+    // add the job for deletion with retainhours = 3.
+    // jobid3 should be deleted at time 4.
+    jobUserlog3 = localizeJob(jobid3);
+    jobFinished(jobid3, 3);
+
+    // mimic localizeJob for jobid4
+    jobUserlog4 = localizeJob(jobid4);
+
+    // do cleanup
+    myClock.advance(2 * ONE_HOUR);
+    // time is now 3.
+    userLogCleaner.processCompletedJobs();
+
+    // jobid1 and jobid2 will be deleted
+    assertFalse(jobUserlog1.exists());
+    assertFalse(jobUserlog2.exists());
+    assertTrue(jobUserlog3.exists());
+    assertTrue(jobUserlog4.exists());
+
+    myClock.advance(ONE_HOUR);
+    // time is now 4.
+    // do cleanup again
+    userLogCleaner.processCompletedJobs();
+
+    // jobid3 will be deleted
+    assertFalse(jobUserlog1.exists());
+    assertFalse(jobUserlog2.exists());
+    assertFalse(jobUserlog3.exists());
+    assertTrue(jobUserlog4.exists());
+  }
+}

+ 24 - 0
src/test/org/apache/hadoop/mapred/UtilsForTests.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapred;
 
+import static org.junit.Assert.fail;
+
 import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.io.*;
@@ -47,6 +49,8 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.UserLogEvent;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.UserLogManager;
 import org.apache.hadoop.util.StringUtils;
 
 /** 
@@ -719,6 +723,26 @@ public class UtilsForTests {
     }
   }
 
+  /**
+   * This is an in-line {@link UserLogManager} to do all the actions in-line. 
+   */
+  static class InLineUserLogManager extends UserLogManager {
+    public InLineUserLogManager(Configuration conf) throws IOException {
+      super(conf);
+      getUserLogCleaner().setCleanupQueue(new InlineCleanupQueue());
+    }
+
+    // do the action in-line
+    public void addLogEvent(UserLogEvent event) {
+      try {
+        super.addLogEvent(event);
+        super.monitor();
+      } catch (Exception e) {
+        fail("failed to process action " + event.getEventType());
+      }
+    }
+  }
+
   static void setUpConfigFile(Properties confProps, File configFile)
     throws IOException {
     Configuration config = new Configuration(false);