浏览代码

Merge r1401869 through r1402273 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1402278 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 12 年之前
父节点
当前提交
40d2b6f308
共有 18 个文件被更改,包括 395 次插入180 次删除
  1. 12 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
  3. 2 14
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  4. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
  5. 1 8
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
  6. 57 94
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  7. 12 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
  8. 11 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
  9. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
  10. 19 6
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
  11. 1 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
  12. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java
  13. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java
  14. 90 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
  15. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  16. 46 42
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java
  17. 13 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
  18. 116 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java

+ 12 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -150,6 +150,8 @@ Trunk (Unreleased)
     HDFS-4052. BlockManager#invalidateWork should print log outside the lock.
     (Jing Zhao via suresh)
 
+    HDFS-4110. Refine a log printed in JNStorage. (Liang Xie via suresh)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -233,6 +235,9 @@ Trunk (Unreleased)
     HDFS-2434. TestNameNodeMetrics.testCorruptBlock fails intermittently.
     (Jing Zhao via suresh)
 
+    HDFS-4067. TestUnderReplicatedBlocks intermittently fails due to 
+    ReplicaAlreadyExistsException. (Jing Zhao via suresh)
+
   BREAKDOWN OF HDFS-3077 SUBTASKS
 
     HDFS-3077. Quorum-based protocol for reading and writing edit logs.
@@ -413,6 +418,9 @@ Release 2.0.3-alpha - Unreleased
 
     HDFS-4099. Clean up replication code and add more javadoc. (szetszwo)
 
+    HDFS-4107. Add utility methods for casting INode to INodeFile and
+    INodeFileUnderConstruction. (szetszwo)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -488,6 +496,10 @@ Release 2.0.3-alpha - Unreleased
     HDFS-4022. Replication not happening for appended block.
     (Vinay via umamahesh)
 
+    HDFS-3948. Do not use hflush in TestWebHDFS.testNamenodeRestart() since the
+    out stream returned by WebHdfsFileSystem does not support it. (Jing Zhao
+    via szetszwo)
+
 Release 2.0.2-alpha - 2012-09-07 
 
   INCOMPATIBLE CHANGES

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java

