|
@@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionException;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.Future;
|
|
import java.util.concurrent.Future;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.jar.JarOutputStream;
|
|
import java.util.jar.JarOutputStream;
|
|
import java.util.jar.Manifest;
|
|
import java.util.jar.Manifest;
|
|
@@ -276,6 +277,9 @@ public class TestFSDownload {
|
|
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
|
|
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
|
|
destPath, rsrc);
|
|
destPath, rsrc);
|
|
pending.put(rsrc, exec.submit(fsd));
|
|
pending.put(rsrc, exec.submit(fsd));
|
|
|
|
+ exec.shutdown();
|
|
|
|
+ while (!exec.awaitTermination(1000, TimeUnit.MILLISECONDS));
|
|
|
|
+ Assert.assertTrue(pending.get(rsrc).isDone());
|
|
|
|
|
|
try {
|
|
try {
|
|
for (Map.Entry<LocalResource,Future<Path>> p : pending.entrySet()) {
|
|
for (Map.Entry<LocalResource,Future<Path>> p : pending.entrySet()) {
|
|
@@ -284,8 +288,6 @@ public class TestFSDownload {
|
|
}
|
|
}
|
|
} catch (ExecutionException e) {
|
|
} catch (ExecutionException e) {
|
|
Assert.assertTrue(e.getCause() instanceof IOException);
|
|
Assert.assertTrue(e.getCause() instanceof IOException);
|
|
- } finally {
|
|
|
|
- exec.shutdown();
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -333,6 +335,12 @@ public class TestFSDownload {
|
|
pending.put(rsrc, exec.submit(fsd));
|
|
pending.put(rsrc, exec.submit(fsd));
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ exec.shutdown();
|
|
|
|
+ while (!exec.awaitTermination(1000, TimeUnit.MILLISECONDS));
|
|
|
|
+ for (Future<Path> path: pending.values()) {
|
|
|
|
+ Assert.assertTrue(path.isDone());
|
|
|
|
+ }
|
|
|
|
+
|
|
try {
|
|
try {
|
|
for (Map.Entry<LocalResource,Future<Path>> p : pending.entrySet()) {
|
|
for (Map.Entry<LocalResource,Future<Path>> p : pending.entrySet()) {
|
|
Path localized = p.getValue().get();
|
|
Path localized = p.getValue().get();
|
|
@@ -354,12 +362,9 @@ public class TestFSDownload {
|
|
}
|
|
}
|
|
} catch (ExecutionException e) {
|
|
} catch (ExecutionException e) {
|
|
throw new IOException("Failed exec", e);
|
|
throw new IOException("Failed exec", e);
|
|
- } finally {
|
|
|
|
- exec.shutdown();
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- @SuppressWarnings("deprecation")
|
|
|
|
@Test (timeout=10000)
|
|
@Test (timeout=10000)
|
|
public void testDownloadArchive() throws IOException, URISyntaxException,
|
|
public void testDownloadArchive() throws IOException, URISyntaxException,
|
|
InterruptedException {
|
|
InterruptedException {
|
|
@@ -392,12 +397,15 @@ public class TestFSDownload {
|
|
FSDownload fsd = new FSDownload(files,
|
|
FSDownload fsd = new FSDownload(files,
|
|
UserGroupInformation.getCurrentUser(), conf, destPath, rsrc);
|
|
UserGroupInformation.getCurrentUser(), conf, destPath, rsrc);
|
|
pending.put(rsrc, exec.submit(fsd));
|
|
pending.put(rsrc, exec.submit(fsd));
|
|
|
|
+ exec.shutdown();
|
|
|
|
+ while (!exec.awaitTermination(1000, TimeUnit.MILLISECONDS));
|
|
|
|
+ Assert.assertTrue(pending.get(rsrc).isDone());
|
|
|
|
|
|
try {
|
|
try {
|
|
FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus(
|
|
FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus(
|
|
basedir);
|
|
basedir);
|
|
for (FileStatus filestatus : filesstatus) {
|
|
for (FileStatus filestatus : filesstatus) {
|
|
- if (filestatus.isDir()) {
|
|
|
|
|
|
+ if (filestatus.isDirectory()) {
|
|
FileStatus[] childFiles = files.getDefaultFileSystem().listStatus(
|
|
FileStatus[] childFiles = files.getDefaultFileSystem().listStatus(
|
|
filestatus.getPath());
|
|
filestatus.getPath());
|
|
for (FileStatus childfile : childFiles) {
|
|
for (FileStatus childfile : childFiles) {
|
|
@@ -411,12 +419,8 @@ public class TestFSDownload {
|
|
}catch (Exception e) {
|
|
}catch (Exception e) {
|
|
throw new IOException("Failed exec", e);
|
|
throw new IOException("Failed exec", e);
|
|
}
|
|
}
|
|
- finally {
|
|
|
|
- exec.shutdown();
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
-
|
|
|
|
- @SuppressWarnings("deprecation")
|
|
|
|
|
|
+
|
|
@Test (timeout=10000)
|
|
@Test (timeout=10000)
|
|
public void testDownloadPatternJar() throws IOException, URISyntaxException,
|
|
public void testDownloadPatternJar() throws IOException, URISyntaxException,
|
|
InterruptedException {
|
|
InterruptedException {
|
|
@@ -450,12 +454,15 @@ public class TestFSDownload {
|
|
FSDownload fsdjar = new FSDownload(files,
|
|
FSDownload fsdjar = new FSDownload(files,
|
|
UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrcjar);
|
|
UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrcjar);
|
|
pending.put(rsrcjar, exec.submit(fsdjar));
|
|
pending.put(rsrcjar, exec.submit(fsdjar));
|
|
|
|
+ exec.shutdown();
|
|
|
|
+ while (!exec.awaitTermination(1000, TimeUnit.MILLISECONDS));
|
|
|
|
+ Assert.assertTrue(pending.get(rsrcjar).isDone());
|
|
|
|
|
|
try {
|
|
try {
|
|
FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus(
|
|
FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus(
|
|
basedir);
|
|
basedir);
|
|
for (FileStatus filestatus : filesstatus) {
|
|
for (FileStatus filestatus : filesstatus) {
|
|
- if (filestatus.isDir()) {
|
|
|
|
|
|
+ if (filestatus.isDirectory()) {
|
|
FileStatus[] childFiles = files.getDefaultFileSystem().listStatus(
|
|
FileStatus[] childFiles = files.getDefaultFileSystem().listStatus(
|
|
filestatus.getPath());
|
|
filestatus.getPath());
|
|
for (FileStatus childfile : childFiles) {
|
|
for (FileStatus childfile : childFiles) {
|
|
@@ -469,12 +476,8 @@ public class TestFSDownload {
|
|
}catch (Exception e) {
|
|
}catch (Exception e) {
|
|
throw new IOException("Failed exec", e);
|
|
throw new IOException("Failed exec", e);
|
|
}
|
|
}
|
|
- finally {
|
|
|
|
- exec.shutdown();
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- @SuppressWarnings("deprecation")
|
|
|
|
@Test (timeout=10000)
|
|
@Test (timeout=10000)
|
|
public void testDownloadArchiveZip() throws IOException, URISyntaxException,
|
|
public void testDownloadArchiveZip() throws IOException, URISyntaxException,
|
|
InterruptedException {
|
|
InterruptedException {
|
|
@@ -507,12 +510,15 @@ public class TestFSDownload {
|
|
FSDownload fsdzip = new FSDownload(files,
|
|
FSDownload fsdzip = new FSDownload(files,
|
|
UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrczip);
|
|
UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrczip);
|
|
pending.put(rsrczip, exec.submit(fsdzip));
|
|
pending.put(rsrczip, exec.submit(fsdzip));
|
|
|
|
+ exec.shutdown();
|
|
|
|
+ while (!exec.awaitTermination(1000, TimeUnit.MILLISECONDS));
|
|
|
|
+ Assert.assertTrue(pending.get(rsrczip).isDone());
|
|
|
|
|
|
try {
|
|
try {
|
|
FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus(
|
|
FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus(
|
|
basedir);
|
|
basedir);
|
|
for (FileStatus filestatus : filesstatus) {
|
|
for (FileStatus filestatus : filesstatus) {
|
|
- if (filestatus.isDir()) {
|
|
|
|
|
|
+ if (filestatus.isDirectory()) {
|
|
FileStatus[] childFiles = files.getDefaultFileSystem().listStatus(
|
|
FileStatus[] childFiles = files.getDefaultFileSystem().listStatus(
|
|
filestatus.getPath());
|
|
filestatus.getPath());
|
|
for (FileStatus childfile : childFiles) {
|
|
for (FileStatus childfile : childFiles) {
|
|
@@ -526,9 +532,6 @@ public class TestFSDownload {
|
|
}catch (Exception e) {
|
|
}catch (Exception e) {
|
|
throw new IOException("Failed exec", e);
|
|
throw new IOException("Failed exec", e);
|
|
}
|
|
}
|
|
- finally {
|
|
|
|
- exec.shutdown();
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
|
|
|
|
private void verifyPermsRecursively(FileSystem fs,
|
|
private void verifyPermsRecursively(FileSystem fs,
|
|
@@ -603,7 +606,13 @@ public class TestFSDownload {
|
|
destPath, rsrc);
|
|
destPath, rsrc);
|
|
pending.put(rsrc, exec.submit(fsd));
|
|
pending.put(rsrc, exec.submit(fsd));
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ exec.shutdown();
|
|
|
|
+ while (!exec.awaitTermination(1000, TimeUnit.MILLISECONDS));
|
|
|
|
+ for (Future<Path> path: pending.values()) {
|
|
|
|
+ Assert.assertTrue(path.isDone());
|
|
|
|
+ }
|
|
|
|
+
|
|
try {
|
|
try {
|
|
|
|
|
|
for (Map.Entry<LocalResource,Future<Path>> p : pending.entrySet()) {
|
|
for (Map.Entry<LocalResource,Future<Path>> p : pending.entrySet()) {
|
|
@@ -619,15 +628,10 @@ public class TestFSDownload {
|
|
}
|
|
}
|
|
} catch (ExecutionException e) {
|
|
} catch (ExecutionException e) {
|
|
throw new IOException("Failed exec", e);
|
|
throw new IOException("Failed exec", e);
|
|
- } finally {
|
|
|
|
- exec.shutdown();
|
|
|
|
}
|
|
}
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- @Test(timeout = 1000)
|
|
|
|
|
|
+ @Test (timeout=10000)
|
|
public void testUniqueDestinationPath() throws Exception {
|
|
public void testUniqueDestinationPath() throws Exception {
|
|
Configuration conf = new Configuration();
|
|
Configuration conf = new Configuration();
|
|
FileContext files = FileContext.getLocalFSFileContext(conf);
|
|
FileContext files = FileContext.getLocalFSFileContext(conf);
|
|
@@ -644,20 +648,20 @@ public class TestFSDownload {
|
|
destPath =
|
|
destPath =
|
|
new Path(destPath, Long.toString(uniqueNumberGenerator
|
|
new Path(destPath, Long.toString(uniqueNumberGenerator
|
|
.incrementAndGet()));
|
|
.incrementAndGet()));
|
|
- try {
|
|
|
|
- Path p = new Path(basedir, "dir" + 0 + ".jar");
|
|
|
|
- LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
|
|
|
|
- LocalResource rsrc = createJar(files, p, vis);
|
|
|
|
- FSDownload fsd =
|
|
|
|
- new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
|
|
|
|
- destPath, rsrc);
|
|
|
|
- Future<Path> rPath = singleThreadedExec.submit(fsd);
|
|
|
|
- // Now FSDownload will not create a random directory to localize the
|
|
|
|
- // resource. Therefore the final localizedPath for the resource should be
|
|
|
|
- // destination directory (passed as an argument) + file name.
|
|
|
|
- Assert.assertEquals(destPath, rPath.get().getParent());
|
|
|
|
- } finally {
|
|
|
|
- singleThreadedExec.shutdown();
|
|
|
|
- }
|
|
|
|
|
|
+
|
|
|
|
+ Path p = new Path(basedir, "dir" + 0 + ".jar");
|
|
|
|
+ LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
|
|
|
|
+ LocalResource rsrc = createJar(files, p, vis);
|
|
|
|
+ FSDownload fsd =
|
|
|
|
+ new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
|
|
|
|
+ destPath, rsrc);
|
|
|
|
+ Future<Path> rPath = singleThreadedExec.submit(fsd);
|
|
|
|
+ singleThreadedExec.shutdown();
|
|
|
|
+ while (!singleThreadedExec.awaitTermination(1000, TimeUnit.MILLISECONDS));
|
|
|
|
+ Assert.assertTrue(rPath.isDone());
|
|
|
|
+ // Now FSDownload will not create a random directory to localize the
|
|
|
|
+ // resource. Therefore the final localizedPath for the resource should be
|
|
|
|
+ // destination directory (passed as an argument) + file name.
|
|
|
|
+ Assert.assertEquals(destPath, rPath.get().getParent());
|
|
}
|
|
}
|
|
}
|
|
}
|