Browse Source

[partial-ns] Implement delete().

Haohui Mai 10 năm trước cách đây
mục cha
commit
707775c39f

+ 104 - 108
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java

@@ -17,52 +17,40 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
-import org.apache.hadoop.hdfs.server.namenode.INode.ReclaimContext;
 import org.apache.hadoop.util.ChunkedArrayList;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
 import static org.apache.hadoop.util.Time.now;
 
 class FSDirDeleteOp {
   /**
    * Delete the target directory and collect the blocks under it
    *
-   * @param fsd the FSDirectory instance
-   * @param iip the INodesInPath instance containing all the INodes for the path
+   * @param tx the Transaction
+   * @param paths the path to be deleted
    * @param collectedBlocks Blocks under the deleted directory
-   * @param removedINodes INodes that should be removed from inodeMap
    * @return the number of files that have been removed
    */
-  static long delete(FSDirectory fsd, INodesInPath iip,
-      BlocksMapUpdateInfo collectedBlocks, List<INode> removedINodes,
-      List<Long> removedUCFiles, long mtime) throws IOException {
+  static long delete(
+      RWTransaction tx, Resolver.Result paths,
+      BlocksMapUpdateInfo collectedBlocks, List<Long> removedUCFiles, long mtime) throws IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + iip.getPath());
+      NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + paths.src);
     }
-    long filesRemoved = -1;
-    fsd.writeLock();
-    try {
-      if (deleteAllowed(iip, iip.getPath()) ) {
-        List<INodeDirectory> snapshottableDirs = new ArrayList<>();
-        FSDirSnapshotOp.checkSnapshot(iip.getLastINode(), snapshottableDirs);
-        ReclaimContext context = new ReclaimContext(
-            fsd.getBlockStoragePolicySuite(), collectedBlocks, removedINodes,
-            removedUCFiles);
-        if (unprotectedDelete(fsd, iip, context, mtime)) {
-          filesRemoved = context.quotaDelta().getNsDelta();
-        }
-        fsd.getFSNamesystem().removeSnapshottableDirs(snapshottableDirs);
-        fsd.updateCount(iip, context.quotaDelta(), false);
-      }
-    } finally {
-      fsd.writeUnlock();
+    final long filesRemoved;
+    if (!deleteAllowed(paths)) {
+      filesRemoved = -1;
+    } else {
+      filesRemoved = unprotectedDelete(tx, paths, collectedBlocks,
+                                       removedUCFiles, mtime);
     }
     return filesRemoved;
   }
