Browse Source

[partial-ns] Implement completeFile().

Haohui Mai 10 years ago
parent
commit
ba100175f9

+ 50 - 44
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java

@@ -710,60 +710,66 @@ class FSDirWriteFileOp {
       FSNamesystem fsn, String src, String holder, Block last, long fileId)
       throws IOException {
     assert fsn.hasWriteLock();
-    final INodeFile pendingFile;
-    final INodesInPath iip;
-    INode inode = null;
-    try {
-      if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
+    FSDirectory fsd = fsn.getFSDirectory();
+    try (RWTransaction tx = fsd.newRWTransaction().begin()) {
+      final Resolver.Result paths;
+      if (true || fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
         // Older clients may not have given us an inode ID to work with.
         // In this case, we have to try to resolve the path and hope it
         // hasn't changed or been deleted since the file was opened for write.
-        iip = fsn.dir.getINodesInPath(src, true);
-        inode = iip.getLastINode();
+        paths = Resolver.resolve(tx, src);
       } else {
-        inode = fsn.dir.getInode(fileId);
-        iip = INodesInPath.fromINode(inode);
-        if (inode != null) {
-          src = iip.getPath();
-        }
+        // Newer clients pass the inode ID, so we can just get the inode
+        // directly.
+        paths = Resolver.resolveById(tx, fileId);
       }
-      pendingFile = fsn.checkLease(src, holder, inode, fileId);
-    } catch (LeaseExpiredException lee) {
-      if (inode != null && inode.isFile() &&
-          !inode.asFile().isUnderConstruction()) {
-        // This could be a retry RPC - i.e the client tried to close
-        // the file, but missed the RPC response. Thus, it is trying
-        // again to close the file. If the file still exists and
-        // the client's view of the last block matches the actual
-        // last block, then we'll treat it as a successful close.
-        // See HDFS-3031.
-        final Block realLastBlock = inode.asFile().getLastBlock();
-        if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
-          NameNode.stateChangeLog.info("DIR* completeFile: " +
-              "request from " + holder + " to complete inode " + fileId +
-              "(" + src + ") which is already closed. But, it appears to be " +
-              "an RPC retry. Returning success");
-          return true;
+      if (paths.invalidPath()) {
+        throw new InvalidPathException(src);
+      } else if (paths.notFound()) {
+        throw new FileNotFoundException(src);
+      }
+      FlatINode inode = paths.inodesInPath().getLastINode();
+      FlatINodeFileFeature file = inode.feature(FlatINodeFileFeature.class);
+      try {
+        fsn.checkLease(src, holder, inode);
+      } catch (LeaseExpiredException lee) {
+        if (file != null && !file.inConstruction()) {
+          // This could be a retry RPC - i.e the client tried to close
+          // the file, but missed the RPC response. Thus, it is trying
+          // again to close the file. If the file still exists and
+          // the client's view of the last block matches the actual
+          // last block, then we'll treat it as a successful close.
+          // See HDFS-3031.
+          final Block realLastBlock = file.lastBlock();
+          if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
+            NameNode.stateChangeLog.info("DIR* completeFile: request from "
+                + holder + " to complete inode " + fileId + "(" + src + ") "
+                + "which is already closed. But, it appears to be an RPC "
+                + "retry. Returning success");
+            return true;
+          }
         }
+        throw lee;
       }
-      throw lee;
-    }
-    // Check the state of the penultimate block. It should be completed
-    // before attempting to complete the last one.
-    if (!fsn.checkFileProgress(src, pendingFile, false)) {
-      return false;
-    }
 
-    // commit the last block and complete it if it has minimum replicas
-    fsn.commitOrCompleteLastBlock(pendingFile, iip, last);
+      // Check the state of the penultimate block. It should be completed
+      // before attempting to complete the last one.
+      if (!fsn.checkFileProgress(src, file, false)) {
+        return false;
+      }
 
-    if (!fsn.checkFileProgress(src, pendingFile, true)) {
-      return false;
-    }
+      // commit the last block and complete it if it has minimum replicas
+      FlatINodeFileFeature.Builder newFile =
+          fsn.commitOrCompleteLastBlock(file, last);
 
-    fsn.finalizeINodeFileUnderConstruction(src, pendingFile,
-        Snapshot.CURRENT_STATE_ID);
-    return true;
+      if (!fsn.checkFileProgress(src, file, true)) {
+        return false;
+      }
+      FlatINode.Builder newINode = new FlatINode.Builder().mergeFrom(inode);
+      fsn.finalizeINodeFileUnderConstruction(tx, src, newINode, newFile);
+      tx.commit();
+      return true;
+    }
   }
 
   private static INodeFile newINodeFile(

+ 36 - 19
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -127,6 +127,7 @@ import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
 
+import com.google.protobuf.ByteString;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
@@ -2987,25 +2988,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return b;
   }
 
