|
@@ -128,8 +128,26 @@ public class LocalDirAllocator {
|
|
*/
|
|
*/
|
|
public Path getLocalPathForWrite(String pathStr, long size,
|
|
public Path getLocalPathForWrite(String pathStr, long size,
|
|
Configuration conf) throws IOException {
|
|
Configuration conf) throws IOException {
|
|
|
|
+ return getLocalPathForWrite(pathStr, size, conf, true);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /** Get a path from the local FS. Pass size as
|
|
|
|
+ * SIZE_UNKNOWN if not known apriori. We
|
|
|
|
+ * round-robin over the set of disks (via the configured dirs) and return
|
|
|
|
+ * the first complete path which has enough space
|
|
|
|
+ * @param pathStr the requested path (this will be created on the first
|
|
|
|
+ * available disk)
|
|
|
|
+ * @param size the size of the file that is going to be written
|
|
|
|
+ * @param conf the Configuration object
|
|
|
|
+ * @param checkWrite ensure that the path is writable
|
|
|
|
+ * @return the complete path to the file on a local disk
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ public Path getLocalPathForWrite(String pathStr, long size,
|
|
|
|
+ Configuration conf,
|
|
|
|
+ boolean checkWrite) throws IOException {
|
|
AllocatorPerContext context = obtainContext(contextCfgItemName);
|
|
AllocatorPerContext context = obtainContext(contextCfgItemName);
|
|
- return context.getLocalPathForWrite(pathStr, size, conf);
|
|
|
|
|
|
+ return context.getLocalPathForWrite(pathStr, size, conf, checkWrite);
|
|
}
|
|
}
|
|
|
|
|
|
/** Get a path from the local FS for reading. We search through all the
|
|
/** Get a path from the local FS for reading. We search through all the
|
|
@@ -146,6 +164,23 @@ public class LocalDirAllocator {
|
|
return context.getLocalPathToRead(pathStr, conf);
|
|
return context.getLocalPathToRead(pathStr, conf);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Get all of the paths that currently exist in the working directories.
|
|
|
|
+ * @param pathStr the path underneath the roots
|
|
|
|
+ * @param conf the configuration to look up the roots in
|
|
|
|
+ * @return all of the paths that exist under any of the roots
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ public Iterable<Path> getAllLocalPathsToRead(String pathStr,
|
|
|
|
+ Configuration conf
|
|
|
|
+ ) throws IOException {
|
|
|
|
+ AllocatorPerContext context;
|
|
|
|
+ synchronized (this) {
|
|
|
|
+ context = obtainContext(contextCfgItemName);
|
|
|
|
+ }
|
|
|
|
+ return context.getAllLocalPathsToRead(pathStr, conf);
|
|
|
|
+ }
|
|
|
|
+
|
|
/** Creates a temporary file in the local FS. Pass size as -1 if not known
|
|
/** Creates a temporary file in the local FS. Pass size as -1 if not known
|
|
* apriori. We round-robin over the set of disks (via the configured dirs)
|
|
* apriori. We round-robin over the set of disks (via the configured dirs)
|
|
* and select the first complete path which has enough space. A file is
|
|
* and select the first complete path which has enough space. A file is
|
|
@@ -214,7 +249,8 @@ public class LocalDirAllocator {
|
|
/** This method gets called everytime before any read/write to make sure
|
|
/** This method gets called everytime before any read/write to make sure
|
|
* that any change to localDirs is reflected immediately.
|
|
* that any change to localDirs is reflected immediately.
|
|
*/
|
|
*/
|
|
- private void confChanged(Configuration conf) throws IOException {
|
|
|
|
|
|
+ private synchronized void confChanged(Configuration conf
|
|
|
|
+ ) throws IOException {
|
|
String newLocalDirs = conf.get(contextCfgItemName);
|
|
String newLocalDirs = conf.get(contextCfgItemName);
|
|
if (!newLocalDirs.equals(savedLocalDirs)) {
|
|
if (!newLocalDirs.equals(savedLocalDirs)) {
|
|
localDirs = conf.getTrimmedStrings(contextCfgItemName);
|
|
localDirs = conf.getTrimmedStrings(contextCfgItemName);
|
|
@@ -252,18 +288,21 @@ public class LocalDirAllocator {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private Path createPath(String path) throws IOException {
|
|
|
|
|
|
+ private Path createPath(String path,
|
|
|
|
+ boolean checkWrite) throws IOException {
|
|
Path file = new Path(new Path(localDirs[dirNumLastAccessed]),
|
|
Path file = new Path(new Path(localDirs[dirNumLastAccessed]),
|
|
path);
|
|
path);
|
|
- //check whether we are able to create a directory here. If the disk
|
|
|
|
- //happens to be RDONLY we will fail
|
|
|
|
- try {
|
|
|
|
- DiskChecker.checkDir(new File(file.getParent().toUri().getPath()));
|
|
|
|
- return file;
|
|
|
|
- } catch (DiskErrorException d) {
|
|
|
|
- LOG.warn(StringUtils.stringifyException(d));
|
|
|
|
- return null;
|
|
|
|
|
|
+ if (checkWrite) {
|
|
|
|
+ //check whether we are able to create a directory here. If the disk
|
|
|
|
+ //happens to be RDONLY we will fail
|
|
|
|
+ try {
|
|
|
|
+ DiskChecker.checkDir(new File(file.getParent().toUri().getPath()));
|
|
|
|
+ } catch (DiskErrorException d) {
|
|
|
|
+ LOG.warn(StringUtils.stringifyException(d));
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ return file;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -274,17 +313,6 @@ public class LocalDirAllocator {
|
|
return dirNumLastAccessed;
|
|
return dirNumLastAccessed;
|
|
}
|
|
}
|
|
|
|
|
|
- /** Get a path from the local FS. This method should be used if the size of
|
|
|
|
- * the file is not known a priori.
|
|
|
|
- *
|
|
|
|
- * It will use roulette selection, picking directories
|
|
|
|
- * with probability proportional to their available space.
|
|
|
|
- */
|
|
|
|
- public synchronized Path getLocalPathForWrite(String path,
|
|
|
|
- Configuration conf) throws IOException {
|
|
|
|
- return getLocalPathForWrite(path, SIZE_UNKNOWN, conf);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/** Get a path from the local FS. If size is known, we go
|
|
/** Get a path from the local FS. If size is known, we go
|
|
* round-robin over the set of disks (via the configured dirs) and return
|
|
* round-robin over the set of disks (via the configured dirs) and return
|
|
* the first complete path which has enough space.
|
|
* the first complete path which has enough space.
|
|
@@ -292,8 +320,10 @@ public class LocalDirAllocator {
|
|
* If size is not known, use roulette selection -- pick directories
|
|
* If size is not known, use roulette selection -- pick directories
|
|
* with probability proportional to their available space.
|
|
* with probability proportional to their available space.
|
|
*/
|
|
*/
|
|
- public synchronized Path getLocalPathForWrite(String pathStr, long size,
|
|
|
|
- Configuration conf) throws IOException {
|
|
|
|
|
|
+ public synchronized
|
|
|
|
+ Path getLocalPathForWrite(String pathStr, long size,
|
|
|
|
+ Configuration conf, boolean checkWrite
|
|
|
|
+ ) throws IOException {
|
|
confChanged(conf);
|
|
confChanged(conf);
|
|
int numDirs = localDirs.length;
|
|
int numDirs = localDirs.length;
|
|
int numDirsSearched = 0;
|
|
int numDirsSearched = 0;
|
|
@@ -325,7 +355,7 @@ public class LocalDirAllocator {
|
|
dir++;
|
|
dir++;
|
|
}
|
|
}
|
|
dirNumLastAccessed = dir;
|
|
dirNumLastAccessed = dir;
|
|
- returnPath = createPath(pathStr);
|
|
|
|
|
|
+ returnPath = createPath(pathStr, checkWrite);
|
|
if (returnPath == null) {
|
|
if (returnPath == null) {
|
|
totalAvailable -= availableOnDisk[dir];
|
|
totalAvailable -= availableOnDisk[dir];
|
|
availableOnDisk[dir] = 0; // skip this disk
|
|
availableOnDisk[dir] = 0; // skip this disk
|
|
@@ -336,7 +366,7 @@ public class LocalDirAllocator {
|
|
while (numDirsSearched < numDirs && returnPath == null) {
|
|
while (numDirsSearched < numDirs && returnPath == null) {
|
|
long capacity = dirDF[dirNumLastAccessed].getAvailable();
|
|
long capacity = dirDF[dirNumLastAccessed].getAvailable();
|
|
if (capacity > size) {
|
|
if (capacity > size) {
|
|
- returnPath = createPath(pathStr);
|
|
|
|
|
|
+ returnPath = createPath(pathStr, checkWrite);
|
|
}
|
|
}
|
|
dirNumLastAccessed++;
|
|
dirNumLastAccessed++;
|
|
dirNumLastAccessed = dirNumLastAccessed % numDirs;
|
|
dirNumLastAccessed = dirNumLastAccessed % numDirs;
|
|
@@ -362,7 +392,7 @@ public class LocalDirAllocator {
|
|
Configuration conf) throws IOException {
|
|
Configuration conf) throws IOException {
|
|
|
|
|
|
// find an appropriate directory
|
|
// find an appropriate directory
|
|
- Path path = getLocalPathForWrite(pathStr, size, conf);
|
|
|
|
|
|
+ Path path = getLocalPathForWrite(pathStr, size, conf, true);
|
|
File dir = new File(path.getParent().toUri().getPath());
|
|
File dir = new File(path.getParent().toUri().getPath());
|
|
String prefix = path.getName();
|
|
String prefix = path.getName();
|
|
|
|
|
|
@@ -399,6 +429,76 @@ public class LocalDirAllocator {
|
|
" the configured local directories");
|
|
" the configured local directories");
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private static
|
|
|
|
+ class PathIterator implements Iterator<Path>, Iterable<Path> {
|
|
|
|
+ private final FileSystem fs;
|
|
|
|
+ private final String pathStr;
|
|
|
|
+ private int i = 0;
|
|
|
|
+ private final String[] rootDirs;
|
|
|
|
+ private Path next = null;
|
|
|
|
+
|
|
|
|
+ private PathIterator(FileSystem fs, String pathStr, String[] rootDirs
|
|
|
|
+ ) throws IOException {
|
|
|
|
+ this.fs = fs;
|
|
|
|
+ this.pathStr = pathStr;
|
|
|
|
+ this.rootDirs = rootDirs;
|
|
|
|
+ advance();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public boolean hasNext() {
|
|
|
|
+ return next != null;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void advance() throws IOException {
|
|
|
|
+ while (i < rootDirs.length) {
|
|
|
|
+ next = new Path(rootDirs[i++], pathStr);
|
|
|
|
+ if (fs.exists(next)) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ next = null;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public Path next() {
|
|
|
|
+ Path result = next;
|
|
|
|
+ try {
|
|
|
|
+ advance();
|
|
|
|
+ } catch (IOException ie) {
|
|
|
|
+ throw new RuntimeException("Can't check existance of " + next, ie);
|
|
|
|
+ }
|
|
|
|
+ return result;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void remove() {
|
|
|
|
+ throw new UnsupportedOperationException("read only iterator");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public Iterator<Path> iterator() {
|
|
|
|
+ return this;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Get all of the paths that currently exist in the working directories.
|
|
|
|
+ * @param pathStr the path underneath the roots
|
|
|
|
+ * @param conf the configuration to look up the roots in
|
|
|
|
+ * @return all of the paths that exist under any of the roots
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ synchronized Iterable<Path> getAllLocalPathsToRead(String pathStr,
|
|
|
|
+ Configuration conf
|
|
|
|
+ ) throws IOException {
|
|
|
|
+ confChanged(conf);
|
|
|
|
+ if (pathStr.startsWith("/")) {
|
|
|
|
+ pathStr = pathStr.substring(1);
|
|
|
|
+ }
|
|
|
|
+ return new PathIterator(localFS, pathStr, localDirs);
|
|
|
|
+ }
|
|
|
|
+
|
|
/** We search through all the configured dirs for the file's existence
|
|
/** We search through all the configured dirs for the file's existence
|
|
* and return true when we find one
|
|
* and return true when we find one
|
|
*/
|
|
*/
|