|
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
+import org.apache.hadoop.yarn.event.InlineDispatcher;
|
|
|
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
|
@@ -202,6 +203,32 @@ public class TestNonAggregatingLogHandler {
|
|
|
verify(logHandler.mockSched).shutdownNow();
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testHandlingApplicationFinishedEvent() {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ LocalDirsHandlerService dirsService = new LocalDirsHandlerService();
|
|
|
+ DeletionService delService = new DeletionService(null);
|
|
|
+ NonAggregatingLogHandler aggregatingLogHandler =
|
|
|
+ new NonAggregatingLogHandler(new InlineDispatcher(),
|
|
|
+ delService,
|
|
|
+ dirsService);
|
|
|
+
|
|
|
+ dirsService.init(conf);
|
|
|
+ dirsService.start();
|
|
|
+ delService.init(conf);
|
|
|
+ delService.start();
|
|
|
+ aggregatingLogHandler.init(conf);
|
|
|
+ aggregatingLogHandler.start();
|
|
|
+ ApplicationId appId = BuilderUtils.newApplicationId(1234, 1);
|
|
|
+ // It should NOT throw RejectedExecutionException
|
|
|
+ aggregatingLogHandler.handle(new LogHandlerAppFinishedEvent(appId));
|
|
|
+ aggregatingLogHandler.stop();
|
|
|
+
|
|
|
+ // It should NOT throw RejectedExecutionException after stopping
|
|
|
+ // handler service.
|
|
|
+ aggregatingLogHandler.handle(new LogHandlerAppFinishedEvent(appId));
|
|
|
+ }
|
|
|
+
|
|
|
private class NonAggregatingLogHandlerWithMockExecutor extends
|
|
|
NonAggregatingLogHandler {
|
|
|
|