@@ -89,19 +77,29 @@ class FSDirDeleteOp {
       throws IOException {
     FSDirectory fsd = fsn.getFSDirectory();
     FSPermissionChecker pc = fsd.getPermissionChecker();
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    try (RWTransaction tx = fsd.newRWTransaction().begin()) {
+      Resolver.Result paths = Resolver.resolve(tx, src);
+      if (paths.notFound()) {
+        // The caller expects no exception when the file does not exist
+        return null;
+      } else if (paths.invalidPath()) {
+        throw new InvalidPathException(src);
+      }
 
-    src = fsd.resolvePath(pc, src, pathComponents);
-    final INodesInPath iip = fsd.getINodesInPath4Write(src, false);
-    if (!recursive && fsd.isNonEmptyDirectory(iip)) {
-      throw new PathIsNotEmptyDirectoryException(src + " is non empty");
-    }
-    if (fsd.isPermissionEnabled()) {
-      fsd.checkPermission(pc, iip, false, null, FsAction.WRITE, null,
-                          FsAction.ALL, true);
+      FlatINodesInPath iip = paths.inodesInPath();
+      if (!recursive && FSDirectory.isNonEmptyDirectory(tx, iip)) {
+        throw new PathIsNotEmptyDirectoryException(src + " is non empty");
+      }
+
+      if (fsd.isPermissionEnabled()) {
+        fsd.checkPermission(pc, iip, false, null, FsAction.WRITE, null,
+                            FsAction.ALL, true);
+      }
+      BlocksMapUpdateInfo ret = deleteInternal(tx, fsn, paths, logRetryCache);
+      tx.commit();
+      return ret;
     }
 
-    return deleteInternal(fsn, src, iip, logRetryCache);
   }
 
   /**
@@ -118,28 +116,22 @@ class FSDirDeleteOp {
    */
   static void deleteForEditLog(FSDirectory fsd, String src, long mtime)
       throws IOException {
-    assert fsd.hasWriteLock();
     FSNamesystem fsn = fsd.getFSNamesystem();
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
-    List<INode> removedINodes = new ChunkedArrayList<>();
+    List<Long> removedINodes = new ChunkedArrayList<>();
     List<Long> removedUCFiles = new ChunkedArrayList<>();
-
-    final INodesInPath iip = fsd.getINodesInPath4Write(
-        FSDirectory.normalizePath(src), false);
-    if (!deleteAllowed(iip, src)) {
-      return;
-    }
-    List<INodeDirectory> snapshottableDirs = new ArrayList<>();
-    FSDirSnapshotOp.checkSnapshot(iip.getLastINode(), snapshottableDirs);
-    boolean filesRemoved = unprotectedDelete(fsd, iip,
-        new ReclaimContext(fsd.getBlockStoragePolicySuite(),
-            collectedBlocks, removedINodes, removedUCFiles),
-        mtime);
-    fsn.removeSnapshottableDirs(snapshottableDirs);
-
-    if (filesRemoved) {
-      fsn.removeLeasesAndINodes(removedUCFiles, removedINodes, false);
-      fsn.removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
+    try (ReplayTransaction tx = fsd.newReplayTransaction().begin()) {
+      Resolver.Result paths = Resolver.resolve(tx, src);
+      if (!deleteAllowed(paths)) {
+        return;
+      }
+      long filesRemoved = unprotectedDelete(tx, paths, collectedBlocks,
+                                            removedUCFiles, mtime);
+      if (filesRemoved >= 0) {
+        fsn.removeLeases(removedUCFiles);
+        fsn.removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
+      }
+      tx.commit();
     }
   }
 
@@ -152,41 +144,39 @@ class FSDirDeleteOp {
    * <p>
    * For small directory or file the deletion is done in one shot.
    * @param fsn namespace
-   * @param src path name to be deleted
-   * @param iip the INodesInPath instance containing all the INodes for the path
+   * @param paths the path to be deleted
    * @param logRetryCache whether to record RPC ids in editlog for retry cache
    *          rebuilding
    * @return blocks collected from the deleted path
    * @throws IOException
    */
   static BlocksMapUpdateInfo deleteInternal(
-      FSNamesystem fsn, String src, INodesInPath iip, boolean logRetryCache)
+      RWTransaction tx, FSNamesystem fsn, Resolver.Result paths,
+      boolean logRetryCache)
       throws IOException {
-    assert fsn.hasWriteLock();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
+      NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + paths.src);
     }
 
     FSDirectory fsd = fsn.getFSDirectory();
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
-    List<INode> removedINodes = new ChunkedArrayList<>();
+    List<Long> removedINodes = new ChunkedArrayList<>();
     List<Long> removedUCFiles = new ChunkedArrayList<>();
 
     long mtime = now();
     // Unlink the target directory from directory tree
-    long filesRemoved = delete(
-        fsd, iip, collectedBlocks, removedINodes, removedUCFiles, mtime);
+    long filesRemoved = delete(tx, paths, collectedBlocks, removedUCFiles, mtime);
     if (filesRemoved < 0) {
       return null;
     }
-    fsd.getEditLog().logDelete(src, mtime, logRetryCache);
+    fsd.getEditLog().logDelete(paths.src, mtime, logRetryCache);
     incrDeletedFileCount(filesRemoved);
 
-    fsn.removeLeasesAndINodes(removedUCFiles, removedINodes, true);
+    fsn.removeLeases(removedUCFiles);
 
     if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* Namesystem.delete: "
-                                        + src +" is removed");
+      NameNode.stateChangeLog.debug(
+          "DIR* Namesystem.delete: " + paths.src + " is removed");
     }
     return collectedBlocks;
   }
