소스 검색

HDFS-8999. Allow a file to be closed with COMMITTED but not yet COMPLETE blocks.

Tsz-Wo Nicholas Sze 9 년 전
부모
커밋
b10d8ced21
15개의 변경된 파일271개의 추가작업 그리고 106개의 파일을 삭제
  1. 33 8
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  2. 32 23
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
  3. 1 2
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
  4. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  5. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  6. 10 21
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
  7. 12 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
  8. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
  9. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
  10. 58 16
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  11. 42 14
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
  12. 7 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
  13. 55 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
  14. 5 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
  15. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java

+ 33 - 8
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -34,7 +34,6 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.lang.reflect.Proxy;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -168,6 +167,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.RpcNoSuchMethodException;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
@@ -182,16 +182,15 @@ import org.apache.hadoop.util.DataChecksum.Type;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Time;
 import org.apache.htrace.core.TraceScope;
+import org.apache.htrace.core.Tracer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.net.InetAddresses;
-import org.apache.htrace.core.Tracer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /********************************************************
  * DFSClient can connect to a Hadoop Filesystem and
@@ -1355,17 +1354,43 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     }
   }
 
+  /**
+   * Invoke namenode append RPC.
+   * It retries in case of {@link BlockNotYetCompleteException}.
+   */
+  private LastBlockWithStatus callAppend(String src,
+      EnumSetWritable<CreateFlag> flag) throws IOException {
+    final long startTime = Time.monotonicNow();
+    for(;;) {
+      try {
+        return namenode.append(src, clientName, flag);
+      } catch(RemoteException re) {
+        if (Time.monotonicNow() - startTime > 5000
+            || !RetriableException.class.getName().equals(
+                re.getClassName())) {
+          throw re;
+        }
+
+        try { // sleep and retry
+          Thread.sleep(500);
+        } catch (InterruptedException e) {
+          throw DFSUtilClient.toInterruptedIOException("callAppend", e);
+        }
+      }
+    }
+  }
+
   /** Method to get stream returned by append call */
   private DFSOutputStream callAppend(String src, EnumSet<CreateFlag> flag,
       Progressable progress, String[] favoredNodes) throws IOException {
     CreateFlag.validateForAppend(flag);
     try {
-      LastBlockWithStatus blkWithStatus = namenode.append(src, clientName,
+      final LastBlockWithStatus blkWithStatus = callAppend(src,
           new EnumSetWritable<>(flag, CreateFlag.class));
       HdfsFileStatus status = blkWithStatus.getFileStatus();
       if (status == null) {
-        DFSClient.LOG.debug("NameNode is on an older version, request file " +
-            "info with additional RPC call for file: " + src);
+        LOG.debug("NameNode is on an older version, request file " +
+            "info with additional RPC call for file: {}", src);
         status = getFileInfo(src);
       }
       return DFSOutputStream.newStreamForAppend(this, src, flag, progress,

+ 32 - 23
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java

@@ -17,9 +17,29 @@
  */
 package org.apache.hadoop.hdfs;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.Maps;
-import com.google.common.primitives.SignedBytes;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.channels.SocketChannel;
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import javax.net.SocketFactory;
+
 import org.apache.commons.io.Charsets;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.KeyProvider;
@@ -54,26 +74,9 @@ import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.SocketFactory;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.channels.SocketChannel;
-import java.text.SimpleDateFormat;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.SignedBytes;
 
 public class DFSUtilClient {
   public static final byte[] EMPTY_BYTES = {};
@@ -684,4 +687,10 @@ public class DFSUtilClient {
     }
   }
 
+  public static InterruptedIOException toInterruptedIOException(String message,
+      InterruptedException e) {
+    final InterruptedIOException iioe = new InterruptedIOException(message);
+    iioe.initCause(e);
+    return iioe;
+  }
 }

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java

@@ -455,8 +455,7 @@ class DataStreamer extends Daemon {
     setPipeline(lastBlock);
     if (nodes.length < 1) {
       throw new IOException("Unable to retrieve blocks locations " +
-          " for last block " + block +
-          "of file " + src);
+          " for last block " + block + " of file " + src);
     }
   }
 

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -1020,6 +1020,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-9436. Make NNThroughputBenchmark$BlockReportStats run with 10
     datanodes by default. (Mingliang Liu via shv)
 
+    HDFS-8999. Allow a file to be closed with COMMITTED but not yet COMPLETE
+    blocks.  (szetszwo)
+
   BUG FIXES
 
     HDFS-8091: ACLStatus and XAttributes should be presented to

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -194,6 +194,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_NAMENODE_REPLICATION_MIN_KEY =
       HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
   public static final int     DFS_NAMENODE_REPLICATION_MIN_DEFAULT = 1;
+  public static final String  DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY
+      = "dfs.namenode.file.close.num-committed-allowed";
+  public static final int     DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_DEFAULT
+      = 0;
   public static final String  DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY =
       "dfs.namenode.safemode.replication.min";
   public static final String  DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY =

+ 10 - 21
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -626,6 +626,10 @@ public class BlockManager implements BlockStatsMXBean {
     return (countNodes(block).liveReplicas() >= minReplication);
   }
 
+  public short getMinReplication() {
+    return minReplication;
+  }
+
   /**
    * Commit a block of a file
    * 
@@ -673,7 +677,7 @@ public class BlockManager implements BlockStatsMXBean {
     final boolean b = commitBlock(lastBlock, commitBlock);
     if (countNodes(lastBlock).liveReplicas() >= minReplication) {
       if (b) {
-        addExpectedReplicasToPending(lastBlock);
+        addExpectedReplicasToPending(lastBlock, bc);
       }
       completeBlock(lastBlock, false);
     }
@@ -685,6 +689,10 @@ public class BlockManager implements BlockStatsMXBean {
    * pendingReplications in order to keep ReplicationMonitor from scheduling
    * the block.
    */
+  public void addExpectedReplicasToPending(BlockInfo blk, BlockCollection bc) {
+    addExpectedReplicasToPending(blk);
+  }
+
   private void addExpectedReplicasToPending(BlockInfo lastBlock) {
     DatanodeStorageInfo[] expectedStorages =
         lastBlock.getUnderConstructionFeature().getExpectedStorageLocations();
@@ -2656,7 +2664,7 @@ public class BlockManager implements BlockStatsMXBean {
 
     if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
         numLiveReplicas >= minReplication) {
-      addExpectedReplicasToPending(storedBlock);
+      addExpectedReplicasToPending(storedBlock, bc);
       completeBlock(storedBlock, false);
     } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
       // check whether safe replication is reached for the block
@@ -3485,25 +3493,6 @@ public class BlockManager implements BlockStatsMXBean {
     }
   }
 
-  /**
-   * Check that the indicated blocks are present and
-   * replicated.
-   */
-  public boolean checkBlocksProperlyReplicated(
-      String src, BlockInfo[] blocks) {
-    for (BlockInfo b: blocks) {
-      if (!b.isComplete()) {
-        final int numNodes = b.numNodes();
-        LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = "
-          + b.getBlockUCState() + ", replication# = " + numNodes
-          + (numNodes < minReplication ? " < ": " >= ")
-          + " minimum = " + minReplication + ") in file " + src);
-        return false;
-      }
-    }
-    return true;
-  }
-
   /** 
    * @return 0 if the block is not found;
    *         otherwise, return the replication factor of the block.

+ 12 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java

@@ -33,8 +33,10 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion.Feature;
+import org.apache.hadoop.ipc.RetriableException;
 
 import com.google.common.base.Preconditions;
 
@@ -119,10 +121,17 @@ final class FSDirAppendOp {
 
       final BlockInfo lastBlock = file.getLastBlock();
       // Check that the block has at least minimum replication.
-      if (lastBlock != null && lastBlock.isComplete()
+      if (lastBlock != null) {
+        if (lastBlock.getBlockUCState() == BlockUCState.COMMITTED) {
+          throw new RetriableException(
+              new NotReplicatedYetException("append: lastBlock="
+                  + lastBlock + " of src=" + path
+                  + " is COMMITTED but not yet COMPLETE."));
+        } else if (lastBlock.isComplete()
           && !blockManager.isSufficientlyReplicated(lastBlock)) {
-        throw new IOException("append: lastBlock=" + lastBlock + " of src="
-            + path + " is not sufficiently replicated yet.");
+          throw new IOException("append: lastBlock=" + lastBlock + " of src="
+              + path + " is not sufficiently replicated yet.");
+        }
       }
       lb = prepareFileForAppend(fsn, iip, holder, clientMachine, newBlock,
           true, logRetryCache);

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java

@@ -758,8 +758,10 @@ class FSDirWriteFileOp {
       return false;
     }
 
+    fsn.addCommittedBlocksToPending(pendingFile);
+
     fsn.finalizeINodeFileUnderConstruction(src, pendingFile,
-        Snapshot.CURRENT_STATE_ID);
+        Snapshot.CURRENT_STATE_ID, true);
     return true;
   }
 

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java

@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
 import static org.apache.hadoop.hdfs.server.namenode.FSImageFormat.renameReservedPathsOnUpgrade;
 import static org.apache.hadoop.util.Time.monotonicNow;
 
@@ -29,7 +28,6 @@ import java.util.EnumMap;
 import java.util.EnumSet;
 import java.util.List;
 
-import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -90,6 +88,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
@@ -449,8 +448,9 @@ public class FSEditLogLoader {
       // One might expect that you could use removeLease(holder, path) here,
       // but OP_CLOSE doesn't serialize the holder. So, remove the inode.
       if (file.isUnderConstruction()) {
-        fsNamesys.leaseManager.removeLeases(Lists.newArrayList(file.getId()));
-        file.toCompleteFile(file.getModificationTime());
+        fsNamesys.getLeaseManager().removeLease(file.getId());
+        file.toCompleteFile(file.getModificationTime(), 0,
+            fsNamesys.getBlockManager().getMinReplication());
       }
       break;
     }

+ 58 - 16
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -445,6 +445,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   private final long minBlockSize;         // minimum block size
   final long maxBlocksPerFile;     // maximum # of blocks per file
+  private final int numCommittedAllowed;
 
   /** Lock to protect FSNamesystem. */
   private final FSNamesystemLock fsLock;
@@ -741,6 +742,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT);
       this.maxBlocksPerFile = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY,
           DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT);
