|
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
|
|
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.fail;
|
|
|
+import static org.mockito.Matchers.any;
|
|
|
import static org.mockito.Matchers.anyBoolean;
|
|
|
import static org.mockito.Matchers.anyInt;
|
|
|
import static org.mockito.Matchers.argThat;
|
|
@@ -27,6 +28,7 @@ import static org.mockito.Matchers.isA;
|
|
|
import static org.mockito.Matchers.same;
|
|
|
import static org.mockito.Mockito.doNothing;
|
|
|
import static org.mockito.Mockito.doReturn;
|
|
|
+import static org.mockito.Mockito.doThrow;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.never;
|
|
|
import static org.mockito.Mockito.spy;
|
|
@@ -57,6 +59,7 @@ import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.security.Credentials;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
+import org.apache.hadoop.yarn.YarnException;
|
|
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
|
|
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
|
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
|
@@ -76,47 +79,28 @@ public class TestContainerLocalizer {
|
|
|
static final Path basedir =
|
|
|
new Path("target", TestContainerLocalizer.class.getName());
|
|
|
|
|
|
+ static final String appUser = "yak";
|
|
|
+ static final String appId = "app_RM_0";
|
|
|
+ static final String containerId = "container_0";
|
|
|
+ static final InetSocketAddress nmAddr =
|
|
|
+ new InetSocketAddress("foobar", 8040);
|
|
|
+
|
|
|
+ private AbstractFileSystem spylfs;
|
|
|
+ private Random random;
|
|
|
+ private List<Path> localDirs;
|
|
|
+ private Path tokenPath;
|
|
|
+ private LocalizationProtocol nmProxy;
|
|
|
+
|
|
|
@Test
|
|
|
- @SuppressWarnings("unchecked") // mocked generics
|
|
|
public void testContainerLocalizerMain() throws Exception {
|
|
|
- Configuration conf = new Configuration();
|
|
|
- AbstractFileSystem spylfs =
|
|
|
- spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
|
|
|
- // don't actually create dirs
|
|
|
- doNothing().when(spylfs).mkdir(
|
|
|
- isA(Path.class), isA(FsPermission.class), anyBoolean());
|
|
|
- FileContext lfs = FileContext.getFileContext(spylfs, conf);
|
|
|
- final String user = "yak";
|
|
|
- final String appId = "app_RM_0";
|
|
|
- final String cId = "container_0";
|
|
|
- final InetSocketAddress nmAddr = new InetSocketAddress("foobar", 8040);
|
|
|
- final List<Path> localDirs = new ArrayList<Path>();
|
|
|
- for (int i = 0; i < 4; ++i) {
|
|
|
- localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
|
|
|
- }
|
|
|
- RecordFactory mockRF = getMockLocalizerRecordFactory();
|
|
|
- ContainerLocalizer concreteLoc = new ContainerLocalizer(lfs, user,
|
|
|
- appId, cId, localDirs, mockRF);
|
|
|
- ContainerLocalizer localizer = spy(concreteLoc);
|
|
|
-
|
|
|
- // return credential stream instead of opening local file
|
|
|
- final Random r = new Random();
|
|
|
- long seed = r.nextLong();
|
|
|
- r.setSeed(seed);
|
|
|
- System.out.println("SEED: " + seed);
|
|
|
- DataInputBuffer appTokens = createFakeCredentials(r, 10);
|
|
|
- Path tokenPath =
|
|
|
- lfs.makeQualified(new Path(
|
|
|
- String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, cId)));
|
|
|
- doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens))
|
|
|
- ).when(spylfs).open(tokenPath);
|
|
|
+ ContainerLocalizer localizer = setupContainerLocalizerForTest();
|
|
|
|
|
|
// mock heartbeat responses from NM
|
|
|
- LocalizationProtocol nmProxy = mock(LocalizationProtocol.class);
|
|
|
- LocalResource rsrcA = getMockRsrc(r, LocalResourceVisibility.PRIVATE);
|
|
|
- LocalResource rsrcB = getMockRsrc(r, LocalResourceVisibility.PRIVATE);
|
|
|
- LocalResource rsrcC = getMockRsrc(r, LocalResourceVisibility.APPLICATION);
|
|
|
- LocalResource rsrcD = getMockRsrc(r, LocalResourceVisibility.PRIVATE);
|
|
|
+ LocalResource rsrcA = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
|
|
|
+ LocalResource rsrcB = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
|
|
|
+ LocalResource rsrcC = getMockRsrc(random,
|
|
|
+ LocalResourceVisibility.APPLICATION);
|
|
|
+ LocalResource rsrcD = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
|
|
|
when(nmProxy.heartbeat(isA(LocalizerStatus.class)))
|
|
|
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
|
|
|
Collections.singletonList(rsrcA)))
|
|
@@ -130,6 +114,7 @@ public class TestContainerLocalizer {
|
|
|
Collections.<LocalResource>emptyList()))
|
|
|
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.DIE,
|
|
|
null));
|
|
|
+
|
|
|
doReturn(new FakeDownload(rsrcA.getResource().getFile(), true)).when(
|
|
|
localizer).download(isA(LocalDirAllocator.class), eq(rsrcA),
|
|
|
isA(UserGroupInformation.class));
|
|
@@ -142,33 +127,13 @@ public class TestContainerLocalizer {
|
|
|
doReturn(new FakeDownload(rsrcD.getResource().getFile(), true)).when(
|
|
|
localizer).download(isA(LocalDirAllocator.class), eq(rsrcD),
|
|
|
isA(UserGroupInformation.class));
|
|
|
- doReturn(nmProxy).when(localizer).getProxy(nmAddr);
|
|
|
- doNothing().when(localizer).sleep(anyInt());
|
|
|
-
|
|
|
- // return result instantly for deterministic test
|
|
|
- ExecutorService syncExec = mock(ExecutorService.class);
|
|
|
- CompletionService<Path> cs = mock(CompletionService.class);
|
|
|
- when(cs.submit(isA(Callable.class)))
|
|
|
- .thenAnswer(new Answer<Future<Path>>() {
|
|
|
- @Override
|
|
|
- public Future<Path> answer(InvocationOnMock invoc)
|
|
|
- throws Throwable {
|
|
|
- Future<Path> done = mock(Future.class);
|
|
|
- when(done.isDone()).thenReturn(true);
|
|
|
- FakeDownload d = (FakeDownload) invoc.getArguments()[0];
|
|
|
- when(done.get()).thenReturn(d.call());
|
|
|
- return done;
|
|
|
- }
|
|
|
- });
|
|
|
- doReturn(syncExec).when(localizer).createDownloadThreadPool();
|
|
|
- doReturn(cs).when(localizer).createCompletionService(syncExec);
|
|
|
|
|
|
// run localization
|
|
|
assertEquals(0, localizer.runLocalization(nmAddr));
|
|
|
|
|
|
// verify created cache
|
|
|
for (Path p : localDirs) {
|
|
|
- Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), user);
|
|
|
+ Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser);
|
|
|
Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
|
|
|
// $x/usercache/$user/filecache
|
|
|
verify(spylfs).mkdir(eq(privcache), isA(FsPermission.class), eq(false));
|
|
@@ -194,11 +159,91 @@ public class TestContainerLocalizer {
|
|
|
@Override
|
|
|
public boolean matches(Object o) {
|
|
|
LocalizerStatus status = (LocalizerStatus) o;
|
|
|
- return !cId.equals(status.getLocalizerId());
|
|
|
+ return !containerId.equals(status.getLocalizerId());
|
|
|
}
|
|
|
}));
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ @SuppressWarnings("unchecked") // mocked generics
|
|
|
+ public void testContainerLocalizerClosesFilesystems() throws Exception {
|
|
|
+ // verify filesystems are closed when localizer doesn't fail
|
|
|
+ ContainerLocalizer localizer = setupContainerLocalizerForTest();
|
|
|
+ doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class),
|
|
|
+ any(CompletionService.class), any(UserGroupInformation.class));
|
|
|
+ verify(localizer, never()).closeFileSystems(
|
|
|
+ any(UserGroupInformation.class));
|
|
|
+ localizer.runLocalization(nmAddr);
|
|
|
+ verify(localizer).closeFileSystems(any(UserGroupInformation.class));
|
|
|
+
|
|
|
+ // verify filesystems are closed when localizer fails
|
|
|
+ localizer = setupContainerLocalizerForTest();
|
|
|
+ doThrow(new YarnException("Forced Failure")).when(localizer).localizeFiles(
|
|
|
+ any(LocalizationProtocol.class), any(CompletionService.class),
|
|
|
+ any(UserGroupInformation.class));
|
|
|
+ verify(localizer, never()).closeFileSystems(
|
|
|
+ any(UserGroupInformation.class));
|
|
|
+ localizer.runLocalization(nmAddr);
|
|
|
+ verify(localizer).closeFileSystems(any(UserGroupInformation.class));
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked") // mocked generics
|
|
|
+ private ContainerLocalizer setupContainerLocalizerForTest()
|
|
|
+ throws Exception {
|
|
|
+ spylfs = spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
|
|
|
+ // don't actually create dirs
|
|
|
+ doNothing().when(spylfs).mkdir(
|
|
|
+ isA(Path.class), isA(FsPermission.class), anyBoolean());
|
|
|
+
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ FileContext lfs = FileContext.getFileContext(spylfs, conf);
|
|
|
+ localDirs = new ArrayList<Path>();
|
|
|
+ for (int i = 0; i < 4; ++i) {
|
|
|
+ localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
|
|
|
+ }
|
|
|
+ RecordFactory mockRF = getMockLocalizerRecordFactory();
|
|
|
+ ContainerLocalizer concreteLoc = new ContainerLocalizer(lfs, appUser,
|
|
|
+ appId, containerId, localDirs, mockRF);
|
|
|
+ ContainerLocalizer localizer = spy(concreteLoc);
|
|
|
+
|
|
|
+ // return credential stream instead of opening local file
|
|
|
+ random = new Random();
|
|
|
+ long seed = random.nextLong();
|
|
|
+ System.out.println("SEED: " + seed);
|
|
|
+ random.setSeed(seed);
|
|
|
+ DataInputBuffer appTokens = createFakeCredentials(random, 10);
|
|
|
+ tokenPath =
|
|
|
+ lfs.makeQualified(new Path(
|
|
|
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
|
|
|
+ containerId)));
|
|
|
+ doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens))
|
|
|
+ ).when(spylfs).open(tokenPath);
|
|
|
+
|
|
|
+ nmProxy = mock(LocalizationProtocol.class);
|
|
|
+ doReturn(nmProxy).when(localizer).getProxy(nmAddr);
|
|
|
+ doNothing().when(localizer).sleep(anyInt());
|
|
|
+
|
|
|
+ // return result instantly for deterministic test
|
|
|
+ ExecutorService syncExec = mock(ExecutorService.class);
|
|
|
+ CompletionService<Path> cs = mock(CompletionService.class);
|
|
|
+ when(cs.submit(isA(Callable.class)))
|
|
|
+ .thenAnswer(new Answer<Future<Path>>() {
|
|
|
+ @Override
|
|
|
+ public Future<Path> answer(InvocationOnMock invoc)
|
|
|
+ throws Throwable {
|
|
|
+ Future<Path> done = mock(Future.class);
|
|
|
+ when(done.isDone()).thenReturn(true);
|
|
|
+ FakeDownload d = (FakeDownload) invoc.getArguments()[0];
|
|
|
+ when(done.get()).thenReturn(d.call());
|
|
|
+ return done;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ doReturn(syncExec).when(localizer).createDownloadThreadPool();
|
|
|
+ doReturn(cs).when(localizer).createCompletionService(syncExec);
|
|
|
+
|
|
|
+ return localizer;
|
|
|
+ }
|
|
|
+
|
|
|
static class HBMatches extends ArgumentMatcher<LocalizerStatus> {
|
|
|
final LocalResource rsrc;
|
|
|
HBMatches(LocalResource rsrc) {
|