Browse Source

MAPREDUCE-3424. Some LinuxTaskController cleanup. Contributed by Eli Collins

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security@1205279 13f79535-47bb-0310-9956-ffa450edef68
Eli Collins 13 years ago
parent
commit
ccfe239c78

+ 2 - 0
CHANGES.txt

@@ -93,6 +93,8 @@ Release 0.20.206.0 - unreleased
     HDFS-2246. Shortcut a local client reads to a Datanodes files directly. 
     HDFS-2246. Shortcut a local client reads to a Datanodes files directly. 
     (Andrew Purtell, Suresh, Jitendra)
     (Andrew Purtell, Suresh, Jitendra)
 
 
+    MAPREDUCE-3424. Some LinuxTaskController cleanup (eli)
+
 Release 0.20.205.1 - unreleased
 Release 0.20.205.1 - unreleased
 
 
   NEW FEATURES
   NEW FEATURES

+ 22 - 10
src/c++/task-controller/impl/task-controller.c

@@ -40,9 +40,13 @@
 
 
 #define ATTEMPT_DIR_PATTERN TT_JOB_DIR_PATTERN "/%s/work"
 #define ATTEMPT_DIR_PATTERN TT_JOB_DIR_PATTERN "/%s/work"
 
 
+#define USER_LOG_PATTERN "%s/userlogs/%s"
+
+#define TASK_DIR_PATTERN USER_LOG_PATTERN "/%s"
+
 #define TASK_SCRIPT "taskjvm.sh"
 #define TASK_SCRIPT "taskjvm.sh"
 
 
-#define TT_LOCAL_TASK_DIR_PATTERN    "%s/taskTracker/%s/jobcache/%s/%s"
+#define TT_LOCAL_TASK_DIR_PATTERN "%s/taskTracker/%s/jobcache/%s/%s"
 
 
 #define TT_LOG_DIR_KEY "hadoop.log.dir"
 #define TT_LOG_DIR_KEY "hadoop.log.dir"
 
 
