|
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.any;
|
|
|
import static org.mockito.Mockito.doAnswer;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
|
|
|
+import java.io.DataOutputStream;
|
|
|
import java.io.File;
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.FileReader;
|
|
@@ -41,14 +42,6 @@ import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Random;
|
|
|
|
|
|
-import org.apache.hadoop.fs.FileUtil;
|
|
|
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
-import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
-import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
|
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
|
|
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.AbstractFileSystem;
|
|
@@ -57,20 +50,30 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileContext;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
+import org.apache.hadoop.fs.FileUtil;
|
|
|
+import org.apache.hadoop.fs.FsStatus;
|
|
|
import org.apache.hadoop.fs.Options.CreateOpts;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.io.DataInputBuffer;
|
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
+import org.apache.hadoop.security.Credentials;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FakeFSDataInputStream;
|
|
|
|
|
|
+import org.junit.After;
|
|
|
import org.junit.AfterClass;
|
|
|
+import org.junit.Assert;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
-import org.junit.After;
|
|
|
-import org.junit.Assert;
|
|
|
import org.mockito.invocation.InvocationOnMock;
|
|
|
import org.mockito.stubbing.Answer;
|
|
|
|
|
@@ -296,6 +299,102 @@ public class TestDefaultContainerExecutor {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Test(timeout = 30000)
|
|
|
+ public void testStartLocalizer()
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ InetSocketAddress localizationServerAddress;
|
|
|
+ final Path firstDir = new Path(BASE_TMP_PATH, "localDir1");
|
|
|
+ List<String> localDirs = new ArrayList<String>();
|
|
|
+ final Path secondDir = new Path(BASE_TMP_PATH, "localDir2");
|
|
|
+ List<String> logDirs = new ArrayList<String>();
|
|
|
+ final Path logDir = new Path(BASE_TMP_PATH, "logDir");
|
|
|
+ final Path tokenDir = new Path(BASE_TMP_PATH, "tokenDir");
|
|
|
+ FsPermission perms = new FsPermission((short)0770);
|
|
|
+
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ localizationServerAddress = conf.getSocketAddr(
|
|
|
+ YarnConfiguration.NM_BIND_HOST,
|
|
|
+ YarnConfiguration.NM_LOCALIZER_ADDRESS,
|
|
|
+ YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
|
|
|
+ YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
|
|
|
+
|
|
|
+ final FileContext mockLfs = spy(FileContext.getLocalFSFileContext(conf));
|
|
|
+ final FileContext.Util mockUtil = spy(mockLfs.util());
|
|
|
+ doAnswer(new Answer() {
|
|
|
+ @Override
|
|
|
+ public Object answer(InvocationOnMock invocationOnMock)
|
|
|
+ throws Throwable {
|
|
|
+ return mockUtil;
|
|
|
+ }
|
|
|
+ }).when(mockLfs).util();
|
|
|
+ doAnswer(new Answer() {
|
|
|
+ @Override
|
|
|
+ public Object answer(InvocationOnMock invocationOnMock)
|
|
|
+ throws Throwable {
|
|
|
+ Path dest = (Path) invocationOnMock.getArguments()[1];
|
|
|
+ if (dest.toString().contains(firstDir.toString())) {
|
|
|
+ // throw an Exception when copy token to the first local dir
|
|
|
+ // to simulate no space on the first drive
|
|
|
+ throw new IOException("No space on this drive " +
|
|
|
+ dest.toString());
|
|
|
+ } else {
|
|
|
+ // copy token to the second local dir
|
|
|
+ DataOutputStream tokenOut = null;
|
|
|
+ try {
|
|
|
+ Credentials credentials = new Credentials();
|
|
|
+ tokenOut = mockLfs.create(dest,
|
|
|
+ EnumSet.of(CREATE, OVERWRITE));
|
|
|
+ credentials.writeTokenStorageToStream(tokenOut);
|
|
|
+ } finally {
|
|
|
+ if (tokenOut != null) {
|
|
|
+ tokenOut.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }).when(mockUtil).copy(any(Path.class), any(Path.class));
|
|
|
+ doAnswer(new Answer() {
|
|
|
+ @Override
|
|
|
+ public Object answer(InvocationOnMock invocationOnMock)
|
|
|
+ throws Throwable {
|
|
|
+ Path p = (Path) invocationOnMock.getArguments()[0];
|
|
|
+ // let second local directory return more free space than
|
|
|
+ // first local directory
|
|
|
+ if (p.toString().contains(firstDir.toString())) {
|
|
|
+ return new FsStatus(2000, 2000, 0);
|
|
|
+ } else {
|
|
|
+ return new FsStatus(1000, 0, 1000);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }).when(mockLfs).getFsStatus(any(Path.class));
|
|
|
+
|
|
|
+ DefaultContainerExecutor mockExec = spy(new DefaultContainerExecutor(
|
|
|
+ mockLfs));
|
|
|
+ mockExec.setConf(conf);
|
|
|
+ localDirs.add(mockLfs.makeQualified(firstDir).toString());
|
|
|
+ localDirs.add(mockLfs.makeQualified(secondDir).toString());
|
|
|
+ logDirs.add(mockLfs.makeQualified(logDir).toString());
|
|
|
+ conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS,
|
|
|
+ localDirs.toArray(new String[localDirs.size()]));
|
|
|
+ conf.set(YarnConfiguration.NM_LOG_DIRS, logDir.toString());
|
|
|
+ mockLfs.mkdir(tokenDir, perms, true);
|
|
|
+ Path nmPrivateCTokensPath = new Path(tokenDir, "test.tokens");
|
|
|
+ String appSubmitter = "nobody";
|
|
|
+ String appId = "APP_ID";
|
|
|
+ String locId = "LOC_ID";
|
|
|
+ try {
|
|
|
+ mockExec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
|
|
|
+ appSubmitter, appId, locId, localDirs, logDirs);
|
|
|
+ } catch (IOException e) {
|
|
|
+ Assert.fail("StartLocalizer failed to copy token file " + e);
|
|
|
+ } finally {
|
|
|
+ mockExec.deleteAsUser(appSubmitter, firstDir);
|
|
|
+ mockExec.deleteAsUser(appSubmitter, secondDir);
|
|
|
+ mockExec.deleteAsUser(appSubmitter, logDir);
|
|
|
+ deleteTmpFiles();
|
|
|
+ }
|
|
|
+ }
|
|
|
// @Test
|
|
|
// public void testInit() throws IOException, InterruptedException {
|
|
|
// Configuration conf = new Configuration();
|