浏览代码

[partial-ns] Implement mkdirs().

Haohui Mai 10 年之前
父节点
当前提交
8cc95f693d

+ 160 - 52
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java

@@ -18,10 +18,10 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
 import org.apache.commons.io.Charsets;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.InvalidPathException;
-import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -32,7 +32,9 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.AbstractMap;
 import java.util.List;
 import java.util.Map;
@@ -47,61 +49,68 @@ class FSDirMkdirOp {
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
     }
+
     if (!DFSUtil.isValidName(src)) {
       throw new InvalidPathException(src);
     }
+
     FSPermissionChecker pc = fsd.getPermissionChecker();
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
-    fsd.writeLock();
-    try {
-      src = fsd.resolvePath(pc, src, pathComponents);
-      INodesInPath iip = fsd.getINodesInPath4Write(src);
-      if (fsd.isPermissionEnabled()) {
-        fsd.checkTraverse(pc, iip);
+    try (RWTransaction tx = fsd.newRWTransaction().begin()) {
+      Resolver.Result paths =  Resolver.resolve(tx, src);
+      FlatINodesInPath iip = paths.inodesInPath();
+      if (paths.invalidPath()) {
+        throw new InvalidPathException(src);
       }
 
-      final INode lastINode = iip.getLastINode();
-      if (lastINode != null && lastINode.isFile()) {
-        throw new FileAlreadyExistsException("Path is not a directory: " + src);
+      if (fsd.isPermissionEnabled()) {
+        fsd.checkTraverse(pc, paths);
       }
 
-      INodesInPath existing = lastINode != null ? iip : iip.getExistingINodes();
-      if (lastINode == null) {
-        if (fsd.isPermissionEnabled()) {
-          fsd.checkAncestorAccess(pc, iip, FsAction.WRITE);
+      FlatINode parent = iip.getLastINode();
+      if (paths.ok()) {
+        switch (parent.type()) {
+          case FILE:
+            throw new FileAlreadyExistsException("Path is not a directory: "
+                                                     + src);
+          case DIRECTORY:
+            // mkdir is a no-op (i.e. idempotent) when the directory exists.
+            return fsd.getAuditFileInfo(iip);
+          default:
+            throw new IllegalArgumentException();
         }
+      }
 
-        if (!createParent) {
-          fsd.verifyParentDir(iip, src);
-        }
+      if (fsd.isPermissionEnabled()) {
+        fsd.checkAncestorAccess(pc, paths, FsAction.WRITE);
+      }
 
-        // validate that we have enough inodes. This is, at best, a
-        // heuristic because the mkdirs() operation might need to
-        // create multiple inodes.
-        fsn.checkFsObjectLimit();
-
-        List<String> nonExisting = iip.getPath(existing.length(),
-            iip.length() - existing.length());
-        int length = nonExisting.size();
-        if (length > 1) {
-          List<String> ancestors = nonExisting.subList(0, length - 1);
-          // Ensure that the user can traversal the path by adding implicit
-          // u+wx permission to all ancestor directories
-          existing = createChildrenDirectories(fsd, existing, ancestors,
-              addImplicitUwx(permissions, permissions));
-          if (existing == null) {
-            throw new IOException("Failed to create directory: " + src);
-          }
-        }
+      if (!createParent && FlatNSUtil.hasNextLevelInPath(src, paths.offset)) {
+        throw new FileNotFoundException("Parent directory doesn't exist: ");
+      }
 
-        if ((existing = createChildrenDirectories(fsd, existing,
-            nonExisting.subList(length - 1, length), permissions)) == null) {
-          throw new IOException("Failed to create directory: " + src);
-        }
+      // validate that we have enough inodes. This is, at best, a
+      // heuristic because the mkdirs() operation might need to
+      // create multiple inodes.
+      fsn.checkFsObjectLimit();
+
+      int offset = paths.offset;
+      String component = FlatNSUtil.getNextComponent(src, offset);
+      while (component != null) {
+        offset += 1 + component.length();
+        String nextComponent = FlatNSUtil.getNextComponent(src, offset);
+        PermissionStatus perm = nextComponent == null
+            ? permissions
+            : addImplicitUwx(permissions, permissions);
+
+        long inodeId = tx.allocateNewInodeId();
+        iip = createSingleDir(tx, iip, inodeId, component, perm, null, now());
+        component = nextComponent;
       }
-      return fsd.getAuditFileInfo(existing);
-    } finally {
-      fsd.writeUnlock();
+      ByteString newParent = new FlatINode.Builder().mergeFrom(parent)
+          .mtime(now()).build();
+      tx.putINode(parent.id(), newParent);
+      tx.commit();
+      return fsd.getAuditFileInfo(iip);
     }
   }
 
@@ -124,7 +133,7 @@ class FSDirMkdirOp {
     final String last = new String(iip.getLastLocalName(), Charsets.UTF_8);
     INodesInPath existing = iip.getExistingINodes();
     List<String> children = iip.getPath(existing.length(),
-        iip.length() - existing.length());
+                                        iip.length() - existing.length());
     int size = children.size();
     if (size > 1) { // otherwise all ancestors have been created
       List<String> directories = children.subList(0, size - 1);
@@ -140,6 +149,56 @@ class FSDirMkdirOp {
     return new AbstractMap.SimpleImmutableEntry<>(existing, last);
   }
 
+  /**
+   * For a given absolute path, create all ancestors as directories along the
+   * path. All ancestors inherit their parent's permission plus an implicit
+   * u+wx permission. This is used by create() and addSymlink() for
+   * implicitly creating all directories along the path.
+   *
+   * For example, path="/foo/bar/spam", "/foo" is an existing directory,
+   * "/foo/bar" is not existing yet, the function will create directory bar.
+   *
+   * @return a tuple which contains both the new INodesInPath (with all the
+   * existing and newly created directories) and the last component in the
+   * relative path. Or return null if there are errors.
+   */
+  static Map.Entry<FlatINodesInPath, String> createAncestorDirectories(
+      RWTransaction tx, FSDirectory fsd, Resolver.Result paths,
+      PermissionStatus
+      permission)
+      throws IOException {
+    int offset = paths.offset;
+    String src = paths.src;
+    String last = null;
+    String component = FlatNSUtil.getNextComponent(src, offset);
+    FlatINode parentINode = paths.inodesInPath().getLastINode();
+    FlatINodesInPath iip = paths.inodesInPath();
+    boolean changed = false;
+    FlatINode tipINode = paths.inodesInPath().getLastINode();
+    while (component != null) {
+      last = component;
+      offset += 1 + component.length();
+      String nextComponent = FlatNSUtil.getNextComponent(src, offset);
+      if (nextComponent == null) {
+        break;
+      }
+      PermissionStatus perm = addImplicitUwx(
+          parentINode.permissionStatus(fsd.ugid()), permission);
+
+      long inodeId = tx.allocateNewInodeId();
+      iip = createSingleDir(tx, iip, inodeId, component, perm, null, now());
+      component = nextComponent;
+      changed = true;
+    }
+
+    if (changed) {
+      ByteString b = new FlatINode.Builder().mergeFrom(tipINode).mtime(now())
+          .build();
+      tx.putINode(tipINode.id(), b);
+    }
+    return new AbstractMap.SimpleImmutableEntry<>(iip, last);
+  }
+
   /**
    * Create the directory {@code parent} / {@code children} and all ancestors
    * along the path.
@@ -173,15 +232,64 @@ class FSDirMkdirOp {
 
   static void mkdirForEditLog(FSDirectory fsd, long inodeId, String src,
       PermissionStatus permissions, List<AclEntry> aclEntries, long timestamp)
-      throws QuotaExceededException, UnresolvedLinkException, AclException,
-      FileAlreadyExistsException {
+      throws IOException {
     assert fsd.hasWriteLock();
-    INodesInPath iip = fsd.getINodesInPath(src, false);
-    final byte[] localName = iip.getLastLocalName();
-    final INodesInPath existing = iip.getParentINodesInPath();
-    Preconditions.checkState(existing.getLastINode() != null);
-    unprotectedMkdir(fsd, inodeId, existing, localName, permissions, aclEntries,
-        timestamp);
+    try (ReplayTransaction tx = fsd.newReplayTransaction()) {
+      Resolver.Result paths =  Resolver.resolve(tx, src);
+      FlatINodesInPath iip = paths.inodesInPath();
+      Preconditions.checkState(
+          paths.notFound() && !FlatNSUtil.hasNextLevelInPath(src, paths.offset));
+      String localName = FlatNSUtil.getNextComponent(src, paths.offset);
+      createSingleDir(tx, iip, inodeId, localName, permissions, aclEntries,
+                      timestamp);
+      tx.commit();
+    }
+  }
+
+  private static FlatINodesInPath createSingleDir(
+      RWTransaction tx, FlatINodesInPath iip, long inodeId, String localName,
+      PermissionStatus permissions, List<AclEntry> aclEntries, long timestamp)
+      throws FileAlreadyExistsException {
+
+    FlatINode parent = iip.getLastINode();
+    if (!parent.isDirectory()) {
+      throw new FileAlreadyExistsException("Parent path is not a directory");
+    }
+
+    int userId = tx.getStringId(permissions.getUserName());
+    int groupId = permissions.getGroupName() == null ? parent.groupId() : tx
+        .getStringId(permissions.getGroupName());
+
+    ByteString b = new FlatINode.Builder()
+        .id(inodeId)
+        .parentId(parent.id())
+        .userId(userId)
+        .groupId(groupId)
+        .permission(permissions.getPermission().toShort())
+        .mtime(timestamp)
+        .build();
+
+    tx.putINode(inodeId, b);
+    tx.putChild(parent.id(),
+                ByteBuffer.wrap(localName.getBytes(Charsets.UTF_8)), inodeId);
+
+    // TODO: Handle ACL
+    Preconditions.checkState(aclEntries == null, "Unimplemented");
+//    if (iip != null && aclEntries != null) {
+//      AclStorage.updateINodeAcl(dir, aclEntries, Snapshot.CURRENT_STATE_ID);
+//    }
+
+    // TODO: Perform various verification in {@link FSDirectory#addLastINode}
+    FlatINodesInPath newIIP = FlatINodesInPath.addINode(
+        iip, ByteString.copyFromUtf8(localName),
+        FlatINode.wrap(b));
+
+    tx.logMkDir(newIIP);
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("mkdirs: created directory " + newIIP.path());
+    }
+
+    return newIIP;
   }
 
   private static INodesInPath createSingleDirectory(FSDirectory fsd,

+ 15 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -564,7 +564,8 @@ public class FSDirectory implements Closeable {
       getStorageTypeDeltas(fileINode.getStoragePolicyID(), ssDelta,
           replication, replication);;
     updateCount(iip, iip.length() - 1,
-      new QuotaCounts.Builder().nameSpace(nsDelta).storageSpace(ssDelta * replication).
+      new QuotaCounts.Builder().nameSpace(nsDelta).storageSpace(
+          ssDelta * replication).
           typeSpaces(typeSpaceDeltas).build(),
         checkQuota);
   }
@@ -578,10 +579,9 @@ public class FSDirectory implements Closeable {
     EnumCounters<StorageType> typeSpaceDeltas =
         getStorageTypeDeltas(fileINode.getStoragePolicyID(), ssDelta, oldRep, newRep);
     updateCount(iip, iip.length() - 1,
-        new QuotaCounts.Builder().nameSpace(nsDelta).
-            storageSpace(ssDelta * (newRep - oldRep)).
-            typeSpaces(typeSpaceDeltas).build(),
-        checkQuota);
+                new QuotaCounts.Builder().nameSpace(nsDelta).
+                    storageSpace(ssDelta * (newRep - oldRep)).
+                    typeSpaces(typeSpaceDeltas).build(), checkQuota);
   }
 
   /** update count of each inode with quota
@@ -1568,6 +1568,16 @@ public class FSDirectory implements Closeable {
     checkPermission(pc, iip, false, null, null, null, null);
   }
 
+  void checkTraverse(FSPermissionChecker pc, Resolver.Result iip)
+      throws AccessControlException {
+    // TODO
+  }
+
+  void checkAncestorAccess(FSPermissionChecker pc, Resolver.Result iip,
+      FsAction access) throws AccessControlException {
+    // TODO
+  }
+
   /**
    * Check whether current user have permissions to access the path. For more
    * details of the parameters, see

+ 25 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

@@ -813,7 +813,31 @@ public class FSEditLog implements LogsPurgeable {
     }
     logEdit(op);
   }
-  
+
+  /**
+   * Add create directory record to edit log
+   */
+  public void logMkDir(FlatINodesInPath iip, StringMap ugidMap) {
+    FlatINode inode = iip.getLastINode();
+    PermissionStatus permissions = inode.permissionStatus(ugidMap);
+    MkdirOp op = MkdirOp.getInstance(cache.get())
+        .setInodeId(inode.id())
+        .setPath(iip.path())
+        .setTimestamp(inode.mtime())
+        .setPermissionStatus(permissions);
+
+    // TODO: Handle ACL and XAttr
+//    AclFeature f = newNode.getAclFeature();
+//    if (f != null) {
+//      op.setAclEntries(AclStorage.readINodeLogicalAcl(newNode));
+//    }
+//
+//    XAttrFeature x = newNode.getXAttrFeature();
+//    if (x != null) {
+//      op.setXAttrs(x.getXAttrs());
+//    }
+    logEdit(op);
+  }
   /** 
    * Add rename record to edit log.
    *

+ 38 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FlatNSUtil.java

@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+public class FlatNSUtil {
+  public static String getNextComponent(String src, int offset) {
+    if (offset >= src.length()) {
+      return null;
+    }
+
+    assert src.charAt(offset) == '/';
+
+    int next = src.indexOf('/', offset + 1);
+    if (next == -1) {
+      next = src.length();
+    }
+    return src.substring(offset + 1, next);
+  }
+
+  public static boolean hasNextLevelInPath(String src, int offset) {
+    return src.indexOf('/', offset + 1) != -1;
+  }
+}