瀏覽代碼

Merge -r 755904:755905 from trunk onto 0.20 branch. Fixes HADOOP-5471.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@755908 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 年之前
父節點
當前提交
319db1ac20
共有 2 個文件被更改,包括 13 次插入4 次删除
  1. 3 0
      CHANGES.txt
  2. 10 4
      src/mapred/org/apache/hadoop/mapred/Child.java

+ 3 - 0
CHANGES.txt

@@ -767,6 +767,9 @@ Release 0.20.0 - Unreleased
 
 
     HADOOP-5382. Support combiners in the new context object API. (omalley)
     HADOOP-5382. Support combiners in the new context object API. (omalley)
 
 
+    HADOOP-5471. Fixes a problem to do with updating the log.index file in the 
+    case where a cleanup task is run. (Amareshwari Sriramadasu via ddas)
+
 Release 0.19.2 - Unreleased
 Release 0.19.2 - Unreleased
 
 
   BUG FIXES
   BUG FIXES

+ 10 - 4
src/mapred/org/apache/hadoop/mapred/Child.java

@@ -46,7 +46,7 @@ class Child {
   public static final Log LOG =
   public static final Log LOG =
     LogFactory.getLog(TaskTracker.class);
     LogFactory.getLog(TaskTracker.class);
 
 
-  static volatile TaskAttemptID taskid;
+  static volatile TaskAttemptID taskid = null;
   static volatile boolean isCleanup;
   static volatile boolean isCleanup;
 
 
   public static void main(String[] args) throws Throwable {
   public static void main(String[] args) throws Throwable {
@@ -58,9 +58,8 @@ class Child {
     InetSocketAddress address = new InetSocketAddress(host, port);
     InetSocketAddress address = new InetSocketAddress(host, port);
     final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
     final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
     final int SLEEP_LONGER_COUNT = 5;
     final int SLEEP_LONGER_COUNT = 5;
-    taskid = firstTaskid;
     int jvmIdInt = Integer.parseInt(args[3]);
     int jvmIdInt = Integer.parseInt(args[3]);
-    JVMId jvmId = new JVMId(taskid.getJobID(),taskid.isMap(),jvmIdInt);
+    JVMId jvmId = new JVMId(firstTaskid.getJobID(),firstTaskid.isMap(),jvmIdInt);
     TaskUmbilicalProtocol umbilical =
     TaskUmbilicalProtocol umbilical =
       (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
       (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
           TaskUmbilicalProtocol.versionID,
           TaskUmbilicalProtocol.versionID,
@@ -99,6 +98,7 @@ class Child {
     Task task = null;
     Task task = null;
     try {
     try {
       while (true) {
       while (true) {
+        taskid = null;
         JvmTask myTask = umbilical.getTask(jvmId);
         JvmTask myTask = umbilical.getTask(jvmId);
         if (myTask.shouldDie()) {
         if (myTask.shouldDie()) {
           break;
           break;
@@ -182,7 +182,9 @@ class Child {
       // Report back any failures, for diagnostic purposes
       // Report back any failures, for diagnostic purposes
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
       throwable.printStackTrace(new PrintStream(baos));
       throwable.printStackTrace(new PrintStream(baos));
-      umbilical.reportDiagnosticInfo(taskid, baos.toString());
+      if (taskid != null) {
+        umbilical.reportDiagnosticInfo(taskid, baos.toString());
+      }
     } finally {
     } finally {
       RPC.stopProxy(umbilical);
       RPC.stopProxy(umbilical);
       MetricsContext metricsContext = MetricsUtil.getContext("mapred");
       MetricsContext metricsContext = MetricsUtil.getContext("mapred");
@@ -191,6 +193,10 @@ class Child {
       // This assumes that on return from Task.run() 
       // This assumes that on return from Task.run() 
       // there is no more logging done.
       // there is no more logging done.
       LogManager.shutdown();
       LogManager.shutdown();
+      // do synclogs
+      if (taskid != null) {
+        TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+      }
     }
     }
   }
   }
 }
 }