|
@@ -32,6 +32,7 @@ import static org.mockito.Mockito.doThrow;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.never;
|
|
|
import static org.mockito.Mockito.spy;
|
|
|
+import static org.mockito.Mockito.times;
|
|
|
import static org.mockito.Mockito.verify;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
@@ -46,6 +47,8 @@ import java.util.concurrent.CompletionService;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Future;
|
|
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.AbstractFileSystem;
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
@@ -77,6 +80,7 @@ import org.mockito.stubbing.Answer;
|
|
|
|
|
|
public class TestContainerLocalizer {
|
|
|
|
|
|
+ static final Log LOG = LogFactory.getLog(TestContainerLocalizer.class);
|
|
|
static final Path basedir =
|
|
|
new Path("target", TestContainerLocalizer.class.getName());
|
|
|
|
|
@@ -94,7 +98,10 @@ public class TestContainerLocalizer {
|
|
|
|
|
|
@Test
|
|
|
public void testContainerLocalizerMain() throws Exception {
|
|
|
- ContainerLocalizer localizer = setupContainerLocalizerForTest();
|
|
|
+ FileContext fs = FileContext.getLocalFSFileContext();
|
|
|
+ spylfs = spy(fs.getDefaultFileSystem());
|
|
|
+ ContainerLocalizer localizer =
|
|
|
+ setupContainerLocalizerForTest();
|
|
|
|
|
|
// verify created cache
|
|
|
List<Path> privCacheList = new ArrayList<Path>();
|
|
@@ -190,11 +197,25 @@ public class TestContainerLocalizer {
|
|
|
}
|
|
|
}));
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ public void testLocalizerTokenIsGettingRemoved() throws Exception {
|
|
|
+ FileContext fs = FileContext.getLocalFSFileContext();
|
|
|
+ spylfs = spy(fs.getDefaultFileSystem());
|
|
|
+ ContainerLocalizer localizer = setupContainerLocalizerForTest();
|
|
|
+ doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class),
|
|
|
+ any(CompletionService.class), any(UserGroupInformation.class));
|
|
|
+ localizer.runLocalization(nmAddr);
|
|
|
+ verify(spylfs, times(1)).delete(tokenPath, false);
|
|
|
+ }
|
|
|
|
|
|
@Test
|
|
|
@SuppressWarnings("unchecked") // mocked generics
|
|
|
public void testContainerLocalizerClosesFilesystems() throws Exception {
|
|
|
// verify filesystems are closed when localizer doesn't fail
|
|
|
+ FileContext fs = FileContext.getLocalFSFileContext();
|
|
|
+ spylfs = spy(fs.getDefaultFileSystem());
|
|
|
ContainerLocalizer localizer = setupContainerLocalizerForTest();
|
|
|
doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class),
|
|
|
any(CompletionService.class), any(UserGroupInformation.class));
|
|
@@ -203,6 +224,7 @@ public class TestContainerLocalizer {
|
|
|
localizer.runLocalization(nmAddr);
|
|
|
verify(localizer).closeFileSystems(any(UserGroupInformation.class));
|
|
|
|
|
|
+ spylfs = spy(fs.getDefaultFileSystem());
|
|
|
// verify filesystems are closed when localizer fails
|
|
|
localizer = setupContainerLocalizerForTest();
|
|
|
doThrow(new YarnRuntimeException("Forced Failure")).when(localizer).localizeFiles(
|
|
@@ -217,7 +239,6 @@ public class TestContainerLocalizer {
|
|
|
@SuppressWarnings("unchecked") // mocked generics
|
|
|
private ContainerLocalizer setupContainerLocalizerForTest()
|
|
|
throws Exception {
|
|
|
- spylfs = spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
|
|
|
// don't actually create dirs
|
|
|
doNothing().when(spylfs).mkdir(
|
|
|
isA(Path.class), isA(FsPermission.class), anyBoolean());
|
|
@@ -245,10 +266,10 @@ public class TestContainerLocalizer {
|
|
|
containerId)));
|
|
|
doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens))
|
|
|
).when(spylfs).open(tokenPath);
|
|
|
-
|
|
|
nmProxy = mock(LocalizationProtocol.class);
|
|
|
doReturn(nmProxy).when(localizer).getProxy(nmAddr);
|
|
|
doNothing().when(localizer).sleep(anyInt());
|
|
|
+
|
|
|
|
|
|
// return result instantly for deterministic test
|
|
|
ExecutorService syncExec = mock(ExecutorService.class);
|