+      this.numCommittedAllowed = conf.getInt(
+              DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY,
+              DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_DEFAULT);
       this.supportAppends = conf.getBoolean(DFS_SUPPORT_APPEND_KEY, DFS_SUPPORT_APPEND_DEFAULT);
       LOG.info("Append Enabled: " + supportAppends);
 
@@ -2563,17 +2567,36 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   boolean checkFileProgress(String src, INodeFile v, boolean checkall) {
     assert hasReadLock();
     if (checkall) {
-      return blockManager.checkBlocksProperlyReplicated(src, v
-          .getBlocks());
+      return checkBlocksComplete(src, true, v.getBlocks());
     } else {
-      // check the penultimate block of this file
-      BlockInfo b = v.getPenultimateBlock();
-      return b == null ||
-          blockManager.checkBlocksProperlyReplicated(
-              src, new BlockInfo[] { b });
+      final BlockInfo[] blocks = v.getBlocks();
+      final int i = blocks.length - numCommittedAllowed - 2;
+      return i < 0 || blocks[i] == null
+          || checkBlocksComplete(src, false, blocks[i]);
     }
   }
 
+  /**
+   * Check if the blocks are COMPLETE;
+   * it may allow the last block to be COMMITTED.
+   */
+  private boolean checkBlocksComplete(String src, boolean allowCommittedBlock,
+      BlockInfo... blocks) {
+    final int n = allowCommittedBlock? numCommittedAllowed: 0;
+    for(int i = 0; i < blocks.length; i++) {
+      final short min = blockManager.getMinReplication();
+      final String err = INodeFile.checkBlockComplete(blocks, i, n, min);
+      if (err != null) {
+        final int numNodes = blocks[i].numNodes();
+        LOG.info("BLOCK* " + err + "(numNodes= " + numNodes
+            + (numNodes < min ? " < " : " >= ")
+            + " minimum = " + min + ") in file " + src);
+        return false;
+      }
+    }
+    return true;
+  }
+
   /**
    * Change the indicated filename. 
    * @deprecated Use {@link #renameTo(String, String, boolean,
@@ -2704,7 +2727,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       List<INode> removedINodes,
       final boolean acquireINodeMapLock) {
     assert hasWriteLock();
-    leaseManager.removeLeases(removedUCFiles);
+    for(long i : removedUCFiles) {
+      leaseManager.removeLease(i);
+    }
     // remove inodes from inodesMap
     if (removedINodes != null) {
       if (acquireINodeMapLock) {
@@ -2963,7 +2988,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     // then reap lease immediately and close the file.
     if(nrCompleteBlocks == nrBlocks) {
       finalizeINodeFileUnderConstruction(src, pendingFile,
-          iip.getLatestSnapshotId());
+          iip.getLatestSnapshotId(), false);
       NameNode.stateChangeLog.warn("BLOCK*"
         + " internalReleaseLease: All existing blocks are COMPLETE,"
         + " lease removed, file closed.");
@@ -3002,7 +3027,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       if(penultimateBlockMinReplication &&
           blockManager.checkMinReplication(lastBlock)) {
         finalizeINodeFileUnderConstruction(src, pendingFile,
-            iip.getLatestSnapshotId());
+            iip.getLatestSnapshotId(), false);
         NameNode.stateChangeLog.warn("BLOCK*"
           + " internalReleaseLease: Committed blocks are minimally replicated,"
           + " lease removed, file closed.");
@@ -3046,7 +3071,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         // We can remove this block and close the file.
         pendingFile.removeLastBlock(lastBlock);
         finalizeINodeFileUnderConstruction(src, pendingFile,
-            iip.getLatestSnapshotId());
+            iip.getLatestSnapshotId(), false);
         NameNode.stateChangeLog.warn("BLOCK* internalReleaseLease: "
             + "Removed empty last block and closed file.");
         return true;
@@ -3111,8 +3136,23 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
   }
 
-  void finalizeINodeFileUnderConstruction(
-      String src, INodeFile pendingFile, int latestSnapshot) throws IOException {
+  void addCommittedBlocksToPending(final INodeFile pendingFile) {
+    final BlockInfo[] blocks = pendingFile.getBlocks();
+    int i = blocks.length - numCommittedAllowed;
+    if (i < 0) {
+      i = 0;
+    }
+    for(; i < blocks.length; i++) {
+      final BlockInfo b = blocks[i];
+      if (b != null && b.getBlockUCState() == BlockUCState.COMMITTED) {
+        // b is COMMITTED but not yet COMPLETE, add it to pending replication.
+        blockManager.addExpectedReplicasToPending(b, pendingFile);
+      }
+    }
+  }
+
+  void finalizeINodeFileUnderConstruction(String src, INodeFile pendingFile,
+      int latestSnapshot, boolean allowCommittedBlock) throws IOException {
     assert hasWriteLock();
 
     FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
@@ -3127,7 +3167,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     // The file is no longer pending.
     // Create permanent INode, update blocks. No need to replace the inode here
     // since we just remove the uc feature from pendingFile
-    pendingFile.toCompleteFile(now());
+    pendingFile.toCompleteFile(now(),
+        allowCommittedBlock? numCommittedAllowed: 0,
+        blockManager.getMinReplication());
 
     // close file and persist block allocations for this file
     closeFile(src, pendingFile);
@@ -3375,8 +3417,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     commitOrCompleteLastBlock(pendingFile, iip, storedBlock);
 
     //remove lease, close file
-    finalizeINodeFileUnderConstruction(src, pendingFile,
-        Snapshot.findLatestSnapshot(pendingFile, Snapshot.CURRENT_STATE_ID));
+    int s = Snapshot.findLatestSnapshot(pendingFile, Snapshot.CURRENT_STATE_ID);
+    finalizeINodeFileUnderConstruction(src, pendingFile, s, false);
   }
 
   /**

+ 42 - 14
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java

@@ -201,28 +201,56 @@ public class INodeFile extends INodeWithAdditionalFields
    * Convert the file to a complete file, i.e., to remove the Under-Construction
    * feature.
    */