@@ -171,8 +171,7 @@ class JNStorage extends Storage {
 
   void format(NamespaceInfo nsInfo) throws IOException {
     setStorageInfo(nsInfo);
-    LOG.info("Formatting journal storage directory " + 
-        sd + " with nsid: " + getNamespaceID());
+    LOG.info("Formatting journal " + sd + " with nsid: " + getNamespaceID());
     // Unlock the directory before formatting, because we will
     // re-analyze it after format(). The analyzeStorage() call
     // below is reponsible for re-locking it. This is a no-op

+ 2 - 14
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -1012,7 +1012,7 @@ public class FSDirectory implements Closeable {
     int i = 0;
     int totalBlocks = 0;
     for(String src : srcs) {
-      INodeFile srcInode = getFileINode(src);
+      INodeFile srcInode = (INodeFile)getINode(src);
       allSrcInodes[i++] = srcInode;
       totalBlocks += srcInode.blocks.length;  
     }
@@ -1300,25 +1300,13 @@ public class FSDirectory implements Closeable {
     }
   }
 
-  /**
-   * Get {@link INode} associated with the file.
-   */
-  INodeFile getFileINode(String src) throws UnresolvedLinkException {
-    INode inode = getINode(src);
-    if (inode == null || inode.isDirectory())
-      return null;
-    assert !inode.isLink();
-    return (INodeFile) inode;
-  }
-  
   /**
    * Get {@link INode} associated with the file / directory.
    */
   public INode getINode(String src) throws UnresolvedLinkException {
     readLock();
     try {
-      INode iNode = rootDir.getNode(src, true);
-      return iNode;
+      return rootDir.getNode(src, true);
     } finally {
       readUnlock();
     }

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java

@@ -477,8 +477,8 @@ public class FSEditLogLoader {
       Lease lease = fsNamesys.leaseManager.getLease(
           reassignLeaseOp.leaseHolder);
       INodeFileUnderConstruction pendingFile =
-          (INodeFileUnderConstruction) fsDir.getFileINode(
-              reassignLeaseOp.path);
+          INodeFileUnderConstruction.valueOf( 
+              fsDir.getINode(reassignLeaseOp.path), reassignLeaseOp.path);
       fsNamesys.reassignLeaseInternal(lease,
           reassignLeaseOp.path, reassignLeaseOp.newHolder, pendingFile);
       break;

+ 1 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java

@@ -365,14 +365,7 @@ class FSImageFormat {
 
         // verify that file exists in namespace
         String path = cons.getLocalName();
-        INode old = fsDir.getFileINode(path);
-        if (old == null) {
-          throw new IOException("Found lease for non-existent file " + path);
-        }
-        if (old.isDirectory()) {
-          throw new IOException("Found lease for directory " + path);
-        }
-        INodeFile oldnode = (INodeFile) old;
+        INodeFile oldnode = INodeFile.valueOf(fsDir.getINode(path), path);
         fsDir.replaceNode(path, oldnode, cons);
         namesystem.leaseManager.addLease(cons.getClientName(), path); 
       }

+ 57 - 94
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -1270,11 +1270,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         }
 
         long now = now();
-        INodeFile inode = dir.getFileINode(src);
-        if (inode == null) {
-          throw new FileNotFoundException("File does not exist: " + src);
-        }
-        assert !inode.isLink();
+        final INodeFile inode = INodeFile.valueOf(dir.getINode(src), src);
         if (doAccessTime && isAccessTimeSupported()) {
           if (now <= inode.getAccessTime() + getAccessTimePrecision()) {
             // if we have to set access time but we only have the readlock, then
@@ -1390,28 +1386,27 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
     // we put the following prerequisite for the operation
     // replication and blocks sizes should be the same for ALL the blocks
-    // check the target
-    INode inode = dir.getFileINode(target);
 
-    if(inode == null) {
-      throw new IllegalArgumentException("concat: trg file doesn't exist");
-    }
-    if(inode.isUnderConstruction()) {
-      throw new IllegalArgumentException("concat: trg file is uner construction");
+    // check the target
+    final INodeFile trgInode = INodeFile.valueOf(dir.getINode(target), target);
+    if(trgInode.isUnderConstruction()) {
+      throw new HadoopIllegalArgumentException("concat: target file "
+          + target + " is under construction");
     }
-
-    INodeFile trgInode = (INodeFile) inode;
-
-    // per design trg shouldn't be empty and all the blocks same size
+    // per design target shouldn't be empty and all the blocks same size
     if(trgInode.blocks.length == 0) {
-      throw new IllegalArgumentException("concat: "+ target + " file is empty");
+      throw new HadoopIllegalArgumentException("concat: target file "
+          + target + " is empty");
     }
 
     long blockSize = trgInode.getPreferredBlockSize();
 
     // check the end block to be full
     if(blockSize != trgInode.blocks[trgInode.blocks.length-1].getNumBytes()) {
-      throw new IllegalArgumentException(target + " blocks size should be the same");
+      throw new HadoopIllegalArgumentException("The last block in " + target
+          + " is not full; last block size = "
+          + trgInode.blocks[trgInode.blocks.length-1].getNumBytes()
+          + " but file block size = " + blockSize);
     }
 
     si.add(trgInode);
@@ -1424,21 +1419,21 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       if(i==srcs.length-1)
         endSrc=true;
 
-      INodeFile srcInode = dir.getFileINode(src);
-
+      final INodeFile srcInode = INodeFile.valueOf(dir.getINode(src), src);
       if(src.isEmpty() 
-          || srcInode == null
           || srcInode.isUnderConstruction()
           || srcInode.blocks.length == 0) {
-        throw new IllegalArgumentException("concat: file " + src + 
-        " is invalid or empty or underConstruction");
+        throw new HadoopIllegalArgumentException("concat: source file " + src
+            + " is invalid or empty or underConstruction");
       }
 
       // check replication and blocks size
       if(repl != srcInode.getFileReplication()) {
-        throw new IllegalArgumentException(src + " and " + target + " " +
-            "should have same replication: "
-            + repl + " vs. " + srcInode.getFileReplication());
+        throw new HadoopIllegalArgumentException("concat: the soruce file "
+            + src + " and the target file " + target
+            + " should have the same replication: source replication is "
+            + srcInode.getBlockReplication()
+            + " but target replication is " + repl);
       }
 
       //boolean endBlock=false;
@@ -1448,8 +1443,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       if(endSrc)
         idx = srcInode.blocks.length-2; // end block of endSrc is OK not to be full
       if(idx >= 0 && srcInode.blocks[idx].getNumBytes() != blockSize) {
-        throw new IllegalArgumentException("concat: blocks sizes of " + 
-            src + " and " + target + " should all be the same");
+        throw new HadoopIllegalArgumentException("concat: the soruce file "
+            + src + " and the target file " + target
+            + " should have the same blocks sizes: target block size is "
+            + blockSize + " but the size of source block " + idx + " is "
+            + srcInode.blocks[idx].getNumBytes());
       }
 
       si.add(srcInode);
@@ -1458,7 +1456,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     // make sure no two files are the same
     if(si.size() < srcs.length+1) { // trg + srcs
       // it means at least two files are the same
-      throw new IllegalArgumentException("at least two files are the same");
+      throw new HadoopIllegalArgumentException(
+          "concat: at least two of the source files are the same");
     }
 
     if(NameNode.stateChangeLog.isDebugEnabled()) {
@@ -1797,13 +1796,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
 
     try {
-      INodeFile myFile = dir.getFileINode(src);
-      try {
-        blockManager.verifyReplication(src, replication, clientMachine);
-      } catch(IOException e) {
-        throw new IOException("failed to create "+e.getMessage());
-      }
+      blockManager.verifyReplication(src, replication, clientMachine);
       boolean create = flag.contains(CreateFlag.CREATE);
+      final INode myFile = dir.getINode(src);
       if (myFile == null) {
         if (!create) {
           throw new FileNotFoundException("failed to overwrite or append to non-existent file "
@@ -1829,8 +1824,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
 
       if (append && myFile != null) {
+        final INodeFile f = INodeFile.valueOf(myFile, src); 
         return prepareFileForWrite(
-            src, myFile, holder, clientMachine, clientNode, true);
+            src, f, holder, clientMachine, clientNode, true);
       } else {
        // Now we can add the name to the filesystem. This file has no
        // blocks associated with it.
@@ -1925,11 +1921,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         throw new IOException("Invalid file name: " + src);
       }
   
-      INode inode = dir.getFileINode(src);
-      if (inode == null) {
-        throw new FileNotFoundException("File not found " + src);
-      }
-  
+      final INodeFile inode = INodeFile.valueOf(dir.getINode(src), src);
       if (!inode.isUnderConstruction()) {
         return true;
       }
@@ -2330,35 +2322,32 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   private INodeFileUnderConstruction checkLease(String src, String holder) 
       throws LeaseExpiredException, UnresolvedLinkException {
     assert hasReadOrWriteLock();
-    INodeFile file = dir.getFileINode(src);
-    checkLease(src, holder, file);
-    return (INodeFileUnderConstruction)file;
+    return checkLease(src, holder, dir.getINode(src));
   }
 
-  private void checkLease(String src, String holder, INode file)
-      throws LeaseExpiredException {
+  private INodeFileUnderConstruction checkLease(String src, String holder,
+      INode file) throws LeaseExpiredException {
     assert hasReadOrWriteLock();
-    if (file == null || file.isDirectory()) {
+    if (file == null || !(file instanceof INodeFile)) {
       Lease lease = leaseManager.getLease(holder);
-      throw new LeaseExpiredException("No lease on " + src +
-                                      " File does not exist. " +
-                                      (lease != null ? lease.toString() :
-                                       "Holder " + holder + 
-                                       " does not have any open files."));
+      throw new LeaseExpiredException(
+          "No lease on " + src + ": File does not exist. "
+          + (lease != null ? lease.toString()
+              : "Holder " + holder + " does not have any open files."));
     }
     if (!file.isUnderConstruction()) {
       Lease lease = leaseManager.getLease(holder);
-      throw new LeaseExpiredException("No lease on " + src + 
-                                      " File is not open for writing. " +
-                                      (lease != null ? lease.toString() :
-                                       "Holder " + holder + 
-                                       " does not have any open files."));
+      throw new LeaseExpiredException(
+          "No lease on " + src + ": File is not open for writing. "
+          + (lease != null ? lease.toString()
+              : "Holder " + holder + " does not have any open files."));
     }
     INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file;
     if (holder != null && !pendingFile.getClientName().equals(holder)) {
       throw new LeaseExpiredException("Lease mismatch on " + src + " owned by "
           + pendingFile.getClientName() + " but is accessed by " + holder);
     }
+    return pendingFile;
   }
  
   /**
@@ -2400,15 +2389,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     try {
       pendingFile = checkLease(src, holder);
     } catch (LeaseExpiredException lee) {
-      INodeFile file = dir.getFileINode(src);
-      if (file != null && !file.isUnderConstruction()) {
+      final INode inode = dir.getINode(src);
+      if (inode != null && inode instanceof INodeFile && !inode.isUnderConstruction()) {
         // This could be a retry RPC - i.e the client tried to close
         // the file, but missed the RPC response. Thus, it is trying
         // again to close the file. If the file still exists and
         // the client's view of the last block matches the actual
         // last block, then we'll treat it as a successful close.
         // See HDFS-3031.
-        Block realLastBlock = file.getLastBlock();
+        final Block realLastBlock = ((INodeFile)inode).getLastBlock();
         if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
           NameNode.stateChangeLog.info("DIR* NameSystem.completeFile: " +
               "received request from " + holder + " to complete file " + src +
@@ -2994,23 +2983,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     LOG.info("Recovering lease=" + lease + ", src=" + src);
     assert !isInSafeMode();
     assert hasWriteLock();
-    INodeFile iFile = dir.getFileINode(src);
-    if (iFile == null) {
-      final String message = "DIR* NameSystem.internalReleaseLease: "
-        + "attempt to release a create lock on "
-        + src + " file does not exist.";
-      NameNode.stateChangeLog.warn(message);
-      throw new IOException(message);
-    }
-    if (!iFile.isUnderConstruction()) {
-      final String message = "DIR* NameSystem.internalReleaseLease: "
-        + "attempt to release a create lock on "
-        + src + " but file is already closed.";
-      NameNode.stateChangeLog.warn(message);
-      throw new IOException(message);
-    }
 
-    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;
+    final INodeFileUnderConstruction pendingFile
+        = INodeFileUnderConstruction.valueOf(dir.getINode(src), src);
     int nrBlocks = pendingFile.numBlocks();
     BlockInfo[] blocks = pendingFile.getBlocks();
 
@@ -4318,17 +4293,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     try {
       for (Lease lease : leaseManager.getSortedLeases()) {
         for (String path : lease.getPaths()) {
-          INode node;
+          final INodeFileUnderConstruction cons;
           try {
-            node = dir.getFileINode(path);
+            cons = INodeFileUnderConstruction.valueOf(dir.getINode(path), path);
           } catch (UnresolvedLinkException e) {
             throw new AssertionError("Lease files should reside on this FS");
+          } catch (IOException e) {
+            throw new RuntimeException(e);
           }
-          assert node != null : "Found a lease for nonexisting file.";
-          assert node.isUnderConstruction() :
-            "Found a lease for file " + path + " that is not under construction." +
-            " lease=" + lease;
-          INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
           BlockInfo[] blocks = cons.getBlocks();
           if(blocks == null)
             continue;
@@ -4911,21 +4883,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       for (Lease lease : leaseManager.getSortedLeases()) {
         for(String path : lease.getPaths()) {
           // verify that path exists in namespace
-          INode node;
+          final INodeFileUnderConstruction cons;
           try {
-            node = dir.getFileINode(path);
+            cons = INodeFileUnderConstruction.valueOf(dir.getINode(path), path);
           } catch (UnresolvedLinkException e) {
             throw new AssertionError("Lease files should reside on this FS");
           }
-          if (node == null) {
-            throw new IOException("saveLeases found path " + path +
-                                  " but no matching entry in namespace.");
-          }
-          if (!node.isUnderConstruction()) {
-            throw new IOException("saveLeases found path " + path +
-                                  " but is not under construction.");
-          }
-          INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
           FSImageSerialization.writeINodeUnderConstruction(out, cons, path);
         }
       }

+ 12 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.List;
 
@@ -32,6 +33,17 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 /** I-node for closed file. */
 @InterfaceAudience.Private
 public class INodeFile extends INode implements BlockCollection {
+  /** Cast INode to INodeFile. */
+  public static INodeFile valueOf(INode inode, String path) throws IOException {
+    if (inode == null) {
+      throw new FileNotFoundException("File does not exist: " + path);
+    }
+    if (!(inode instanceof INodeFile)) {
+      throw new FileNotFoundException("Path is not a file: " + path);
+    }
+    return (INodeFile)inode;
+  }
+
   static final FsPermission UMASK = FsPermission.createImmutable((short)0111);
 
   //Number of bits for Block size

+ 11 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java

@@ -25,8 +25,8 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.blockmanagement.MutableBlockCollection;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 
 import com.google.common.base.Joiner;
 
@@ -35,6 +35,16 @@ import com.google.common.base.Joiner;
  */
 @InterfaceAudience.Private
 class INodeFileUnderConstruction extends INodeFile implements MutableBlockCollection {
+  /** Cast INode to INodeFileUnderConstruction. */
+  public static INodeFileUnderConstruction valueOf(INode inode, String path
+      ) throws IOException {
+    final INodeFile file = INodeFile.valueOf(inode, path);
+    if (!file.isUnderConstruction()) {
+      throw new IOException("File is not under construction: " + path);
+    }
+    return (INodeFileUnderConstruction)file;
+  }
+
   private  String clientName;         // lease holder
   private final String clientMachine;
   private final DatanodeDescriptor clientNode; // if client is a cluster node too.

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java

@@ -253,7 +253,7 @@ public class LeaseManager {
     private String findPath(INodeFileUnderConstruction pendingFile) {
       try {
         for (String src : paths) {
-          if (fsnamesystem.dir.getFileINode(src) == pendingFile) {
+          if (fsnamesystem.dir.getINode(src) == pendingFile) {
             return src;
           }
         }

+ 19 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java

@@ -876,7 +876,14 @@ public class TestDFSClientRetries {
       new Random().nextBytes(bytes);
       out4.write(bytes);
       out4.write(bytes);
-      out4.hflush();
+      if (isWebHDFS) {
+        // WebHDFS does not support hflush. To avoid DataNode communicating with
+        // NN while we're shutting down NN, we call out4.close() to finish
+        // writing the data
+        out4.close();
+      } else {
+        out4.hflush();
+      }
 
       //shutdown namenode
       assertTrue(HdfsUtils.isHealthy(uri));
@@ -889,10 +896,12 @@ public class TestDFSClientRetries {
         public void run() {
           try {
             //write some more data and then close the file
-            out4.write(bytes);
-            out4.write(bytes);
-            out4.write(bytes);
-            out4.close();
+            if (!isWebHDFS) {
+              out4.write(bytes);
+              out4.write(bytes);
+              out4.write(bytes);
+              out4.close();
+            }
           } catch (Exception e) {
             exceptions.add(e);
           }
@@ -975,7 +984,11 @@ public class TestDFSClientRetries {
           Assert.assertEquals(String.format("count=%d", count),
               bytes[count % bytes.length], (byte)r);
         }
-        Assert.assertEquals(5 * bytes.length, count);
+        if (!isWebHDFS) {
+          Assert.assertEquals(5 * bytes.length, count);
+        } else {
+          Assert.assertEquals(2 * bytes.length, count);
+        }
         in.close();
       }
 

+ 1 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java

@@ -119,8 +119,6 @@ public class TestDistributedFileSystem {
       DFSTestUtil.createFile(fileSys, p, 1L, (short)1, 0L);
       DFSTestUtil.readFile(fileSys, p);
       
-      DFSClient client = ((DistributedFileSystem)fileSys).dfs;
-
       fileSys.close();
       
     } finally {
@@ -476,7 +474,7 @@ public class TestDistributedFileSystem {
       fail("Expecting FileNotFoundException");
     } catch (FileNotFoundException e) {
       assertTrue("Not throwing the intended exception message", e.getMessage()
-          .contains("File does not exist: /test/TestExistingDir"));
+          .contains("Path is not a file: /test/TestExistingDir"));
     }
     
     //hftp
@@ -712,7 +710,6 @@ public class TestDistributedFileSystem {
   @Test
   public void testCreateWithCustomChecksum() throws Exception {
     Configuration conf = getTestConfiguration();
-    final long grace = 1000L;
     MiniDFSCluster cluster = null;
     Path testBasePath = new Path("/test/csum");
     // create args 

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.junit.Test;
 
 public class TestUnderReplicatedBlocks {
@@ -49,6 +50,12 @@ public class TestUnderReplicatedBlocks {
       ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
       DatanodeDescriptor dn = bm.blocksMap.nodeIterator(b.getLocalBlock()).next();
       bm.addToInvalidates(b.getLocalBlock(), dn);
+      // Compute the invalidate work in NN, and trigger the heartbeat from DN
+      BlockManagerTestUtil.computeAllPendingWork(bm);
+      DataNodeTestUtils.triggerHeartbeat(cluster.getDataNode(dn.getIpcPort()));
+      // Wait to make sure the DataNode receives the deletion request 
+      Thread.sleep(1000);
+      // Remove the record from blocksMap
       bm.blocksMap.removeNode(b.getLocalBlock(), dn);
       
       // increment this file's replication factor

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java

@@ -83,8 +83,7 @@ public class TestBlockUnderConstruction {
   private void verifyFileBlocks(String file,
                                 boolean isFileOpen) throws IOException {
     FSNamesystem ns = cluster.getNamesystem();
-    INodeFile inode = ns.dir.getFileINode(file);
-    assertTrue("File does not exist: " + inode.toString(), inode != null);
+    final INodeFile inode = INodeFile.valueOf(ns.dir.getINode(file), file);
     assertTrue("File " + inode.toString() +
         " isUnderConstruction = " + inode.isUnderConstruction() +
         " expected to be " + isFileOpen,

+ 90 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java

@@ -18,13 +18,17 @@
 
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-
 import org.junit.Test;
 
 public class TestINodeFile {
@@ -199,4 +203,88 @@ public class TestINodeFile {
     
     return iNodes;
   }
+
+  /**
+   * Test for the static {@link INodeFile#valueOf(INode, String)}
+   * and {@link INodeFileUnderConstruction#valueOf(INode, String)} methods.
+   * @throws IOException 
+   */
+  @Test
+  public void testValueOf () throws IOException {
+    final String path = "/testValueOf";
+    final PermissionStatus perm = new PermissionStatus(
+        userName, null, FsPermission.getDefault());
+    final short replication = 3;
+
+    {//cast from null
+      final INode from = null;
+
+      //cast to INodeFile, should fail
+      try {
+        INodeFile.valueOf(from, path);
+        fail();
+      } catch(FileNotFoundException fnfe) {
+        assertTrue(fnfe.getMessage().contains("File does not exist"));
+      }
+
+      //cast to INodeFileUnderConstruction, should fail
+      try {
+        INodeFileUnderConstruction.valueOf(from, path);
+        fail();
+      } catch(FileNotFoundException fnfe) {
+        assertTrue(fnfe.getMessage().contains("File does not exist"));
+      }
+    }
+
+    {//cast from INodeFile
+      final INode from = new INodeFile(
+          perm, null, replication, 0L, 0L, preferredBlockSize);
+      
+      //cast to INodeFile, should success
+      final INodeFile f = INodeFile.valueOf(from, path);
+      assertTrue(f == from);
+
+      //cast to INodeFileUnderConstruction, should fail
+      try {
+        INodeFileUnderConstruction.valueOf(from, path);
+        fail();
+      } catch(IOException ioe) {
+        assertTrue(ioe.getMessage().contains("File is not under construction"));
+      }
+    }
+
+    {//cast from INodeFileUnderConstruction
+      final INode from = new INodeFileUnderConstruction(
+          perm, replication, 0L, 0L, "client", "machine", null);
+      
+      //cast to INodeFile, should success
+      final INodeFile f = INodeFile.valueOf(from, path);
+      assertTrue(f == from);
+
+      //cast to INodeFileUnderConstruction, should success
+      final INodeFileUnderConstruction u = INodeFileUnderConstruction.valueOf(
+          from, path);
+      assertTrue(u == from);
+    }
+
+    {//cast from INodeDirectory
+      final INode from = new INodeDirectory(perm, 0L);
+
+      //cast to INodeFile, should fail
+      try {
+        INodeFile.valueOf(from, path);
+        fail();
+      } catch(FileNotFoundException fnfe) {
+        assertTrue(fnfe.getMessage().contains("Path is not a file"));
+      }
+
+      //cast to INodeFileUnderConstruction, should fail
+      try {
+        INodeFileUnderConstruction.valueOf(from, path);
+        fail();
+      } catch(FileNotFoundException fnfe) {
+        assertTrue(fnfe.getMessage().contains("Path is not a file"));
+      }
+    }
+  }
 }

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -610,6 +610,9 @@ Release 0.23.5 - UNRELEASED
     MAPREDUCE-4741. WARN and ERROR messages logged during normal AM shutdown.
     (Vinod Kumar Vavilapalli via jlowe)
 
+    MAPREDUCE-4730. Fix Reducer's EventFetcher to scale the map-completion
+    requests slowly to avoid HADOOP-8942. (Jason Lowe via vinodkv)
+
 Release 0.23.4 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 46 - 42
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java

@@ -27,10 +27,8 @@ import org.apache.hadoop.mapred.TaskCompletionEvent;
 import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 
-@SuppressWarnings("deprecation")
 class EventFetcher<K,V> extends Thread {
   private static final long SLEEP_TIME = 1000;
-  private static final int MAX_EVENTS_TO_FETCH = 10000;
   private static final int MAX_RETRIES = 10;
   private static final int RETRY_PERIOD = 5000;
   private static final Log LOG = LogFactory.getLog(EventFetcher.class);
@@ -38,7 +36,8 @@ class EventFetcher<K,V> extends Thread {
   private final TaskAttemptID reduce;
   private final TaskUmbilicalProtocol umbilical;
   private final ShuffleScheduler<K,V> scheduler;
-  private int fromEventId = 0;
+  private int fromEventIdx = 0;
+  private int maxEventsToFetch;
   private ExceptionReporter exceptionReporter = null;
   
   private int maxMapRuntime = 0;
@@ -48,13 +47,15 @@ class EventFetcher<K,V> extends Thread {
   public EventFetcher(TaskAttemptID reduce,
                       TaskUmbilicalProtocol umbilical,
                       ShuffleScheduler<K,V> scheduler,
-                      ExceptionReporter reporter) {
+                      ExceptionReporter reporter,
+                      int maxEventsToFetch) {
     setName("EventFetcher for fetching Map Completion Events");
     setDaemon(true);    
     this.reduce = reduce;
     this.umbilical = umbilical;
     this.scheduler = scheduler;
     exceptionReporter = reporter;
+    this.maxEventsToFetch = maxEventsToFetch;
   }
 
   @Override
@@ -112,46 +113,47 @@ class EventFetcher<K,V> extends Thread {
    * from a given event ID.
    * @throws IOException
    */  
-  private int getMapCompletionEvents() throws IOException {
+  protected int getMapCompletionEvents() throws IOException {
     
     int numNewMaps = 0;
-    
-    MapTaskCompletionEventsUpdate update = 
-      umbilical.getMapCompletionEvents((org.apache.hadoop.mapred.JobID)
-                                       reduce.getJobID(), 
-                                       fromEventId, 
-                                       MAX_EVENTS_TO_FETCH,
-                                       (org.apache.hadoop.mapred.TaskAttemptID)
-                                         reduce);
-    TaskCompletionEvent events[] = update.getMapTaskCompletionEvents();
-    LOG.debug("Got " + events.length + " map completion events from " + 
-             fromEventId);
-      
-    // Check if the reset is required.
-    // Since there is no ordering of the task completion events at the 
-    // reducer, the only option to sync with the new jobtracker is to reset 
-    // the events index
-    if (update.shouldReset()) {
-      fromEventId = 0;
-      scheduler.resetKnownMaps();
-    }
-    
-    // Update the last seen event ID
-    fromEventId += events.length;
-    
-    // Process the TaskCompletionEvents:
-    // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
-    // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop 
-    //    fetching from those maps.
-    // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
-    //    outputs at all.
-    for (TaskCompletionEvent event : events) {
-      switch (event.getTaskStatus()) {
+    TaskCompletionEvent events[] = null;
+
+    do {
+      MapTaskCompletionEventsUpdate update =
+          umbilical.getMapCompletionEvents(
+              (org.apache.hadoop.mapred.JobID)reduce.getJobID(),
+              fromEventIdx,
+              maxEventsToFetch,
+              (org.apache.hadoop.mapred.TaskAttemptID)reduce);
+      events = update.getMapTaskCompletionEvents();
+      LOG.debug("Got " + events.length + " map completion events from " +
+               fromEventIdx);
+
+      // Check if the reset is required.
+      // Since there is no ordering of the task completion events at the
+      // reducer, the only option to sync with the new jobtracker is to reset
+      // the events index
+      if (update.shouldReset()) {
+        fromEventIdx = 0;
+        scheduler.resetKnownMaps();
+      }
+
+      // Update the last seen event ID
+      fromEventIdx += events.length;
+
+      // Process the TaskCompletionEvents:
+      // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
+      // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
+      //    fetching from those maps.
+      // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
+      //    outputs at all.
+      for (TaskCompletionEvent event : events) {
+        switch (event.getTaskStatus()) {
         case SUCCEEDED:
           URI u = getBaseURI(event.getTaskTrackerHttp());
           scheduler.addKnownMapOutput(u.getHost() + ":" + u.getPort(),
-                                      u.toString(),
-                                      event.getTaskAttemptId());
+              u.toString(),
+              event.getTaskAttemptId());
           numNewMaps ++;
           int duration = event.getTaskRunTime();
           if (duration > maxMapRuntime) {
@@ -164,15 +166,17 @@ class EventFetcher<K,V> extends Thread {
         case OBSOLETE:
           scheduler.obsoleteMapOutput(event.getTaskAttemptId());
           LOG.info("Ignoring obsolete output of " + event.getTaskStatus() + 
-                   " map-task: '" + event.getTaskAttemptId() + "'");
+              " map-task: '" + event.getTaskAttemptId() + "'");
           break;
         case TIPFAILED:
           scheduler.tipFailed(event.getTaskAttemptId().getTaskID());
           LOG.info("Ignoring output of failed map TIP: '" +  
-               event.getTaskAttemptId() + "'");
+              event.getTaskAttemptId() + "'");
           break;
+        }
       }
-    }
+    } while (events.length == maxEventsToFetch);
+
     return numNewMaps;
   }
   

+ 13 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java

@@ -40,9 +40,12 @@ import org.apache.hadoop.util.Progress;
 
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-@SuppressWarnings({"deprecation", "unchecked", "rawtypes"})
+@SuppressWarnings({"unchecked", "rawtypes"})
 public class Shuffle<K, V> implements ExceptionReporter {
   private static final int PROGRESS_FREQUENCY = 2000;
+  private static final int MAX_EVENTS_TO_FETCH = 10000;
+  private static final int MIN_EVENTS_TO_FETCH = 100;
+  private static final int MAX_RPC_OUTSTANDING_EVENTS = 3000000;
   
   private final TaskAttemptID reduceId;
   private final JobConf jobConf;
@@ -99,9 +102,17 @@ public class Shuffle<K, V> implements ExceptionReporter {
   }
 
   public RawKeyValueIterator run() throws IOException, InterruptedException {
+    // Scale the maximum events we fetch per RPC call to mitigate OOM issues
+    // on the ApplicationMaster when a thundering herd of reducers fetch events
+    // TODO: This should not be necessary after HADOOP-8942
+    int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
+        MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
+    int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
+
     // Start the map-completion events fetcher thread
     final EventFetcher<K,V> eventFetcher = 
-      new EventFetcher<K,V>(reduceId, umbilical, scheduler, this);
+      new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
+          maxEventsToFetch);
     eventFetcher.start();
     
     // Start the map-output fetcher threads

+ 116 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java

@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.MapTaskCompletionEventsUpdate;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.junit.Test;
+import org.mockito.InOrder;
+
+public class TestEventFetcher {
+
+  @Test
+  public void testConsecutiveFetch() throws IOException {
+    final int MAX_EVENTS_TO_FETCH = 100;
+    TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 1);
+
+    TaskUmbilicalProtocol umbilical = mock(TaskUmbilicalProtocol.class);
+    when(umbilical.getMapCompletionEvents(any(JobID.class),
+        anyInt(), anyInt(), any(TaskAttemptID.class)))
+      .thenReturn(getMockedCompletionEventsUpdate(0, 0));
+    when(umbilical.getMapCompletionEvents(any(JobID.class),
+        eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
+      .thenReturn(getMockedCompletionEventsUpdate(0, MAX_EVENTS_TO_FETCH));
+    when(umbilical.getMapCompletionEvents(any(JobID.class),
+        eq(MAX_EVENTS_TO_FETCH), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
+      .thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH,
+          MAX_EVENTS_TO_FETCH));
+    when(umbilical.getMapCompletionEvents(any(JobID.class),
+        eq(MAX_EVENTS_TO_FETCH*2), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
+      .thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH*2, 3));
+
+    @SuppressWarnings("unchecked")
+    ShuffleScheduler<String,String> scheduler = mock(ShuffleScheduler.class);
+    ExceptionReporter reporter = mock(ExceptionReporter.class);
+
+    EventFetcherForTest<String,String> ef =
+        new EventFetcherForTest<String,String>(tid, umbilical, scheduler,
+            reporter, MAX_EVENTS_TO_FETCH);
+    ef.getMapCompletionEvents();
+
+    verify(reporter, never()).reportException(any(Throwable.class));
+    InOrder inOrder = inOrder(umbilical);
+    inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class),
+        eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid));
+    inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class),
+        eq(MAX_EVENTS_TO_FETCH), eq(MAX_EVENTS_TO_FETCH), eq(tid));
+    inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class),
+        eq(MAX_EVENTS_TO_FETCH*2), eq(MAX_EVENTS_TO_FETCH), eq(tid));
+    verify(scheduler, times(MAX_EVENTS_TO_FETCH*2 + 3)).addKnownMapOutput(
+        anyString(), anyString(), any(TaskAttemptID.class));
+  }
+
+  private MapTaskCompletionEventsUpdate getMockedCompletionEventsUpdate(
+      int startIdx, int numEvents) {
+    ArrayList<TaskCompletionEvent> tceList =
+        new ArrayList<TaskCompletionEvent>(numEvents);
+    for (int i = 0; i < numEvents; ++i) {
+      int eventIdx = startIdx + i;
+      TaskCompletionEvent tce = new TaskCompletionEvent(eventIdx,
+          new TaskAttemptID("12345", 1, TaskType.MAP, eventIdx, 0),
+          eventIdx, true, TaskCompletionEvent.Status.SUCCEEDED,
+          "http://somehost:8888");
+      tceList.add(tce);
+    }
+    TaskCompletionEvent[] events = {};
+    return new MapTaskCompletionEventsUpdate(tceList.toArray(events), false);
+  }
+
+  private static class EventFetcherForTest<K,V> extends EventFetcher<K,V> {
+
+    public EventFetcherForTest(TaskAttemptID reduce,
+        TaskUmbilicalProtocol umbilical, ShuffleScheduler<K,V> scheduler,
+        ExceptionReporter reporter, int maxEventsToFetch) {
+      super(reduce, umbilical, scheduler, reporter, maxEventsToFetch);
+    }
+
+    @Override
+    public int getMapCompletionEvents() throws IOException {
+      return super.getMapCompletionEvents();
+    }
+
+  }
+}