|
@@ -128,8 +128,26 @@ public class LocalDirAllocator {
|
|
*/
|
|
*/
|
|
public Path getLocalPathForWrite(String pathStr, long size,
|
|
public Path getLocalPathForWrite(String pathStr, long size,
|
|
Configuration conf) throws IOException {
|
|
Configuration conf) throws IOException {
|
|
|
|
+ return getLocalPathForWrite(pathStr, size, conf, true);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /** Get a path from the local FS. Pass size as
|
|
|
|
+ * SIZE_UNKNOWN if not known apriori. We
|
|
|
|
+ * round-robin over the set of disks (via the configured dirs) and return
|
|
|
|
+ * the first complete path which has enough space
|
|
|
|
+ * @param pathStr the requested path (this will be created on the first
|
|
|
|
+ * available disk)
|
|
|
|
+ * @param size the size of the file that is going to be written
|
|
|
|
+ * @param conf the Configuration object
|
|
|
|
+ * @param checkWrite ensure that the path is writable
|
|
|
|
+ * @return the complete path to the file on a local disk
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ public Path getLocalPathForWrite(String pathStr, long size,
|
|
|
|
+ Configuration conf,
|
|
|
|
+ boolean checkWrite) throws IOException {
|
|
AllocatorPerContext context = obtainContext(contextCfgItemName);
|
|
AllocatorPerContext context = obtainContext(contextCfgItemName);
|
|
- return context.getLocalPathForWrite(pathStr, size, conf);
|
|
|
|
|
|
+ return context.getLocalPathForWrite(pathStr, size, conf, checkWrite);
|
|
}
|
|
}
|
|
|
|
|
|
/** Get a path from the local FS for reading. We search through all the
|
|
/** Get a path from the local FS for reading. We search through all the
|
|
@@ -214,7 +232,8 @@ public class LocalDirAllocator {
|
|
/** This method gets called everytime before any read/write to make sure
|
|
/** This method gets called everytime before any read/write to make sure
|
|
* that any change to localDirs is reflected immediately.
|
|
* that any change to localDirs is reflected immediately.
|
|
*/
|
|
*/
|
|
- private void confChanged(Configuration conf) throws IOException {
|
|
|
|
|
|
+ private synchronized void confChanged(Configuration conf
|
|
|
|
+ ) throws IOException {
|
|
String newLocalDirs = conf.get(contextCfgItemName);
|
|
String newLocalDirs = conf.get(contextCfgItemName);
|
|
if (!newLocalDirs.equals(savedLocalDirs)) {
|
|
if (!newLocalDirs.equals(savedLocalDirs)) {
|
|
localDirs = conf.getTrimmedStrings(contextCfgItemName);
|
|
localDirs = conf.getTrimmedStrings(contextCfgItemName);
|
|
@@ -252,18 +271,21 @@ public class LocalDirAllocator {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private Path createPath(String path) throws IOException {
|
|
|
|
|
|
+ private Path createPath(String path,
|
|
|
|
+ boolean checkWrite) throws IOException {
|
|
Path file = new Path(new Path(localDirs[dirNumLastAccessed]),
|
|
Path file = new Path(new Path(localDirs[dirNumLastAccessed]),
|
|
path);
|
|
path);
|
|
- //check whether we are able to create a directory here. If the disk
|
|
|
|
- //happens to be RDONLY we will fail
|
|
|
|
- try {
|
|
|
|
- DiskChecker.checkDir(new File(file.getParent().toUri().getPath()));
|
|
|
|
- return file;
|
|
|
|
- } catch (DiskErrorException d) {
|
|
|
|
- LOG.warn(StringUtils.stringifyException(d));
|
|
|
|
- return null;
|
|
|
|
|
|
+ if (checkWrite) {
|
|
|
|
+ //check whether we are able to create a directory here. If the disk
|
|
|
|
+ //happens to be RDONLY we will fail
|
|
|
|
+ try {
|
|
|
|
+ DiskChecker.checkDir(new File(file.getParent().toUri().getPath()));
|
|
|
|
+ } catch (DiskErrorException d) {
|
|
|
|
+ LOG.warn(StringUtils.stringifyException(d));
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ return file;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -274,17 +296,6 @@ public class LocalDirAllocator {
|
|
return dirNumLastAccessed;
|
|
return dirNumLastAccessed;
|
|
}
|
|
}
|
|
|
|
|
|
- /** Get a path from the local FS. This method should be used if the size of
|
|
|
|
- * the file is not known a priori.
|
|
|
|
- *
|
|
|
|
- * It will use roulette selection, picking directories
|
|
|
|
- * with probability proportional to their available space.
|
|
|
|
- */
|
|
|
|
- public synchronized Path getLocalPathForWrite(String path,
|
|
|
|
- Configuration conf) throws IOException {
|
|
|
|
- return getLocalPathForWrite(path, SIZE_UNKNOWN, conf);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/** Get a path from the local FS. If size is known, we go
|
|
/** Get a path from the local FS. If size is known, we go
|
|
* round-robin over the set of disks (via the configured dirs) and return
|
|
* round-robin over the set of disks (via the configured dirs) and return
|
|
* the first complete path which has enough space.
|
|
* the first complete path which has enough space.
|
|
@@ -292,8 +303,10 @@ public class LocalDirAllocator {
|
|
* If size is not known, use roulette selection -- pick directories
|
|
* If size is not known, use roulette selection -- pick directories
|
|
* with probability proportional to their available space.
|
|
* with probability proportional to their available space.
|
|
*/
|
|
*/
|
|
- public synchronized Path getLocalPathForWrite(String pathStr, long size,
|
|
|
|
- Configuration conf) throws IOException {
|
|
|
|
|
|
+ public synchronized
|
|
|
|
+ Path getLocalPathForWrite(String pathStr, long size,
|
|
|
|
+ Configuration conf, boolean checkWrite
|
|
|
|
+ ) throws IOException {
|
|
confChanged(conf);
|
|
confChanged(conf);
|
|
int numDirs = localDirs.length;
|
|
int numDirs = localDirs.length;
|
|
int numDirsSearched = 0;
|
|
int numDirsSearched = 0;
|
|
@@ -325,7 +338,7 @@ public class LocalDirAllocator {
|
|
dir++;
|
|
dir++;
|
|
}
|
|
}
|
|
dirNumLastAccessed = dir;
|
|
dirNumLastAccessed = dir;
|
|
- returnPath = createPath(pathStr);
|
|
|
|
|
|
+ returnPath = createPath(pathStr, checkWrite);
|
|
if (returnPath == null) {
|
|
if (returnPath == null) {
|
|
totalAvailable -= availableOnDisk[dir];
|
|
totalAvailable -= availableOnDisk[dir];
|
|
availableOnDisk[dir] = 0; // skip this disk
|
|
availableOnDisk[dir] = 0; // skip this disk
|
|
@@ -336,7 +349,7 @@ public class LocalDirAllocator {
|
|
while (numDirsSearched < numDirs && returnPath == null) {
|
|
while (numDirsSearched < numDirs && returnPath == null) {
|
|
long capacity = dirDF[dirNumLastAccessed].getAvailable();
|
|
long capacity = dirDF[dirNumLastAccessed].getAvailable();
|
|
if (capacity > size) {
|
|
if (capacity > size) {
|
|
- returnPath = createPath(pathStr);
|
|
|
|
|
|
+ returnPath = createPath(pathStr, checkWrite);
|
|
}
|
|
}
|
|
dirNumLastAccessed++;
|
|
dirNumLastAccessed++;
|
|
dirNumLastAccessed = dirNumLastAccessed % numDirs;
|
|
dirNumLastAccessed = dirNumLastAccessed % numDirs;
|
|
@@ -362,7 +375,7 @@ public class LocalDirAllocator {
|
|
Configuration conf) throws IOException {
|
|
Configuration conf) throws IOException {
|
|
|
|
|
|
// find an appropriate directory
|
|
// find an appropriate directory
|
|
- Path path = getLocalPathForWrite(pathStr, size, conf);
|
|
|
|
|
|
+ Path path = getLocalPathForWrite(pathStr, size, conf, true);
|
|
File dir = new File(path.getParent().toUri().getPath());
|
|
File dir = new File(path.getParent().toUri().getPath());
|
|
String prefix = path.getName();
|
|
String prefix = path.getName();
|
|
|
|
|
|
@@ -399,6 +412,7 @@ public class LocalDirAllocator {
|
|
" the configured local directories");
|
|
" the configured local directories");
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+
|
|
/** We search through all the configured dirs for the file's existence
|
|
/** We search through all the configured dirs for the file's existence
|
|
* and return true when we find one
|
|
* and return true when we find one
|
|
*/
|
|
*/
|