|
@@ -766,66 +766,145 @@ public class DistributedFileSystem extends FileSystem {
|
|
protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path p,
|
|
protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path p,
|
|
final PathFilter filter)
|
|
final PathFilter filter)
|
|
throws IOException {
|
|
throws IOException {
|
|
- final Path absF = fixRelativePart(p);
|
|
|
|
- return new RemoteIterator<LocatedFileStatus>() {
|
|
|
|
- private DirectoryListing thisListing;
|
|
|
|
- private int i;
|
|
|
|
- private String src;
|
|
|
|
- private LocatedFileStatus curStat = null;
|
|
|
|
-
|
|
|
|
- { // initializer
|
|
|
|
- // Fully resolve symlinks in path first to avoid additional resolution
|
|
|
|
- // round-trips as we fetch more batches of listings
|
|
|
|
- src = getPathName(resolvePath(absF));
|
|
|
|
- // fetch the first batch of entries in the directory
|
|
|
|
- thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME, true);
|
|
|
|
- statistics.incrementReadOps(1);
|
|
|
|
- if (thisListing == null) { // the directory does not exist
|
|
|
|
- throw new FileNotFoundException("File " + p + " does not exist.");
|
|
|
|
|
|
+ Path absF = fixRelativePart(p);
|
|
|
|
+ return new FileSystemLinkResolver<RemoteIterator<LocatedFileStatus>>() {
|
|
|
|
+ @Override
|
|
|
|
+ public RemoteIterator<LocatedFileStatus> doCall(final Path p)
|
|
|
|
+ throws IOException, UnresolvedLinkException {
|
|
|
|
+ return new DirListingIterator<LocatedFileStatus>(p, filter, true);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public RemoteIterator<LocatedFileStatus> next(final FileSystem fs, final Path p)
|
|
|
|
+ throws IOException {
|
|
|
|
+ if (fs instanceof DistributedFileSystem) {
|
|
|
|
+ return ((DistributedFileSystem)fs).listLocatedStatus(p, filter);
|
|
}
|
|
}
|
|
|
|
+ // symlink resolution for this methos does not work cross file systems
|
|
|
|
+ // because it is a protected method.
|
|
|
|
+ throw new IOException("Link resolution does not work with multiple " +
|
|
|
|
+ "file systems for listLocatedStatus(): " + p);
|
|
|
|
+ }
|
|
|
|
+ }.resolve(this, absF);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Returns a remote iterator so that followup calls are made on demand
|
|
|
|
+ * while consuming the entries. This reduces memory consumption during
|
|
|
|
+ * listing of a large directory.
|
|
|
|
+ *
|
|
|
|
+ * @param p target path
|
|
|
|
+ * @return remote iterator
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public RemoteIterator<FileStatus> listStatusIterator(final Path p)
|
|
|
|
+ throws IOException {
|
|
|
|
+ Path absF = fixRelativePart(p);
|
|
|
|
+ return new FileSystemLinkResolver<RemoteIterator<FileStatus>>() {
|
|
|
|
+ @Override
|
|
|
|
+ public RemoteIterator<FileStatus> doCall(final Path p)
|
|
|
|
+ throws IOException, UnresolvedLinkException {
|
|
|
|
+ return new DirListingIterator<FileStatus>(p, false);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- public boolean hasNext() throws IOException {
|
|
|
|
- while (curStat == null && hasNextNoFilter()) {
|
|
|
|
- LocatedFileStatus next =
|
|
|
|
- ((HdfsLocatedFileStatus)thisListing.getPartialListing()[i++])
|
|
|
|
- .makeQualifiedLocated(getUri(), absF);
|
|
|
|
- if (filter.accept(next.getPath())) {
|
|
|
|
- curStat = next;
|
|
|
|
- }
|
|
|
|
|
|
+ public RemoteIterator<FileStatus> next(final FileSystem fs, final Path p)
|
|
|
|
+ throws IOException {
|
|
|
|
+ return ((DistributedFileSystem)fs).listStatusIterator(p);
|
|
|
|
+ }
|
|
|
|
+ }.resolve(this, absF);
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * This class defines an iterator that returns
|
|
|
|
+ * the file status of each file/subdirectory of a directory
|
|
|
|
+ *
|
|
|
|
+ * if needLocation, status contains block location if it is a file
|
|
|
|
+ * throws a RuntimeException with the error as its cause.
|
|
|
|
+ *
|
|
|
|
+ * @param <T> the type of the file status
|
|
|
|
+ */
|
|
|
|
+ private class DirListingIterator<T extends FileStatus>
|
|
|
|
+ implements RemoteIterator<T> {
|
|
|
|
+ private DirectoryListing thisListing;
|
|
|
|
+ private int i;
|
|
|
|
+ private Path p;
|
|
|
|
+ private String src;
|
|
|
|
+ private T curStat = null;
|
|
|
|
+ private PathFilter filter;
|
|
|
|
+ private boolean needLocation;
|
|
|
|
+
|
|
|
|
+ private DirListingIterator(Path p, PathFilter filter,
|
|
|
|
+ boolean needLocation) throws IOException {
|
|
|
|
+ this.p = p;
|
|
|
|
+ this.src = getPathName(p);
|
|
|
|
+ this.filter = filter;
|
|
|
|
+ this.needLocation = needLocation;
|
|
|
|
+ // fetch the first batch of entries in the directory
|
|
|
|
+ thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME,
|
|
|
|
+ needLocation);
|
|
|
|
+ statistics.incrementReadOps(1);
|
|
|
|
+ if (thisListing == null) { // the directory does not exist
|
|
|
|
+ throw new FileNotFoundException("File " + p + " does not exist.");
|
|
|
|
+ }
|
|
|
|
+ i = 0;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private DirListingIterator(Path p, boolean needLocation)
|
|
|
|
+ throws IOException {
|
|
|
|
+ this(p, null, needLocation);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
|
+ public boolean hasNext() throws IOException {
|
|
|
|
+ while (curStat == null && hasNextNoFilter()) {
|
|
|
|
+ T next;
|
|
|
|
+ HdfsFileStatus fileStat = thisListing.getPartialListing()[i++];
|
|
|
|
+ if (needLocation) {
|
|
|
|
+ next = (T)((HdfsLocatedFileStatus)fileStat)
|
|
|
|
+ .makeQualifiedLocated(getUri(), p);
|
|
|
|
+ } else {
|
|
|
|
+ next = (T)fileStat.makeQualified(getUri(), p);
|
|
|
|
+ }
|
|
|
|
+ // apply filter if not null
|
|
|
|
+ if (filter == null || filter.accept(next.getPath())) {
|
|
|
|
+ curStat = next;
|
|
}
|
|
}
|
|
- return curStat != null;
|
|
|
|
}
|
|
}
|
|
|
|
+ return curStat != null;
|
|
|
|
+ }
|
|
|
|
|
|
- /** Check if there is a next item before applying the given filter */
|
|
|
|
- private boolean hasNextNoFilter() throws IOException {
|
|
|
|
|
|
+ /** Check if there is a next item before applying the given filter */
|
|
|
|
+ private boolean hasNextNoFilter() throws IOException {
|
|
|
|
+ if (thisListing == null) {
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ if (i >= thisListing.getPartialListing().length
|
|
|
|
+ && thisListing.hasMore()) {
|
|
|
|
+ // current listing is exhausted & fetch a new listing
|
|
|
|
+ thisListing = dfs.listPaths(src, thisListing.getLastName(),
|
|
|
|
+ needLocation);
|
|
|
|
+ statistics.incrementReadOps(1);
|
|
if (thisListing == null) {
|
|
if (thisListing == null) {
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
- if (i>=thisListing.getPartialListing().length
|
|
|
|
- && thisListing.hasMore()) {
|
|
|
|
- // current listing is exhausted & fetch a new listing
|
|
|
|
- thisListing = dfs.listPaths(src, thisListing.getLastName(), true);
|
|
|
|
- statistics.incrementReadOps(1);
|
|
|
|
- if (thisListing == null) {
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
- i = 0;
|
|
|
|
- }
|
|
|
|
- return (i<thisListing.getPartialListing().length);
|
|
|
|
|
|
+ i = 0;
|
|
}
|
|
}
|
|
|
|
+ return (i < thisListing.getPartialListing().length);
|
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
|
- public LocatedFileStatus next() throws IOException {
|
|
|
|
- if (hasNext()) {
|
|
|
|
- LocatedFileStatus tmp = curStat;
|
|
|
|
- curStat = null;
|
|
|
|
- return tmp;
|
|
|
|
- }
|
|
|
|
- throw new java.util.NoSuchElementException("No more entry in " + p);
|
|
|
|
- }
|
|
|
|
- };
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public T next() throws IOException {
|
|
|
|
+ if (hasNext()) {
|
|
|
|
+ T tmp = curStat;
|
|
|
|
+ curStat = null;
|
|
|
|
+ return tmp;
|
|
|
|
+ }
|
|
|
|
+ throw new java.util.NoSuchElementException("No more entry in " + p);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|