|
@@ -31,6 +31,8 @@ import org.apache.hadoop.conf.Configuration;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import static org.apache.hadoop.net.NetUtils.getHostname;
|
|
|
+
|
|
|
/** An implementation of a round-robin scheme for disk allocation for creating
|
|
|
* files. The way it works is that it is kept track what disk was last
|
|
|
* allocated for a file write. For the current request, the next disk from
|
|
@@ -65,7 +67,10 @@ import org.slf4j.LoggerFactory;
|
|
|
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
|
|
|
@InterfaceStability.Unstable
|
|
|
public class LocalDirAllocator {
|
|
|
-
|
|
|
+
|
|
|
+ static final String E_NO_SPACE_AVAILABLE =
|
|
|
+ "No space available in any of the local directories";
|
|
|
+
|
|
|
//A Map from the config item names like "mapred.local.dir"
|
|
|
//to the instance of the AllocatorPerContext. This
|
|
|
//is a static object to make sure there exists exactly one instance per JVM
|
|
@@ -384,6 +389,24 @@ public class LocalDirAllocator {
|
|
|
return currentContext.get().dirNumLastAccessed.get();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Format a string, log at debug and append it to the history as a new line.
|
|
|
+ *
|
|
|
+ * @param history history to fill in
|
|
|
+ * @param fmt format string
|
|
|
+ * @param args varags
|
|
|
+ */
|
|
|
+ private void note(StringBuilder history, String fmt, Object... args) {
|
|
|
+ try {
|
|
|
+ final String s = String.format(fmt, args);
|
|
|
+ history.append(s).append("\n");
|
|
|
+ LOG.debug(s);
|
|
|
+ } catch (Exception e) {
|
|
|
+ // some resilience in case the format string is wrong
|
|
|
+ LOG.debug(fmt, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/** Get a path from the local FS. If size is known, we go
|
|
|
* round-robin over the set of disks (via the configured dirs) and return
|
|
|
* the first complete path which has enough space.
|
|
@@ -393,6 +416,12 @@ public class LocalDirAllocator {
|
|
|
*/
|
|
|
public Path getLocalPathForWrite(String pathStr, long size,
|
|
|
Configuration conf, boolean checkWrite) throws IOException {
|
|
|
+
|
|
|
+ // history is built up and logged at error if the alloc
|
|
|
+ StringBuilder history = new StringBuilder();
|
|
|
+
|
|
|
+ note(history, "Searching for a directory for file \"%s\", size = %,d; checkWrite=%s",
|
|
|
+ pathStr, size, checkWrite);
|
|
|
Context ctx = confChanged(conf);
|
|
|
int numDirs = ctx.localDirs.length;
|
|
|
int numDirsSearched = 0;
|
|
@@ -406,27 +435,62 @@ public class LocalDirAllocator {
|
|
|
pathStr = pathStr.substring(1);
|
|
|
}
|
|
|
Path returnPath = null;
|
|
|
-
|
|
|
- if(size == SIZE_UNKNOWN) { //do roulette selection: pick dir with probability
|
|
|
- //proportional to available size
|
|
|
- long[] availableOnDisk = new long[ctx.dirDF.length];
|
|
|
- long totalAvailable = 0;
|
|
|
-
|
|
|
- //build the "roulette wheel"
|
|
|
- for(int i =0; i < ctx.dirDF.length; ++i) {
|
|
|
- final DF target = ctx.dirDF[i];
|
|
|
- // attempt to recreate the dir so that getAvailable() is valid
|
|
|
- // if it fails, getAvailable() will return 0, so the dir will
|
|
|
- // be declared unavailable.
|
|
|
- // return value is logged at debug to keep spotbugs quiet.
|
|
|
- final boolean b = new File(target.getDirPath()).mkdirs();
|
|
|
- LOG.debug("mkdirs of {}={}", target, b);
|
|
|
- availableOnDisk[i] = target.getAvailable();
|
|
|
+
|
|
|
+ final int dirCount = ctx.dirDF.length;
|
|
|
+ long[] availableOnDisk = new long[dirCount];
|
|
|
+ long totalAvailable = 0;
|
|
|
+
|
|
|
+ StringBuilder pathNames = new StringBuilder();
|
|
|
+
|
|
|
+ //build the "roulette wheel"
|
|
|
+ for (int i =0; i < dirCount; ++i) {
|
|
|
+ final DF target = ctx.dirDF[i];
|
|
|
+ // attempt to recreate the dir so that getAvailable() is valid
|
|
|
+ // if it fails, getAvailable() will return 0, so the dir will
|
|
|
+ // be declared unavailable.
|
|
|
+ // return value is logged at debug to keep spotbugs quiet.
|
|
|
+ final String name = target.getDirPath();
|
|
|
+ pathNames.append(" ").append(name);
|
|
|
+ final File dirPath = new File(name);
|
|
|
+
|
|
|
+ // existence probe with directory recreation
|
|
|
+ if (!dirPath.exists()) {
|
|
|
+ LOG.debug("Creating buffer dir {}", name);
|
|
|
+ if (dirPath.mkdirs()) {
|
|
|
+ note(history, "Created buffer dir %s", name);
|
|
|
+ } else {
|
|
|
+ note(history, "Failed to create buffer dir %s", name);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // path already existed or the mkdir call had an outcome
|
|
|
+ // make sure the path is present and a dir, and if so add its availability
|
|
|
+ if (dirPath.isDirectory()) {
|
|
|
+ final long available = target.getAvailable();
|
|
|
+ availableOnDisk[i] = available;
|
|
|
+ note(history, "%,d bytes available under path %s", available, name);
|
|
|
totalAvailable += availableOnDisk[i];
|
|
|
+ } else {
|
|
|
+ note(history, "%s does not exist/is not a directory", name);
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- if (totalAvailable == 0){
|
|
|
- throw new DiskErrorException("No space available in any of the local directories.");
|
|
|
+ note(history, "Directory count is %d; total available capacity is %,d",
|
|
|
+ dirCount, totalAvailable);
|
|
|
+
|
|
|
+ if (size == SIZE_UNKNOWN) {
|
|
|
+ //do roulette selection: pick dir with probability
|
|
|
+ // proportional to available size
|
|
|
+ note(history, "Size not specified, so picking directories at random.");
|
|
|
+
|
|
|
+ if (totalAvailable == 0) {
|
|
|
+ // log error and history
|
|
|
+ String newErrorText = E_NO_SPACE_AVAILABLE + pathNames
|
|
|
+ + " on host" + getHostname();
|
|
|
+ LOG.error(newErrorText);
|
|
|
+ LOG.error(history.toString());
|
|
|
+ // then raise the exception
|
|
|
+ throw new DiskErrorException(newErrorText);
|
|
|
}
|
|
|
|
|
|
// Keep rolling the wheel till we get a valid path
|
|
@@ -439,14 +503,20 @@ public class LocalDirAllocator {
|
|
|
dir++;
|
|
|
}
|
|
|
ctx.dirNumLastAccessed.set(dir);
|
|
|
- returnPath = createPath(ctx.localDirs[dir], pathStr, checkWrite);
|
|
|
+ final Path localDir = ctx.localDirs[dir];
|
|
|
+ returnPath = createPath(localDir, pathStr, checkWrite);
|
|
|
if (returnPath == null) {
|
|
|
totalAvailable -= availableOnDisk[dir];
|
|
|
availableOnDisk[dir] = 0; // skip this disk
|
|
|
numDirsSearched++;
|
|
|
+ note(history, "No capacity in %s", localDir);
|
|
|
+ } else {
|
|
|
+ note(history, "Allocated file %s in %s", returnPath, localDir);
|
|
|
}
|
|
|
}
|
|
|
} else {
|
|
|
+ note(history, "Requested file size is %,d; searching for a suitable directory",
|
|
|
+ size);
|
|
|
// Start linear search with random increment if possible
|
|
|
int randomInc = 1;
|
|
|
if (numDirs > 2) {
|
|
@@ -459,17 +529,22 @@ public class LocalDirAllocator {
|
|
|
maxCapacity = capacity;
|
|
|
}
|
|
|
if (capacity > size) {
|
|
|
+ final Path localDir = ctx.localDirs[dirNum];
|
|
|
try {
|
|
|
- returnPath = createPath(ctx.localDirs[dirNum], pathStr,
|
|
|
- checkWrite);
|
|
|
+ returnPath = createPath(localDir, pathStr, checkWrite);
|
|
|
} catch (IOException e) {
|
|
|
errorText = e.getMessage();
|
|
|
diskException = e;
|
|
|
- LOG.debug("DiskException caught for dir {}", ctx.localDirs[dirNum], e);
|
|
|
+ note(history, "Exception while creating path %s: %s", localDir, errorText);
|
|
|
+ LOG.debug("DiskException caught for dir {}", localDir, e);
|
|
|
}
|
|
|
if (returnPath != null) {
|
|
|
+ // success
|
|
|
ctx.getAndIncrDirNumLastAccessed(numDirsSearched);
|
|
|
+ note(history, "Allocated file %s in %s", returnPath, localDir);
|
|
|
break;
|
|
|
+ } else {
|
|
|
+ note(history, "No capacity in %s", localDir);
|
|
|
}
|
|
|
}
|
|
|
dirNum++;
|
|
@@ -482,12 +557,18 @@ public class LocalDirAllocator {
|
|
|
}
|
|
|
|
|
|
//no path found
|
|
|
- String newErrorText = "Could not find any valid local directory for " +
|
|
|
- pathStr + " with requested size " + size +
|
|
|
- " as the max capacity in any directory is " + maxCapacity;
|
|
|
+ String hostname = getHostname();
|
|
|
+ String newErrorText = "Could not find any valid local directory for "
|
|
|
+ + pathStr + " with requested size " + size
|
|
|
+ + " on host " + hostname
|
|
|
+ + " as the max capacity in any directory"
|
|
|
+ + " (" + pathNames + " )"
|
|
|
+ + " is " + maxCapacity;
|
|
|
if (errorText != null) {
|
|
|
newErrorText = newErrorText + " due to " + errorText;
|
|
|
}
|
|
|
+ LOG.error(newErrorText);
|
|
|
+ LOG.error(history.toString());
|
|
|
throw new DiskErrorException(newErrorText, diskException);
|
|
|
}
|
|
|
|