|
@@ -43,6 +43,7 @@ import static org.mockito.Mockito.when;
|
|
|
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
+import java.lang.reflect.Constructor;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.URI;
|
|
|
import java.net.URISyntaxException;
|
|
@@ -69,6 +70,7 @@ import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.AbstractFileSystem;
|
|
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
+import org.apache.hadoop.fs.FSError;
|
|
|
import org.apache.hadoop.fs.FileContext;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
|
@@ -715,6 +717,86 @@ public class TestResourceLocalizationService {
|
|
|
stateStore.close();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+ @Test( timeout = 10000)
|
|
|
+ @SuppressWarnings("unchecked") // mocked generics
|
|
|
+ public void testLocalizerRunnerException() throws Exception {
|
|
|
+ DrainDispatcher dispatcher = new DrainDispatcher();
|
|
|
+ dispatcher.init(conf);
|
|
|
+ dispatcher.start();
|
|
|
+ EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
|
|
|
+ dispatcher.register(ApplicationEventType.class, applicationBus);
|
|
|
+ EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
|
|
|
+ dispatcher.register(ContainerEventType.class, containerBus);
|
|
|
+
|
|
|
+ ContainerExecutor exec = mock(ContainerExecutor.class);
|
|
|
+ LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
|
|
|
+ LocalDirsHandlerService dirsHandlerSpy = spy(dirsHandler);
|
|
|
+ dirsHandlerSpy.init(conf);
|
|
|
+
|
|
|
+ DeletionService delServiceReal = new DeletionService(exec);
|
|
|
+ DeletionService delService = spy(delServiceReal);
|
|
|
+ delService.init(new Configuration());
|
|
|
+ delService.start();
|
|
|
+
|
|
|
+ ResourceLocalizationService rawService =
|
|
|
+ new ResourceLocalizationService(dispatcher, exec, delService,
|
|
|
+ dirsHandlerSpy, nmContext);
|
|
|
+ ResourceLocalizationService spyService = spy(rawService);
|
|
|
+ doReturn(mockServer).when(spyService).createServer();
|
|
|
+ try {
|
|
|
+ spyService.init(conf);
|
|
|
+ spyService.start();
|
|
|
+
|
|
|
+ // init application
|
|
|
+ final Application app = mock(Application.class);
|
|
|
+ final ApplicationId appId =
|
|
|
+ BuilderUtils.newApplicationId(314159265358979L, 3);
|
|
|
+ when(app.getUser()).thenReturn("user0");
|
|
|
+ when(app.getAppId()).thenReturn(appId);
|
|
|
+ spyService.handle(new ApplicationLocalizationEvent(
|
|
|
+ LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
|
|
|
+ dispatcher.await();
|
|
|
+
|
|
|
+ Random r = new Random();
|
|
|
+ long seed = r.nextLong();
|
|
|
+ System.out.println("SEED: " + seed);
|
|
|
+ r.setSeed(seed);
|
|
|
+ final Container c = getMockContainer(appId, 42, "user0");
|
|
|
+ final LocalResource resource1 = getPrivateMockedResource(r);
|
|
|
+ System.out.println("Here 4");
|
|
|
+
|
|
|
+ final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
|
|
|
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
|
|
|
+ new HashMap<LocalResourceVisibility,
|
|
|
+ Collection<LocalResourceRequest>>();
|
|
|
+ List<LocalResourceRequest> privateResourceList =
|
|
|
+ new ArrayList<LocalResourceRequest>();
|
|
|
+ privateResourceList.add(req1);
|
|
|
+ rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
|
|
|
+
|
|
|
+ final Constructor<?>[] constructors =
|
|
|
+ FSError.class.getDeclaredConstructors();
|
|
|
+ constructors[0].setAccessible(true);
|
|
|
+ FSError fsError =
|
|
|
+ (FSError) constructors[0].newInstance(new IOException("Disk Error"));
|
|
|
+
|
|
|
+ Mockito
|
|
|
+ .doThrow(fsError)
|
|
|
+ .when(dirsHandlerSpy)
|
|
|
+ .getLocalPathForWrite(isA(String.class));
|
|
|
+ spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
|
|
|
+ Thread.sleep(1000);
|
|
|
+ dispatcher.await();
|
|
|
+ // Verify if ContainerResourceFailedEvent is invoked on FSError
|
|
|
+ verify(containerBus).handle(isA(ContainerResourceFailedEvent.class));
|
|
|
+ } finally {
|
|
|
+ spyService.stop();
|
|
|
+ dispatcher.stop();
|
|
|
+ delService.stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
@Test( timeout = 10000)
|
|
|
@SuppressWarnings("unchecked") // mocked generics
|