Преглед на файлове

MAPREDUCE-4741. WARN and ERROR messages logged during normal AM shutdown. Contributed by Vinod Kumar Vavilapalli

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1401738 13f79535-47bb-0310-9956-ffa450edef68
Jason Darrell Lowe преди 12 години
родител
ревизия
1e45b1f1fd

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -607,6 +607,9 @@ Release 0.23.5 - UNRELEASED
 
 
     MAPREDUCE-4229. Intern counter names in the JT (Miomir Boljanovic and bobby via daryn)
     MAPREDUCE-4229. Intern counter names in the JT (Miomir Boljanovic and bobby via daryn)
 
 
+    MAPREDUCE-4741. WARN and ERROR messages logged during normal AM shutdown.
+    (Vinod Kumar Vavilapalli via jlowe)
+
 Release 0.23.4 - UNRELEASED
 Release 0.23.4 - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 11 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java

@@ -30,6 +30,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
@@ -81,6 +82,7 @@ public class ContainerLauncherImpl extends AbstractService implements
   protected BlockingQueue<ContainerLauncherEvent> eventQueue =
   protected BlockingQueue<ContainerLauncherEvent> eventQueue =
       new LinkedBlockingQueue<ContainerLauncherEvent>();
       new LinkedBlockingQueue<ContainerLauncherEvent>();
   YarnRPC rpc;
   YarnRPC rpc;
