|
@@ -69,6 +69,7 @@ import org.apache.hadoop.mapreduce.security.TokenCache;
|
|
|
* interface.</b>
|
|
|
*/
|
|
|
public class TrackerDistributedCacheManager {
|
|
|
+ public static final String WORK_DIR_FIX = "-work-";
|
|
|
// cacheID to cacheStatus mapping
|
|
|
private LinkedHashMap<String, CacheStatus> cachedArchives =
|
|
|
new LinkedHashMap<String, CacheStatus>();
|
|
@@ -372,7 +373,7 @@ public class TrackerDistributedCacheManager {
|
|
|
}
|
|
|
|
|
|
private static Path createRandomPath(Path base) throws IOException {
|
|
|
- return new Path(base.toString() + "-work-" + random.nextLong());
|
|
|
+ return new Path(base.toString() + WORK_DIR_FIX + random.nextLong());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -427,39 +428,44 @@ public class TrackerDistributedCacheManager {
|
|
|
if (!localFs.mkdirs(workDir, permission)) {
|
|
|
throw new IOException("Mkdirs failed to create directory " + workDir);
|
|
|
}
|
|
|
- Path workFile = new Path(workDir, parchive.getName());
|
|
|
- sourceFs.copyToLocalFile(sourcePath, workFile);
|
|
|
- localFs.setPermission(workFile, permission);
|
|
|
- if (isArchive) {
|
|
|
- String tmpArchive = workFile.getName().toLowerCase();
|
|
|
- File srcFile = new File(workFile.toString());
|
|
|
- File destDir = new File(workDir.toString());
|
|
|
- LOG.info(String.format("Extracting %s to %s",
|
|
|
- srcFile.toString(), destDir.toString()));
|
|
|
- if (tmpArchive.endsWith(".jar")) {
|
|
|
- RunJar.unJar(srcFile, destDir);
|
|
|
- } else if (tmpArchive.endsWith(".zip")) {
|
|
|
- FileUtil.unZip(srcFile, destDir);
|
|
|
- } else if (isTarFile(tmpArchive)) {
|
|
|
- FileUtil.unTar(srcFile, destDir);
|
|
|
- } else {
|
|
|
- LOG.warn(String.format(
|
|
|
- "Cache file %s specified as archive, but not valid extension.",
|
|
|
- srcFile.toString()));
|
|
|
- // else will not do anyhting
|
|
|
- // and copy the file into the dir as it is
|
|
|
+ try {
|
|
|
+ Path workFile = new Path(workDir, parchive.getName());
|
|
|
+ sourceFs.copyToLocalFile(sourcePath, workFile);
|
|
|
+ localFs.setPermission(workFile, permission);
|
|
|
+ if (isArchive) {
|
|
|
+ String tmpArchive = workFile.getName().toLowerCase();
|
|
|
+ File srcFile = new File(workFile.toString());
|
|
|
+ File destDir = new File(workDir.toString());
|
|
|
+ LOG.info(String.format("Extracting %s to %s",
|
|
|
+ srcFile.toString(), destDir.toString()));
|
|
|
+ if (tmpArchive.endsWith(".jar")) {
|
|
|
+ RunJar.unJar(srcFile, destDir);
|
|
|
+ } else if (tmpArchive.endsWith(".zip")) {
|
|
|
+ FileUtil.unZip(srcFile, destDir);
|
|
|
+ } else if (isTarFile(tmpArchive)) {
|
|
|
+ FileUtil.unTar(srcFile, destDir);
|
|
|
+ } else {
|
|
|
+ LOG.warn(String.format(
|
|
|
+ "Cache file %s specified as archive, but not valid extension.",
|
|
|
+ srcFile.toString()));
|
|
|
+ // else will not do anyhting
|
|
|
+ // and copy the file into the dir as it is
|
|
|
+ }
|
|
|
+ FileUtil.chmod(destDir.toString(), "ugo+rx", true);
|
|
|
}
|
|
|
- FileUtil.chmod(destDir.toString(), "ugo+rx", true);
|
|
|
- }
|
|
|
- // promote the output to the final location
|
|
|
- if (!localFs.rename(workDir, finalDir)) {
|
|
|
- localFs.delete(workDir, true);
|
|
|
- if (!localFs.exists(finalDir)) {
|
|
|
- throw new IOException("Failed to promote distributed cache object " +
|
|
|
- workDir + " to " + finalDir);
|
|
|
+ // promote the output to the final location
|
|
|
+ if (!localFs.rename(workDir, finalDir)) {
|
|
|
+ if (!localFs.exists(finalDir)) {
|
|
|
+ throw new IOException("Failed to promote distributed cache object " +
|
|
|
+ workDir + " to " + finalDir);
|
|
|
+ }
|
|
|
+ // someone else promoted first
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ if (localFs.exists(workDir)) {
|
|
|
+ localFs.delete(workDir, true);
|
|
|
}
|
|
|
- // someone else promoted first
|
|
|
- return 0;
|
|
|
}
|
|
|
|
|
|
LOG.info(String.format("Cached %s as %s",
|