Browse Source

MAPREDUCE-4490. Fixed LinuxTaskController to re-initialize user log
directory when JVM reuse option is enabled. (Sam Liu via eyang)



git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1602528 13f79535-47bb-0310-9956-ffa450edef68

Eric Yang 11 years ago
parent
commit
484c4fd413

+ 3 - 0
CHANGES.txt

@@ -63,6 +63,9 @@ Release 1.3.0 - unreleased
 
   BUG FIXES
 
+    MAPREDUCE-4490. Fixed LinuxTaskController to re-initialize user log
+    directory when JVM reuse option is enabled.  (Sam Liu via eyang)
+
     HADOOP-9863. Backport HADOOP-8686 to support BigEndian on ppc64. 
     (Yu Li via eyang)
 

+ 11 - 0
src/c++/task-controller/impl/main.c

@@ -51,6 +51,8 @@ void display_usage(FILE *stream) {
 	  DELETE_LOG_AS_USER);
   fprintf(stream, "   run command as user:  %2d cmd args\n",
 	  RUN_COMMAND_AS_USER);
+  fprintf(stream, "   initialize task:      %2d user relative-path jobid taskid\n",
+	  INITIALIZE_TASK);
 }
 
 int main(int argc, char **argv) {
@@ -197,6 +199,15 @@ int main(int argc, char **argv) {
   case RUN_COMMAND_AS_USER:
     exit_code = run_command_as_user(user_detail->pw_name, argv + optind);
     break;
+  case INITIALIZE_TASK:
+    if (argc < 6) {
+      fprintf(LOGFILE, "Too few arguments (%d vs 6) for initializing task\n", argc);
+      return INVALID_ARGUMENT_NUMBER;
+    }
+    job_id = argv[optind++];
+    task_id = argv[optind++];
+    exit_code = initialize_task(user_detail->pw_name, good_local_dirs, job_id, task_id);
+    break;
   default:
     exit_code = INVALID_COMMAND_PROVIDED;
   }

+ 7 - 3
src/c++/task-controller/impl/task-controller.c

@@ -507,6 +507,13 @@ int create_attempt_directories(const char* user,
   return result;
 }
 
+int initialize_task(const char* user,
+  const char * good_local_dirs, const char *job_id, const char *task_id) {
+  // Prepare the attempt directories for the task JVM.
+  int result = create_attempt_directories(user, good_local_dirs, job_id, task_id);
+  return result;
+}
+
 /**
  * Load the user information for a given user name.
  */
@@ -883,9 +890,6 @@ int run_task_as_user(const char *user, const char * good_local_dirs,
                      const char *work_dir, const char *script_name) {
   int exit_code = -1;
   char *task_script_path = NULL;
-  if (create_attempt_directories(user, good_local_dirs, job_id, task_id) != 0) {
-    goto cleanup;
-  }
   int task_file_source = open_file_as_task_tracker(script_name);
   if (task_file_source == -1) {
     goto cleanup;

+ 8 - 1
src/c++/task-controller/impl/task-controller.h

@@ -26,7 +26,8 @@ enum command {
   SIGNAL_TASK = 2,
   DELETE_AS_USER = 3,
   DELETE_LOG_AS_USER = 4,
-  RUN_COMMAND_AS_USER = 5
+  RUN_COMMAND_AS_USER = 5,
+  INITIALIZE_TASK = 6
 };
 
 enum errorcodes {
@@ -159,3 +160,9 @@ int change_user(uid_t user, gid_t group);
  */
 int create_attempt_directories(const char* user,
 	const char * good_local_dirs, const char *job_id, const char *task_id);
+
+/**
+ * Initialize the task directory
+ */
+int initialize_task(const char* user,
+ 	const char * good_local_dirs, const char *job_id, const char *task_id);

+ 21 - 1
src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java

@@ -23,6 +23,8 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -70,6 +72,9 @@ class LinuxTaskController extends TaskController {
   private static final String TASK_CONTROLLER_EXEC_KEY =
     "mapreduce.tasktracker.task-controller.exe";
   
+  private static Map<String, String> jobUserMap = new HashMap<String, String>(); 
+  private static File currentWorkDirectory; 
+  
   @Override
   public void setConf(Configuration conf) {
     super.setConf(conf);
@@ -93,7 +98,8 @@ class LinuxTaskController extends TaskController {
     SIGNAL_TASK(2),
     DELETE_AS_USER(3),
     DELETE_LOG_AS_USER(4),
-    RUN_COMMAND_AS_USER(5);
+    RUN_COMMAND_AS_USER(5),
+    INITIALIZE_TASK(6);
 
     private int value;
     Commands(int value) {
@@ -154,6 +160,8 @@ class LinuxTaskController extends TaskController {
                             Path jobConf, TaskUmbilicalProtocol taskTracker,
                             InetSocketAddress ttAddr
                             ) throws IOException {
+    jobUserMap.put(jobid, user);
+
     List<String> command = new ArrayList<String>(
       Arrays.asList(taskControllerExe, 
                     user,
@@ -264,6 +272,18 @@ class LinuxTaskController extends TaskController {
   public void createLogDir(TaskAttemptID taskID,
                            boolean isCleanup) throws IOException {
     // Log dirs are created during attempt dir creation when running the task
+    String[] command = 
+      new String[]{taskControllerExe, 
+          jobUserMap.get(taskID.getJobID().toString()),
+                   localStorage.getDirsString(),
+                   Integer.toString(Commands.INITIALIZE_TASK.getValue()),
+                   taskID.getJobID().toString(),
+                   taskID.toString()};
+    ShellCommandExecutor shExec = new ShellCommandExecutor(command);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("createLogDir: " + Arrays.toString(command));
+    }
+    shExec.execute();
   }
 
   @Override