|
@@ -25,10 +25,12 @@ import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_MOUNT_LINKS_AS
|
|
|
import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_MOUNT_LINKS_AS_SYMLINKS_DEFAULT;
|
|
|
import static org.apache.hadoop.fs.viewfs.Constants.PERMISSION_555;
|
|
|
|
|
|
+import java.util.function.Function;
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
import java.net.URI;
|
|
|
import java.net.URISyntaxException;
|
|
|
+import java.security.PrivilegedExceptionAction;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
@@ -279,7 +281,7 @@ public class ViewFileSystem extends FileSystem {
|
|
|
enableInnerCache = config.getBoolean(CONFIG_VIEWFS_ENABLE_INNER_CACHE,
|
|
|
CONFIG_VIEWFS_ENABLE_INNER_CACHE_DEFAULT);
|
|
|
FsGetter fsGetter = fsGetter();
|
|
|
- final InnerCache innerCache = new InnerCache(fsGetter);
|
|
|
+ cache = new InnerCache(fsGetter);
|
|
|
// Now build client side view (i.e. client side mount table) from config.
|
|
|
final String authority = theUri.getAuthority();
|
|
|
String tableName = authority;
|
|
@@ -295,15 +297,32 @@ public class ViewFileSystem extends FileSystem {
|
|
|
fsState = new InodeTree<FileSystem>(conf, tableName, myUri,
|
|
|
initingUriAsFallbackOnNoMounts) {
|
|
|
@Override
|
|
|
- protected FileSystem getTargetFileSystem(final URI uri)
|
|
|
- throws URISyntaxException, IOException {
|
|
|
- FileSystem fs;
|
|
|
- if (enableInnerCache) {
|
|
|
- fs = innerCache.get(uri, config);
|
|
|
- } else {
|
|
|
- fs = fsGetter.get(uri, config);
|
|
|
+ protected Function<URI, FileSystem> initAndGetTargetFs() {
|
|
|
+ return new Function<URI, FileSystem>() {
|
|
|
+ @Override
|
|
|
+ public FileSystem apply(final URI uri) {
|
|
|
+ FileSystem fs;
|
|
|
+ try {
|
|
|
+ fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
|
|
|
+ @Override
|
|
|
+ public FileSystem run() throws IOException {
|
|
|
+ if (enableInnerCache) {
|
|
|
+ synchronized (cache) {
|
|
|
+ return cache.get(uri, config);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ return fsGetter().get(uri, config);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ return new ChRootedFileSystem(fs, uri);
|
|
|
+ } catch (IOException | InterruptedException ex) {
|
|
|
+ LOG.error("Could not initialize the underlying FileSystem "
|
|
|
+ + "object. Exception: " + ex.toString());
|
|
|
+ }
|
|
|
+ return null;
|
|
|
}
|
|
|
- return new ChRootedFileSystem(fs, uri);
|
|
|
+ };
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -327,13 +346,6 @@ public class ViewFileSystem extends FileSystem {
|
|
|
} catch (URISyntaxException e) {
|
|
|
throw new IOException("URISyntax exception: " + theUri);
|
|
|
}
|
|
|
-
|
|
|
- if (enableInnerCache) {
|
|
|
- // All fs instances are created and cached on startup. The cache is
|
|
|
- // readonly after the initialize() so the concurrent access of the cache
|
|
|
- // is safe.
|
|
|
- cache = innerCache.unmodifiableCache();
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -365,7 +377,7 @@ public class ViewFileSystem extends FileSystem {
|
|
|
@Override
|
|
|
public Path resolvePath(final Path f) throws IOException {
|
|
|
final InodeTree.ResolveResult<FileSystem> res;
|
|
|
- res = fsState.resolve(getUriPath(f), true);
|
|
|
+ res = fsState.resolve(getUriPath(f), true);
|
|
|
if (res.isInternalDir()) {
|
|
|
return f;
|
|
|
}
|
|
@@ -851,9 +863,34 @@ public class ViewFileSystem extends FileSystem {
|
|
|
public void setVerifyChecksum(final boolean verifyChecksum) {
|
|
|
List<InodeTree.MountPoint<FileSystem>> mountPoints =
|
|
|
fsState.getMountPoints();
|
|
|
+ Map<String, FileSystem> fsMap = initializeMountedFileSystems(mountPoints);
|
|
|
+ for (InodeTree.MountPoint<FileSystem> mount : mountPoints) {
|
|
|
+ fsMap.get(mount.src).setVerifyChecksum(verifyChecksum);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Initialize the target filesystem for all mount points.
|
|
|
+ * @param mountPoints The mount points
|
|
|
+ * @return Mapping of mount point and the initialized target filesystems
|
|
|
+ * @throws RuntimeException when the target file system cannot be initialized
|
|
|
+ */
|
|
|
+ private Map<String, FileSystem> initializeMountedFileSystems(
|
|
|
+ List<InodeTree.MountPoint<FileSystem>> mountPoints) {
|
|
|
+ FileSystem fs = null;
|
|
|
+ Map<String, FileSystem> fsMap = new HashMap<>(mountPoints.size());
|
|
|
for (InodeTree.MountPoint<FileSystem> mount : mountPoints) {
|
|
|
- mount.target.targetFileSystem.setVerifyChecksum(verifyChecksum);
|
|
|
+ try {
|
|
|
+ fs = mount.target.getTargetFileSystem();
|
|
|
+ fsMap.put(mount.src, fs);
|
|
|
+ } catch (IOException ex) {
|
|
|
+ String errMsg = "Not able to initialize FileSystem for mount path " +
|
|
|
+ mount.src + " with exception " + ex;
|
|
|
+ LOG.error(errMsg);
|
|
|
+ throw new RuntimeException(errMsg, ex);
|
|
|
+ }
|
|
|
}
|
|
|
+ return fsMap;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -879,6 +916,9 @@ public class ViewFileSystem extends FileSystem {
|
|
|
return res.targetFileSystem.getDefaultBlockSize(res.remainingPath);
|
|
|
} catch (FileNotFoundException e) {
|
|
|
throw new NotInMountpointException(f, "getDefaultBlockSize");
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException("Not able to initialize fs in "
|
|
|
+ + " getDefaultBlockSize for path " + f + " with exception", e);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -890,6 +930,9 @@ public class ViewFileSystem extends FileSystem {
|
|
|
return res.targetFileSystem.getDefaultReplication(res.remainingPath);
|
|
|
} catch (FileNotFoundException e) {
|
|
|
throw new NotInMountpointException(f, "getDefaultReplication");
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException("Not able to initialize fs in "
|
|
|
+ + " getDefaultReplication for path " + f + " with exception", e);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -922,8 +965,9 @@ public class ViewFileSystem extends FileSystem {
|
|
|
public void setWriteChecksum(final boolean writeChecksum) {
|
|
|
List<InodeTree.MountPoint<FileSystem>> mountPoints =
|
|
|
fsState.getMountPoints();
|
|
|
+ Map<String, FileSystem> fsMap = initializeMountedFileSystems(mountPoints);
|
|
|
for (InodeTree.MountPoint<FileSystem> mount : mountPoints) {
|
|
|
- mount.target.targetFileSystem.setWriteChecksum(writeChecksum);
|
|
|
+ fsMap.get(mount.src).setWriteChecksum(writeChecksum);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -931,9 +975,10 @@ public class ViewFileSystem extends FileSystem {
|
|
|
public FileSystem[] getChildFileSystems() {
|
|
|
List<InodeTree.MountPoint<FileSystem>> mountPoints =
|
|
|
fsState.getMountPoints();
|
|
|
+ Map<String, FileSystem> fsMap = initializeMountedFileSystems(mountPoints);
|
|
|
Set<FileSystem> children = new HashSet<FileSystem>();
|
|
|
for (InodeTree.MountPoint<FileSystem> mountPoint : mountPoints) {
|
|
|
- FileSystem targetFs = mountPoint.target.targetFileSystem;
|
|
|
+ FileSystem targetFs = fsMap.get(mountPoint.src);
|
|
|
children.addAll(Arrays.asList(targetFs.getChildFileSystems()));
|
|
|
}
|
|
|
return children.toArray(new FileSystem[]{});
|