Browse Source

commit 9dd71b62d66a21544c9554c5659123a044621005
Author: Hemanth Yamijala <yhemanth@yahoo-inc.com>
Date: Mon Jan 11 20:18:53 2010 +0530

Reverting patch https://issues.apache.org/jira/secure/attachment/12427328/y896.v2.1.patch for MAPREDUCE:896


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077101 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 years ago
parent
commit
0dbc93a64f

+ 0 - 9
src/c++/task-controller/main.c

@@ -23,7 +23,6 @@ int main(int argc, char **argv) {
   int next_option = 0;
   const char * job_id = NULL;
   const char * task_id = NULL;
-  const char * dir_to_be_deleted = NULL;
   const char * tt_root = NULL;
   int exit_code = 0;
   const char * task_pid = NULL;
@@ -32,7 +31,6 @@ int main(int argc, char **argv) {
       NULL, 0 } };
 
   const char* log_file = NULL;
-  char * base_path = NULL;
 
   //Minimum number of arguments required to run the task-controller
   //command-name user command tt-root
@@ -110,13 +108,6 @@ int main(int argc, char **argv) {
     task_pid = argv[optind++];
     exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGKILL);
     break;
-  case ENABLE_TASK_FOR_CLEANUP:
-    base_path = argv[optind++];
-    job_id = argv[optind++];
-    dir_to_be_deleted = argv[optind++];
-    exit_code = enable_task_for_cleanup(base_path, user_detail->pw_name, job_id,
-                                        dir_to_be_deleted);
-    break;
   default:
     exit_code = INVALID_COMMAND_PROVIDED;
   }

+ 2 - 285
src/c++/task-controller/task-controller.c

