Browse Source

HADOOP-11812. Implement listLocatedStatus for ViewFileSystem to speed up split calculation (gera)

(cherry picked from commit 6d2cf9fbbd02482315a091ab07af26e40cc5134f)
(cherry picked from commit 1544c63602089b690e850e0e30af4589513a2371)
Gera Shegalov 10 năm trước cách đây
mục cha
commit
736ebeca33

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -51,6 +51,9 @@ Release 2.6.1 - UNRELEASED
     HADOOP-11710. Make CryptoOutputStream behave like DFSOutputStream wrt
     synchronization. (Sean Busbey via yliu)
 
+    HADOOP-11812. Implement listLocatedStatus for ViewFileSystem to speed up
+    split calculation (gera)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

+ 6 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocatedFileStatus.java

@@ -32,6 +32,11 @@ import org.apache.hadoop.fs.permission.FsPermission;
 public class LocatedFileStatus extends FileStatus {
   private BlockLocation[] locations;
 
+
+  public LocatedFileStatus() {
+    super();
+  }
+
   /**
    * Constructor 
    * @param stat a file status
@@ -43,7 +48,7 @@ public class LocatedFileStatus extends FileStatus {
         stat.getBlockSize(), stat.getModificationTime(),
         stat.getAccessTime(), stat.getPermission(), stat.getOwner(),
         stat.getGroup(), null, stat.getPath(), locations);
-    if (isSymlink()) {
+    if (stat.isSymlink()) {
       setSymlink(stat.getSymlink());
     }
   }

+ 9 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFileSystem.java

@@ -37,8 +37,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FilterFileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -240,7 +242,13 @@ class ChRootedFileSystem extends FilterFileSystem {
       throws IOException {
     return super.listStatus(fullPath(f));
   }
-  
+
+  @Override
+  public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
+      throws IOException {
+    return super.listLocatedStatus(fullPath(f));
+  }
+
   @Override
   public boolean mkdirs(final Path f, final FsPermission permission)
       throws IOException {

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java

@@ -363,7 +363,7 @@ abstract class InodeTree<T> {
       kind = k;
       targetFileSystem = targetFs;
       resolvedPath = resolveP;
-      remainingPath = remainingP;  
+      remainingPath = remainingP;
     }
     
     // isInternalDir of path resolution completed within the mount table 

+ 73 - 21
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java

@@ -45,7 +45,10 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsConstants;
 import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.permission.AclEntry;
@@ -115,8 +118,7 @@ public class ViewFileSystem extends FileSystem {
    */
   private String getUriPath(final Path p) {
     checkPath(p);
-    String s = makeAbsolute(p).toUri().getPath();
-    return s;
+    return makeAbsolute(p).toUri().getPath();
   }
   
   private Path makeAbsolute(final Path f) {
@@ -282,7 +284,7 @@ public class ViewFileSystem extends FileSystem {
     }
     assert(res.remainingPath != null);
     return res.targetFileSystem.createNonRecursive(res.remainingPath, permission,
-         flags, bufferSize, replication, blockSize, progress);
+        flags, bufferSize, replication, blockSize, progress);
   }
   
   @Override
@@ -297,7 +299,7 @@ public class ViewFileSystem extends FileSystem {
     }
     assert(res.remainingPath != null);
     return res.targetFileSystem.create(res.remainingPath, permission,
-         overwrite, bufferSize, replication, blockSize, progress);
+        overwrite, bufferSize, replication, blockSize, progress);
   }
 
   
@@ -328,7 +330,7 @@ public class ViewFileSystem extends FileSystem {
     final InodeTree.ResolveResult<FileSystem> res = 
       fsState.resolve(getUriPath(fs.getPath()), true);
     return res.targetFileSystem.getFileBlockLocations(
-          new ViewFsFileStatus(fs, res.remainingPath), start, len);
+        new ViewFsFileStatus(fs, res.remainingPath), start, len);
   }
 
   @Override
@@ -340,24 +342,42 @@ public class ViewFileSystem extends FileSystem {
     return res.targetFileSystem.getFileChecksum(res.remainingPath);
   }
 