@@ -195,16 +185,16 @@ class FSDirDeleteOp {
     NameNode.getNameNodeMetrics().incrFilesDeleted(count);
   }
 
-  private static boolean deleteAllowed(final INodesInPath iip,
-      final String src) {
-    if (iip.length() < 1 || iip.getLastINode() == null) {
+  private static boolean deleteAllowed(Resolver.Result paths) {
+    String src = paths.src;
+    if (paths.notFound()) {
       if (NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug(
             "DIR* FSDirectory.unprotectedDelete: failed to remove "
                 + src + " because it does not exist");
       }
       return false;
-    } else if (iip.length() == 1) { // src is the root
+    } else if (paths.inodesInPath().length() == 1) { // src is the root
       NameNode.stateChangeLog.warn(
           "DIR* FSDirectory.unprotectedDelete: failed to remove " + src +
               " because the root is not allowed to be deleted");
@@ -216,47 +206,53 @@ class FSDirDeleteOp {
   /**
    * Delete a path from the name space
    * Update the count at each ancestor directory with quota
-   * @param fsd the FSDirectory instance
-   * @param iip the inodes resolved from the path
-   * @param reclaimContext used to collect blocks and inodes to be removed
+   *
+   * @param tx Transaction
+   * @param paths the path to be deleted
+   * @param collectedBlocks blocks collected from the deleted path
+   * @param removedUCFiles inodes whose leases need to be released
    * @param mtime the time the inode is removed
    * @return true if there are inodes deleted
    */
-  private static boolean unprotectedDelete(FSDirectory fsd, INodesInPath iip,
-      ReclaimContext reclaimContext, long mtime) {
-    assert fsd.hasWriteLock();
-
-    // check if target node exists
-    INode targetNode = iip.getLastINode();
-    if (targetNode == null) {
-      return false;
-    }
-
-    // record modification
-    final int latestSnapshot = iip.getLatestSnapshotId();
-    targetNode.recordModification(latestSnapshot);
-
-    // Remove the node from the namespace
-    long removed = fsd.removeLastINode(iip);
-    if (removed == -1) {
-      return false;
-    }
-
-    // set the parent's modification time
-    final INodeDirectory parent = targetNode.getParent();
-    parent.updateModificationTime(mtime, latestSnapshot);
-
-    // collect block and update quota
-    if (!targetNode.isInLatestSnapshot(latestSnapshot)) {
-      targetNode.destroyAndCollectBlocks(reclaimContext);
-    } else {
-      targetNode.cleanSubtree(reclaimContext, CURRENT_STATE_ID, latestSnapshot);
-    }
-
+  private static long unprotectedDelete(
+      RWTransaction tx, Resolver.Result paths,
+      BlocksMapUpdateInfo collectedBlocks, List<Long> removedUCFiles, long mtime) {
+    // TODO: Update quota
+    FlatINode parent = paths.inodesInPath().getLastINode(-2);
+    FlatINode inode = paths.inodesInPath().getLastINode();
+    long deleted = deleteSubtree(tx, inode.id(), collectedBlocks,
+                                 removedUCFiles);
+    ByteString newParent = new FlatINode.Builder()
+        .mergeFrom(parent).mtime(mtime).build();
+    tx.putINode(parent.id(), newParent);
+    tx.deleteChild(parent.id(),
+                   paths.inodesInPath().getLastPathComponent().asReadOnlyByteBuffer());
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
-          + iip.getPath() + " is removed");
+          + paths.src + " is removed");
     }
-    return true;
+    return deleted + 1;
+  }
+
+  private static long deleteSubtree(
+      RWTransaction tx, long parentId, BlocksMapUpdateInfo collectedBlocks,
+      List<Long> removedUCFiles) {
+    long deleted = 0;
+    for (long child : tx.childrenView(parentId).values()) {
+      FlatINode node = tx.getINode(child);
+      if (node.isFile()) {
+        FlatINodeFileFeature f = node.feature(FlatINodeFileFeature.class);
+        assert f != null;
+        if (f.inConstruction()) {
+          removedUCFiles.add(child);
+        }
+        for (Block b : f.blocks()) {
+          collectedBlocks.addDeleteBlock(b);
+        }
+      } else if (node.isDirectory()) {
+        deleted += deleteSubtree(tx, child, collectedBlocks, removedUCFiles);
+      }
+    }
+    return deleted;
   }
 }

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java

@@ -105,7 +105,7 @@ class FSDirStatAndListingOp {
 //      byte policyId = includeStoragePolicy && i != null && !i.isSymlink() ?
 //          i.getStoragePolicyID() : BlockStoragePolicySuite.ID_UNSPECIFIED;
 
-      byte policyId = HdfsConstantsClient.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
+      byte policyId = HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
       return createFileStatus(tx, fsd, paths.inodesInPath().getLastINode(),
                               HdfsFileStatus.EMPTY_NAME,
                               policyId);
@@ -261,7 +261,7 @@ class FSDirStatAndListingOp {
 //      byte curPolicy = isSuperUser && !cur.isSymlink()?
 //          cur.getLocalStoragePolicyID():
 //          HdfsConstantsClient.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
-      byte curPolicy = HdfsConstantsClient.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
+      byte curPolicy = HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
       ByteBuffer b =e.getKey().duplicate();
       byte[] localName = new byte[b.remaining()];
       b.get(localName);

+ 10 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java

@@ -389,15 +389,16 @@ class FSDirWriteFileOp {
 
     if (myFile != null) {
       if (overwrite) {
-        List<INode> toRemoveINodes = new ChunkedArrayList<>();
-        List<Long> toRemoveUCFiles = new ChunkedArrayList<>();
-        long ret = FSDirDeleteOp.delete(fsd, iip, toRemoveBlocks,
-                                        toRemoveINodes, toRemoveUCFiles, now());
-        if (ret >= 0) {
-          iip = INodesInPath.replace(iip, iip.length() - 1, null);
-          FSDirDeleteOp.incrDeletedFileCount(ret);
-          fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true);
-        }
+        // TODO
+//        List<INode> toRemoveINodes = new ChunkedArrayList<>();
+//        List<Long> toRemoveUCFiles = new ChunkedArrayList<>();
+//        long ret = FSDirDeleteOp.delete(fsd, iip, toRemoveBlocks,
+//                                        toRemoveINodes, toRemoveUCFiles, now());
+//        if (ret >= 0) {
+//          iip = INodesInPath.replace(iip, iip.length() - 1, null);
+//          FSDirDeleteOp.incrDeletedFileCount(ret);
+//          fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true);
+//        }
       } else {
         // If lease soft limit time is expired, recover the lease
         fsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip,

+ 25 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -474,6 +474,12 @@ public class FSDirectory implements Closeable {
     }
   }
 
+  static boolean isNonEmptyDirectory(
+      Transaction tx, FlatINodesInPath iip) {
+    FlatINode inode = iip.getLastINode();
+    return inode.isDirectory() && !tx.childrenView(inode.id()).isEmpty();
+  }
+
   /**
    * Check whether the filepath could be created
    * @throws SnapshotAccessControlException if path is in RO snapshot
@@ -573,8 +579,9 @@ public class FSDirectory implements Closeable {
   /**
    * Update usage count with replication factor change due to setReplication
    */
-  void updateCount(INodesInPath iip, long nsDelta, long ssDelta, short oldRep,
-      short newRep, boolean checkQuota) throws QuotaExceededException {
+  void updateCount(
+      INodesInPath iip, long nsDelta, long ssDelta, short oldRep, short newRep,
+      boolean checkQuota) throws QuotaExceededException {
     final INodeFile fileINode = iip.getLastINode().asFile();
     EnumCounters<StorageType> typeSpaceDeltas =
         getStorageTypeDeltas(fileINode.getStoragePolicyID(), ssDelta, oldRep, newRep);
@@ -1011,7 +1018,7 @@ public class FSDirectory implements Closeable {
    * @return true if on the block boundary or false if recovery is need
    */
   boolean unprotectedTruncate(INodesInPath iip, long newLength,
-                              BlocksMapUpdateInfo collectedBlocks,
+      BlocksMapUpdateInfo collectedBlocks,
                               long mtime, QuotaCounts delta) throws IOException {
     assert hasWriteLock();
     INodeFile file = iip.getLastINode().asFile();
@@ -1090,7 +1097,14 @@ public class FSDirectory implements Closeable {
       }
     }
   }
-  
+
+  public final void removeFromInodeMap(RWTransaction tx, List<Long> inodes) {
+    for (long inode : inodes) {
+      tx.deleteINode(inode);
+      ezManager.removeEncryptionZone(inode);
+    }
+  }
+
   /**
    * Get the inode from inodeMap based on its inode id.
    * @param id The given id
@@ -1632,6 +1646,13 @@ public class FSDirectory implements Closeable {
 //    }
   }
 
+  void checkPermission(FSPermissionChecker pc, FlatINodesInPath iip,
+      boolean doCheckOwner, FsAction ancestorAccess, FsAction parentAccess,
+      FsAction access, FsAction subAccess, boolean ignoreEmptyDir)
+      throws AccessControlException {
+    // TODO
+  }
+
   HdfsFileStatus getAuditFileInfo(INodesInPath iip)
       throws IOException {
     return (namesystem.isAuditEnabled() && namesystem.isExternalInvocation())

+ 15 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -3042,7 +3042,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   /**
    * Remove the indicated file from namespace.
    * 
-   * @see ClientProtocol#delete(String, boolean) for detailed description and 
+   * @see ClientProtocol#delete(String, boolean) for detailed description and
    * description of exceptions
    */
   boolean delete(String src, boolean recursive, boolean logRetryCache)
@@ -3129,6 +3129,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
   }
 
+  void removeLeases(List<Long> removedUCFiles) {
+    assert hasWriteLock();
+    leaseManager.removeLeases(removedUCFiles);
+    // remove inodes from inodesMap
+  }
+
   /**
    * Removes the blocks from blocksmap and updates the safemode blocks total
    * 
@@ -4072,21 +4078,24 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
             filesToDelete.add(blockInfo.getBlockCollection());
           }
         }
+      } finally {
+        writeUnlock();
+      }
 
+      try (RWTransaction tx = getFSDirectory().newRWTransaction().begin()) {
         for (BlockCollection bc : filesToDelete) {
           LOG.warn("Removing lazyPersist file " + bc.getName() + " with no replicas.");
+          Resolver.Result paths = Resolver.resolve(tx, bc.getName());
           BlocksMapUpdateInfo toRemoveBlocks =
-              FSDirDeleteOp.deleteInternal(
-                  FSNamesystem.this, bc.getName(),
-                  INodesInPath.fromINode((INodeFile) bc), false);
+              FSDirDeleteOp.deleteInternal(tx, FSNamesystem.this,
+                                           paths, false);
           changed |= toRemoveBlocks != null;
           if (toRemoveBlocks != null) {
             removeBlocks(toRemoveBlocks); // Incremental deletion of blocks
           }
         }
-      } finally {
-        writeUnlock();
       }
+
       if (changed) {
         getEditLog().logSync();
       }

+ 2 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetBlockLocations.java

@@ -60,7 +60,7 @@ public class TestGetBlockLocations {
 
   @Test(timeout = 30000)
   public void testGetBlockLocationsRacingWithDelete() throws IOException {
-    FSNamesystem fsn = spy(setupFileSystem());
+    final FSNamesystem fsn = spy(setupFileSystem());
     final FSDirectory fsd = fsn.getFSDirectory();
     FSEditLog editlog = fsn.getEditLog();
 
@@ -68,10 +68,7 @@ public class TestGetBlockLocations {
 
       @Override
       public Void answer(InvocationOnMock invocation) throws Throwable {
-        INodesInPath iip = fsd.getINodesInPath(FILE_PATH, true);
-        FSDirDeleteOp.delete(fsd, iip, new INode.BlocksMapUpdateInfo(),
-                             new ArrayList<INode>(), new ArrayList<Long>(),
-                             now());
+        FSDirDeleteOp.delete(fsn, FILE_PATH, true, false);
         invocation.callRealMethod();
         return null;
       }