Parcourir la source

YARN-3878. AsyncDispatcher can hang while stopping if it is configured for draining events on stop. Contributed by Varun Saxena

Jian He il y a 9 ans
Parent
commit
393fe71771

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -682,6 +682,9 @@ Release 2.7.2 - UNRELEASED
     YARN-3535. Scheduler must re-request container resources when RMContainer transitions
     from ALLOCATED to KILLED (rohithsharma and peng.zhang via asuresh)
 
+    YARN-3878. AsyncDispatcher can hang while stopping if it is configured for
+    draining events on stop. (Varun Saxena via jianhe)
+
 Release 2.7.1 - 2015-07-06 
 
   INCOMPATIBLE CHANGES

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java

@@ -246,6 +246,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
         if (!stopped) {
           LOG.warn("AsyncDispatcher thread interrupted", e);
         }
+        // Need to reset drained flag to true if event queue is empty,
+        // otherwise dispatcher will hang on stop.
+        drained = eventQueue.isEmpty();
         throw new YarnRuntimeException(e);
       }
     };
@@ -286,6 +289,11 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
     };
   }
 
+  @VisibleForTesting
+  protected boolean isEventThreadWaiting() {
+    return eventHandlingThread.getState() == Thread.State.WAITING;
+  }
+
   @VisibleForTesting
   protected boolean isDrained() {
     return this.drained;

+ 10 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java

@@ -27,10 +27,19 @@ public class DrainDispatcher extends AsyncDispatcher {
     this(new LinkedBlockingQueue<Event>());
   }
 
-  private DrainDispatcher(BlockingQueue<Event> eventQueue) {
+  public DrainDispatcher(BlockingQueue<Event> eventQueue) {
     super(eventQueue);
   }
 
+  /**
+   *  Wait till event thread enters WAITING state (i.e. waiting for new events).
+   */
+  public void waitForEventThreadToWait() {
+    while (!isEventThreadWaiting()) {
+      Thread.yield();
+    }
+  }
+
   /**
    * Busy loop waiting for all queued events to drain.
    */

+ 62 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java

@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.event;
+
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAsyncDispatcher {
+
+  /* This test checks whether dispatcher hangs on close if following two things
+   * happen :
+   * 1. A thread which was putting event to event queue is interrupted.
+   * 2. Event queue is empty on close.
+   */
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Test(timeout=10000)
+  public void testDispatcherOnCloseIfQueueEmpty() throws Exception {
+    BlockingQueue<Event> eventQueue = spy(new LinkedBlockingQueue<Event>());
+    Event event = mock(Event.class);
+    doThrow(new InterruptedException()).when(eventQueue).put(event);
+    DrainDispatcher disp = new DrainDispatcher(eventQueue);
+    disp.init(new Configuration());
+    disp.setDrainEventsOnStop();
+    disp.start();
+    // Wait for event handler thread to start and begin waiting for events.
+    disp.waitForEventThreadToWait();
+    try {
+      disp.getEventHandler().handle(event);
+    } catch (YarnRuntimeException e) {
+    }
+    // Queue should be empty and dispatcher should not hang on close
+    Assert.assertTrue("Event Queue should have been empty",
+        eventQueue.isEmpty());
+    disp.close();
+  }
+}
+