-  /**
-   * Check that the indicated file's blocks are present and
-   * replicated.  If not, return false. If checkall is true, then check
-   * all blocks, otherwise check only penultimate block.
-   */
-  boolean checkFileProgress(String src, INodeFile v, boolean checkall) {
-    assert hasReadLock();
-    if (checkall) {
-      return blockManager.checkBlocksProperlyReplicated(src, v
-          .getBlocks());
-    } else {
-      // check the penultimate block of this file
-      BlockInfoContiguous b = v.getPenultimateBlock();
-      return b == null ||
-          blockManager.checkBlocksProperlyReplicated(
-              src, new BlockInfoContiguous[] { b });
-    }
-  }
-
   /**
    * Check that the indicated file's blocks are present and
    * replicated.  If not, return false. If checkall is true, then check
@@ -3630,6 +3612,26 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     blockManager.checkReplication(pendingFile);
   }
 
+  void finalizeINodeFileUnderConstruction(RWTransaction tx,
+      String src, FlatINode.Builder inode, FlatINodeFileFeature.Builder file)
+      throws IOException {
+    assert hasWriteLock();
+
+    Preconditions.checkArgument(file != null && file.inConstruction());
+    leaseManager.removeLeases(Collections.singletonList(inode.id()));
+
+    ByteString f = file.inConstruction(false).clientName(null)
+        .clientMachine(null).build();
+    ByteString b = inode.replaceFeature(FlatINodeFileFeature.wrap(f))
+        .mtime(now()).build();
+    tx.putINode(inode.id(), b);
+
+    // close file and persist block allocations for this file
+    closeFile(tx, src, FlatINode.wrap(b));
+
+    blockManager.checkReplication(file.blocks());
+  }
+
   @VisibleForTesting
   BlockInfoContiguous getStoredBlock(Block block) {
     return blockManager.getStoredBlock(block);
@@ -4042,6 +4044,21 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
   }
 
+  /**
+   * Close file.
+   */
+  private void closeFile(RWTransaction tx, String path, FlatINode inode) {
+    assert hasWriteLock();
+    waitForLoadingFSImage();
+    FlatINodeFileFeature file = inode.feature(FlatINodeFileFeature.class);
+    tx.logCloseFile(path, inode);
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("closeFile: "
+          + path + " with " + file.numBlocks()
+          + " blocks is persisted to the file system");
+    }
+  }
+
   /**
    * Periodically calls hasAvailableResources of NameNodeResourceChecker, and if
    * there are found to be insufficient resources available, causes the NN to

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java

@@ -163,4 +163,8 @@ class RWTransaction extends Transaction {
   public void logAddBlock(String src, FlatINodeFileFeature file) {
     fsd.getEditLog().logAddBlock(src, file);
   }
+
+  public void logCloseFile(String path, FlatINode inode) {
+    fsd.getEditLog().logCloseFile(fsd.ugid(), path, inode);
+  }
 }

+ 8 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java

@@ -24,6 +24,8 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.util.EnumSet;
+
+import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -136,8 +138,12 @@ public class TestAddBlockRetry {
   boolean checkFileProgress(String src, boolean checkall) throws IOException {
     final FSNamesystem ns = cluster.getNamesystem();
     ns.readLock();
-    try {
-      return ns.checkFileProgress(src, ns.dir.getINode(src).asFile(), checkall);
+    try (ROTransaction tx = ns.dir.newROTransaction().begin()) {
+      Resolver.Result paths = Resolver.resolve(tx, src);
+      Preconditions.checkState(paths.ok());
+      FlatINode inode = paths.inodesInPath().getLastINode();
+      FlatINodeFileFeature file = inode.feature(FlatINodeFileFeature.class);
+      return ns.checkFileProgress(src, file, checkall);
     } finally {
       ns.readUnlock();
     }