-  @Override
-  public FileStatus getFileStatus(final Path f) throws AccessControlException,
-      FileNotFoundException, IOException {
-    InodeTree.ResolveResult<FileSystem> res = 
-      fsState.resolve(getUriPath(f), true);
-    
-    // FileStatus#getPath is a fully qualified path relative to the root of 
+
+  private static FileStatus fixFileStatus(FileStatus orig,
+      Path qualified) throws IOException {
+    // FileStatus#getPath is a fully qualified path relative to the root of
     // target file system.
     // We need to change it to viewfs URI - relative to root of mount table.
-    
+
     // The implementors of RawLocalFileSystem were trying to be very smart.
-    // They implement FileStatus#getOwener lazily -- the object
+    // They implement FileStatus#getOwner lazily -- the object
     // returned is really a RawLocalFileSystem that expect the
     // FileStatus#getPath to be unchanged so that it can get owner when needed.
-    // Hence we need to interpose a new ViewFileSystemFileStatus that 
+    // Hence we need to interpose a new ViewFileSystemFileStatus that
     // works around.
+    if ("file".equals(orig.getPath().toUri().getScheme())) {
+      orig = wrapLocalFileStatus(orig, qualified);
+    }
+
+    orig.setPath(qualified);
+    return orig;
+  }
+
+  private static FileStatus wrapLocalFileStatus(FileStatus orig,
+      Path qualified) {
+    return orig instanceof LocatedFileStatus
+        ? new ViewFsLocatedFileStatus((LocatedFileStatus)orig, qualified)
+        : new ViewFsFileStatus(orig, qualified);
+  }
+
+
+  @Override
+  public FileStatus getFileStatus(final Path f) throws AccessControlException,
+      FileNotFoundException, IOException {
+    InodeTree.ResolveResult<FileSystem> res =
+      fsState.resolve(getUriPath(f), true);
     FileStatus status =  res.targetFileSystem.getFileStatus(res.remainingPath);
-    return new ViewFsFileStatus(status, this.makeQualified(f));
+    return fixFileStatus(status, this.makeQualified(f));
   }
   
   @Override
@@ -378,18 +398,50 @@ public class ViewFileSystem extends FileSystem {
     if (!res.isInternalDir()) {
       // We need to change the name in the FileStatus as described in
       // {@link #getFileStatus }
-      ChRootedFileSystem targetFs;
-      targetFs = (ChRootedFileSystem) res.targetFileSystem;
       int i = 0;
       for (FileStatus status : statusLst) {
-          String suffix = targetFs.stripOutRoot(status.getPath());
-          statusLst[i++] = new ViewFsFileStatus(status, this.makeQualified(
-              suffix.length() == 0 ? f : new Path(res.resolvedPath, suffix)));
+          statusLst[i++] = fixFileStatus(status,
+              getChrootedPath(res, status, f));
       }
     }
     return statusLst;
   }
 
+  @Override
+  public RemoteIterator<LocatedFileStatus>listLocatedStatus(final Path f,
+      final PathFilter filter) throws FileNotFoundException, IOException {
+    final InodeTree.ResolveResult<FileSystem> res = fsState
+        .resolve(getUriPath(f), true);
+    final RemoteIterator<LocatedFileStatus> statusIter = res.targetFileSystem
+        .listLocatedStatus(res.remainingPath);
+
+    if (res.isInternalDir()) {
+      return statusIter;
+    }
+
+    return new RemoteIterator<LocatedFileStatus>() {
+      @Override
+      public boolean hasNext() throws IOException {
+        return statusIter.hasNext();
+      }
+
+      @Override
+      public LocatedFileStatus next() throws IOException {
+        final LocatedFileStatus status = statusIter.next();
+        return (LocatedFileStatus)fixFileStatus(status,
+            getChrootedPath(res, status, f));
+      }
+    };
+  }
+
+  private Path getChrootedPath(InodeTree.ResolveResult<FileSystem> res,
+      FileStatus status, Path f) throws IOException {
+    final String suffix = ((ChRootedFileSystem)res.targetFileSystem)
+        .stripOutRoot(status.getPath());
+    return this.makeQualified(
+        suffix.length() == 0 ? f : new Path(res.resolvedPath, suffix));
+  }
+
   @Override
   public boolean mkdirs(final Path dir, final FsPermission permission)
       throws IOException {

+ 136 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFsLocatedFileStatus.java

@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import java.io.IOException;
+
+class ViewFsLocatedFileStatus extends LocatedFileStatus {
+  final LocatedFileStatus myFs;
+  Path modifiedPath;
+
+  ViewFsLocatedFileStatus(LocatedFileStatus locatedFileStatus, Path path) {
+    myFs = locatedFileStatus;
+    modifiedPath = path;
+  }
+
+  @Override
+  public long getLen() {
+    return myFs.getLen();
+  }
+
+  @Override
+  public boolean isFile() {
+    return myFs.isFile();
+  }
+
+  @Override
+  public boolean isDirectory() {
+    return myFs.isDirectory();
+  }
+
+  @Override
+  @SuppressWarnings("deprecation")
+  public boolean isDir() {
+    return myFs.isDirectory();
+  }
+
+  @Override
+  public boolean isSymlink() {
+    return myFs.isSymlink();
+  }
+
+  @Override
+  public long getBlockSize() {
+    return myFs.getBlockSize();
+  }
+
+  @Override
+  public short getReplication() {
+    return myFs.getReplication();
+  }
+
+  @Override
+  public long getModificationTime() {
+    return myFs.getModificationTime();
+  }
+
+  @Override
+  public long getAccessTime() {
+    return myFs.getAccessTime();
+  }
+
+  @Override
+  public FsPermission getPermission() {
+    return myFs.getPermission();
+  }
+
+  @Override
+  public String getOwner() {
+    return myFs.getOwner();
+  }
+
+  @Override
+  public String getGroup() {
+    return myFs.getGroup();
+  }
+
+  @Override
+  public Path getPath() {
+    return modifiedPath;
+  }
+
+  @Override
+  public void setPath(final Path p) {
+    modifiedPath = p;
+  }
+
+  @Override
+  public Path getSymlink() throws IOException {
+    return myFs.getSymlink();
+  }
+
+  @Override
+  public void setSymlink(Path p) {
+    myFs.setSymlink(p);
+  }
+
+  @Override
+  public BlockLocation[] getBlockLocations() {
+    return myFs.getBlockLocations();
+  }
+
+  @Override
+  public int compareTo(Object o) {
+    return super.compareTo(o);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
+
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
+}

+ 14 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestChRootedFileSystem.java

@@ -395,6 +395,20 @@ public class TestChRootedFileSystem {
     verify(mockFs).getAclStatus(rawPath);
   }
 
+  @Test
+  public void testListLocatedFileStatus() throws IOException {
+    final Path mockMount = new Path("mockfs://foo/user");
+    final Path mockPath = new Path("/usermock");
+    final Configuration conf = new Configuration();
+    conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
+    ConfigUtil.addLink(conf, mockPath.toString(), mockMount.toUri());
+    FileSystem vfs = FileSystem.get(URI.create("viewfs:///"), conf);
+    vfs.listLocatedStatus(mockPath);
+    final FileSystem mockFs = ((MockFileSystem)mockMount.getFileSystem(conf))
+        .getRawFileSystem();
+    verify(mockFs).listLocatedStatus(new Path(mockMount.toUri().getPath()));
+  }
+
   static class MockFileSystem extends FilterFileSystem {
     MockFileSystem() {
       super(mock(FileSystem.class));

+ 85 - 23
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.fs.viewfs;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.List;
@@ -36,9 +37,11 @@ import static org.junit.Assert.assertFalse;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FsConstants;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.AclUtil;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.viewfs.ConfigUtil;
@@ -125,7 +128,7 @@ public class ViewFileSystemBaseTest {
   
   void setupMountPoints() {
     ConfigUtil.addLink(conf, "/targetRoot", targetTestRoot.toUri());
-    ConfigUtil.addLink(conf, "/user", new Path(targetTestRoot,"user").toUri());
+    ConfigUtil.addLink(conf, "/user", new Path(targetTestRoot, "user").toUri());
     ConfigUtil.addLink(conf, "/user2", new Path(targetTestRoot,"user").toUri());
     ConfigUtil.addLink(conf, "/data", new Path(targetTestRoot,"data").toUri());
     ConfigUtil.addLink(conf, "/internalDir/linkToDir2",
@@ -133,9 +136,9 @@ public class ViewFileSystemBaseTest {
     ConfigUtil.addLink(conf, "/internalDir/internalDir2/linkToDir3",
         new Path(targetTestRoot,"dir3").toUri());
     ConfigUtil.addLink(conf, "/danglingLink",
-        new Path(targetTestRoot,"missingTarget").toUri());
+        new Path(targetTestRoot, "missingTarget").toUri());
     ConfigUtil.addLink(conf, "/linkToAFile",
-        new Path(targetTestRoot,"aFile").toUri());
+        new Path(targetTestRoot, "aFile").toUri());
   }
   
   @Test
@@ -204,19 +207,28 @@ public class ViewFileSystemBaseTest {
         fsView.makeQualified(new Path("/foo/bar")));
   }
 
-  
-  /** 
-   * Test modify operations (create, mkdir, delete, etc) 
+  @Test
+  public void testLocatedOperationsThroughMountLinks() throws IOException {
+    testOperationsThroughMountLinksInternal(true);
+  }
+
+  @Test
+  public void testOperationsThroughMountLinks() throws IOException {
+    testOperationsThroughMountLinksInternal(false);
+  }
+
+  /**
+   * Test modify operations (create, mkdir, delete, etc)
    * on the mount file system where the pathname references through
    * the mount points.  Hence these operation will modify the target
    * file system.
-   * 
+   *
    * Verify the operation via mountfs (ie fSys) and *also* via the
    *  target file system (ie fSysLocal) that the mount link points-to.
    */
-  @Test
-  public void testOperationsThroughMountLinks() throws IOException {
-    // Create file 
+  private void testOperationsThroughMountLinksInternal(boolean located)
+      throws IOException {
+    // Create file
     fileSystemTestHelper.createFile(fsView, "/user/foo");
     Assert.assertTrue("Created file should be type file",
         fsView.isFile(new Path("/user/foo")));
@@ -329,7 +341,8 @@ public class ViewFileSystemBaseTest {
     fsView.mkdirs(new Path("/targetRoot/dirFoo"));
     Assert.assertTrue(fsView.exists(new Path("/targetRoot/dirFoo")));
     boolean dirFooPresent = false;
-    for (FileStatus fileStatus : fsView.listStatus(new Path("/targetRoot/"))) {
+    for (FileStatus fileStatus :
+        listStatusInternal(located, new Path("/targetRoot/"))) {
       if (fileStatus.getPath().getName().equals("dirFoo")) {
         dirFooPresent = true;
       }
@@ -394,9 +407,13 @@ public class ViewFileSystemBaseTest {
       i++;     
     } 
   }
-  
-  
-  
+
+  @Test
+  public void testLocatedListOnInternalDirsOfMountTable() throws IOException {
+    testListOnInternalDirsOfMountTableInternal(true);
+  }
+
+
   /**
    * Test "readOps" (e.g. list, listStatus) 
    * on internal dirs of mount table
@@ -406,15 +423,20 @@ public class ViewFileSystemBaseTest {
   // test list on internal dirs of mount table 
   @Test
   public void testListOnInternalDirsOfMountTable() throws IOException {
+    testListOnInternalDirsOfMountTableInternal(false);
+  }
+
+  private void testListOnInternalDirsOfMountTableInternal(boolean located)
+      throws IOException {
     
     // list on Slash
-    
-    FileStatus[] dirPaths = fsView.listStatus(new Path("/"));
+
+    FileStatus[] dirPaths = listStatusInternal(located, new Path("/"));
     FileStatus fs;
     verifyRootChildren(dirPaths);
 
     // list on internal dir
-    dirPaths = fsView.listStatus(new Path("/internalDir"));
+    dirPaths = listStatusInternal(located, new Path("/internalDir"));
     Assert.assertEquals(2, dirPaths.length);
 
     fs = fileSystemTestHelper.containsPath(fsView, "/internalDir/internalDir2", dirPaths);
@@ -452,13 +474,26 @@ public class ViewFileSystemBaseTest {
   
   @Test
   public void testListOnMountTargetDirs() throws IOException {
-    FileStatus[] dirPaths = fsView.listStatus(new Path("/data"));
+    testListOnMountTargetDirsInternal(false);
+  }
+
+  @Test
+  public void testLocatedListOnMountTargetDirs() throws IOException {
+    testListOnMountTargetDirsInternal(true);
+  }
+
+  private void testListOnMountTargetDirsInternal(boolean located)
+      throws IOException {
+    final Path dataPath = new Path("/data");
+
+    FileStatus[] dirPaths = listStatusInternal(located, dataPath);
+
     FileStatus fs;
     Assert.assertEquals(0, dirPaths.length);
     
     // add a file
     long len = fileSystemTestHelper.createFile(fsView, "/data/foo");
-    dirPaths = fsView.listStatus(new Path("/data"));
+    dirPaths = listStatusInternal(located, dataPath);
     Assert.assertEquals(1, dirPaths.length);
     fs = fileSystemTestHelper.containsPath(fsView, "/data/foo", dirPaths);
     Assert.assertNotNull(fs);
@@ -467,7 +502,7 @@ public class ViewFileSystemBaseTest {
     
     // add a dir
     fsView.mkdirs(fileSystemTestHelper.getTestRootPath(fsView, "/data/dirX"));
-    dirPaths = fsView.listStatus(new Path("/data"));
+    dirPaths = listStatusInternal(located, dataPath);
     Assert.assertEquals(2, dirPaths.length);
     fs = fileSystemTestHelper.containsPath(fsView, "/data/foo", dirPaths);
     Assert.assertNotNull(fs);
@@ -476,7 +511,23 @@ public class ViewFileSystemBaseTest {
     Assert.assertNotNull(fs);
     Assert.assertTrue("Created dir should appear as a dir", fs.isDirectory()); 
   }
-      
+
+  private FileStatus[] listStatusInternal(boolean located, Path dataPath) throws IOException {
+    FileStatus[] dirPaths = new FileStatus[0];
+    if (located) {
+      RemoteIterator<LocatedFileStatus> statIter =
+          fsView.listLocatedStatus(dataPath);
+      ArrayList<LocatedFileStatus> tmp = new ArrayList<LocatedFileStatus>(10);
+      while (statIter.hasNext()) {
+        tmp.add(statIter.next());
+      }
+      dirPaths = tmp.toArray(dirPaths);
+    } else {
+      dirPaths = fsView.listStatus(dataPath);
+    }
+    return dirPaths;
+  }
+
   @Test
   public void testFileStatusOnMountLink() throws IOException {
     Assert.assertTrue(fsView.getFileStatus(new Path("/")).isDirectory());
@@ -692,11 +743,21 @@ public class ViewFileSystemBaseTest {
     Assert.assertTrue("Created file should be type file",
         fsView.isFile(new Path("/user/foo")));
     Assert.assertTrue("Target of created file should be type file",
-        fsTarget.isFile(new Path(targetTestRoot,"user/foo")));
+        fsTarget.isFile(new Path(targetTestRoot, "user/foo")));
   }
 
   @Test
   public void testRootReadableExecutable() throws IOException {
+    testRootReadableExecutableInternal(false);
+  }
+
+  @Test
+  public void testLocatedRootReadableExecutable() throws IOException {
+    testRootReadableExecutableInternal(true);
+  }
+
+  private void testRootReadableExecutableInternal(boolean located)
+      throws IOException {
     // verify executable permission on root: cd /
     //
     Assert.assertFalse("In root before cd",
@@ -707,7 +768,8 @@ public class ViewFileSystemBaseTest {
 
     // verify readable
     //
-    verifyRootChildren(fsView.listStatus(fsView.getWorkingDirectory()));
+    verifyRootChildren(listStatusInternal(located,
+        fsView.getWorkingDirectory()));
 
     // verify permissions
     //