|
@@ -65,10 +65,7 @@ import org.slf4j.LoggerFactory;
|
|
|
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
|
|
|
@InterfaceStability.Unstable
|
|
|
public class LocalDirAllocator {
|
|
|
-
|
|
|
- static final String E_NO_SPACE_AVAILABLE =
|
|
|
- "No space available in any of the local directories";
|
|
|
-
|
|
|
+
|
|
|
//A Map from the config item names like "mapred.local.dir"
|
|
|
//to the instance of the AllocatorPerContext. This
|
|
|
//is a static object to make sure there exists exactly one instance per JVM
|
|
@@ -387,24 +384,6 @@ public class LocalDirAllocator {
|
|
|
return currentContext.get().dirNumLastAccessed.get();
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Format a string, log at debug and append it to the history as a new line.
|
|
|
- *
|
|
|
- * @param history history to fill in
|
|
|
- * @param fmt format string
|
|
|
- * @param args varags
|
|
|
- */
|
|
|
- private void note(StringBuilder history, String fmt, Object... args) {
|
|
|
- try {
|
|
|
- final String s = String.format(fmt, args);
|
|
|
- history.append(s).append("\n");
|
|
|
- LOG.debug(s);
|
|
|
- } catch (Exception e) {
|
|
|
- // some resilience in case the format string is wrong
|
|
|
- LOG.debug(fmt, e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/** Get a path from the local FS. If size is known, we go
|
|
|
* round-robin over the set of disks (via the configured dirs) and return
|
|
|
* the first complete path which has enough space.
|
|
@@ -414,12 +393,6 @@ public class LocalDirAllocator {
|
|
|
*/
|
|
|
public Path getLocalPathForWrite(String pathStr, long size,
|
|
|
Configuration conf, boolean checkWrite) throws IOException {
|
|
|
-
|
|
|
- // history is built up and logged at error if the alloc
|
|
|
- StringBuilder history = new StringBuilder();
|
|
|
-
|
|
|
- note(history, "Searchng for a directory for file at %s, size = %,d; checkWrite=%s",
|
|
|
- pathStr, size, checkWrite);
|
|
|
Context ctx = confChanged(conf);
|
|
|
int numDirs = ctx.localDirs.length;
|
|
|
int numDirsSearched = 0;
|
|
@@ -433,56 +406,27 @@ public class LocalDirAllocator {
|
|
|
pathStr = pathStr.substring(1);
|
|
|
}
|
|
|
Path returnPath = null;
|
|
|
-
|
|
|
- final int dirCount = ctx.dirDF.length;
|
|
|
- long[] availableOnDisk = new long[dirCount];
|
|
|
- long totalAvailable = 0;
|
|
|
-
|
|
|
- StringBuilder pathNames = new StringBuilder();
|
|
|
-
|
|
|
- //build the "roulette wheel"
|
|
|
- for (int i =0; i < dirCount; ++i) {
|
|
|
- final DF target = ctx.dirDF[i];
|
|
|
- // attempt to recreate the dir so that getAvailable() is valid
|
|
|
- // if it fails, getAvailable() will return 0, so the dir will
|
|
|
- // be declared unavailable.
|
|
|
- // return value is logged at debug to keep spotbugs quiet.
|
|
|
- final String name = target.getDirPath();
|
|
|
- pathNames.append(" ").append(name);
|
|
|
- final File dirPath = new File(name);
|
|
|
-
|
|
|
- // existence probe
|
|
|
- if (!dirPath.exists()) {
|
|
|
- LOG.debug("creating buffer dir {}", name);
|
|
|
- if (dirPath.mkdirs()) {
|
|
|
- note(history, "Created buffer dir %s", name);
|
|
|
- } else {
|
|
|
- note(history, "Failed to create buffer dir %s", name);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // path already existed or the mkdir call had an outcome
|
|
|
- // make sure the path is present and a dir, and if so add its availability
|
|
|
- if (dirPath.isDirectory()) {
|
|
|
- final long available = target.getAvailable();
|
|
|
- availableOnDisk[i] = available;
|
|
|
- note(history, "%s available under path %s", pathStr, available);
|
|
|
+
|
|
|
+ if(size == SIZE_UNKNOWN) { //do roulette selection: pick dir with probability
|
|
|
+ //proportional to available size
|
|
|
+ long[] availableOnDisk = new long[ctx.dirDF.length];
|
|
|
+ long totalAvailable = 0;
|
|
|
+
|
|
|
+ //build the "roulette wheel"
|
|
|
+ for(int i =0; i < ctx.dirDF.length; ++i) {
|
|
|
+ final DF target = ctx.dirDF[i];
|
|
|
+ // attempt to recreate the dir so that getAvailable() is valid
|
|
|
+ // if it fails, getAvailable() will return 0, so the dir will
|
|
|
+ // be declared unavailable.
|
|
|
+ // return value is logged at debug to keep spotbugs quiet.
|
|
|
+ final boolean b = new File(target.getDirPath()).mkdirs();
|
|
|
+ LOG.debug("mkdirs of {}={}", target, b);
|
|
|
+ availableOnDisk[i] = target.getAvailable();
|
|
|
totalAvailable += availableOnDisk[i];
|
|
|
- } else {
|
|
|
- note(history, "%s does not exist/is not a directory", pathStr);
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- note(history, "Directory count is %d; total available capacity is %,d{}",
|
|
|
- dirCount, totalAvailable);
|
|
|
|
|
|
- if (size == SIZE_UNKNOWN) {
|
|
|
- //do roulette selection: pick dir with probability
|
|
|
- // proportional to available size
|
|
|
- note(history, "Size not specified, so picking at random.");
|
|
|
-
|
|
|
- if (totalAvailable == 0) {
|
|
|
- throw new DiskErrorException(E_NO_SPACE_AVAILABLE + pathNames + "; history=" + history);
|
|
|
+ if (totalAvailable == 0){
|
|
|
+ throw new DiskErrorException("No space available in any of the local directories.");
|
|
|
}
|
|
|
|
|
|
// Keep rolling the wheel till we get a valid path
|
|
@@ -495,19 +439,14 @@ public class LocalDirAllocator {
|
|
|
dir++;
|
|
|
}
|
|
|
ctx.dirNumLastAccessed.set(dir);
|
|
|
- final Path localDir = ctx.localDirs[dir];
|
|
|
- returnPath = createPath(localDir, pathStr, checkWrite);
|
|
|
+ returnPath = createPath(ctx.localDirs[dir], pathStr, checkWrite);
|
|
|
if (returnPath == null) {
|
|
|
totalAvailable -= availableOnDisk[dir];
|
|
|
availableOnDisk[dir] = 0; // skip this disk
|
|
|
numDirsSearched++;
|
|
|
- note(history, "No capacity in %s", localDir);
|
|
|
- } else {
|
|
|
- note(history, "Allocated file %s in %s", returnPath, localDir);
|
|
|
}
|
|
|
}
|
|
|
} else {
|
|
|
- note(history, "Size is %,d; searching", size);
|
|
|
// Start linear search with random increment if possible
|
|
|
int randomInc = 1;
|
|
|
if (numDirs > 2) {
|
|
@@ -520,22 +459,17 @@ public class LocalDirAllocator {
|
|
|
maxCapacity = capacity;
|
|
|
}
|
|
|
if (capacity > size) {
|
|
|
- final Path localDir = ctx.localDirs[dirNum];
|
|
|
try {
|
|
|
- returnPath = createPath(localDir, pathStr, checkWrite);
|
|
|
+ returnPath = createPath(ctx.localDirs[dirNum], pathStr,
|
|
|
+ checkWrite);
|
|
|
} catch (IOException e) {
|
|
|
errorText = e.getMessage();
|
|
|
diskException = e;
|
|
|
- note(history, "Exception while creating path %s: %s", localDir, errorText);
|
|
|
- LOG.debug("DiskException caught for dir {}", localDir, e);
|
|
|
+ LOG.debug("DiskException caught for dir {}", ctx.localDirs[dirNum], e);
|
|
|
}
|
|
|
if (returnPath != null) {
|
|
|
- // success
|
|
|
ctx.getAndIncrDirNumLastAccessed(numDirsSearched);
|
|
|
- note(history, "Allocated file %s in %s", returnPath, localDir);
|
|
|
break;
|
|
|
- } else {
|
|
|
- note(history, "No capacity in %s", localDir);
|
|
|
}
|
|
|
}
|
|
|
dirNum++;
|
|
@@ -550,14 +484,10 @@ public class LocalDirAllocator {
|
|
|
//no path found
|
|
|
String newErrorText = "Could not find any valid local directory for " +
|
|
|
pathStr + " with requested size " + size +
|
|
|
- " as the max capacity in any directory"
|
|
|
- + " (" + pathNames + " )"
|
|
|
- + " is " + maxCapacity;
|
|
|
+ " as the max capacity in any directory is " + maxCapacity;
|
|
|
if (errorText != null) {
|
|
|
newErrorText = newErrorText + " due to " + errorText;
|
|
|
}
|
|
|
- LOG.error(newErrorText);
|
|
|
- LOG.error(history.toString());
|
|
|
throw new DiskErrorException(newErrorText, diskException);
|
|
|
}
|
|
|
|