Browse Source

MAPREDUCE-5693. Restore MRv1 behavior for log flush. Contributed by Gera Shegalov

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1560148 13f79535-47bb-0310-9956-ffa450edef68
Jason Darrell Lowe 11 years ago
parent
commit
d1963ad4bd

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -285,6 +285,9 @@ Release 2.4.0 - UNRELEASED
 
     MAPREDUCE-5729. mapred job -list throws NPE (kasha)
 
+    MAPREDUCE-5693. Restore MRv1 behavior for log flush (Gera Shegalov via
+    jlowe)
+
 Release 2.3.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 5 - 5
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java

@@ -27,6 +27,7 @@ import java.net.URI;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -61,7 +62,6 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.log4j.LogManager;
 
 /**
  * The main() for MapReduce task processes.
@@ -123,6 +123,7 @@ class YarnChild {
     LOG.debug("PID: " + System.getenv().get("JVM_PID"));
     Task task = null;
     UserGroupInformation childUGI = null;
+    ScheduledExecutorService logSyncer = null;
 
     try {
       int idleLoopCount = 0;
@@ -161,6 +162,8 @@ class YarnChild {
       // set job classloader if configured before invoking the task
       MRApps.setJobClassLoader(job);
 
+      logSyncer = TaskLog.createLogSyncer();
+
       // Create a final reference to the task for the doAs block
       final Task taskFinal = task;
       childUGI.doAs(new PrivilegedExceptionAction<Object>() {
@@ -214,10 +217,7 @@ class YarnChild {
     } finally {
       RPC.stopProxy(umbilical);
       DefaultMetricsSystem.shutdown();
-      // Shutting down log4j of the child-vm...
-      // This assumes that on return from Task.run()
-      // there is no more logging done.
-      LogManager.shutdown();
+      TaskLog.syncLogsShutdown(logSyncer);
     }
   }
 

+ 10 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.io.IOUtils;
@@ -45,6 +46,7 @@ import org.apache.hadoop.mapred.FileOutputCommitter;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.LocalContainerLauncher;
 import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
+import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -212,6 +214,7 @@ public class MRAppMaster extends CompositeService {
   boolean errorHappenedShutDown = false;
   private String shutDownMessage = null;
   JobStateInternal forcedState = null;
+  private final ScheduledExecutorService logSyncer;
 
   private long recoveredJobStartTime = 0;
 
@@ -240,6 +243,7 @@ public class MRAppMaster extends CompositeService {
     this.nmHttpPort = nmHttpPort;
     this.metrics = MRAppMetrics.create();
     this.maxAppAttempts = maxAppAttempts;
+    logSyncer = TaskLog.createLogSyncer();
     LOG.info("Created MRAppMaster for application " + applicationAttemptId);
   }
 
@@ -1078,6 +1082,12 @@ public class MRAppMaster extends CompositeService {
     // All components have started, start the job.
     startJobs();
   }
+  
+  @Override
+  public void stop() {
+    super.stop();
+    TaskLog.syncLogsShutdown(logSyncer);
+  }
 
   private void processRecovery() {
     if (appAttemptID.getAttemptId() == 1) {

+ 87 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java

@@ -23,12 +23,17 @@ import java.io.BufferedReader;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.Flushable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -44,6 +49,8 @@ import org.apache.hadoop.io.SecureIOUtils;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.log4j.Appender;
 import org.apache.log4j.LogManager;
@@ -262,7 +269,86 @@ public class TaskLog {
     }
     writeToIndexFile(logLocation, isCleanup);
   }
-  
+
+  public static synchronized void syncLogsShutdown(
+    ScheduledExecutorService scheduler) 
+  {
+    // flush standard streams
+    //
+    System.out.flush();
+    System.err.flush();
+
+    if (scheduler != null) {
+      scheduler.shutdownNow();
+    }
+
+    // flush & close all appenders
+    LogManager.shutdown(); 
+  }
+
+  @SuppressWarnings("unchecked")
+  public static synchronized void syncLogs() {
+    // flush standard streams
+    //
+    System.out.flush();
+    System.err.flush();
+
+    // flush flushable appenders
+    //
+    final Logger rootLogger = Logger.getRootLogger();
+    flushAppenders(rootLogger);
+    final Enumeration<Logger> allLoggers = rootLogger.getLoggerRepository().
+      getCurrentLoggers();
+    while (allLoggers.hasMoreElements()) {
+      final Logger l = allLoggers.nextElement();
+      flushAppenders(l);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static void flushAppenders(Logger l) {
+    final Enumeration<Appender> allAppenders = l.getAllAppenders();
+    while (allAppenders.hasMoreElements()) {
+      final Appender a = allAppenders.nextElement();
+      if (a instanceof Flushable) {
+        try {
+          ((Flushable) a).flush();
+        } catch (IOException ioe) {
+          System.err.println(a + ": Failed to flush!"
+            + StringUtils.stringifyException(ioe));
+        }
+      }
+    }
+  }
+
+  public static ScheduledExecutorService createLogSyncer() {
+    final ScheduledExecutorService scheduler =
+      Executors.newSingleThreadScheduledExecutor(
+        new ThreadFactory() {
+          @Override
+          public Thread newThread(Runnable r) {
+            final Thread t = Executors.defaultThreadFactory().newThread(r);
+            t.setDaemon(true);
+            t.setName("Thread for syncLogs");
+            return t;
+          }
+        });
+    ShutdownHookManager.get().addShutdownHook(new Runnable() {
+        @Override
+        public void run() {
+          TaskLog.syncLogsShutdown(scheduler);
+        }
+      }, 50);
+    scheduler.scheduleWithFixedDelay(
+        new Runnable() {
+          @Override
+          public void run() {
+            TaskLog.syncLogs();
+          }
+        }, 0L, 5L, TimeUnit.SECONDS);
+    return scheduler;
+  }
+
   /**
    * The filter for userlogs.
    */

+ 3 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.Flushable;
 import java.util.LinkedList;
 import java.util.Queue;
 
@@ -31,7 +32,7 @@ import org.apache.log4j.spi.LoggingEvent;
  * 
  */
 @InterfaceStability.Unstable
-public class TaskLogAppender extends FileAppender {
+public class TaskLogAppender extends FileAppender implements Flushable {
   private String taskId; //taskId should be managed as String rather than TaskID object
   //so that log4j can configure it from the configuration(log4j.properties). 
   private Integer maxEvents;
@@ -92,6 +93,7 @@ public class TaskLogAppender extends FileAppender {
     }
   }
   
+  @Override
   public void flush() {
     if (qw != null) {
       qw.flush();

+ 5 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerLogAppender.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn;
 
 import java.io.File;
+import java.io.Flushable;
 import java.util.LinkedList;
 import java.util.Queue;
 
@@ -33,7 +34,9 @@ import org.apache.log4j.spi.LoggingEvent;
  */
 @Public
 @Unstable
-public class ContainerLogAppender extends FileAppender {
+public class ContainerLogAppender extends FileAppender
+  implements Flushable
+{
   private String containerLogDir;
   //so that log4j can configure it from the configuration(log4j.properties). 
   private int maxEvents;
@@ -65,6 +68,7 @@ public class ContainerLogAppender extends FileAppender {
     }
   }
   
+  @Override
   public void flush() {
     if (qw != null) {
       qw.flush();