Преглед изворни кода

commit e146ebba5589b29f1b4c6f2266e98e0d29be1cf9
Author: Devaraj Das <ddas@yahoo-inc.com>
Date: Thu Sep 30 23:26:50 2010 -0700

Fixes truncation issues for userlogs. Makes the TaskController implement a new interface for truncate-logs-as-user. UserLogs will not be deleted by the TT but instead truncated if the Task fails to truncate for any reason.

+++ b/YAHOO-CHANGES.txt
+ Fixes truncation issues for userlogs. Makes the TaskController
+ implement a new interface for truncate-logs-as-user. UserLogs will not be
+ deleted by the TT but instead truncated if the Task fails to truncate
+ for any reason. (ddas)
+


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077729 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley пре 14 година
родитељ
комит
7aa362fd90

+ 10 - 5
src/c++/task-controller/impl/main.c

@@ -37,16 +37,18 @@ void display_usage(FILE *stream) {
   fprintf(stream,
       "Usage: task-controller user command command-args\n");
   fprintf(stream, "Commands:\n");
-  fprintf(stream, "   initialize job: %2d jobid credentials cmd args\n",
+  fprintf(stream, "   initialize job:       %2d jobid credentials cmd args\n",
 	  INITIALIZE_JOB);
-  fprintf(stream, "   launch task:    %2d jobid taskid task-script\n",
+  fprintf(stream, "   launch task:          %2d jobid taskid task-script\n",
 	  LAUNCH_TASK_JVM);
-  fprintf(stream, "   signal task:    %2d task-pid signal\n",
+  fprintf(stream, "   signal task:          %2d task-pid signal\n",
 	  SIGNAL_TASK);
-  fprintf(stream, "   delete as user: %2d relative-path\n",
+  fprintf(stream, "   delete as user:       %2d relative-path\n",
 	  DELETE_AS_USER);
-  fprintf(stream, "   delete log:     %2d relative-path\n",
+  fprintf(stream, "   delete log:           %2d relative-path\n",
 	  DELETE_LOG_AS_USER);
+  fprintf(stream, "   run command as user:  %2d cmd args\n",
+	  RUN_COMMAND_AS_USER);
 }
 
 int main(int argc, char **argv) {
@@ -183,6 +185,9 @@ int main(int argc, char **argv) {
     dir_to_be_deleted = argv[optind++];
     exit_code= delete_log_directory(dir_to_be_deleted);
     break;
+  case RUN_COMMAND_AS_USER:
+    exit_code = run_command_as_user(user_detail->pw_name, argv + optind);
+    break;
   default:
     exit_code = INVALID_COMMAND_PROVIDED;
   }

+ 18 - 0
src/c++/task-controller/impl/task-controller.c

@@ -1042,3 +1042,21 @@ int delete_log_directory(const char *subdir) {
   free(log_subdir);
   return ret;
 }
+
+/**
+ * run command as user
+ */
+int run_command_as_user(const char *user, char* const* args) {
+  if (user == NULL) {
+    fprintf(LOGFILE, "The user passed is null.\n");
+    return INVALID_ARGUMENT_NUMBER;
+  }
+  // give up root privs
+  if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
+    return -1;
+  }
+  execvp(args[0], args);
+  fprintf(LOGFILE, "Failure to exec command - %s\n",
+	  strerror(errno));
+  return -1;
+} 

+ 6 - 1
src/c++/task-controller/impl/task-controller.h

@@ -25,7 +25,8 @@ enum command {
   LAUNCH_TASK_JVM = 1,
   SIGNAL_TASK = 2,
   DELETE_AS_USER = 3,
-  DELETE_LOG_AS_USER = 4
+  DELETE_LOG_AS_USER = 4,
+  RUN_COMMAND_AS_USER = 5
 };
 
 enum errorcodes {
@@ -88,6 +89,10 @@ int signal_user_task(const char *user, int pid, int sig);
 int delete_as_user(const char *user,
                    const char *dir_to_be_deleted);
 
+// run a command as the user
+int run_command_as_user(const char *user,
+                        char* const* args); 
+
 // set the task tracker's uid and gid
 void set_tasktracker_uid(uid_t user, gid_t group);
 

+ 5 - 0
src/mapred/org/apache/hadoop/mapred/Child.java

@@ -190,6 +190,11 @@ class Child {
         // Create the job-conf and set credentials
         final JobConf job = new JobConf(task.getJobFile());
         job.setCredentials(defaultConf.getCredentials());
+        //forcefully turn off caching for localfs. All cached FileSystems
+        //are closed during the JVM shutdown. We do certain
+        //localfs operations in the shutdown hook, and we don't
+        //want the localfs to be "closed"
+        job.setBoolean("fs.file.impl.disable.cache", false);
 
         // set the jobTokenFile into task
         task.setJobTokenSecret(JobTokenSecretManager.

+ 15 - 0
src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java

@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -29,9 +30,11 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.server.tasktracker.JVMInfo;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.util.ProcessTree.Signal;
 import org.apache.hadoop.util.ProcessTree;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
 import org.apache.commons.logging.Log;
@@ -231,6 +234,18 @@ public class DefaultTaskController extends TaskController {
     Path dir = new Path(TaskLog.getUserLogDir().getAbsolutePath(), subDir);
     fs.delete(dir, true);
   }
+  
+  @Override
+  public void truncateLogsAsUser(String user, List<Task> allAttempts)
+    throws IOException {
+    Task firstTask = allAttempts.get(0);
+    TaskLogsTruncater trunc = new TaskLogsTruncater(getConf());
+
+    trunc.truncateLogs(new JVMInfo(
+            TaskLog.getAttemptDir(firstTask.getTaskID(), 
+                                  firstTask.isTaskCleanupTask()),
+                       allAttempts));
+  }
 
   @Override
   public void setup(LocalDirAllocator allocator) {

+ 0 - 20
src/mapred/org/apache/hadoop/mapred/JvmManager.java

@@ -181,25 +181,6 @@ class JvmManager {
                                            
   }
   
-  static void checkAndDeleteTaskLogs(TaskTracker tracker, Task firstTask) {
-    File logdir = TaskLog.getAttemptDir(firstTask.getTaskID(), 
-        firstTask.isTaskCleanupTask());
-    //allow for 10% over the limit
-    final long retainSize = tracker.getRetainSize(firstTask.getTaskID());
-    if (retainSize >= 0 && FileUtil.getDU(logdir) > (1.1 * retainSize)){
-      LOG.info("Deleting user log path since the amount of data in the logs" +
-      		" exceeded the allowed " +
-      		"log limits " + logdir);
-      String user = firstTask.getUser();
-      String jobLogDir = firstTask.getJobID().toString() + 
-                         Path.SEPARATOR + logdir.getName();
-      PathDeletionContext item = 
-        new TaskController.DeletionContext(tracker.getTaskController(),
-            true, user, jobLogDir);
-      tracker.getCleanupThread().addToQueue(item);
-    }
-  }
-
   static class JvmManagerForType {
     //Mapping from the JVM IDs to running Tasks
     Map <JVMId,TaskRunner> jvmToRunningTask = 
@@ -503,7 +484,6 @@ class JvmManager {
           LOG.info("JVM : " + jvmId + " exited with exit code " + exitCode
               + ". Number of tasks it ran: " + numTasksRan);
           deleteWorkDir(tracker, firstTask);
-          checkAndDeleteTaskLogs(tracker, firstTask);
         }
       }
 

+ 82 - 1
src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java

@@ -23,13 +23,17 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.ProcessTree.Signal;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -86,7 +90,8 @@ class LinuxTaskController extends TaskController {
     LAUNCH_TASK_JVM(1),
     SIGNAL_TASK(2),
     DELETE_AS_USER(3),
-    DELETE_LOG_AS_USER(4);
+    DELETE_LOG_AS_USER(4),
+    RUN_COMMAND_AS_USER(5);
 
     private int value;
     Commands(int value) {
@@ -330,5 +335,81 @@ class LinuxTaskController extends TaskController {
   public String getRunAsUser(JobConf conf) {
     return conf.getUser();
   }
+  
+  @Override
+  public void truncateLogsAsUser(String user, List<Task> allAttempts)
+    throws IOException {
+    
+    Task firstTask = allAttempts.get(0);
+    String taskid = firstTask.getTaskID().toString();
+    
+    LocalDirAllocator ldirAlloc = new LocalDirAllocator("mapred.local.dir");
+    String taskRanFile = TaskTracker.TT_LOG_TMP_DIR + Path.SEPARATOR + taskid;
+    Configuration conf = new Configuration();
+    
+    //write the serialized task information to a file to pass to the truncater
+    Path taskRanFilePath = 
+      ldirAlloc.getLocalPathForWrite(taskRanFile, conf);
+    LocalFileSystem lfs = FileSystem.getLocal(conf);
+    FSDataOutputStream out = lfs.create(taskRanFilePath);
+    out.writeInt(allAttempts.size());
+    for (Task t : allAttempts) {
+      out.writeBoolean(t.isMapTask());
+      t.write(out);
+    }
+    out.close();
+    lfs.setPermission(taskRanFilePath, 
+                      FsPermission.createImmutable((short)0755));
+    
+    List<String> command = new ArrayList<String>();
+    File jvm =                                  // use same jvm as parent
+      new File(new File(System.getProperty("java.home"), "bin"), "java");
+    command.add(jvm.toString());
+    command.add("-Djava.library.path=" + 
+                System.getProperty("java.library.path"));
+    command.add("-Dhadoop.log.dir=" + TaskLog.getBaseLogDir());
+    command.add("-Dhadoop.root.logger=INFO,console");
+    command.add("-classpath");
+    command.add(System.getProperty("java.class.path"));
+    // main of TaskLogsTruncater
+    command.add(TaskLogsTruncater.class.getName()); 
+    command.add(taskRanFilePath.toString());
+
+    String[] taskControllerCmd = new String[3 + command.size()];
+    taskControllerCmd[0] = taskControllerExe;
+    taskControllerCmd[1] = user;
+    taskControllerCmd[2] = Integer.toString(
+        Commands.RUN_COMMAND_AS_USER.getValue());
+    int i = 3;
+    for (String cmdArg : command) {
+      taskControllerCmd[i++] = cmdArg;
+    }
+    if (LOG.isDebugEnabled()) {
+      for (String cmd : taskControllerCmd) {
+        LOG.debug("taskctrl command = " + cmd);
+      }
+    }
+    ShellCommandExecutor shExec = new ShellCommandExecutor(taskControllerCmd);
+    
+    try {
+      shExec.execute();
+    } catch (Exception e) {
+      LOG.warn("Exit code from " + taskControllerExe.toString() + " is : "
+          + shExec.getExitCode() + " for truncateLogs");
+      LOG.warn("Exception thrown by " + taskControllerExe.toString() + " : "
+          + StringUtils.stringifyException(e));
+      LOG.info("Output from LinuxTaskController's "
+               + taskControllerExe.toString() + " follows:");
+      logOutput(shExec.getOutput());
+      lfs.delete(taskRanFilePath, false);
+      throw new IOException(e);
+    }
+    lfs.delete(taskRanFilePath, false);
+    if (LOG.isDebugEnabled()) {
+      LOG.info("Output from LinuxTaskController's "
+               + taskControllerExe.toString() + " follows:");
+      logOutput(shExec.getOutput());
+    }
+  }
 }
 

+ 6 - 1
src/mapred/org/apache/hadoop/mapred/MapTask.java

@@ -139,7 +139,12 @@ class MapTask extends Task {
   public void write(DataOutput out) throws IOException {
     super.write(out);
     if (isMapOrReduce()) {
-      splitMetaInfo.write(out);
+      if (splitMetaInfo != null) {
+        splitMetaInfo.write(out);
+      } else {
+        new TaskSplitIndex().write(out);
+      }
+      //TODO do we really need to set this to null?
       splitMetaInfo = null;
     }
   }

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/Task.java

@@ -376,7 +376,7 @@ abstract public class Task implements Writable, Configurable {
    * 
    * @return user
    */
-  String getUser() {
+  public String getUser() {
     return user;
   }
   

+ 10 - 0
src/mapred/org/apache/hadoop/mapred/TaskController.java

@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetSocketAddress;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -149,6 +150,15 @@ public abstract class TaskController implements Configurable {
   public abstract void deleteLogAsUser(String user, 
                                        String subDir) throws IOException;
   
+  /**
+   * Run the passed command as the user
+   * @param user 
+   * @param allAttempts the list of attempts that the JVM ran
+   * @throws IOException
+   */
+  public abstract void truncateLogsAsUser(String user, List<Task> allAttempts) 
+  throws IOException;
+  
   static class DeletionContext extends CleanupQueue.PathDeletionContext {
     private TaskController controller;
     private boolean isLog;

+ 7 - 6
src/mapred/org/apache/hadoop/mapred/TaskLog.java

@@ -74,6 +74,11 @@ public class TaskLog {
     if (!LOG_DIR.exists()) {
       LOG_DIR.mkdirs();
     }
+    try {
+      localFS = FileSystem.getLocal(new Configuration());
+    } catch (IOException ie) {
+      throw new RuntimeException(ie);
+    }
   }
 
   public static File getTaskLogFile(TaskAttemptID taskid, boolean isCleanup,
@@ -162,14 +167,13 @@ public class TaskLog {
    * determined by checking the job's log directory.
    */
   static String obtainLogDirOwner(TaskAttemptID taskid) throws IOException {
-    Configuration conf = new Configuration();
-    FileSystem raw = FileSystem.getLocal(conf).getRaw();
+    FileSystem raw = localFS.getRaw();
     Path jobLogDir = new Path(getJobDir(taskid.getJobID()).getAbsolutePath());
     FileStatus jobStat = raw.getFileStatus(jobLogDir);
     return jobStat.getOwner();
   }
 
-  static String getBaseLogDir() {
+  public static String getBaseLogDir() {
     return System.getProperty("hadoop.log.dir");
   }
 
@@ -233,9 +237,6 @@ public class TaskLog {
     Path indexFilePath = new Path(indexFile.getAbsolutePath());
     Path tmpIndexFilePath = new Path(tmpIndexFile.getAbsolutePath());
 
-    if (localFS == null) {// set localFS once
-      localFS = FileSystem.getLocal(new Configuration());
-    }
     localFS.rename (tmpIndexFilePath, indexFilePath);
   }
 

+ 74 - 0
src/mapred/org/apache/hadoop/mapred/TaskLogsTruncater.java

@@ -18,10 +18,15 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -31,6 +36,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SecureIOUtils;
 import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapred.TaskLog.LogName;
@@ -69,6 +78,41 @@ public class TaskLogsTruncater {
 
   static final int MINIMUM_RETAIN_SIZE_FOR_TRUNCATION = 0;
 
+  /**
+   * Check the log file sizes generated by the attempts that ran in a
+   * particular JVM
+   * @param lInfo
+   * @return
+   * @throws IOException
+   */
+  public boolean shouldTruncateLogs(JVMInfo lInfo) throws IOException {
+    // Read the log-file details for all the attempts that ran in this JVM
+    Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails;
+    try {
+      taskLogFileDetails = getAllLogsFileDetails(lInfo.getAllAttempts());
+    } catch (IOException e) {
+      LOG.warn(
+          "Exception in truncateLogs while getting allLogsFileDetails()."
+              + " Ignoring the truncation of logs of this process.", e);
+      return false;
+    }
+
+    File attemptLogDir = lInfo.getLogLocation();
+
+    for (LogName logName : LogName.values()) {
+
+      File logFile = new File(attemptLogDir, logName.toString());
+
+      if (logFile.exists()) {
+        if(!isTruncationNeeded(lInfo, taskLogFileDetails, logName)) {
+          LOG.debug("Truncation is not needed for "
+              + logFile.getAbsolutePath());
+        } else return true;
+      }
+    }
+    return false;
+  }
+
   /**
    * Process the removed task's logs. This involves truncating them to
    * retainSize.
@@ -441,5 +485,35 @@ public class TaskLogsTruncater {
       }
     }
   }
+  
+  public static void main(String args[]) throws IOException {
+    String taskRanFile = args[0];
+    Configuration conf = new Configuration();
+    
+    //read the Task objects from the file
+    LocalFileSystem lfs = FileSystem.getLocal(conf);
+    FSDataInputStream din = lfs.open(new Path(taskRanFile));
+    
+    int numTasksRan = din.readInt();
+    List<Task> taskAttemptsRan = new ArrayList<Task>();
+    for (int i = 0; i < numTasksRan; i++) {
+      Task t;
+      if (din.readBoolean()) {
+        t = new MapTask(); 
+      } else {
+        t = new ReduceTask();
+      }
+      t.readFields(din);
+      taskAttemptsRan.add(t);
+    }
+    Task firstTask = taskAttemptsRan.get(0);
+    TaskLogsTruncater trunc = new TaskLogsTruncater(conf);
+
+    trunc.truncateLogs(new JVMInfo(
+            TaskLog.getAttemptDir(firstTask.getTaskID(), 
+                                  firstTask.isTaskCleanupTask()),
+                      taskAttemptsRan));
+    System.exit(0);
+  }
 
 }

+ 2 - 2
src/mapred/org/apache/hadoop/mapred/TaskRunner.java

@@ -262,7 +262,7 @@ abstract class TaskRunner extends Thread {
     } catch (FSError e) {
       LOG.fatal("FSError", e);
       try {
-        tracker.fsError(t.getTaskID(), e.getMessage());
+        tracker.fsErrorInternal(t.getTaskID(), e.getMessage());
       } catch (IOException ie) {
         LOG.fatal(t.getTaskID()+" reporting FSError", ie);
       }
@@ -272,7 +272,7 @@ abstract class TaskRunner extends Thread {
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
       causeThrowable.printStackTrace(new PrintStream(baos));
       try {
-        tracker.reportDiagnosticInfo(t.getTaskID(), baos.toString());
+        tracker.reportDiagnosticInfoInternal(t.getTaskID(), baos.toString());
       } catch (IOException e) {
         LOG.warn(t.getTaskID()+" Reporting Diagnostics", e);
       }

+ 26 - 0
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -248,6 +248,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
   static final String LOCAL_SPLIT_FILE = "split.info";
   static final String JOBFILE = "job.xml";
   static final String TT_PRIVATE_DIR = "ttprivate";
+  public static final String TT_LOG_TMP_DIR = "tt_log_tmp";
   static final String JVM_EXTRA_ENV_FILE = "jvm.extra.env";
 
   static final String JOB_LOCAL_DIR = "job.local.dir";
@@ -601,6 +602,11 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
     for (String s : localdirs) {
       localFs.mkdirs(new Path(s, TT_PRIVATE_DIR), priv);
     }
+    fConf.deleteLocalFiles(TT_LOG_TMP_DIR);
+    final FsPermission pub = FsPermission.createImmutable((short) 0755);
+    for (String s : localdirs) {
+      localFs.mkdirs(new Path(s, TT_LOG_TMP_DIR), pub);
+    }
 
     // Clear out state tables
     this.tasks.clear();
@@ -2994,6 +3000,16 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
    */
   public synchronized void reportDiagnosticInfo(TaskAttemptID taskid, String info) throws IOException {
     authorizeJVM(taskid.getJobID());
+    reportDiagnosticInfoInternal(taskid, info);
+  }
+  /**
+   * Meant to be used internally
+   * @param taskid
+   * @param info
+   * @throws IOException
+   */
+  synchronized void reportDiagnosticInfoInternal(TaskAttemptID taskid, 
+      String info) throws IOException {
     TaskInProgress tip = tasks.get(taskid);
     if (tip != null) {
       tip.reportDiagnosticInfo(info);
@@ -3077,6 +3093,16 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
   public synchronized void fsError(TaskAttemptID taskId, String message) 
   throws IOException {
     authorizeJVM(taskId.getJobID());
+    fsErrorInternal(taskId, message);  
+  }
+  /**
+   * Meant to be used internally
+   * @param taskId
+   * @param message
+   * @throws IOException
+   */
+  synchronized void fsErrorInternal(TaskAttemptID taskId, String message) 
+  throws IOException {
     LOG.fatal("Task: " + taskId + " - Killed due to FSError: " + message);
     TaskInProgress tip = runningTasks.get(taskId);
     tip.reportDiagnosticInfo("FSError: " + message);

+ 24 - 2
src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogManager.java

@@ -17,15 +17,31 @@
  */
 package org.apache.hadoop.mapreduce.server.tasktracker.userlogs;
 
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.DefaultTaskController;
+import org.apache.hadoop.mapred.JobLocalizer;
+import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapred.TaskLogsTruncater;
 import org.apache.hadoop.mapred.TaskTracker;
 import org.apache.hadoop.mapred.UserLogCleaner;
@@ -141,8 +157,14 @@ public class UserLogManager {
     userLogCleaner.clearOldUserLogs(conf);
   }
 
-  private void doJvmFinishedAction(JvmFinishedEvent event) {
-    // do nothing
+  private void doJvmFinishedAction(JvmFinishedEvent event) throws IOException {
+    //check whether any of the logs are over the limit, and if so
+    //invoke the truncator to run as the user
+    if (taskLogsTruncater.shouldTruncateLogs(event.getJvmInfo())) {
+      String user = event.getJvmInfo().getAllAttempts().get(0).getUser();
+      taskController.truncateLogsAsUser(user, 
+                                        event.getJvmInfo().getAllAttempts());
+    }
   }
 
   private void doJobStartedAction(JobStartedEvent event) {