|
@@ -235,7 +235,7 @@ public class LocalDirAllocator {
|
|
|
private FileSystem localFS;
|
|
|
private DF[] dirDF;
|
|
|
private String contextCfgItemName;
|
|
|
- private String[] localDirs;
|
|
|
+ private Path[] localDirsPath;
|
|
|
private String savedLocalDirs = "";
|
|
|
|
|
|
public AllocatorPerContext(String contextCfgItemName) {
|
|
@@ -249,7 +249,7 @@ public class LocalDirAllocator {
|
|
|
) throws IOException {
|
|
|
String newLocalDirs = conf.get(contextCfgItemName);
|
|
|
if (!newLocalDirs.equals(savedLocalDirs)) {
|
|
|
- localDirs = conf.getStrings(contextCfgItemName);
|
|
|
+ String[] localDirs = conf.getStrings(contextCfgItemName);
|
|
|
localFS = FileSystem.getLocal(conf);
|
|
|
int numDirs = localDirs.length;
|
|
|
ArrayList<String> dirs = new ArrayList<String>(numDirs);
|
|
@@ -275,7 +275,10 @@ public class LocalDirAllocator {
|
|
|
ie.getMessage() + "\n" + StringUtils.stringifyException(ie));
|
|
|
} //ignore
|
|
|
}
|
|
|
- localDirs = dirs.toArray(new String[dirs.size()]);
|
|
|
+ localDirsPath = new Path[dirs.size()];
|
|
|
+ for(int i=0;i<localDirsPath.length;i++) {
|
|
|
+ localDirsPath[i] = new Path(dirs.get(i));
|
|
|
+ }
|
|
|
dirDF = dfList.toArray(new DF[dirs.size()]);
|
|
|
savedLocalDirs = newLocalDirs;
|
|
|
|
|
@@ -284,10 +287,10 @@ public class LocalDirAllocator {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private Path createPath(String path,
|
|
|
+ private Path createPath(Path path,
|
|
|
boolean checkWrite) throws IOException {
|
|
|
- Path file = new Path(new Path(localDirs[dirNumLastAccessed]),
|
|
|
- path);
|
|
|
+ Path file = new Path(localDirsPath[dirNumLastAccessed], path);
|
|
|
+
|
|
|
if (checkWrite) {
|
|
|
//check whether we are able to create a directory here. If the disk
|
|
|
//happens to be RDONLY we will fail
|
|
@@ -321,7 +324,7 @@ public class LocalDirAllocator {
|
|
|
Configuration conf, boolean checkWrite
|
|
|
) throws IOException {
|
|
|
confChanged(conf);
|
|
|
- int numDirs = localDirs.length;
|
|
|
+ int numDirs = localDirsPath.length;
|
|
|
int numDirsSearched = 0;
|
|
|
//remove the leading slash from the path (to make sure that the uri
|
|
|
//resolution results in a valid path on the dir being checked)
|
|
@@ -329,6 +332,7 @@ public class LocalDirAllocator {
|
|
|
pathStr = pathStr.substring(1);
|
|
|
}
|
|
|
Path returnPath = null;
|
|
|
+ Path path = new Path(pathStr);
|
|
|
|
|
|
if(size == SIZE_UNKNOWN) { //do roulette selection: pick dir with probability
|
|
|
//proportional to available size
|
|
@@ -351,7 +355,7 @@ public class LocalDirAllocator {
|
|
|
dir++;
|
|
|
}
|
|
|
dirNumLastAccessed = dir;
|
|
|
- returnPath = createPath(pathStr, checkWrite);
|
|
|
+ returnPath = createPath(path, checkWrite);
|
|
|
if (returnPath == null) {
|
|
|
totalAvailable -= availableOnDisk[dir];
|
|
|
availableOnDisk[dir] = 0; // skip this disk
|
|
@@ -362,7 +366,7 @@ public class LocalDirAllocator {
|
|
|
while (numDirsSearched < numDirs && returnPath == null) {
|
|
|
long capacity = dirDF[dirNumLastAccessed].getAvailable();
|
|
|
if (capacity > size) {
|
|
|
- returnPath = createPath(pathStr, checkWrite);
|
|
|
+ returnPath = createPath(path, checkWrite);
|
|
|
}
|
|
|
dirNumLastAccessed++;
|
|
|
dirNumLastAccessed = dirNumLastAccessed % numDirs;
|
|
@@ -405,15 +409,16 @@ public class LocalDirAllocator {
|
|
|
public synchronized Path getLocalPathToRead(String pathStr,
|
|
|
Configuration conf) throws IOException {
|
|
|
confChanged(conf);
|
|
|
- int numDirs = localDirs.length;
|
|
|
+ int numDirs = localDirsPath.length;
|
|
|
int numDirsSearched = 0;
|
|
|
//remove the leading slash from the path (to make sure that the uri
|
|
|
//resolution results in a valid path on the dir being checked)
|
|
|
if (pathStr.startsWith("/")) {
|
|
|
pathStr = pathStr.substring(1);
|
|
|
}
|
|
|
+ Path childPath = new Path(pathStr);
|
|
|
while (numDirsSearched < numDirs) {
|
|
|
- Path file = new Path(localDirs[numDirsSearched], pathStr);
|
|
|
+ Path file = new Path(localDirsPath[numDirsSearched], childPath);
|
|
|
if (localFS.exists(file)) {
|
|
|
return file;
|
|
|
}
|
|
@@ -430,10 +435,10 @@ public class LocalDirAllocator {
|
|
|
private final FileSystem fs;
|
|
|
private final String pathStr;
|
|
|
private int i = 0;
|
|
|
- private final String[] rootDirs;
|
|
|
+ private final Path[] rootDirs;
|
|
|
private Path next = null;
|
|
|
|
|
|
- private PathIterator(FileSystem fs, String pathStr, String[] rootDirs
|
|
|
+ private PathIterator(FileSystem fs, String pathStr, Path[] rootDirs
|
|
|
) throws IOException {
|
|
|
this.fs = fs;
|
|
|
this.pathStr = pathStr;
|
|
@@ -492,7 +497,7 @@ public class LocalDirAllocator {
|
|
|
if (pathStr.startsWith("/")) {
|
|
|
pathStr = pathStr.substring(1);
|
|
|
}
|
|
|
- return new PathIterator(localFS, pathStr, localDirs);
|
|
|
+ return new PathIterator(localFS, pathStr, localDirsPath);
|
|
|
}
|
|
|
|
|
|
/** We search through all the configured dirs for the file's existence
|
|
@@ -500,15 +505,16 @@ public class LocalDirAllocator {
|
|
|
*/
|
|
|
public synchronized boolean ifExists(String pathStr,Configuration conf) {
|
|
|
try {
|
|
|
- int numDirs = localDirs.length;
|
|
|
+ int numDirs = localDirsPath.length;
|
|
|
int numDirsSearched = 0;
|
|
|
//remove the leading slash from the path (to make sure that the uri
|
|
|
//resolution results in a valid path on the dir being checked)
|
|
|
if (pathStr.startsWith("/")) {
|
|
|
pathStr = pathStr.substring(1);
|
|
|
}
|
|
|
+ Path childPath = new Path(pathStr);
|
|
|
while (numDirsSearched < numDirs) {
|
|
|
- Path file = new Path(localDirs[numDirsSearched], pathStr);
|
|
|
+ Path file = new Path(localDirsPath[numDirsSearched], childPath);
|
|
|
if (localFS.exists(file)) {
|
|
|
return true;
|
|
|
}
|