|
@@ -97,7 +97,7 @@ public class TrackerDistributedCacheManager {
|
|
|
|
|
|
private Configuration trackerConf;
|
|
|
|
|
|
- private Random random = new Random();
|
|
|
+ private static final Random random = new Random();
|
|
|
|
|
|
public TrackerDistributedCacheManager(Configuration conf,
|
|
|
TaskController controller
|
|
@@ -242,9 +242,11 @@ public class TrackerDistributedCacheManager {
|
|
|
}
|
|
|
|
|
|
void setSize(CacheStatus status, long size) throws IOException {
|
|
|
- synchronized (status) {
|
|
|
- status.size = size;
|
|
|
- addCacheInfoUpdate(status);
|
|
|
+ if (size != 0) {
|
|
|
+ synchronized (status) {
|
|
|
+ status.size = size;
|
|
|
+ addCacheInfoUpdate(status);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -412,6 +414,10 @@ public class TrackerDistributedCacheManager {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
+ private static Path createRandomPath(Path base) throws IOException {
|
|
|
+ return new Path(base.toString() + "-work-" + random.nextLong());
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Download a given path to the local file system.
|
|
|
* @param conf the job's configuration
|
|
@@ -447,24 +453,32 @@ public class TrackerDistributedCacheManager {
|
|
|
|
|
|
Path parchive = null;
|
|
|
if (isArchive) {
|
|
|
- parchive = new Path(destination,
|
|
|
- new Path(destination.getName()));
|
|
|
+ parchive = new Path(destination, destination.getName());
|
|
|
} else {
|
|
|
parchive = destination;
|
|
|
}
|
|
|
- LOG.info("Creating " + destination + " with " + permission);
|
|
|
- if (!localFs.mkdirs(destination.getParent(), permission)) {
|
|
|
- throw new IOException("Mkdirs failed to create directory " +
|
|
|
- destination);
|
|
|
- }
|
|
|
-
|
|
|
- sourceFs.copyToLocalFile(sourcePath, parchive);
|
|
|
+ // if the file already exists, we are done
|
|
|
+ if (localFs.exists(parchive)) {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ // the final directory for the object
|
|
|
+ Path finalDir = parchive.getParent();
|
|
|
+ // the work directory for the object
|
|
|
+ Path workDir = createRandomPath(finalDir);
|
|
|
+ LOG.info("Creating " + destination.getName() + " in " + workDir + " with " +
|
|
|
+ permission);
|
|
|
+ if (!localFs.mkdirs(workDir, permission)) {
|
|
|
+ throw new IOException("Mkdirs failed to create directory " + workDir);
|
|
|
+ }
|
|
|
+ Path workFile = new Path(workDir, parchive.getName());
|
|
|
+ sourceFs.copyToLocalFile(sourcePath, workFile);
|
|
|
+ localFs.setPermission(workFile, permission);
|
|
|
if (isArchive) {
|
|
|
- String tmpArchive = destination.toString().toLowerCase();
|
|
|
- File srcFile = new File(parchive.toString());
|
|
|
- File destDir = new File(parchive.getParent().toString());
|
|
|
+ String tmpArchive = workFile.getName().toLowerCase();
|
|
|
+ File srcFile = new File(workFile.toString());
|
|
|
+ File destDir = new File(workDir.toString());
|
|
|
LOG.info(String.format("Extracting %s to %s",
|
|
|
- srcFile.toString(), destDir.toString()));
|
|
|
+ srcFile.toString(), destDir.toString()));
|
|
|
if (tmpArchive.endsWith(".jar")) {
|
|
|
RunJar.unJar(srcFile, destDir);
|
|
|
} else if (tmpArchive.endsWith(".zip")) {
|
|
@@ -479,11 +493,16 @@ public class TrackerDistributedCacheManager {
|
|
|
// and copy the file into the dir as it is
|
|
|
}
|
|
|
}
|
|
|
- // set proper permissions for the localized copy
|
|
|
- if (isArchive) {
|
|
|
- localFs.setPermission(parchive, permission);
|
|
|
+ // promote the output to the final location
|
|
|
+ if (!localFs.rename(workDir, finalDir)) {
|
|
|
+ localFs.delete(workDir, true);
|
|
|
+ if (!localFs.exists(finalDir)) {
|
|
|
+ throw new IOException("Failed to promote distributed cache object " +
|
|
|
+ workDir + " to " + finalDir);
|
|
|
+ }
|
|
|
+ // someone else promoted first
|
|
|
+ return 0;
|
|
|
}
|
|
|
- localFs.setPermission(destination, permission);
|
|
|
|
|
|
LOG.info(String.format("Cached %s as %s",
|
|
|
source.toString(), destination.toString()));
|