@@ -100,7 +104,7 @@ char* get_executable() {
  *    * be user-owned by root
  *    * be user-owned by root
  *    * be group-owned by a configured special group.
  *    * be group-owned by a configured special group.
  *    * others do not have any permissions
  *    * others do not have any permissions
- *    * be setuid/setgid
+ *    * be setuid
  */
  */
 int check_taskcontroller_permissions(char *executable_file) {
 int check_taskcontroller_permissions(char *executable_file) {
 
 
@@ -138,15 +142,16 @@ int check_taskcontroller_permissions(char *executable_file) {
   }
   }
 
 
   // check others do not have read/write/execute permissions
   // check others do not have read/write/execute permissions
-  if ((filestat.st_mode & S_IROTH) == S_IROTH || (filestat.st_mode & S_IWOTH)
-      == S_IWOTH || (filestat.st_mode & S_IXOTH) == S_IXOTH) {
+  if ((filestat.st_mode & S_IROTH) == S_IROTH ||
+      (filestat.st_mode & S_IWOTH) == S_IWOTH ||
+      (filestat.st_mode & S_IXOTH) == S_IXOTH) {
     fprintf(LOGFILE,
     fprintf(LOGFILE,
             "The task-controller binary should not have read or write or"
             "The task-controller binary should not have read or write or"
             " execute for others.\n");
             " execute for others.\n");
     return -1;
     return -1;
   }
   }
 
 
-  // Binary should be setuid/setgid executable
+  // Binary should be setuid executable
   if ((filestat.st_mode & S_ISUID) == 0) {
   if ((filestat.st_mode & S_ISUID) == 0) {
     fprintf(LOGFILE, "The task-controller binary should be set setuid.\n");
     fprintf(LOGFILE, "The task-controller binary should be set setuid.\n");
     return -1;
     return -1;
@@ -298,7 +303,7 @@ char* get_job_log_directory(const char* jobid) {
     fprintf(LOGFILE, "Log directory %s is not configured.\n", TT_LOG_DIR_KEY);
     fprintf(LOGFILE, "Log directory %s is not configured.\n", TT_LOG_DIR_KEY);
     return NULL;
     return NULL;
   }
   }
-  char *result = concatenate("%s/userlogs/%s", "job log dir", 2, log_dir, 
+  char *result = concatenate(USER_LOG_PATTERN, "job log dir", 2, log_dir, 
                              jobid);
                              jobid);
   if (result == NULL) {
   if (result == NULL) {
     fprintf(LOGFILE, "failed to get memory in get_job_log_directory for %s"
     fprintf(LOGFILE, "failed to get memory in get_job_log_directory for %s"
@@ -468,16 +473,18 @@ int create_attempt_directories(const char* user,
       result = -1;
       result = -1;
       goto cleanup;
       goto cleanup;
     }
     }
-    sprintf(real_job_dir, "%s/userlogs/%s", random_local_dir, job_id);
+    sprintf(real_job_dir, USER_LOG_PATTERN, random_local_dir, job_id);
     result = create_directory_for_user(real_job_dir);
     result = create_directory_for_user(real_job_dir);
     if( result != 0) {
     if( result != 0) {
       result = -1;
       result = -1;
       goto cleanup;
       goto cleanup;
     }
     }
-    sprintf(real_task_dir, "%s/userlogs/%s/%s",
+    sprintf(real_task_dir, TASK_DIR_PATTERN,
             random_local_dir, job_id, task_id);
             random_local_dir, job_id, task_id);
     result = mkdirs(real_task_dir, perms); 
     result = mkdirs(real_task_dir, perms); 
     if( result != 0) {
     if( result != 0) {
+      fprintf(LOGFILE, "Failed to create real task dir %s - %s\n",
+              real_task_dir, strerror(errno));
       result = -1; 
       result = -1; 
       goto cleanup;
       goto cleanup;
     }
     }
@@ -486,6 +493,7 @@ int create_attempt_directories(const char* user,
       fprintf(LOGFILE, "Failed to create symlink %s to %s - %s\n",
       fprintf(LOGFILE, "Failed to create symlink %s to %s - %s\n",
               link_task_log_dir, real_task_dir, strerror(errno));
               link_task_log_dir, real_task_dir, strerror(errno));
       result = -1;
       result = -1;
+      goto cleanup;
     }
     }
   }
   }
 
 
@@ -841,7 +849,11 @@ int initialize_job(const char *user, const char * good_local_dirs,
     fclose(stdout);
     fclose(stdout);
   }
   }
   fclose(stderr);
   fclose(stderr);
-  chdir(primary_job_dir);
+  if (chdir(primary_job_dir)) {
+    fprintf(LOGFILE, "Failure to chdir to job dir - %s\n",
+            strerror(errno));
+    return -1;
+  }
   execvp(args[0], args);
   execvp(args[0], args);
   fprintf(LOGFILE, "Failure to exec job initialization process - %s\n",
   fprintf(LOGFILE, "Failure to exec job initialization process - %s\n",
 	  strerror(errno));
 	  strerror(errno));
@@ -1132,7 +1144,7 @@ int delete_log_directory(const char *subdir, const char * good_local_dirs) {
 
 
   char **local_dir_ptr;
   char **local_dir_ptr;
   for(local_dir_ptr = local_dir; *local_dir_ptr != NULL; ++local_dir_ptr) {
   for(local_dir_ptr = local_dir; *local_dir_ptr != NULL; ++local_dir_ptr) {
-     char *mapred_local_log_dir = concatenate("%s/userlogs/%s", 
+     char *mapred_local_log_dir = concatenate(USER_LOG_PATTERN, 
 				      "mapred local job log dir", 
 				      "mapred local job log dir", 
 			      	      2, *local_dir_ptr, subdir);
 			      	      2, *local_dir_ptr, subdir);
      if (mapred_local_log_dir != NULL) {
      if (mapred_local_log_dir != NULL) {

+ 0 - 7
src/docs/src/documentation/content/xdocs/cluster_setup.xml

@@ -627,13 +627,6 @@
             </p>
             </p>
             <table><tr><th>Name</th><th>Description</th></tr>
             <table><tr><th>Name</th><th>Description</th></tr>
             <tr>
             <tr>
-            <td>mapred.local.dir</td>
-            <td>Path to mapred local directories. Should be same as the value 
-            which was provided to key in mapred-site.xml. This is required to
-            validate paths passed to the setuid executable in order to prevent
-            arbitrary paths being passed to it.</td>
-            </tr>
-            <tr>
             <td>hadoop.log.dir</td>
             <td>hadoop.log.dir</td>
             <td>Path to hadoop log directory. Should be same as the value which
             <td>Path to hadoop log directory. Should be same as the value which
             the TaskTracker is started with. This is required to set proper
             the TaskTracker is started with. This is required to set proper

+ 15 - 16
src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java

@@ -238,23 +238,22 @@ public class DefaultTaskController extends TaskController {
   public void deleteLogAsUser(String user, 
   public void deleteLogAsUser(String user, 
                               String subDir) throws IOException {
                               String subDir) throws IOException {
     Path dir = new Path(TaskLog.getUserLogDir().getAbsolutePath(), subDir);
     Path dir = new Path(TaskLog.getUserLogDir().getAbsolutePath(), subDir);
-    //Delete the subDir in <hadoop.log.dir>/userlogs
+    // Delete the subDir in <hadoop.log.dir>/userlogs
     File subDirPath = new File(dir.toString());
     File subDirPath = new File(dir.toString());
-    FileUtil.fullyDelete( subDirPath );
-    
-    //Delete the subDir in all good <mapred.local.dirs>/userlogs 
-    String [] localDirs = localStorage.getDirs();
-    for(String localdir : localDirs) {
-    	String dirPath = localdir + File.separatorChar + 
-    					TaskLog.USERLOGS_DIR_NAME + File.separatorChar +
-    					subDir;
-    	try {
-    		FileUtil.fullyDelete( new File(dirPath) );
-        } catch(Exception e){
-        	//Skip bad dir for later deletion
-            LOG.warn("Could not delete dir: " + dirPath + 
-                " , Reason : " + e.getMessage());
-        }
+    FileUtil.fullyDelete(subDirPath);
+
+    // Delete the subDir in all good <mapred.local.dirs>/userlogs
+    for (String localdir : localStorage.getDirs()) {
+      String dirPath = localdir + File.separatorChar +
+        TaskLog.USERLOGS_DIR_NAME + File.separatorChar + subDir;
+
+      try {
+        FileUtil.fullyDelete(new File(dirPath));
+      } catch(Exception e){
+        // Skip bad dir for later deletion
+        LOG.warn("Could not delete dir: " + dirPath +
+                 " , Reason : " + e.getMessage());
+      }
     }
     }
   }
   }
   
   

+ 10 - 3
src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java

@@ -127,9 +127,10 @@ class LinuxTaskController extends TaskController {
   public void setup(LocalDirAllocator allocator, LocalStorage localStorage)
   public void setup(LocalDirAllocator allocator, LocalStorage localStorage)
       throws IOException {
       throws IOException {
 
 
-    // Check the permissions of the task-controller binary by running it plainly.
-    // If permissions are correct, it returns an error code 1, else it returns
-    // 24 or something else if some other bugs are also present.
+    // Check the permissions of the task-controller binary by running
+    // it plainly.  If permissions are correct, it returns an error
+    // code 1, else it returns 24 or something else if some other bugs
+    // are also present.
     String[] taskControllerCmd =
     String[] taskControllerCmd =
         new String[] { taskControllerExe };
         new String[] { taskControllerExe };
     ShellCommandExecutor shExec = new ShellCommandExecutor(taskControllerCmd);
     ShellCommandExecutor shExec = new ShellCommandExecutor(taskControllerCmd);
@@ -258,6 +259,12 @@ class LinuxTaskController extends TaskController {
     return 0;
     return 0;
   }
   }
 
 
+  @Override
+  public void createLogDir(TaskAttemptID taskID,
+                           boolean isCleanup) throws IOException {
+    // Log dirs are created during attempt dir creation when running the task
+  }
+
   @Override
   @Override
   public void deleteAsUser(String user, String subDir) throws IOException {
   public void deleteAsUser(String user, String subDir) throws IOException {
     String[] command = 
     String[] command = 

+ 2 - 4
src/mapred/org/apache/hadoop/mapred/TaskController.java

@@ -150,10 +150,8 @@ public abstract class TaskController implements Configurable {
    * @param isCleanup If the task is cleanup task or not
    * @param isCleanup If the task is cleanup task or not
    * @throws IOException
    * @throws IOException
    */
    */
-  public void createLogDir(TaskAttemptID taskID, 
-			boolean isCleanup) throws IOException {
-	  
-  }
+  public abstract void createLogDir(TaskAttemptID taskID, 
+                                    boolean isCleanup) throws IOException;
   
   
   /**
   /**
    * Delete the user's files under the userlogs directory.
    * Delete the user's files under the userlogs directory.

+ 1 - 3
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -710,8 +710,6 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
     fConf.setStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY, dirs);
     fConf.setStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY, dirs);
     LOG.info("Good mapred local directories are: " + dirs);
     LOG.info("Good mapred local directories are: " + dirs);
     taskController.setConf(fConf);
     taskController.setConf(fConf);
-    // Setup task controller so that deletion of user dirs happens properly
-    taskController.setup(localDirAllocator, localStorage);
     server.setAttribute("conf", fConf);
     server.setAttribute("conf", fConf);
 
 
     deleteUserDirectories(fConf);
     deleteUserDirectories(fConf);
@@ -1451,7 +1449,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
     localStorage = new LocalStorage(fConf.getLocalDirs());
     localStorage = new LocalStorage(fConf.getLocalDirs());
     localStorage.checkDirs();
     localStorage.checkDirs();
     taskController = 
     taskController = 
-      (TaskController) ReflectionUtils.newInstance(taskControllerClass, fConf);
+      (TaskController)ReflectionUtils.newInstance(taskControllerClass, fConf);
     taskController.setup(localDirAllocator, localStorage);
     taskController.setup(localDirAllocator, localStorage);
     lastNumFailures = localStorage.numFailures();
     lastNumFailures = localStorage.numFailures();
 
 

+ 57 - 54
src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java

@@ -131,27 +131,28 @@ public class UserLogCleaner extends Thread {
    * @throws IOException
    * @throws IOException
    */
    */
   public void addOldUserLogsForDeletion(File loc, Configuration conf)  
   public void addOldUserLogsForDeletion(File loc, Configuration conf)  
-  			throws IOException  {
-    if (loc.exists()) {
-        long now = clock.getTime();
-        for(String logDir: loc.list()) {
-          // add all the log dirs to taskLogsMnonitor.
-          JobID jobid = null;
-          try {
-            jobid = JobID.forName(logDir);
-          } catch (IllegalArgumentException ie) {
-            deleteLogPath(logDir);
-            continue;
-          }
-          // add the job log directory for deletion with 
-          // default retain hours, if it is not already added
-          if (!completedJobs.containsKey(jobid)) {
-            JobCompletedEvent jce = 
-              new JobCompletedEvent(jobid, now,getUserlogRetainHours(conf));
-            userLogManager.addLogEvent(jce);
-          }
-        }
+      throws IOException  {
+    if (!loc.exists()) {
+      return;
+    }
+    long now = clock.getTime();
+    for (String logDir : loc.list()) {
+      // add all the log dirs to taskLogsMnonitor.
+      JobID jobid = null;
+      try {
+        jobid = JobID.forName(logDir);
+      } catch (IllegalArgumentException ie) {
+        deleteLogPath(logDir);
+        continue;
       }
       }
+      // add the job log directory for deletion with 
+      // default retain hours, if it is not already added
+      if (!completedJobs.containsKey(jobid)) {
+        JobCompletedEvent jce = 
+          new JobCompletedEvent(jobid, now,getUserlogRetainHours(conf));
+        userLogManager.addLogEvent(jce);
+      }
+    }
   }
   }
   
   
   /**
   /**
@@ -165,10 +166,10 @@ public class UserLogCleaner extends Thread {
     File userLogDir = TaskLog.getUserLogDir();
     File userLogDir = TaskLog.getUserLogDir();
     addOldUserLogsForDeletion(userLogDir, conf);
     addOldUserLogsForDeletion(userLogDir, conf);
     String[] localDirs = conf.getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
     String[] localDirs = conf.getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
-    for(String localDir : localDirs) {
-    	File mapredLocalUserLogDir = new File(localDir + 
-    			File.separatorChar + TaskLog.USERLOGS_DIR_NAME);
-    	addOldUserLogsForDeletion(mapredLocalUserLogDir, conf);
+    for (String localDir : localDirs) {
+      File mapredLocalUserLogDir = new File(localDir + 
+        File.separatorChar + TaskLog.USERLOGS_DIR_NAME);
+      addOldUserLogsForDeletion(mapredLocalUserLogDir, conf);
     }
     }
   }
   }
 
 
@@ -218,39 +219,41 @@ public class UserLogCleaner extends Thread {
    * @throws IOException
    * @throws IOException
    */
    */
   private String getLogUser(String logPath) throws IOException{
   private String getLogUser(String logPath) throws IOException{
-	//Get user from <hadoop.log.dir>/userlogs/jobid path
-	String logRoot = TaskLog.getUserLogDir().toString();
-	String user = null;
-	try{
-		user = localFs.getFileStatus(new Path(logRoot, logPath)).getOwner();
-	}catch(Exception e){
-		//Ignore this exception since this path might have been deleted.
-	}
+    // Get user from <hadoop.log.dir>/userlogs/jobid path
+    String logRoot = TaskLog.getUserLogDir().toString();
+    String user = null;
+    try {
+      user = localFs.getFileStatus(new Path(logRoot, logPath)).getOwner();
+    } catch (Exception e) {
+      // Ignore this exception since this path might have been deleted.
+    }
 	
 	
-	//If we found the user for this logPath, then return this user
-	if(user != null) return user; 
+    // If we found the user for this logPath, then return this user
+    if (user != null) {
+      return user;
+    }
 
 
-	//If <hadoop.log.dir>/userlogs/jobid not found, then get user from 
-	//any one of existing <mapred.local.dir>/userlogs/jobid path(s)
-	String[] localDirs = 
-	   userLogManager.getTaskController().getLocalDirs();
-	for(String localDir : localDirs) {
-		try{
-		   logRoot = localDir + File.separator + TaskLog.USERLOGS_DIR_NAME;
-		   user = localFs.getFileStatus(new Path(logRoot, logPath)).getOwner();
-		   //If we found the user for this logPath, then break this loop
-		   if(user != null) break; 
-			
-		}catch(Exception e){
-			//Ignore this exception since this path might have been deleted.
-		}
-	}
-	
-	if(user == null) {
-		throw new IOException("Userlog path not found for " + logPath);
-	}
+    // If <hadoop.log.dir>/userlogs/jobid not found, then get user from 
+    // any one of existing <mapred.local.dir>/userlogs/jobid path(s)
+    String[] localDirs =
+      userLogManager.getTaskController().getLocalDirs();
+    for (String localDir : localDirs) {
+      try {
+        logRoot = localDir + File.separator + TaskLog.USERLOGS_DIR_NAME;
+        user = localFs.getFileStatus(new Path(logRoot, logPath)).getOwner();
+        // If we found the user for this logPath, then break this loop
+        if (user != null) {
+          break;
+        }
+      } catch (Exception e) {
+        // Ignore this exception since this path might have been deleted.
+      }
+    }
 	
 	
-	return user;
+    if (user == null) {
+      throw new IOException("Userlog path not found for " + logPath);
+    }
+    return user;
   }
   }
   
   
   /**
   /**