|
@@ -97,12 +97,23 @@ public class TestAsyncDispatcher {
|
|
}
|
|
}
|
|
|
|
|
|
private static class TestHandler implements EventHandler<Event> {
|
|
private static class TestHandler implements EventHandler<Event> {
|
|
|
|
+
|
|
|
|
+ private long sleepTime = 1500;
|
|
|
|
+
|
|
|
|
+ TestHandler() {
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ TestHandler(long sleepTime) {
|
|
|
|
+ this.sleepTime = sleepTime;
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public void handle(Event event) {
|
|
public void handle(Event event) {
|
|
try {
|
|
try {
|
|
// As long as 10000 events queued
|
|
// As long as 10000 events queued
|
|
- Thread.sleep(1500);
|
|
|
|
- } catch (InterruptedException e) {}
|
|
|
|
|
|
+ Thread.sleep(this.sleepTime);
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -170,11 +181,54 @@ public class TestAsyncDispatcher {
|
|
//Make sure more than one event to take
|
|
//Make sure more than one event to take
|
|
verify(log, atLeastOnce()).
|
|
verify(log, atLeastOnce()).
|
|
info("Latest dispatch event type: TestEventType");
|
|
info("Latest dispatch event type: TestEventType");
|
|
|
|
+ } finally {
|
|
|
|
+ //... restore logger object
|
|
|
|
+ logger.set(null, oldLog);
|
|
dispatcher.stop();
|
|
dispatcher.stop();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //Test print dispatcher details when the blocking queue is heavy
|
|
|
|
+ @Test(timeout = 60000)
|
|
|
|
+ public void testPrintDispatcherEventDetailsAvoidDeadLoop() throws Exception {
|
|
|
|
+ for (int i = 0; i < 5; i++) {
|
|
|
|
+ testPrintDispatcherEventDetailsAvoidDeadLoopInternal();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void testPrintDispatcherEventDetailsAvoidDeadLoopInternal()
|
|
|
|
+ throws Exception {
|
|
|
|
+ YarnConfiguration conf = new YarnConfiguration();
|
|
|
|
+ conf.setInt(YarnConfiguration.
|
|
|
|
+ YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD, 10);
|
|
|
|
+ Log log = mock(Log.class);
|
|
|
|
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
|
|
|
|
+ dispatcher.init(conf);
|
|
|
|
+
|
|
|
|
+ Field logger = AsyncDispatcher.class.getDeclaredField("LOG");
|
|
|
|
+ logger.setAccessible(true);
|
|
|
|
+ Field modifiers = Field.class.getDeclaredField("modifiers");
|
|
|
|
+ modifiers.setAccessible(true);
|
|
|
|
+ modifiers.setInt(logger, logger.getModifiers() & ~Modifier.FINAL);
|
|
|
|
+ Object oldLog = logger.get(null);
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ logger.set(null, log);
|
|
|
|
+ dispatcher.register(TestEnum.class, new TestHandler(0));
|
|
|
|
+ dispatcher.start();
|
|
|
|
+
|
|
|
|
+ for (int i = 0; i < 10000; ++i) {
|
|
|
|
+ Event event = mock(Event.class);
|
|
|
|
+ when(event.getType()).thenReturn(TestEnum.TestEventType);
|
|
|
|
+ dispatcher.getEventHandler().handle(event);
|
|
|
|
+ }
|
|
|
|
+ Thread.sleep(3000);
|
|
} finally {
|
|
} finally {
|
|
//... restore logger object
|
|
//... restore logger object
|
|
logger.set(null, oldLog);
|
|
logger.set(null, oldLog);
|
|
|
|
+ dispatcher.stop();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
}
|
|
}
|
|
|
|
|