|
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertEquals;
|
|
|
import java.io.File;
|
|
|
import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
|
+import java.net.URI;
|
|
|
import java.net.URISyntaxException;
|
|
|
import java.util.EnumSet;
|
|
|
import java.util.HashMap;
|
|
@@ -47,10 +48,12 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileContext;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.FileUtil;
|
|
|
import org.apache.hadoop.fs.LocalDirAllocator;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
|
|
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
|
|
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
|
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
|
@@ -113,7 +116,127 @@ public class TestFSDownload {
|
|
|
return ret;
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
+ static LocalResource createTarFile(FileContext files, Path p, int len,
|
|
|
+ Random r, LocalResourceVisibility vis) throws IOException,
|
|
|
+ URISyntaxException {
|
|
|
+
|
|
|
+ FSDataOutputStream outFile = null;
|
|
|
+ try {
|
|
|
+ byte[] bytes = new byte[len];
|
|
|
+ Path tarPath = new Path(p.toString());
|
|
|
+ outFile = files.create(tarPath, EnumSet.of(CREATE, OVERWRITE));
|
|
|
+ r.nextBytes(bytes);
|
|
|
+ outFile.write(bytes);
|
|
|
+ } finally {
|
|
|
+ if (outFile != null)
|
|
|
+ outFile.close();
|
|
|
+ }
|
|
|
+ StringBuffer tarCommand = new StringBuffer();
|
|
|
+ URI u = new URI(p.getParent().toString());
|
|
|
+ tarCommand.append("cd '");
|
|
|
+ tarCommand.append(FileUtil.makeShellPath(u.getPath().toString()));
|
|
|
+ tarCommand.append("' ; ");
|
|
|
+ tarCommand.append("tar -czf " + p.getName() + ".tar " + p.getName());
|
|
|
+ String[] shellCmd = { "bash", "-c", tarCommand.toString() };
|
|
|
+ ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd);
|
|
|
+ shexec.execute();
|
|
|
+ int exitcode = shexec.getExitCode();
|
|
|
+ if (exitcode != 0) {
|
|
|
+ throw new IOException("Error untarring file " + p
|
|
|
+ + ". Tar process exited with exit code " + exitcode);
|
|
|
+ }
|
|
|
+ LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
|
|
|
+ ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString()
|
|
|
+ + ".tar")));
|
|
|
+ ret.setSize(len);
|
|
|
+ ret.setType(LocalResourceType.ARCHIVE);
|
|
|
+ ret.setVisibility(vis);
|
|
|
+ ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".tar"))
|
|
|
+ .getModificationTime());
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+
|
|
|
+ static LocalResource createJarFile(FileContext files, Path p, int len,
|
|
|
+ Random r, LocalResourceVisibility vis) throws IOException,
|
|
|
+ URISyntaxException {
|
|
|
+
|
|
|
+ FSDataOutputStream outFile = null;
|
|
|
+ try {
|
|
|
+ byte[] bytes = new byte[len];
|
|
|
+ Path tarPath = new Path(p.toString());
|
|
|
+ outFile = files.create(tarPath, EnumSet.of(CREATE, OVERWRITE));
|
|
|
+ r.nextBytes(bytes);
|
|
|
+ outFile.write(bytes);
|
|
|
+ } finally {
|
|
|
+ if (outFile != null)
|
|
|
+ outFile.close();
|
|
|
+ }
|
|
|
+ StringBuffer tarCommand = new StringBuffer();
|
|
|
+ URI u = new URI(p.getParent().toString());
|
|
|
+ tarCommand.append("cd '");
|
|
|
+ tarCommand.append(FileUtil.makeShellPath(u.getPath().toString()));
|
|
|
+ tarCommand.append("' ; ");
|
|
|
+ tarCommand.append("jar cf " + p.getName() + ".jar " + p.getName());
|
|
|
+ String[] shellCmd = { "bash", "-c", tarCommand.toString() };
|
|
|
+ ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd);
|
|
|
+ shexec.execute();
|
|
|
+ int exitcode = shexec.getExitCode();
|
|
|
+ if (exitcode != 0) {
|
|
|
+ throw new IOException("Error untarring file " + p
|
|
|
+ + ". Tar process exited with exit code " + exitcode);
|
|
|
+ }
|
|
|
+ LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
|
|
|
+ ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString()
|
|
|
+ + ".jar")));
|
|
|
+ ret.setSize(len);
|
|
|
+ ret.setType(LocalResourceType.ARCHIVE);
|
|
|
+ ret.setVisibility(vis);
|
|
|
+ ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".jar"))
|
|
|
+ .getModificationTime());
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+
|
|
|
+ static LocalResource createZipFile(FileContext files, Path p, int len,
|
|
|
+ Random r, LocalResourceVisibility vis) throws IOException,
|
|
|
+ URISyntaxException {
|
|
|
+
|
|
|
+ FSDataOutputStream outFile = null;
|
|
|
+ try {
|
|
|
+ byte[] bytes = new byte[len];
|
|
|
+ Path tarPath = new Path(p.toString());
|
|
|
+ outFile = files.create(tarPath, EnumSet.of(CREATE, OVERWRITE));
|
|
|
+ r.nextBytes(bytes);
|
|
|
+ outFile.write(bytes);
|
|
|
+ } finally {
|
|
|
+ if (outFile != null)
|
|
|
+ outFile.close();
|
|
|
+ }
|
|
|
+ StringBuffer zipCommand = new StringBuffer();
|
|
|
+ URI u = new URI(p.getParent().toString());
|
|
|
+ zipCommand.append("cd '");
|
|
|
+ zipCommand.append(FileUtil.makeShellPath(u.getPath().toString()));
|
|
|
+ zipCommand.append("' ; ");
|
|
|
+ zipCommand.append("gzip " + p.getName());
|
|
|
+ String[] shellCmd = { "bash", "-c", zipCommand.toString() };
|
|
|
+ ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd);
|
|
|
+ shexec.execute();
|
|
|
+ int exitcode = shexec.getExitCode();
|
|
|
+ if (exitcode != 0) {
|
|
|
+ throw new IOException("Error untarring file " + p
|
|
|
+ + ". Tar process exited with exit code " + exitcode);
|
|
|
+ }
|
|
|
+ LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
|
|
|
+ ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString()
|
|
|
+ + ".zip")));
|
|
|
+ ret.setSize(len);
|
|
|
+ ret.setType(LocalResourceType.ARCHIVE);
|
|
|
+ ret.setVisibility(vis);
|
|
|
+ ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".gz"))
|
|
|
+ .getModificationTime());
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test (timeout=10000)
|
|
|
public void testDownloadBadPublic() throws IOException, URISyntaxException,
|
|
|
InterruptedException {
|
|
|
Configuration conf = new Configuration();
|
|
@@ -161,7 +284,7 @@ public class TestFSDownload {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
+ @Test (timeout=10000)
|
|
|
public void testDownload() throws IOException, URISyntaxException,
|
|
|
InterruptedException {
|
|
|
Configuration conf = new Configuration();
|
|
@@ -229,6 +352,175 @@ public class TestFSDownload {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("deprecation")
|
|
|
+ @Test (timeout=10000)
|
|
|
+ public void testDownloadArchive() throws IOException, URISyntaxException,
|
|
|
+ InterruptedException {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
|
|
|
+ FileContext files = FileContext.getLocalFSFileContext(conf);
|
|
|
+ final Path basedir = files.makeQualified(new Path("target",
|
|
|
+ TestFSDownload.class.getSimpleName()));
|
|
|
+ files.mkdir(basedir, null, true);
|
|
|
+ conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
|
|
|
+
|
|
|
+ Random rand = new Random();
|
|
|
+ long sharedSeed = rand.nextLong();
|
|
|
+ rand.setSeed(sharedSeed);
|
|
|
+ System.out.println("SEED: " + sharedSeed);
|
|
|
+
|
|
|
+ Map<LocalResource, Future<Path>> pending = new HashMap<LocalResource, Future<Path>>();
|
|
|
+ ExecutorService exec = Executors.newSingleThreadExecutor();
|
|
|
+ LocalDirAllocator dirs = new LocalDirAllocator(
|
|
|
+ TestFSDownload.class.getName());
|
|
|
+
|
|
|
+ int size = rand.nextInt(512) + 512;
|
|
|
+ LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
|
|
|
+
|
|
|
+ Path p = new Path(basedir, "" + 1);
|
|
|
+ LocalResource rsrc = createTarFile(files, p, size, rand, vis);
|
|
|
+ Path destPath = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
|
|
|
+ FSDownload fsd = new FSDownload(files,
|
|
|
+ UserGroupInformation.getCurrentUser(), conf, destPath, rsrc,
|
|
|
+ new Random(sharedSeed));
|
|
|
+ pending.put(rsrc, exec.submit(fsd));
|
|
|
+
|
|
|
+ try {
|
|
|
+ FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus(
|
|
|
+ basedir);
|
|
|
+ for (FileStatus filestatus : filesstatus) {
|
|
|
+ if (filestatus.isDir()) {
|
|
|
+ FileStatus[] childFiles = files.getDefaultFileSystem().listStatus(
|
|
|
+ filestatus.getPath());
|
|
|
+ for (FileStatus childfile : childFiles) {
|
|
|
+ if (childfile.getPath().getName().equalsIgnoreCase("1.tar.tmp")) {
|
|
|
+ Assert.fail("Tmp File should not have been there "
|
|
|
+ + childfile.getPath());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }catch (Exception e) {
|
|
|
+ throw new IOException("Failed exec", e);
|
|
|
+ }
|
|
|
+ finally {
|
|
|
+ exec.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("deprecation")
|
|
|
+ @Test (timeout=10000)
|
|
|
+ public void testDownloadPatternJar() throws IOException, URISyntaxException,
|
|
|
+ InterruptedException {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
|
|
|
+ FileContext files = FileContext.getLocalFSFileContext(conf);
|
|
|
+ final Path basedir = files.makeQualified(new Path("target",
|
|
|
+ TestFSDownload.class.getSimpleName()));
|
|
|
+ files.mkdir(basedir, null, true);
|
|
|
+ conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
|
|
|
+
|
|
|
+ Random rand = new Random();
|
|
|
+ long sharedSeed = rand.nextLong();
|
|
|
+ rand.setSeed(sharedSeed);
|
|
|
+ System.out.println("SEED: " + sharedSeed);
|
|
|
+
|
|
|
+ Map<LocalResource, Future<Path>> pending = new HashMap<LocalResource, Future<Path>>();
|
|
|
+ ExecutorService exec = Executors.newSingleThreadExecutor();
|
|
|
+ LocalDirAllocator dirs = new LocalDirAllocator(
|
|
|
+ TestFSDownload.class.getName());
|
|
|
+
|
|
|
+ int size = rand.nextInt(512) + 512;
|
|
|
+ LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
|
|
|
+
|
|
|
+ Path p = new Path(basedir, "" + 1);
|
|
|
+ LocalResource rsrcjar = createJarFile(files, p, size, rand, vis);
|
|
|
+ rsrcjar.setType(LocalResourceType.PATTERN);
|
|
|
+ Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
|
|
|
+ FSDownload fsdjar = new FSDownload(files,
|
|
|
+ UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrcjar,
|
|
|
+ new Random(sharedSeed));
|
|
|
+ pending.put(rsrcjar, exec.submit(fsdjar));
|
|
|
+
|
|
|
+ try {
|
|
|
+ FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus(
|
|
|
+ basedir);
|
|
|
+ for (FileStatus filestatus : filesstatus) {
|
|
|
+ if (filestatus.isDir()) {
|
|
|
+ FileStatus[] childFiles = files.getDefaultFileSystem().listStatus(
|
|
|
+ filestatus.getPath());
|
|
|
+ for (FileStatus childfile : childFiles) {
|
|
|
+ if (childfile.getPath().getName().equalsIgnoreCase("1.jar.tmp")) {
|
|
|
+ Assert.fail("Tmp File should not have been there "
|
|
|
+ + childfile.getPath());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }catch (Exception e) {
|
|
|
+ throw new IOException("Failed exec", e);
|
|
|
+ }
|
|
|
+ finally {
|
|
|
+ exec.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("deprecation")
|
|
|
+ @Test (timeout=10000)
|
|
|
+ public void testDownloadArchiveZip() throws IOException, URISyntaxException,
|
|
|
+ InterruptedException {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
|
|
|
+ FileContext files = FileContext.getLocalFSFileContext(conf);
|
|
|
+ final Path basedir = files.makeQualified(new Path("target",
|
|
|
+ TestFSDownload.class.getSimpleName()));
|
|
|
+ files.mkdir(basedir, null, true);
|
|
|
+ conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
|
|
|
+
|
|
|
+ Random rand = new Random();
|
|
|
+ long sharedSeed = rand.nextLong();
|
|
|
+ rand.setSeed(sharedSeed);
|
|
|
+ System.out.println("SEED: " + sharedSeed);
|
|
|
+
|
|
|
+ Map<LocalResource, Future<Path>> pending = new HashMap<LocalResource, Future<Path>>();
|
|
|
+ ExecutorService exec = Executors.newSingleThreadExecutor();
|
|
|
+ LocalDirAllocator dirs = new LocalDirAllocator(
|
|
|
+ TestFSDownload.class.getName());
|
|
|
+
|
|
|
+ int size = rand.nextInt(512) + 512;
|
|
|
+ LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
|
|
|
+
|
|
|
+ Path p = new Path(basedir, "" + 1);
|
|
|
+ LocalResource rsrczip = createZipFile(files, p, size, rand, vis);
|
|
|
+ Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
|
|
|
+ FSDownload fsdzip = new FSDownload(files,
|
|
|
+ UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrczip,
|
|
|
+ new Random(sharedSeed));
|
|
|
+ pending.put(rsrczip, exec.submit(fsdzip));
|
|
|
+
|
|
|
+ try {
|
|
|
+ FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus(
|
|
|
+ basedir);
|
|
|
+ for (FileStatus filestatus : filesstatus) {
|
|
|
+ if (filestatus.isDir()) {
|
|
|
+ FileStatus[] childFiles = files.getDefaultFileSystem().listStatus(
|
|
|
+ filestatus.getPath());
|
|
|
+ for (FileStatus childfile : childFiles) {
|
|
|
+ if (childfile.getPath().getName().equalsIgnoreCase("1.gz.tmp")) {
|
|
|
+ Assert.fail("Tmp File should not have been there "
|
|
|
+ + childfile.getPath());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }catch (Exception e) {
|
|
|
+ throw new IOException("Failed exec", e);
|
|
|
+ }
|
|
|
+ finally {
|
|
|
+ exec.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void verifyPermsRecursively(FileSystem fs,
|
|
|
FileContext files, Path p,
|
|
|
LocalResourceVisibility vis) throws IOException {
|
|
@@ -261,7 +553,7 @@ public class TestFSDownload {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
+ @Test (timeout=10000)
|
|
|
public void testDirDownload() throws IOException, InterruptedException {
|
|
|
Configuration conf = new Configuration();
|
|
|
FileContext files = FileContext.getLocalFSFileContext(conf);
|