@@ -104,17 +104,12 @@ int check_tt_root(const char *tt_root) {
  * path resolve to one and same.
  */
 
-int check_path_for_relative_components(char *path) {
+int check_path(char *path) {
   char * resolved_path = (char *) canonicalize_file_name(path);
   if(resolved_path == NULL) {
-    fprintf(LOGFILE, "Error resolving the path: %s. Passed path: %s\n",
-          strerror(errno), path);
     return ERROR_RESOLVING_FILE_PATH;
   }
   if(strcmp(resolved_path, path) !=0) {
-    fprintf(LOGFILE,
-            "Relative path components in the path: %s. Resolved path: %s\n",
-            path, resolved_path);
     free(resolved_path);
     return RELATIVE_PATH_COMPONENTS_IN_FILE_PATH;
   }
@@ -169,34 +164,6 @@ void get_task_file_path(const char * jobid, const char * taskid,
   free(mapred_local_dir);
 }
 
-/*
- * Builds the full path of the dir(localTaskDir or localWorkDir)
- * tt_root : is the base path(i.e. mapred-local-dir) sent to task-controller
- * dir_to_be_deleted : is either taskDir($taskId) OR taskWorkDir($taskId/work)
- *
- * Check TT_LOCAL_TASK_DIR_PATTERN for pattern
- */
-void get_task_dir_path(const char * tt_root, const char * jobid,
-                       const char * dir_to_be_deleted, char **task_dir_path) {
-  *task_dir_path = NULL;
-  int str_len = strlen(TT_LOCAL_TASK_DIR_PATTERN) + strlen(jobid) + strlen(
-      dir_to_be_deleted) + strlen(tt_root);
-
-  *task_dir_path = (char *) malloc(sizeof(char) * (str_len + 1));
-  if (*task_dir_path == NULL) {
-    fprintf(LOGFILE, "Unable to allocate memory for task_dir_path \n");
-    return;
-  }
-
-  memset(*task_dir_path,'\0',str_len+1);
-  snprintf(*task_dir_path, str_len, TT_LOCAL_TASK_DIR_PATTERN, tt_root,
-           jobid, dir_to_be_deleted);
-#ifdef DEBUG
-  fprintf(LOGFILE, "get_task_dir_path : task dir path = %s\n", *task_dir_path);
-  fflush(LOGFILE);
-#endif
-}
-
 //end of private functions
 void display_usage(FILE *stream) {
   fprintf(stream,
@@ -216,200 +183,6 @@ int get_user_details(const char *user) {
   return 0;
 }
 
-/**
- * Compare ownership of a file with the given ids.
- */
-int compare_ownership(uid_t uid, gid_t gid, char *path) {
-  struct stat filestat;
-  if (stat(path, &filestat) != 0) {
-    return UNABLE_TO_STAT_FILE;
-  }
-  if (uid == filestat.st_uid && gid == filestat.st_gid) {
-    return 0;
-  }
-  return 1;
-}
-
-/*
- * Function to check if the TaskTracker actually owns the file.
-  */
-int check_ownership(char *path) {
-  struct stat filestat;
-  if (stat(path, &filestat) != 0) {
-    return UNABLE_TO_STAT_FILE;
-  }
-  // check user/group. User should be TaskTracker user, group can either be
-  // TaskTracker's primary group or the special group to which binary's
-  // permissions are set.
-  if (getuid() != filestat.st_uid || (getgid() != filestat.st_gid && getegid()
-      != filestat.st_gid)) {
-    return FILE_NOT_OWNED_BY_TASKTRACKER;
-  }
-  return 0;
-}
-
-/**
- * Function to change the owner/group of a given path.
- */
-static int change_owner(const char *path, uid_t uid, gid_t gid) {
-  int exit_code = chown(path, uid, gid);
-  if (exit_code != 0) {
-    fprintf(LOGFILE, "chown %d:%d for path %s failed: %s.\n", uid, gid, path,
-        strerror(errno));
-  }
-  return exit_code;
-}
-
-/**
- * Function to change the mode of a given path.
- */
-static int change_mode(const char *path, mode_t mode) {
-  int exit_code = chmod(path, mode);
-  if (exit_code != 0) {
-    fprintf(LOGFILE, "chmod %d of path %s failed: %s.\n", mode, path,
-        strerror(errno));
-  }
-  return exit_code;
-}
-
-/**
- * Function to change permissions of the given path. It does the following
- * recursively:
- *    1) changes the owner/group of the paths to the passed owner/group
- *    2) changes the file permission to the passed file_mode and directory
- *       permission to the passed dir_mode
- *
- * should_check_ownership : boolean to enable checking of ownership of each path
- */
-static int secure_path(const char *path, uid_t uid, gid_t gid,
-    mode_t file_mode, mode_t dir_mode, int should_check_ownership) {
-  FTS *tree = NULL; // the file hierarchy
-  FTSENT *entry = NULL; // a file in the hierarchy
-  char *paths[] = { (char *) path };
-  int process_path = 0;
-  int dir = 0;
-  int error_code = 0;
-  int done = 0;
-
-  // Get physical locations and don't resolve the symlinks.
-  // Don't change directory while walking the directory.
-  int ftsoptions = FTS_PHYSICAL | FTS_NOCHDIR;
-
-  tree = fts_open(paths, ftsoptions, NULL);
-  if (tree == NULL) {
-    fprintf(LOGFILE,
-        "Cannot open file traversal structure for the path %s:%s.\n", path,
-        strerror(errno));
-    return -1;
-  }
-
-  while (((entry = fts_read(tree)) != NULL) && !done) {
-    dir = 0;
-    switch (entry->fts_info) {
-    case FTS_D:
-      // A directory being visited in pre-order.
-      // We change ownership of directories in post-order.
-      // so ignore the pre-order visit.
-      process_path = 0;
-      break;
-    case FTS_DC:
-      // A directory that causes a cycle in the tree
-      // We don't expect cycles, ignore.
-      process_path = 0;
-      break;
-    case FTS_DNR:
-      // A directory which cannot be read
-      // Ignore and set error code.
-      process_path = 0;
-      error_code = -1;
-      break;
-    case FTS_DOT:
-      // "."  or ".."
-      process_path = 0;
-      break;
-    case FTS_F:
-      // A regular file
-      process_path = 1;
-      break;
-    case FTS_DP:
-      // A directory being visited in post-order
-      if (entry->fts_level == 0) {
-        // root directory. Done with traversing.
-        done = 1;
-      }
-      process_path = 1;
-      dir = 1;
-      break;
-    case FTS_SL:
-      // A symbolic link
-      process_path = 1;
-      break;
-    case FTS_SLNONE:
-      // A symbolic link with a nonexistent target
-      process_path = 1;
-      break;
-    case FTS_NS:
-      // A  file for which no stat(2) information was available
-      // Ignore and set error code
-      process_path = 0;
-      error_code = -1;
-      break;
-    case FTS_ERR:
-      // An error return. Ignore and set error code.
-      process_path = 0;
-      error_code = -1;
-      break;
-    case FTS_DEFAULT:
-      // File that doesn't belong to any of the above type. Ignore.
-      process_path = 0;
-      break;
-    default:
-      // None of the above. Ignore and set error code
-      process_path = 0;
-      error_code = -1;
-    }
-
-    if (error_code != 0) {
-      break;
-    }
-    if (!process_path) {
-      continue;
-    }
-    if (should_check_ownership &&
-        (compare_ownership(uid, gid, entry->fts_path) == 0)) {
-      // already set proper permissions.
-      // This might happen with distributed cache.
-#ifdef DEBUG
-      fprintf(
-          LOGFILE,
-          "already has private permissions. Not trying to change again for %s",
-          entry->fts_path);
-#endif
-      continue;
-    }
-
-    if (should_check_ownership && (check_ownership(entry->fts_path) != 0)) {
-      fprintf(LOGFILE,
-          "Invalid file path. %s not user/group owned by the tasktracker.\n",
-          entry->fts_path);
-      error_code = -1;
-    } else if (change_owner(entry->fts_path, uid, gid) != 0) {
-      fprintf(LOGFILE, "couldn't change the ownership of %s\n",
-          entry->fts_path);
-      error_code = -3;
-    } else if (change_mode(entry->fts_path, (dir ? dir_mode : file_mode)) != 0) {
-      fprintf(LOGFILE, "couldn't change the permissions of %s\n",
-          entry->fts_path);
-      error_code = -3;
-    }
-  }
-  if (fts_close(tree) != 0) {
-    fprintf(LOGFILE, "couldn't close file traversal structure:%s.\n",
-        strerror(errno));
-  }
-  return error_code;
-}
-
 /*
  *Function used to launch a task as the provided user.
  * First the function checks if the tt_root passed is found in
@@ -458,7 +231,7 @@ int run_task_as_user(const char * user, const char *jobid, const char *taskid,
     return INVALID_TASK_SCRIPT_PATH;
   }
   errno = 0;
-  exit_code = check_path_for_relative_components(task_script_path);
+  exit_code = check_path(task_script_path);
   if(exit_code != 0) {
     goto cleanup;
   }
@@ -528,59 +301,3 @@ int kill_user_task(const char *user, const char *task_pid, int sig) {
   return 0;
 }
 
-/**
- * Enables the path for deletion by changing the owner, group and permissions
- * of the specified path and all the files/directories in the path recursively.
- *     *  sudo chown user:mapred -R full_path
- *     *  sudo chmod 2777 -R full_path
- * Before changing permissions, makes sure that the given path doesn't contain
- * any relative components.
- * tt_root : is the base path(i.e. mapred-local-dir) sent to task-controller
- * dir_to_be_deleted : is either taskDir OR taskWorkDir that is to be deleted
- */
-int enable_task_for_cleanup(char *tt_root, const char *user,
-           const char *jobid, const char *dir_to_be_deleted) {
-  int exit_code = 0;
-  gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
-
-  char * full_path = NULL;
-  if (check_tt_root(tt_root) < 0) {
-    fprintf(LOGFILE, "invalid tt root passed %s\n", tt_root);
-    cleanup();
-    return INVALID_TT_ROOT;
-  }
-
-  get_task_dir_path(tt_root, jobid, dir_to_be_deleted, &full_path);
-  if (full_path == NULL) {
-    fprintf(LOGFILE, 
-      "Could not build the full path. Not deleting the dir %s\n",
-      dir_to_be_deleted);
-    exit_code = UNABLE_TO_BUILD_PATH; // may be malloc failed
-  }
-     // Make sure that the path given is not having any relative components
-  else if ((exit_code = check_path_for_relative_components(full_path)) != 0) {
-    fprintf(LOGFILE, 
-         "Not changing permissions as the path contains relative components.\n",
-         full_path);
-  }
-  else if (get_user_details(user) < 0) {
-    fprintf(LOGFILE, "Couldn't get the user details of %s.\n", user);
-    exit_code = INVALID_USER_NAME;
-  }
-  else if (exit_code = secure_path(full_path, user_detail->pw_uid,
-               tasktracker_gid, S_IRWXU | S_IRWXG | S_IRWXO,
-               S_ISGID | S_IRWXU | S_IRWXG | S_IRWXO, 0) != 0) {
-    // No setgid on files and setgid on dirs, 777.
-    // set 777 permissions for user, TTgroup for all files/directories in
-    // 'full_path' recursively sothat deletion of path by TaskTracker succeeds.
-
-    fprintf(LOGFILE, "Failed to set permissions for %s\n", full_path);
-  }
-
-  if (full_path != NULL) {
-    free(full_path);
-  }
-  // free configurations
-  cleanup();
-  return exit_code;
-}

+ 2 - 11
src/c++/task-controller/task-controller.h

@@ -29,16 +29,13 @@
 #include <sys/signal.h>
 #include <getopt.h>
 #include<grp.h>
-#include <fts.h>
-
 #include "configuration.h"
 
 //command definitions
 enum command {
   LAUNCH_TASK_JVM,
   TERMINATE_TASK_JVM,
-  KILL_TASK_JVM,
-  ENABLE_TASK_FOR_CLEANUP
+  KILL_TASK_JVM
 };
 
 enum errorcodes {
@@ -56,15 +53,12 @@ enum errorcodes {
   ERROR_RESOLVING_FILE_PATH, //12
   RELATIVE_PATH_COMPONENTS_IN_FILE_PATH, //13
   UNABLE_TO_STAT_FILE, //14
-  FILE_NOT_OWNED_BY_TASKTRACKER, //15
-  UNABLE_TO_BUILD_PATH //16
+  FILE_NOT_OWNED_BY_TASKTRACKER //15
 };
 
 
 #define TT_LOCAL_TASK_SCRIPT_PATTERN "%s/taskTracker/jobcache/%s/%s/taskjvm.sh"
 
-#define TT_LOCAL_TASK_DIR_PATTERN    "%s/taskTracker/jobcache/%s/%s"
-
 #define TT_SYS_DIR_KEY "mapred.local.dir"
 
 #define MAX_ITEMS 10
@@ -84,7 +78,4 @@ int run_task_as_user(const char * user, const char *jobid, const char *taskid, c
 
 int kill_user_task(const char *user, const char *task_pid, int sig);
 
-int enable_task_for_cleanup(char * base_path, const char *user,
- const char *jobid, const char *dir_to_be_deleted);
-
 int get_user_details(const char *user);

+ 24 - 60
src/mapred/org/apache/hadoop/mapred/CleanupQueue.java

@@ -39,7 +39,7 @@ class CleanupQueue {
    * paths(directories/files) in a separate thread. This constructor creates a
    * clean-up thread and also starts it as a daemon. Callers can instantiate one
    * CleanupQueue per JVM and can use it for deleting paths. Use
-   * {@link CleanupQueue#addToQueue(PathDeletionContext...)} to add paths for
+   * {@link CleanupQueue#addToQueue(JobConf, Path...)} to add paths for
    * deletion.
    */
   public CleanupQueue() {
@@ -49,53 +49,23 @@ class CleanupQueue {
       }
     }
   }
-
-  /**
-   * Contains info related to the path of the file/dir to be deleted
-   */
-  static class PathDeletionContext {
-    String fullPath;// full path of file or dir
-    FileSystem fs;
-
-    public PathDeletionContext(FileSystem fs, String fullPath) {
-      this.fs = fs;
-      this.fullPath = fullPath;
-    }
-    
-    protected String getPathForCleanup() {
-      return fullPath;
-    }
-
-    /**
-     * Makes the path(and its subdirectories recursively) fully deletable
-     */
-    protected void enablePathForCleanup() throws IOException {
-      // do nothing
-    }
-  }
-
-  /**
-   * Adds the paths to the queue of paths to be deleted by cleanupThread.
-   */
-  void addToQueue(PathDeletionContext... contexts) {
-    cleanupThread.addToQueue(contexts);
-  }
-
-  protected static boolean deletePath(PathDeletionContext context)
-            throws IOException {
-    context.enablePathForCleanup();
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Trying to delete " + context.fullPath);
-    }
-    return context.fs.delete(new Path(context.fullPath), true);
+  
+  public void addToQueue(JobConf conf, Path...paths) {
+    cleanupThread.addToQueue(conf,paths);
   }
 
   private static class PathCleanupThread extends Thread {
 
+    static class PathAndConf {
+      JobConf conf;
+      Path path;
+      PathAndConf(JobConf conf, Path path) {
+        this.conf = conf;
+        this.path = path;
+      }
+    }
     // cleanup queue which deletes files/directories of the paths queued up.
-    private LinkedBlockingQueue<PathDeletionContext> queue =
-      new LinkedBlockingQueue<PathDeletionContext>();
+    private LinkedBlockingQueue<PathAndConf> queue = new LinkedBlockingQueue<PathAndConf>();
 
     public PathCleanupThread() {
       setName("Directory/File cleanup thread");
@@ -103,34 +73,28 @@ class CleanupQueue {
       start();
     }
 
-    void addToQueue(PathDeletionContext[] contexts) {
-      for (PathDeletionContext context : contexts) {
+    public void addToQueue(JobConf conf,Path... paths) {
+      for (Path p : paths) {
         try {
-          queue.put(context);
-        } catch(InterruptedException ie) {}
+          queue.put(new PathAndConf(conf,p));
+        } catch (InterruptedException ie) {}
       }
     }
 
     public void run() {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(getName() + " started.");
-      }
-      PathDeletionContext context = null;
+      LOG.debug(getName() + " started.");
+      PathAndConf pathAndConf = null;
       while (true) {
         try {
-          context = queue.take();
+          pathAndConf = queue.take();
           // delete the path.
-          if (!deletePath(context)) {
-            LOG.warn("CleanupThread:Unable to delete path " + context.fullPath);
-          }
-          else if (LOG.isDebugEnabled()) {
-            LOG.debug("DELETED " + context.fullPath);
-          }
+          FileSystem fs = pathAndConf.path.getFileSystem(pathAndConf.conf);
+          fs.delete(pathAndConf.path, true);
+          LOG.debug("DELETED " + pathAndConf.path);
         } catch (InterruptedException t) {
-          LOG.warn("Interrupted deletion of " + context.fullPath);
           return;
         } catch (Exception e) {
-          LOG.warn("Error deleting path " + context.fullPath + ": " + e);
+          LOG.warn("Error deleting path" + pathAndConf.path);
         } 
       }
     }

+ 1 - 16
src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java

@@ -21,9 +21,8 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.hadoop.fs.FileUtil;
+
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -135,18 +134,4 @@ class DefaultTaskController extends TaskController {
     }
   }
   
-  /**
-   * Enables the task for cleanup by changing permissions of the specified path
-   * in the local filesystem
-   */
-  @Override
-  void enableTaskForCleanup(PathDeletionContext context)
-         throws IOException {
-    try {
-      FileUtil.chmod(context.fullPath, "a+rwx", true);
-    } catch(InterruptedException e) {
-      LOG.warn("Interrupted while setting permissions for " + context.fullPath +
-          " for deletion.");
-    }
-  }
 }

+ 1 - 3
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -41,7 +41,6 @@ import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobHistory.Values;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.metrics.MetricsContext;
@@ -2857,8 +2856,7 @@ class JobInProgress {
       // Delete temp dfs dirs created if any, like in case of 
       // speculative exn of reduces.  
       Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
-      new CleanupQueue().addToQueue(new PathDeletionContext(
-          FileSystem.get(conf), tempDir.toUri().getPath())); 
+      new CleanupQueue().addToQueue(conf,tempDir); 
     } catch (IOException e) {
       LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
     }

+ 4 - 10
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -68,6 +68,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
 import org.apache.hadoop.mapred.JobHistory.Keys;
@@ -75,7 +76,6 @@ import org.apache.hadoop.mapred.JobHistory.Listener;
 import org.apache.hadoop.mapred.JobHistory.Values;
 import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetUtils;
@@ -3544,9 +3544,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     
     String queue = job.getProfile().getQueueName();
     if(!(queueManager.getQueues().contains(queue))) {      
-      new CleanupQueue().addToQueue(new PathDeletionContext(
-          FileSystem.get(conf),
-          getSystemDirectoryForJob(jobId).toUri().getPath()));
+      new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId));
       job.fail();
       if (userFileForJob != null) {
         userFileForJob.delete();
@@ -3564,9 +3562,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       if (userFileForJob != null) {
         userFileForJob.delete();
       }
-      new CleanupQueue().addToQueue(new PathDeletionContext(
-          FileSystem.get(conf),
-          getSystemDirectoryForJob(jobId).toUri().getPath()));
+      new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
       throw ioe;
     }
 
@@ -3575,9 +3571,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     try {
       checkMemoryRequirements(job);
     } catch (IOException ioe) {
-      new CleanupQueue().addToQueue(new PathDeletionContext(
-          FileSystem.get(conf),
-          getSystemDirectoryForJob(jobId).toUri().getPath()));
+      new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
       throw ioe;
     }
 

+ 1 - 6
src/mapred/org/apache/hadoop/mapred/JvmManager.java

@@ -428,12 +428,7 @@ class JvmManager {
             //task at the beginning of each task in the task JVM.
             //For the last task, we do it here.
             if (env.conf.getNumTasksToExecutePerJvm() != 1) {
-              tracker.directoryCleanupThread.addToQueue(
-                  TaskTracker.buildTaskControllerPathDeletionContexts(
-                      tracker.getLocalFileSystem(), tracker.getLocalDirs(),
-                      initalContext.task,
-                      true /* workDir */,
-                      tracker.getTaskController()));
+              FileUtil.fullyDelete(env.workDir);
             }
           } catch (IOException ie){}
         }

+ 9 - 109
src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java

@@ -24,16 +24,13 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
@@ -113,8 +110,7 @@ class LinuxTaskController extends TaskController {
   enum TaskCommands {
     LAUNCH_TASK_JVM,
     TERMINATE_TASK_JVM,
-    KILL_TASK_JVM,
-    ENABLE_TASK_FOR_CLEANUP
+    KILL_TASK_JVM
   }
   
   /**
@@ -154,7 +150,7 @@ class LinuxTaskController extends TaskController {
     ShellCommandExecutor shExec =  buildTaskControllerExecutor(
                                     TaskCommands.LAUNCH_TASK_JVM, 
                                     env.conf.getUser(),
-                                    launchTaskJVMArgs, env.workDir, env.env);
+                                    launchTaskJVMArgs, env);
     context.shExec = shExec;
     try {
       shExec.execute();
@@ -170,40 +166,6 @@ class LinuxTaskController extends TaskController {
     }
   }
 
-  /**
-   * Helper method that runs a LinuxTaskController command
-   * 
-   * @param taskCommand
-   * @param user
-   * @param cmdArgs
-   * @param env
-   * @throws IOException
-   */
-  private void runCommand(TaskCommands taskCommand, String user,
-      List<String> cmdArgs, File workDir, Map<String, String> env)
-      throws IOException {
-
-    ShellCommandExecutor shExec =
-        buildTaskControllerExecutor(taskCommand, user, cmdArgs, workDir, env);
-    try {
-      shExec.execute();
-    } catch (Exception e) {
-      LOG.warn("Exit code from " + taskCommand.toString() + " is : "
-          + shExec.getExitCode());
-      LOG.warn("Exception thrown by " + taskCommand.toString() + " : "
-          + StringUtils.stringifyException(e));
-      LOG.info("Output from LinuxTaskController's " + taskCommand.toString()
-          + " follows:");
-      logOutput(shExec.getOutput());
-      throw new IOException(e);
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.info("Output from LinuxTaskController's " + taskCommand.toString()
-          + " follows:");
-      logOutput(shExec.getOutput());
-    }
-  }
-
   /**
    * Returns list of arguments to be passed while launching task VM.
    * See {@code buildTaskControllerExecutor(TaskCommands, 
@@ -229,67 +191,6 @@ class LinuxTaskController extends TaskController {
     return commandArgs;
   }
   
-  private List<String> buildTaskCleanupArgs(
-      TaskControllerPathDeletionContext context) {
-    List<String> commandArgs = new ArrayList<String>(3);
-    commandArgs.add(context.mapredLocalDir.toUri().getPath());
-    commandArgs.add(context.task.getJobID().toString());
-
-    String workDir = "";
-    if (context.isWorkDir) {
-      workDir = "/work";
-    }
-    if (context.task.isTaskCleanupTask()) {
-      commandArgs.add(context.task.getTaskID() + TaskTracker.TASK_CLEANUP_SUFFIX
-                      + workDir);
-    } else {
-      commandArgs.add(context.task.getTaskID() + workDir);
-    }
-
-    return commandArgs;
-  }
-
-  /**
-   * Enables the task for cleanup by changing permissions of the specified path
-   * in the local filesystem
-   */
-  @Override
-  void enableTaskForCleanup(PathDeletionContext context)
-      throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Going to do " + TaskCommands.ENABLE_TASK_FOR_CLEANUP.toString()
-                + " for " + context.fullPath);
-    }
-
-    if (context instanceof TaskControllerPathDeletionContext) {
-      TaskControllerPathDeletionContext tContext =
-        (TaskControllerPathDeletionContext) context;
-    
-      if (tContext.task.getUser() != null && tContext.fs instanceof LocalFileSystem) {
-        runCommand(TaskCommands.ENABLE_TASK_FOR_CLEANUP,
-                   tContext.task.getUser(),
-                   buildTaskCleanupArgs(tContext), null, null);
-      }
-      else {
-        throw new IllegalArgumentException("Either user is null or the "  +
-                               "file system is not local file system.");
-      }
-    }
-    else {
-      throw new IllegalArgumentException("PathDeletionContext provided is not "
-          + "TaskControllerPathDeletionContext.");
-    }
-  }
-
-  private void logOutput(String output) {
-    String shExecOutput = output;
-    if (shExecOutput != null) {
-      for (String str : shExecOutput.split("\n")) {
-        LOG.info(str);
-      }
-    }
-  }
-
   // get the Job ID from the information in the TaskControllerContext
   private String getJobId(TaskControllerContext context) {
     String taskId = context.task.getTaskID().toString();
@@ -398,10 +299,10 @@ class LinuxTaskController extends TaskController {
    * @return {@link ShellCommandExecutor}
    * @throws IOException
    */
-  private ShellCommandExecutor buildTaskControllerExecutor(
-      TaskCommands command, String userName, List<String> cmdArgs,
-      File workDir, Map<String, String> env) 
-      throws IOException {
+  private ShellCommandExecutor buildTaskControllerExecutor(TaskCommands command, 
+                                          String userName, 
+                                          List<String> cmdArgs, JvmEnv env) 
+                                    throws IOException {
     String[] taskControllerCmd = new String[3 + cmdArgs.size()];
     taskControllerCmd[0] = taskControllerExe;
     taskControllerCmd[1] = userName;
@@ -416,9 +317,9 @@ class LinuxTaskController extends TaskController {
       }
     }
     ShellCommandExecutor shExec = null;
-    if(workDir != null && workDir.exists()) {
+    if(env.workDir != null && env.workDir.exists()) {
       shExec = new ShellCommandExecutor(taskControllerCmd,
-          workDir, env);
+          env.workDir, env.env);
     } else {
       shExec = new ShellCommandExecutor(taskControllerCmd);
     }
@@ -566,8 +467,7 @@ class LinuxTaskController extends TaskController {
     }
     ShellCommandExecutor shExec = buildTaskControllerExecutor(
         command, context.env.conf.getUser(), 
-        buildKillTaskCommandArgs(context), context.env.workDir,
-        context.env.env);
+        buildKillTaskCommandArgs(context), context.env);
     try {
       shExec.execute();
     } catch (Exception e) {

+ 0 - 68
src/mapred/org/apache/hadoop/mapred/TaskController.java

@@ -23,9 +23,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -126,63 +123,6 @@ abstract class TaskController implements Configurable {
     long sleeptimeBeforeSigkill;
   }
 
-  /**
-   * Contains info related to the path of the file/dir to be deleted. This info
-   * is needed by task-controller to build the full path of the file/dir
-   */
-  static class TaskControllerPathDeletionContext extends PathDeletionContext {
-    Task task;
-    boolean isWorkDir;
-    TaskController taskController;
-
-    /**
-     * mapredLocalDir is the base dir under which to-be-deleted taskWorkDir or
-     * taskAttemptDir exists. fullPath of taskAttemptDir or taskWorkDir
-     * is built using mapredLocalDir, jobId, taskId, etc.
-     */
-    Path mapredLocalDir;
-
-    public TaskControllerPathDeletionContext(FileSystem fs, Path mapredLocalDir,
-        Task task, boolean isWorkDir, TaskController taskController) {
-      super(fs, null);
-      this.task = task;
-      this.isWorkDir = isWorkDir;
-      this.taskController = taskController;
-      this.mapredLocalDir = mapredLocalDir;
-    }
-
-    @Override
-    protected String getPathForCleanup() {
-      if (fullPath == null) {
-        fullPath = buildPathForDeletion();
-      }
-      return fullPath;
-    }
-
-    /**
-     * Builds the path of taskAttemptDir OR taskWorkDir based on
-     * mapredLocalDir, jobId, taskId, etc
-     */
-    String buildPathForDeletion() {
-      String subDir = TaskTracker.getLocalTaskDir(task.getJobID().toString(),
-          task.getTaskID().toString(), task.isTaskCleanupTask());
-      if (isWorkDir) {
-        subDir = subDir + Path.SEPARATOR + "work";
-      }
-      return mapredLocalDir.toUri().getPath() + Path.SEPARATOR + subDir;
-    }
-
-    /**
-     * Makes the path(and its subdirectories recursively) fully deletable by
-     * setting proper permissions(777) by task-controller
-     */
-    @Override
-    protected void enablePathForCleanup() throws IOException {
-      getPathForCleanup();// allow init of fullPath
-      taskController.enableTaskForCleanup(this);
-    }
-  }
-
   /**
    * Method which is called after the job is localized so that task controllers
    * can implement their own job localization logic.
@@ -206,12 +146,4 @@ abstract class TaskController implements Configurable {
    */
   
   abstract void killTask(TaskControllerContext context);
-  
-  /**
-   * Enable the task for cleanup by changing permissions of the path
-   * @param context   path deletion context
-   * @throws IOException
-   */
-  abstract void enableTaskForCleanup(PathDeletionContext context)
-      throws IOException;
 }

+ 2 - 42
src/mapred/org/apache/hadoop/mapred/TaskRunner.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.filecache.*;
 import org.apache.hadoop.util.*;
 
@@ -556,53 +557,12 @@ abstract class TaskRunner extends Thread {
     }
   }
   
-  /**
-   * Sets permissions recursively and then deletes the contents of dir.
-   * Makes dir empty directory(does not delete dir itself).
-   */
-  static void deleteDirContents(JobConf conf, File dir) throws IOException {
-    FileSystem fs = FileSystem.getLocal(conf);
-    if (fs.exists(new Path(dir.getAbsolutePath()))) {
-      File contents[] = dir.listFiles();
-      if (contents != null) {
-        for (int i = 0; i < contents.length; i++) {
-          try {
-            int ret = 0;
-            if ((ret = FileUtil.chmod(contents[i].getAbsolutePath(),
-                                      "a+rwx", true)) != 0) {
-              LOG.warn("Unable to chmod for " + contents[i] + 
-                  "; chmod exit status = " + ret);
-            }
-          } catch(InterruptedException e) {
-            LOG.warn("Interrupted while setting permissions for contents of " +
-                "workDir. Not deleting the remaining contents of workDir.");
-            return;
-          }
-          if (!fs.delete(new Path(contents[i].getAbsolutePath()), true)) {
-            LOG.warn("Unable to delete "+ contents[i]);
-          }
-        }
-      }
-    }
-    else {
-      LOG.warn(dir + " does not exist.");
-    }
-  }
-  
   //Mostly for setting up the symlinks. Note that when we setup the distributed
   //cache, we didn't create the symlinks. This is done on a per task basis
   //by the currently executing task.
   public static void setupWorkDir(JobConf conf) throws IOException {
     File workDir = new File(".").getAbsoluteFile();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Fully deleting contents of " + workDir);
-    }
-
-    /** delete only the contents of workDir leaving the directory empty. We
-     * can't delete the workDir as it is the current working directory.
-     */
-    deleteDirContents(conf, workDir);
-    
+    FileUtil.fullyDelete(workDir);
     if (DistributedCache.getSymlink(conf)) {
       URI[] archives = DistributedCache.getCacheArchives(conf);
       URI[] files = DistributedCache.getCacheFiles(conf);

+ 16 - 67
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -69,8 +69,6 @@ import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
 import org.apache.hadoop.mapred.TaskLog.LogName;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerPathDeletionContext;
 import org.apache.hadoop.mapred.TaskStatus.Phase;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapred.pipes.Submitter;
@@ -230,8 +228,6 @@ public class TaskTracker
   private int maxReduceSlots;
   private int failures;
   
-  private FileSystem localFs;
-  
   // Performance-related config knob to send an out-of-band heartbeat
   // on task completion
   static final String TT_OUTOFBAND_HEARBEAT =
@@ -243,7 +239,7 @@ public class TaskTracker
   
   private MapEventsFetcherThread mapEventsFetcher;
   int workerThreads;
-  CleanupQueue directoryCleanupThread;
+  private CleanupQueue directoryCleanupThread;
   volatile JvmManager jvmManager;
   
   private TaskMemoryManagerThread taskMemoryManager;
@@ -391,7 +387,7 @@ public class TaskTracker
   TaskController getTaskController() {
     return taskController;
   }
-
+  
   private RunningJob addTaskToJob(JobID jobId, 
                                   TaskInProgress tip) {
     synchronized (runningJobs) {
@@ -504,7 +500,6 @@ public class TaskTracker
   synchronized void initialize() throws IOException {
     // use configured nameserver & interface to get local hostname
     this.fConf = new JobConf(originalConf);
-    localFs = FileSystem.getLocal(fConf);
     if (fConf.get("slave.host.name") != null) {
       this.localHostname = fConf.get("slave.host.name");
     }
@@ -1488,32 +1483,6 @@ public class TaskTracker
     }
   }
 
-  private static PathDeletionContext[] buildPathDeletionContexts(FileSystem fs,
-      Path[] paths) {
-    int i = 0;
-    PathDeletionContext[] contexts = new PathDeletionContext[paths.length];
-
-    for (Path p : paths) {
-      contexts[i++] = new PathDeletionContext(fs, p.toUri().getPath());
-    }
-    return contexts;
-  }
-
-  static PathDeletionContext[] buildTaskControllerPathDeletionContexts(
-      FileSystem fs, Path[] paths, Task task, boolean isWorkDir,
-      TaskController taskController)
-      throws IOException {
-    int i = 0;
-    PathDeletionContext[] contexts =
-                          new TaskControllerPathDeletionContext[paths.length];
-
-    for (Path p : paths) {
-      contexts[i++] = new TaskControllerPathDeletionContext(fs, p, task,
-                          isWorkDir, taskController);
-    }
-    return contexts;
-  }
-
   /**
    * The task tracker is done with this job, so we need to clean up.
    * @param action The action with the job
@@ -1542,9 +1511,8 @@ public class TaskTracker
         // Delete the job directory for this  
         // task if the job is done/failed
         if (!rjob.keepJobFiles){
-          PathDeletionContext[] contexts = buildPathDeletionContexts(localFs,
-              getLocalFiles(fConf, getLocalJobDir(rjob.getJobID().toString())));
-          directoryCleanupThread.addToQueue(contexts);
+          directoryCleanupThread.addToQueue(fConf, getLocalFiles(fConf, 
+            getLocalJobDir(rjob.getJobID().toString())));
         }
         // Remove this job 
         rjob.tasks.clear();
@@ -2650,7 +2618,6 @@ public class TaskTracker
           }
           String taskDir = getLocalTaskDir(task.getJobID().toString(),
                              taskId.toString(), task.isTaskCleanupTask());
-
           if (needCleanup) {
             if (runner != null) {
               //cleans up the output directory of the task (where map outputs 
@@ -2662,23 +2629,21 @@ public class TaskTracker
             //might be using the dir. The JVM running the tasks would clean
             //the workdir per a task in the task process itself.
             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-              PathDeletionContext[] contexts =
-                buildTaskControllerPathDeletionContexts(localFs, getLocalDirs(),
-                  task, false/* not workDir */, taskController);
-              directoryCleanupThread.addToQueue(contexts);
-            }
+              directoryCleanupThread.addToQueue(defaultJobConf,
+                  getLocalFiles(defaultJobConf,
+                  taskDir));
+            }  
+            
             else {
-              PathDeletionContext[] contexts = buildPathDeletionContexts(
-                  localFs, getLocalFiles(defaultJobConf, taskDir+"/job.xml"));
-              directoryCleanupThread.addToQueue(contexts);
+              directoryCleanupThread.addToQueue(defaultJobConf,
+                  getLocalFiles(defaultJobConf,
+                taskDir+"/job.xml"));
             }
           } else {
             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-              PathDeletionContext[] contexts =
-                buildTaskControllerPathDeletionContexts(localFs, getLocalDirs(),
-                  task, true /* workDir */,
-                  taskController);
-              directoryCleanupThread.addToQueue(contexts);
+              directoryCleanupThread.addToQueue(defaultJobConf,
+                  getLocalFiles(defaultJobConf,
+                  taskDir+"/work"));
             }  
           }
         } catch (Throwable ie) {
@@ -3265,7 +3230,7 @@ public class TaskTracker
   
 
   // get the full paths of the directory in all the local disks.
-  Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{
+  private Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{
     String[] localDirs = conf.getLocalDirs();
     Path[] paths = new Path[localDirs.length];
     FileSystem localFs = FileSystem.getLocal(conf);
@@ -3276,22 +3241,6 @@ public class TaskTracker
     return paths;
   }
 
-  // get the paths in all the local disks.
-  Path[] getLocalDirs() throws IOException{
-    String[] localDirs = fConf.getLocalDirs();
-    Path[] paths = new Path[localDirs.length];
-    FileSystem localFs = FileSystem.getLocal(fConf);
-    for (int i = 0; i < localDirs.length; i++) {
-      paths[i] = new Path(localDirs[i]);
-      paths[i] = paths[i].makeQualified(localFs);
-    }
-    return paths;
-  }
-
-  FileSystem getLocalFileSystem(){
-    return localFs;
-  }
-
   int getMaxCurrentMapTasks() {
     return maxMapSlots;
   }

+ 0 - 206
src/test/org/apache/hadoop/mapred/TestChildTaskDirs.java

@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import java.io.*;
-import java.util.ArrayList;
-import java.util.List;
-
-import junit.extensions.TestSetup;
-import junit.framework.Test;
-import junit.framework.TestSuite;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * Validates removal of user-created-files(and set non-writable permissions) in
- * tasks under taskWorkDir by TT with LinuxTaskController.
- */
-public class TestChildTaskDirs extends ClusterWithLinuxTaskController {
-  private static final Log LOG = LogFactory.getLog(TestChildTaskDirs.class);
-  private static final File TEST_DIR = 
-    new File(System.getProperty("test.build.data", "/tmp"), "child-dirs");
-  private static final String MY_DIR = "my-test-dir";
-  private static final String MY_FILE = "my-test-file";
-  private static final LocalDirAllocator LOCAL_DIR_ALLOC = 
-    new LocalDirAllocator("mapred.local.dir");
-
-  public static Test suite() {
-    TestSetup setup = 
-      new TestSetup(new TestSuite(TestChildTaskDirs.class)) {
-      protected void setUp() throws Exception {
-        TEST_DIR.mkdirs();
-      }
-      protected void tearDown() throws Exception {
-        FileUtil.fullyDelete(TEST_DIR);
-      }
-    };
-    return setup;
-  }
-
-  class InlineCleanupQueue extends CleanupQueue {
-    List<String> stalePaths = new ArrayList<String>();
-
-    public InlineCleanupQueue() {
-      // do nothing
-    }
-
-    @Override
-    public void addToQueue(PathDeletionContext... contexts) {
-      // delete paths in-line
-      for (PathDeletionContext context : contexts) {
-        try {
-          if (!deletePath(context)) {
-            LOG.warn("Stale path " + context.fullPath);
-            stalePaths.add(context.fullPath);
-          }
-        } catch (IOException e) {
-          LOG.warn("Caught exception while deleting path "
-              + context.fullPath);
-          LOG.info(StringUtils.stringifyException(e));
-          stalePaths.add(context.fullPath);
-        }
-      }
-    }
-  }
-
-  // Mapper that creates dirs
-  // job-id/
-  //   -attempt-id/
-  //      -work/
-  //         -my-test-dir(555)
-  //            -my-test-file(555)
-  static class CreateDir extends MapReduceBase implements
-      Mapper<WritableComparable, Writable, WritableComparable, Writable> {
-
-    File taskWorkDir = null;
-    public void map(WritableComparable key, Writable value,
-        OutputCollector<WritableComparable, Writable> out, Reporter reporter)
-        throws IOException {
-      File subDir = new File(taskWorkDir, MY_DIR);
-      LOG.info("Child folder : " + subDir);
-      subDir.mkdirs();
-      File newFile = new File(subDir, MY_FILE);
-      LOG.info("Child file : " + newFile);
-      newFile.createNewFile();
-
-      // Set the permissions of my-test-dir and my-test-dir/my-test-file to 555
-      try {
-        FileUtil.chmod(subDir.getAbsolutePath(), "a=rx", true);
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
-    }
-    
-    @Override
-    public void configure(JobConf conf) {
-      String jobId = conf.get("mapred.job.id");
-      String taskId = conf.get("mapred.task.id");
-      String taskDir = TaskTracker.getLocalTaskDir(jobId, taskId);
-      try {
-        Path taskDirPath = 
-          LOCAL_DIR_ALLOC.getLocalPathForWrite(taskDir, conf);
-        taskWorkDir = new File(taskDirPath.toString(), "work");
-        LOG.info("Task work-dir : " + taskWorkDir.toString());
-      } catch (IOException ioe) {
-        throw new RuntimeException(ioe);
-      }
-    }
-  }
-
-  public void testChildDirCleanup() throws Exception {
-    LOG.info("Testing if the dirs created by the child process is cleaned up properly");
-
-    if (!shouldRun()) {
-      return;
-    }
-
-    // start the cluster
-    startCluster();
-
-    // make sure that only one tracker is configured
-    if (mrCluster.getNumTaskTrackers() != 1) {
-      throw new Exception("Cluster started with " 
-        + mrCluster.getNumTaskTrackers() + " instead of 1");
-    }
-    
-    // configure a job
-    JobConf jConf = getClusterConf();
-    jConf.setJobName("Mkdir job");
-    jConf.setMapperClass(CreateDir.class);
-    jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(0);
-
-    FileSystem fs = FileSystem.get(jConf);
-    Path inDir = new Path("in");
-    Path outDir = new Path("out");
-    if (fs.exists(outDir)) {
-      fs.delete(outDir, true);
-    }
-    if (!fs.exists(inDir)) {
-      fs.mkdirs(inDir);
-    }
-    String input = "The quick brown fox";
-    DataOutputStream file = fs.create(new Path(inDir, "part-0"));
-    file.writeBytes(input);
-    file.close();
-
-    jConf.setInputFormat(TextInputFormat.class);
-    jConf.setOutputKeyClass(LongWritable.class);
-    jConf.setOutputValueClass(Text.class);
-
-    FileInputFormat.setInputPaths(jConf, inDir);
-    FileOutputFormat.setOutputPath(jConf, outDir);
-
-    // set inline cleanup queue in TT
-    mrCluster.getTaskTrackerRunner(0).getTaskTracker().directoryCleanupThread =
-      new InlineCleanupQueue();
-
-    JobClient jobClient = new JobClient(jConf);
-    RunningJob job = jobClient.submitJob(jConf);
-
-    JobID id = job.getID();
-    
-    // wait for the job to finish
-    job.waitForCompletion();
-    
-    JobInProgress jip = 
-      mrCluster.getJobTrackerRunner().getJobTracker().getJob(id);
-    String attemptId = 
-      jip.getMapTasks()[0].getTaskStatuses()[0].getTaskID().toString();
-    
-    String taskTrackerLocalDir = 
-      mrCluster.getTaskTrackerRunner(0).getLocalDir();
-    
-    String taskDir = TaskTracker.getLocalTaskDir(id.toString(), attemptId);
-    Path taskDirPath = new Path(taskTrackerLocalDir, taskDir);
-    LOG.info("Checking task dir " + taskDirPath);
-    FileSystem localFS = FileSystem.getLocal(jConf);
-    assertFalse("task dir still exists", localFS.exists(taskDirPath));
-  }
-}

+ 0 - 90
src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java

@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-
-import junit.framework.TestCase;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-
-public class TestSetupWorkDir extends TestCase {
-  private static final Log LOG =
-    LogFactory.getLog(TestSetupWorkDir.class);
-
-  /**
-   * Create a file in the given dir and set permissions r_xr_xr_x sothat no one
-   * can delete it directly(without doing chmod).
-   * Creates dir/subDir and dir/subDir/file
-   */
-  static void createFileAndSetPermissions(JobConf jobConf, Path dir)
-       throws IOException {
-    Path subDir = new Path(dir, "subDir");
-    FileSystem fs = FileSystem.getLocal(jobConf);
-    fs.mkdirs(subDir);
-    Path p = new Path(subDir, "file");
-    DataOutputStream out = fs.create(p);
-    out.writeBytes("dummy input");
-    out.close();
-    // no write permission for subDir and subDir/file
-    try {
-      int ret = 0;
-      if((ret = FileUtil.chmod(subDir.toUri().getPath(), "a=rx", true)) != 0) {
-        LOG.warn("chmod failed for " + subDir + ";retVal=" + ret);
-      }
-    } catch(InterruptedException e) {
-      LOG.warn("Interrupted while doing chmod for " + subDir);
-    }
-  }
-
-  /**
-   * Validates if setupWorkDir is properly cleaning up contents of workDir.
-   * TODO: other things of TaskRunner.setupWorkDir() related to distributed
-   * cache need to be validated.
-   */
-  public void testSetupWorkDir() throws IOException {
-    Path rootDir = new Path(System.getProperty("test.build.data",  "/tmp"),
-                            "testSetupWorkDir");
-    Path myWorkDir = new Path(rootDir, "./work");
-    JobConf jConf = new JobConf();
-    FileSystem fs = FileSystem.getLocal(jConf);
-    if (fs.exists(myWorkDir)) {
-      fs.delete(myWorkDir, true);
-    }
-    if (!fs.mkdirs(myWorkDir)) {
-      throw new IOException("Unable to create workDir " + myWorkDir);
-    }
-
-    // create {myWorkDir}/subDir/file and set 555 perms for subDir and file
-    createFileAndSetPermissions(jConf, myWorkDir);
-
-    TaskRunner.deleteDirContents(jConf, new File(myWorkDir.toUri().getPath()));
-    
-    assertTrue("Contents of " + myWorkDir + " are not cleaned up properly.",
-        fs.listStatus(myWorkDir).length == 0);
-    
-    // cleanup
-    fs.delete(rootDir, true);
-  }
-}