+  private final AtomicBoolean stopped;
 
 
   private Container getContainer(ContainerLauncherEvent event) {
   private Container getContainer(ContainerLauncherEvent event) {
     ContainerId id = event.getContainerID();
     ContainerId id = event.getContainerID();
@@ -237,6 +239,7 @@ public class ContainerLauncherImpl extends AbstractService implements
   public ContainerLauncherImpl(AppContext context) {
   public ContainerLauncherImpl(AppContext context) {
     super(ContainerLauncherImpl.class.getName());
     super(ContainerLauncherImpl.class.getName());
     this.context = context;
     this.context = context;
+    this.stopped = new AtomicBoolean(false);
   }
   }
 
 
   @Override
   @Override
@@ -271,11 +274,13 @@ public class ContainerLauncherImpl extends AbstractService implements
       @Override
       @Override
       public void run() {
       public void run() {
         ContainerLauncherEvent event = null;
         ContainerLauncherEvent event = null;
-        while (!Thread.currentThread().isInterrupted()) {
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
           try {
           try {
             event = eventQueue.take();
             event = eventQueue.take();
           } catch (InterruptedException e) {
           } catch (InterruptedException e) {
-            LOG.error("Returning, interrupted : " + e);
+            if (!stopped.get()) {
+              LOG.error("Returning, interrupted : " + e);
+            }
             return;
             return;
           }
           }
           int poolSize = launcherPool.getCorePoolSize();
           int poolSize = launcherPool.getCorePoolSize();
@@ -324,6 +329,10 @@ public class ContainerLauncherImpl extends AbstractService implements
   }
   }
 
 
   public void stop() {
   public void stop() {
+    if (stopped.getAndSet(true)) {
+      // return if already stopped
+      return;
+    }
     // shutdown any containers that might be left running
     // shutdown any containers that might be left running
     shutdownAllContainers();
     shutdownAllContainers();
     eventHandlingThread.interrupt();
     eventHandlingThread.interrupt();

+ 4 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java

@@ -67,7 +67,7 @@ public abstract class RMCommunicator extends AbstractService  {
   private int rmPollInterval;//millis
   private int rmPollInterval;//millis
   protected ApplicationId applicationId;
   protected ApplicationId applicationId;
   protected ApplicationAttemptId applicationAttemptId;
   protected ApplicationAttemptId applicationAttemptId;
-  private AtomicBoolean stopped;
+  private final AtomicBoolean stopped;
   protected Thread allocatorThread;
   protected Thread allocatorThread;
   @SuppressWarnings("rawtypes")
   @SuppressWarnings("rawtypes")
   protected EventHandler eventHandler;
   protected EventHandler eventHandler;
@@ -239,7 +239,9 @@ public abstract class RMCommunicator extends AbstractService  {
               // TODO: for other exceptions
               // TODO: for other exceptions
             }
             }
           } catch (InterruptedException e) {
           } catch (InterruptedException e) {
-            LOG.warn("Allocated thread interrupted. Returning.");
+            if (!stopped.get()) {
+              LOG.warn("Allocated thread interrupted. Returning.");
+            }
             return;
             return;
           }
           }
         }
         }

+ 11 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -32,6 +32,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
@@ -84,7 +85,7 @@ public class RMContainerAllocator extends RMContainerRequestor
   private static final Priority PRIORITY_MAP;
   private static final Priority PRIORITY_MAP;
 
 
   private Thread eventHandlingThread;
   private Thread eventHandlingThread;
-  private volatile boolean stopEventHandling;
+  private final AtomicBoolean stopped;
 
 
   static {
   static {
     PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
     PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
@@ -145,6 +146,7 @@ public class RMContainerAllocator extends RMContainerRequestor
 
 
   public RMContainerAllocator(ClientService clientService, AppContext context) {
   public RMContainerAllocator(ClientService clientService, AppContext context) {
     super(clientService, context);
     super(clientService, context);
+    this.stopped = new AtomicBoolean(false);
   }
   }
 
 
   @Override
   @Override
@@ -176,11 +178,13 @@ public class RMContainerAllocator extends RMContainerRequestor
 
 
         ContainerAllocatorEvent event;
         ContainerAllocatorEvent event;
 
 
-        while (!stopEventHandling && !Thread.currentThread().isInterrupted()) {
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
           try {
           try {
             event = RMContainerAllocator.this.eventQueue.take();
             event = RMContainerAllocator.this.eventQueue.take();
           } catch (InterruptedException e) {
           } catch (InterruptedException e) {
-            LOG.error("Returning, interrupted : " + e);
+            if (!stopped.get()) {
+              LOG.error("Returning, interrupted : " + e);
+            }
             return;
             return;
           }
           }
 
 
@@ -234,7 +238,10 @@ public class RMContainerAllocator extends RMContainerRequestor
 
 
   @Override
   @Override
   public void stop() {
   public void stop() {
-    this.stopEventHandling = true;
+    if (stopped.getAndSet(true)) {
+      // return if already stopped
+      return;
+    }
     eventHandlingThread.interrupt();
     eventHandlingThread.interrupt();
     super.stop();
     super.stop();
     LOG.info("Final Stats: " + getStat());
     LOG.info("Final Stats: " + getStat());

+ 11 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java

@@ -23,6 +23,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
@@ -43,10 +44,12 @@ public class TaskCleanerImpl extends AbstractService implements TaskCleaner {
   private Thread eventHandlingThread;
   private Thread eventHandlingThread;
   private BlockingQueue<TaskCleanupEvent> eventQueue =
   private BlockingQueue<TaskCleanupEvent> eventQueue =
       new LinkedBlockingQueue<TaskCleanupEvent>();
       new LinkedBlockingQueue<TaskCleanupEvent>();
+  private final AtomicBoolean stopped;
 
 
   public TaskCleanerImpl(AppContext context) {
   public TaskCleanerImpl(AppContext context) {
     super("TaskCleaner");
     super("TaskCleaner");
     this.context = context;
     this.context = context;
+    this.stopped = new AtomicBoolean(false);
   }
   }
 
 
   public void start() {
   public void start() {
@@ -59,11 +62,13 @@ public class TaskCleanerImpl extends AbstractService implements TaskCleaner {
       @Override
       @Override
       public void run() {
       public void run() {
         TaskCleanupEvent event = null;
         TaskCleanupEvent event = null;
-        while (!Thread.currentThread().isInterrupted()) {
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
           try {
           try {
             event = eventQueue.take();
             event = eventQueue.take();
           } catch (InterruptedException e) {
           } catch (InterruptedException e) {
-            LOG.error("Returning, interrupted : " + e);
+            if (!stopped.get()) {
+              LOG.error("Returning, interrupted : " + e);
+            }
             return;
             return;
           }
           }
           // the events from the queue are handled in parallel
           // the events from the queue are handled in parallel
@@ -77,6 +82,10 @@ public class TaskCleanerImpl extends AbstractService implements TaskCleaner {
   }
   }
 
 
   public void stop() {
   public void stop() {
+    if (stopped.getAndSet(true)) {
+      // return if already stopped
+      return;
+    }
     eventHandlingThread.interrupt();
     eventHandlingThread.interrupt();
     launcherPool.shutdown();
     launcherPool.shutdown();
     super.stop();
     super.stop();