-  public INodeFile toCompleteFile(long mtime) {
-    Preconditions.checkState(isUnderConstruction(),
-        "file is no longer under construction");
-    FileUnderConstructionFeature uc = getFileUnderConstructionFeature();
-    if (uc != null) {
-      assertAllBlocksComplete();
-      removeFeature(uc);
-      this.setModificationTime(mtime);
-    }
-    return this;
+  void toCompleteFile(long mtime, int numCommittedAllowed, short minReplication) {
+    final FileUnderConstructionFeature uc = getFileUnderConstructionFeature();
+    Preconditions.checkNotNull(uc, "File %s is not under construction", this);
+    assertAllBlocksComplete(numCommittedAllowed, minReplication);
+    removeFeature(uc);
+    setModificationTime(mtime);
   }
 
   /** Assert all blocks are complete. */
-  private void assertAllBlocksComplete() {
+  private void assertAllBlocksComplete(int numCommittedAllowed,
+      short minReplication) {
     if (blocks == null) {
       return;
     }
     for (int i = 0; i < blocks.length; i++) {
-      Preconditions.checkState(blocks[i].isComplete(), "Failed to finalize"
-          + " %s %s since blocks[%s] is non-complete, where blocks=%s.",
-          getClass().getSimpleName(), this, i, Arrays.asList(blocks));
+      final String err = checkBlockComplete(blocks, i, numCommittedAllowed,
+          minReplication);
+      Preconditions.checkState(err == null,
+          "Unexpected block state: %s, file=%s (%s), blocks=%s (i=%s)",
+          err, this, getClass().getSimpleName(), Arrays.asList(blocks), i);
+    }
+  }
+
+  /**
+   * Check if the i-th block is COMPLETE;
+   * when the i-th block is the last block, it may be allowed to be COMMITTED.
+   *
+   * @return null if the block passes the check;
+   *              otherwise, return an error message.
+   */
+  static String checkBlockComplete(BlockInfo[] blocks, int i,
+      int numCommittedAllowed, short minReplication) {
+    final BlockInfo b = blocks[i];
+    final BlockUCState state = b.getBlockUCState();
+    if (state == BlockUCState.COMPLETE) {
+      return null;
+    }
+    if (i < blocks.length - numCommittedAllowed) {
+      return b + " is " + state + " but not COMPLETE";
     }
+    if (state != BlockUCState.COMMITTED) {
+      return b + " is " + state + " but neither COMPLETE nor COMMITTED";
+    }
+    final int numExpectedLocations
+        = b.getUnderConstructionFeature().getNumExpectedLocations();
+    if (numExpectedLocations <= minReplication) {
+      return b + " is " + state + " but numExpectedLocations = "
+          + numExpectedLocations + " <= minReplication = " + minReplication;
+    }
+    return null;
   }
 
   @Override // BlockCollection

+ 7 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java

@@ -160,6 +160,13 @@ public class LeaseManager {
     return lease;
   }
 
+  synchronized void removeLease(long inodeId) {
+    final Lease lease = leasesById.get(inodeId);
+    if (lease != null) {
+      removeLease(lease, inodeId);
+    }
+  }
+
   /**
    * Remove the specified lease and src.
    */
@@ -298,16 +305,6 @@ public class LeaseManager {
     }
   }
 
-  @VisibleForTesting
-  synchronized void removeLeases(Collection<Long> inodes) {
-    for (long inode : inodes) {
-      Lease lease = leasesById.get(inode);
-      if (lease != null) {
-        removeLease(lease, inode);
-      }
-    }
-  }
-
   public void setLeasePeriod(long softLimit, long hardLimit) {
     this.softLimit = softLimit;
     this.hardLimit = hardLimit; 

+ 55 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java

@@ -27,10 +27,12 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.HardLink;
@@ -41,12 +43,12 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.Time;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -55,6 +57,8 @@ import org.junit.Test;
  * support HDFS appends.
  */
 public class TestFileAppend{
+  private static final long RANDOM_TEST_RUNTIME = 10000;
+
   final boolean simulatedStorage = false;
 
   private static byte[] fileContents = null;
@@ -381,6 +385,56 @@ public class TestFileAppend{
     }
   }
 
+
+  @Test
+  public void testMultipleAppends() throws Exception {
+    final long startTime = Time.monotonicNow();
+    final Configuration conf = new HdfsConfiguration();
+    conf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY, 1);
+
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(4).build();
+    final DistributedFileSystem fs = cluster.getFileSystem();
+    try {
+      final Path p = new Path("/testMultipleAppend/foo");
+      final int blockSize = 1 << 16;
+      final byte[] data = AppendTestUtil.initBuffer(blockSize);
+
+      // create an empty file.
+      fs.create(p, true, 4096, (short)3, blockSize).close();
+
+      int fileLen = 0;
+      for(int i = 0;
+          i < 10 || Time.monotonicNow() - startTime < RANDOM_TEST_RUNTIME;
+          i++) {
+        int appendLen = ThreadLocalRandom.current().nextInt(100) + 1;
+        if (fileLen + appendLen > data.length) {
+          break;
+        }
+
+        AppendTestUtil.LOG.info(i + ") fileLen="  + fileLen
+            + ", appendLen=" + appendLen);
+        final FSDataOutputStream out = fs.append(p);
+        out.write(data, fileLen, appendLen);
+        out.close();
+        fileLen += appendLen;
+      }
+
+      Assert.assertEquals(fileLen, fs.getFileStatus(p).getLen());
+      final byte[] actual = new byte[fileLen];
+      final FSDataInputStream in = fs.open(p);
+      in.readFully(actual);
+      in.close();
+      for(int i = 0; i < fileLen; i++) {
+        Assert.assertEquals(data[i], actual[i]);
+      }
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
   /** Tests appending after soft-limit expires. */
   @Test
   public void testAppendAfterSoftLimit() 

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java

@@ -93,6 +93,10 @@ public class TestINodeFile {
         (short)3, 1024L);
   }
 
+  static void toCompleteFile(INodeFile file) {
+    file.toCompleteFile(Time.now(), 0, (short)1);
+  }
+
   INodeFile createINodeFile(short replication, long preferredBlockSize) {
     return new INodeFile(HdfsConstants.GRANDFATHER_INODE_ID, null, perm, 0L, 0L,
         null, replication, preferredBlockSize, (byte)0);
@@ -1090,7 +1094,7 @@ public class TestINodeFile {
     assertEquals(clientName, uc.getClientName());
     assertEquals(clientMachine, uc.getClientMachine());
 
-    file.toCompleteFile(Time.now());
+    toCompleteFile(file);
     assertFalse(file.isUnderConstruction());
   }
 

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java

@@ -51,8 +51,8 @@ public class TestLeaseManager {
     }
 
     assertEquals(4, lm.getINodeIdWithLeases().size());
-    synchronized (lm) {
-      lm.removeLeases(ids);
+    for (long id : ids) {
+      lm.removeLease(id);
     }
     assertEquals(0, lm.getINodeIdWithLeases().size());
   }