Browse Source

HDFS-610. Support o.a.h.fs.FileContext. Contributed by Sanjay Radia

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@816785 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 15 năm trước cách đây
mục cha
commit
08fae5f024

+ 2 - 0
CHANGES.txt

@@ -47,6 +47,8 @@ Trunk (unreleased changes)
     missing blocks from the HDFS logs.
     (Bill Zeller, Jithendra Pandey via suresh).
 
+    HDFS-610. Support o.a.h.fs.FileContext.  (Sanjay Radia via szetszwo)
+
   IMPROVEMENTS
 
     HDFS-381. Remove blocks from DataNode maps when corresponding file

+ 51 - 0
src/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -574,6 +574,35 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     leasechecker.put(src, result);
     return result;
   }
+  
+  /**
+   * Same as {{@link #create(String, FsPermission, EnumSet, short, long,
+   *  Progressable, int)}   except that the permission
+   *   is absolute (ie has already been masked with umask.
+   * 
+   */
+  public OutputStream primitiveCreate(String src, 
+                             FsPermission absPermission,
+                             EnumSet<CreateFlag> flag,
+                             boolean createParent,
+                             short replication,
+                             long blockSize,
+                             Progressable progress,
+                             int buffersize,
+                             int bytesPerChecksum)
+    throws IOException {
+    checkOpen();
+    if (absPermission == null) {
+      absPermission = 
+        FsPermission.getDefault().applyUMask(FsPermission.getUMask(conf));
+    } 
+    LOG.debug(src + ": masked=" + absPermission);
+    OutputStream result = new DFSOutputStream(src, absPermission,
+        flag, createParent, replication, blockSize, progress, buffersize,
+        bytesPerChecksum);
+    leasechecker.put(src, result);
+    return result;
+  } 
 
   /**
    * Append to an existing HDFS file.  
@@ -1002,6 +1031,28 @@ public class DFSClient implements FSConstants, java.io.Closeable {
                                      FileAlreadyExistsException.class);
     }
   }
+  
+  /**
+   * Same {{@link #mkdirs(String, FsPermission, boolean)} except
+   * that the permissions has already been masked against umask.
+   */
+  public boolean primitiveMkdir(String src, FsPermission absPermission)
+    throws IOException{
+    checkOpen();
+    if (absPermission == null) {
+      absPermission = 
+        FsPermission.getDefault().applyUMask(FsPermission.getUMask(conf));
+    } 
+
+    LOG.debug(src + ": masked=" + absPermission);
+    try {
+      return namenode.mkdirs(src, absPermission, true);
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+                                     NSQuotaExceededException.class,
+                                     DSQuotaExceededException.class);
+    }
+  }
 
   ContentSummary getContentSummary(String src) throws IOException {
     try {

+ 26 - 2
src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -177,6 +177,13 @@ public class DistributedFileSystem extends FileSystem {
     }
     return dfs.getBlockLocations(getPathName(file.getPath()), start, len);
   }
+  
+  @Override
+  public BlockLocation[] getFileBlockLocations(Path p, 
+      long start, long len) throws IOException {
+    return dfs.getBlockLocations(getPathName(p), start, len);
+
+  }
 
   @Override
   public void setVerifyChecksum(boolean verifyChecksum) {
@@ -203,11 +210,21 @@ public class DistributedFileSystem extends FileSystem {
     EnumSet<CreateFlag> flag, int bufferSize, short replication, long blockSize,
     Progressable progress) throws IOException {
 
-    return new FSDataOutputStream
-       (dfs.create(getPathName(f), permission,
+    return new FSDataOutputStream(dfs.create(getPathName(f), permission,
                    flag, replication, blockSize, progress, bufferSize),
         statistics);
   }
+  
+  @SuppressWarnings("deprecation")
+  @Override
+  protected FSDataOutputStream primitiveCreate(Path f,
+    FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
+    short replication, long blockSize, Progressable progress,
+    int bytesPerChecksum) throws IOException {
+    return new FSDataOutputStream(dfs.primitiveCreate(getPathName(f),
+        absolutePermission, flag, true, replication, blockSize,
+        progress, bufferSize, bytesPerChecksum),statistics);
+   } 
 
   /**
    * Same as create(), except fails if parent directory doesn't already exist.
@@ -293,6 +310,13 @@ public class DistributedFileSystem extends FileSystem {
     return dfs.mkdirs(getPathName(f), permission, true);
   }
 
+  @SuppressWarnings("deprecation")
+  @Override
+  protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
+    throws IOException {
+    return dfs.primitiveMkdir(getPathName(f), absolutePermission);
+  }
+
   /** {@inheritDoc} */
   @Override
   public void close() throws IOException {

+ 1 - 4
src/java/org/apache/hadoop/hdfs/HftpFileSystem.java

@@ -39,14 +39,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.server.namenode.ListPathsServlet;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
@@ -59,7 +57,6 @@ import org.xml.sax.SAXException;
 import org.xml.sax.XMLReader;
 import org.xml.sax.helpers.DefaultHandler;
 import org.xml.sax.helpers.XMLReaderFactory;
-import org.apache.hadoop.hdfs.ByteRangeInputStream;
 
 
 
@@ -298,7 +295,7 @@ public class HftpFileSystem extends FileSystem {
 
   @Override
   public Path getWorkingDirectory() {
-    return new Path("/").makeQualified(this);
+    return new Path("/").makeQualified(getUri(), null);
   }
 
   @Override

+ 76 - 0
src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java

@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestHDFSFileContextMainOperations extends
+                                  FileContextMainOperationsBaseTest {
+  
+  private static MiniDFSCluster cluster;
+  private static Path defaultWorkingDirectory;
+  
+  @BeforeClass
+  public static void clusterSetupAtBegining()
+                                    throws IOException, LoginException  {
+    cluster = new MiniDFSCluster(new Configuration(), 2, true, null);
+    fc = FileContext.getFileContext(cluster.getFileSystem());
+    defaultWorkingDirectory = fc.makeQualified( new Path("/user/" + 
+        UnixUserGroupInformation.login().getUserName()));
+    fc.mkdirs(defaultWorkingDirectory, FileContext.DEFAULT_PERM);
+  }
+
+      
+  @AfterClass
+  public static void ClusterShutdownAtEnd() throws Exception {
+    cluster.shutdown();   
+  }
+  
+  @Before
+  public void setUp() throws Exception {
+  }
+  
+  @Override
+  @After
+  public void tearDown() throws Exception {
+    super.tearDown();
+  }
+
+  @Override
+  protected Path getDefaultWorkingDirectory() {
+    return defaultWorkingDirectory;
+  } 
+  
+  @Override
+  @Test
+  public void testRenameFileAsExistingFile() throws Exception {
+    // ignore base class test till hadoop-6240 is fixed
+  }
+}