Browse Source

Merging trunk to branch-trunk-win

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-trunk-win@1452454 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 12 năm trước cách đây
mục cha
commit
ca26bb8974
24 tập tin đã thay đổi với 445 bổ sung195 xóa
  1. 2 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 4 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
  3. 7 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java
  4. 5 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  5. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  6. 8 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java
  7. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointFaultInjector.java
  8. 79 167
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  9. 50 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
  10. 9 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
  11. 107 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
  12. 6 0
      hadoop-mapreduce-project/CHANGES.txt
  13. 2 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java
  14. 14 14
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
  15. 10 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
  16. 6 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
  17. 5 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
  18. 6 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
  19. 6 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
  20. 7 0
      hadoop-yarn-project/CHANGES.txt
  21. 6 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java
  22. 4 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
  23. 18 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/InfoBlock.java
  24. 81 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/webapp/view/TestInfoBlock.java

+ 2 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -1383,6 +1383,8 @@ Release 0.23.7 - UNRELEASED
     HADOOP-9336. Allow UGI of current connection to be queried. (Daryn Sharp
     via kihwal)
 
+    HADOOP-9352. Expose UGI.setLoginUser for tests (daryn)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 4 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java

@@ -67,6 +67,8 @@ import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Time;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * User and group information for Hadoop.
  * This class wraps around a JAAS Subject and provides methods to determine the
@@ -713,7 +715,8 @@ public class UserGroupInformation {
 
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
-  synchronized static void setLoginUser(UserGroupInformation ugi) {
+  @VisibleForTesting
+  public synchronized static void setLoginUser(UserGroupInformation ugi) {
     // if this is to become stable, should probably logout the currently
     // logged in ugi if it's different
     loginUser = ugi;

+ 7 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java

@@ -664,4 +664,11 @@ public class TestUserGroupInformation {
     // Restore hasSufficientTimElapsed back to private
     method.setAccessible(false);
   }
+  
+  @Test(timeout=1000)
+  public void testSetLoginUser() throws IOException {
+    UserGroupInformation ugi = UserGroupInformation.createRemoteUser("test-user");
+    UserGroupInformation.setLoginUser(ugi);
+    assertEquals(ugi, UserGroupInformation.getLoginUser());
+  }
 }

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -2310,6 +2310,8 @@ Release 0.23.7 - UNRELEASED
 
   OPTIMIZATIONS
 
+    HDFS-4532. RPC call queue may fill due to current user lookup (daryn)
+
   BUG FIXES
 
     HDFS-4288. NN accepts incremental BR as IBR in safemode (daryn via kihwal)
@@ -2317,6 +2319,9 @@ Release 0.23.7 - UNRELEASED
     HDFS-4495. Allow client-side lease renewal to be retried beyond soft-limit
     (kihwal)
 
+    HDFS-4128. 2NN gets stuck in inconsistent state if edit log replay fails
+    in the middle (kihwal via daryn)
+
 Release 0.23.6 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -121,6 +121,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final long    DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT = 3600;
   public static final String  DFS_NAMENODE_CHECKPOINT_TXNS_KEY = "dfs.namenode.checkpoint.txns";
   public static final long    DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT = 40000;
+  public static final String  DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_KEY = "dfs.namenode.checkpoint.max-retries";
+  public static final int     DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_DEFAULT = 3;
   public static final String  DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY = "dfs.namenode.heartbeat.recheck-interval";
   public static final int     DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT = 5*60*1000;
   public static final String  DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY = "dfs.namenode.tolerate.heartbeat.multiplier";

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java

@@ -39,6 +39,8 @@ public class CheckpointConf {
   /** checkpoint once every this many transactions, regardless of time */
   private final long checkpointTxnCount;
 
+  /** maxium number of retries when merge errors occur */
+  private final int maxRetriesOnMergeError;
   
   public CheckpointConf(Configuration conf) {
     checkpointCheckPeriod = conf.getLong(
@@ -49,6 +51,8 @@ public class CheckpointConf {
                                     DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT);
     checkpointTxnCount = conf.getLong(DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 
                                   DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT);
+    maxRetriesOnMergeError = conf.getInt(DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_KEY,
+                                  DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_DEFAULT);
     warnForDeprecatedConfigs(conf);
   }
   
@@ -75,4 +79,8 @@ public class CheckpointConf {
   public long getTxnCount() {
     return checkpointTxnCount;
   }
+
+  public int getMaxRetriesOnMergeError() {
+    return maxRetriesOnMergeError;
+  }
 }

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointFaultInjector.java

@@ -33,6 +33,7 @@ class CheckpointFaultInjector {
   
   public void beforeGetImageSetsHeaders() throws IOException {}
   public void afterSecondaryCallsRollEditLog() throws IOException {}
+  public void duringMerge() throws IOException {}
   public void afterSecondaryUploadsNewImage() throws IOException {}
   public void aboutToSendFile(File localfile) throws IOException {}
 

+ 79 - 167
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -255,10 +255,23 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     return !isDefaultAuditLogger || auditLog.isInfoEnabled();
   }
 
-  private void logAuditEvent(UserGroupInformation ugi,
-      InetAddress addr, String cmd, String src, String dst,
-      HdfsFileStatus stat) {
-    logAuditEvent(true, ugi, addr, cmd, src, dst, stat);
+  private HdfsFileStatus getAuditFileInfo(String path, boolean resolveSymlink)
+      throws IOException {
+    return (isAuditEnabled() && isExternalInvocation())
+        ? dir.getFileInfo(path, resolveSymlink) : null;
+  }
+  
+  private void logAuditEvent(boolean succeeded, String cmd, String src)
+      throws IOException {
+    logAuditEvent(succeeded, cmd, src, null, null);
+  }
+  
+  private void logAuditEvent(boolean succeeded, String cmd, String src,
+      String dst, HdfsFileStatus stat) throws IOException {
+    if (isAuditEnabled() && isExternalInvocation()) {
+      logAuditEvent(succeeded, getRemoteUser(), getRemoteIp(),
+                    cmd, src, dst, stat);
+    }
   }
 
   private void logAuditEvent(boolean succeeded,
@@ -1179,11 +1192,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     try {
       setPermissionInt(src, permission);
     } catch (AccessControlException e) {
-      if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      getRemoteIp(),
-                      "setPermission", src, null, null);
-      }
+      logAuditEvent(false, "setPermission", src);
       throw e;
     }
   }
@@ -1202,18 +1211,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       }
       checkOwner(pc, src);
       dir.setPermission(src, permission);
-      if (isAuditEnabled() && isExternalInvocation()) {
-        resultingStat = dir.getFileInfo(src, false);
-      }
+      resultingStat = getAuditFileInfo(src, false);
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    if (isAuditEnabled() && isExternalInvocation()) {
-      logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    getRemoteIp(),
-                    "setPermission", src, null, resultingStat);
-    }
+    logAuditEvent(true, "setPermission", src, null, resultingStat);
   }
 
   /**
@@ -1226,11 +1229,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     try {
       setOwnerInt(src, username, group);
     } catch (AccessControlException e) {
-      if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      getRemoteIp(),
-                      "setOwner", src, null, null);
-      }
+      logAuditEvent(false, "setOwner", src);
       throw e;
     } 
   }
@@ -1257,18 +1256,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         }
       }
       dir.setOwner(src, username, group);
-      if (isAuditEnabled() && isExternalInvocation()) {
-        resultingStat = dir.getFileInfo(src, false);
-      }
+      resultingStat = getAuditFileInfo(src, false);
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    if (isAuditEnabled() && isExternalInvocation()) {
-      logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    getRemoteIp(),
-                    "setOwner", src, null, resultingStat);
-    }
+    logAuditEvent(true, "setOwner", src, null, resultingStat);
   }
 
   /**
@@ -1308,11 +1301,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       return getBlockLocationsInt(pc, src, offset, length, doAccessTime,
                                   needBlockToken, checkSafeMode);
     } catch (AccessControlException e) {
-      if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      getRemoteIp(),
-                      "open", src, null, null);
-      }
+      logAuditEvent(false, "open", src);
       throw e;
     }
   }
@@ -1335,11 +1324,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
     final LocatedBlocks ret = getBlockLocationsUpdateTimes(src,
         offset, length, doAccessTime, needBlockToken);  
-    if (isAuditEnabled() && isExternalInvocation()) {
-      logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    getRemoteIp(),
-                    "open", src, null, null);
-    }
+    logAuditEvent(true, "open", src);
     if (checkSafeMode && isInSafeMode()) {
       for (LocatedBlock b : ret.getLocatedBlocks()) {
         // if safemode & no block locations yet then throw safemodeException
@@ -1416,11 +1401,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     try {
       concatInt(target, srcs);
     } catch (AccessControlException e) {
-      if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(false, UserGroupInformation.getLoginUser(),
-                      getRemoteIp(),
-                      "concat", Arrays.toString(srcs), target, null);
-      }
+      logAuditEvent(false, "concat", Arrays.toString(srcs), target, null);
       throw e;
     }
   }
@@ -1460,18 +1441,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         throw new SafeModeException("Cannot concat " + target, safeMode);
       }
       concatInternal(pc, target, srcs);
-      if (isAuditEnabled() && isExternalInvocation()) {
-        resultingStat = dir.getFileInfo(target, false);
-      }
+      resultingStat = getAuditFileInfo(target, false);
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    if (isAuditEnabled() && isExternalInvocation()) {
-      logAuditEvent(UserGroupInformation.getLoginUser(),
-                    getRemoteIp(),
-                    "concat", Arrays.toString(srcs), target, resultingStat);
-    }
+    logAuditEvent(true, "concat", Arrays.toString(srcs), target, resultingStat);
   }
 
   /** See {@link #concat(String, String[])} */
@@ -1588,11 +1563,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     try {
       setTimesInt(src, mtime, atime);
     } catch (AccessControlException e) {
-      if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      getRemoteIp(),
-                      "setTimes", src, null, null);
-      }
+      logAuditEvent(false, "setTimes", src);
       throw e;
     }
   }
@@ -1603,6 +1574,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throw new IOException("Access time for hdfs is not configured. " +
                             " Please set " + DFS_NAMENODE_ACCESSTIME_PRECISION_KEY + " configuration parameter.");
     }
+    HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
@@ -1615,18 +1587,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       INode inode = dir.getINode(src);
       if (inode != null) {
         dir.setTimes(src, inode, mtime, atime, true);
-        if (isAuditEnabled() && isExternalInvocation()) {
-          final HdfsFileStatus stat = dir.getFileInfo(src, false);
-          logAuditEvent(UserGroupInformation.getCurrentUser(),
-                        getRemoteIp(),
-                        "setTimes", src, null, stat);
-        }
+        resultingStat = getAuditFileInfo(src, false);
       } else {
         throw new FileNotFoundException("File/Directory " + src + " does not exist.");
       }
     } finally {
       writeUnlock();
     }
+    logAuditEvent(true, "setTimes", src, null, resultingStat);
   }
 
   /**
@@ -1638,11 +1606,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     try {
       createSymlinkInt(target, link, dirPerms, createParent);
     } catch (AccessControlException e) {
-      if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      getRemoteIp(),
-                      "createSymlink", link, target, null);
-      }
+      logAuditEvent(false, "createSymlink", link, target, null);
       throw e;
     }
   }
@@ -1660,18 +1624,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         verifyParentDir(link);
       }
       createSymlinkInternal(pc, target, link, dirPerms, createParent);
-      if (isAuditEnabled() && isExternalInvocation()) {
-        resultingStat = dir.getFileInfo(link, false);
-      }
+      resultingStat = getAuditFileInfo(link, false);
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    if (isAuditEnabled() && isExternalInvocation()) {
-      logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    getRemoteIp(),
-                    "createSymlink", link, target, resultingStat);
-    }
+    logAuditEvent(true, "createSymlink", link, target, resultingStat);
   }
 
   /**
@@ -1723,11 +1681,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     try {
       return setReplicationInt(src, replication);
     } catch (AccessControlException e) {
-      if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      getRemoteIp(),
-                      "setReplication", src, null, null);
-      }
+      logAuditEvent(false, "setReplication", src);
       throw e;
     }
   }
@@ -1758,10 +1712,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
 
     getEditLog().logSync();
-    if (isFile && isAuditEnabled() && isExternalInvocation()) {
-      logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    getRemoteIp(),
-                    "setReplication", src, null, null);
+    if (isFile) {
+      logAuditEvent(true, "setReplication", src);
     }
     return isFile;
   }
@@ -1817,11 +1769,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       return startFileInt(src, permissions, holder, clientMachine, flag,
           createParent, replication, blockSize);
     } catch (AccessControlException e) {
-      if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      getRemoteIp(),
-                      "create", src, null, null);
-      }
+      logAuditEvent(false, "create", src);
       throw e;
     }
   }
@@ -1853,11 +1801,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       }
     } 
 
-    if (isAuditEnabled() && isExternalInvocation()) {
-      logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    getRemoteIp(),
-                    "create", src, null, stat);
-    }
+    logAuditEvent(true, "create", src, null, stat);
     return stat;
   }
 
@@ -2156,11 +2100,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     try {
       return appendFileInt(src, holder, clientMachine);
     } catch (AccessControlException e) {
-      if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      getRemoteIp(),
-                      "append", src, null, null);
-      }
+      logAuditEvent(false, "append", src);
       throw e;
     }
   }
@@ -2203,11 +2143,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
             +" block size " + lb.getBlock().getNumBytes());
       }
     }
-    if (isAuditEnabled() && isExternalInvocation()) {
-      logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    getRemoteIp(),
-                    "append", src, null, null);
-    }
+    logAuditEvent(true, "append", src);
     return lb;
   }
 
@@ -2701,11 +2637,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     try {
       return renameToInt(src, dst);
     } catch (AccessControlException e) {
-      if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      getRemoteIp(),
-                      "rename", src, dst, null);
-      }
+      logAuditEvent(false, "rename", src, dst, null);
       throw e;
     }
   }
@@ -2724,17 +2656,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       checkOperation(OperationCategory.WRITE);
 
       status = renameToInternal(pc, src, dst);
-      if (status && isAuditEnabled() && isExternalInvocation()) {
-        resultingStat = dir.getFileInfo(dst, false);
+      if (status) {
+        resultingStat = getAuditFileInfo(dst, false);
       }
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    if (status && isAuditEnabled() && isExternalInvocation()) {
-      logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    getRemoteIp(),
-                    "rename", src, dst, resultingStat);
+    if (status) {
+      logAuditEvent(true, "rename", src, dst, resultingStat);
     }
     return status;
   }
@@ -2781,20 +2711,17 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     try {
       checkOperation(OperationCategory.WRITE);
       renameToInternal(pc, src, dst, options);
-      if (isAuditEnabled() && isExternalInvocation()) {
-        resultingStat = dir.getFileInfo(dst, false); 
-      }
+      resultingStat = getAuditFileInfo(dst, false);
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    if (isAuditEnabled() && isExternalInvocation()) {
+    if (resultingStat != null) {
       StringBuilder cmd = new StringBuilder("rename options=");
       for (Rename option : options) {
         cmd.append(option.value()).append(" ");
       }
-      logAuditEvent(UserGroupInformation.getCurrentUser(), getRemoteIp(),
-                    cmd.toString(), src, dst, resultingStat);
+      logAuditEvent(true, cmd.toString(), src, dst, resultingStat);
     }
   }
 
@@ -2827,11 +2754,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     try {
       return deleteInt(src, recursive);
     } catch (AccessControlException e) {
-      if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      getRemoteIp(),
-                      "delete", src, null, null);
-      }
+      logAuditEvent(false, "delete", src);
       throw e;
     }
   }
@@ -2843,10 +2766,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
     }
     boolean status = deleteInternal(src, recursive, true);
-    if (status && isAuditEnabled() && isExternalInvocation()) {
-      logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    getRemoteIp(),
-                    "delete", src, null, null);
+    if (status) {
+      logAuditEvent(true, "delete", src);
     }
     return status;
   }
@@ -3012,20 +2933,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       }
       stat = dir.getFileInfo(src, resolveLink);
     } catch (AccessControlException e) {
-      if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      getRemoteIp(),
-                      "getfileinfo", src, null, null);
-      }
+      logAuditEvent(false, "getfileinfo", src);
       throw e;
     } finally {
       readUnlock();
     }
-    if (isAuditEnabled() && isExternalInvocation()) {
-      logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    getRemoteIp(),
-                    "getfileinfo", src, null, null);
-    }
+    logAuditEvent(true, "getfileinfo", src);
     return stat;
   }
 
@@ -3037,17 +2950,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     try {
       return mkdirsInt(src, permissions, createParent);
     } catch (AccessControlException e) {
-      if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      getRemoteIp(),
-                      "mkdirs", src, null, null);
-      }
+      logAuditEvent(false, "mkdirs", src);
       throw e;
     }
   }
 
   private boolean mkdirsInt(String src, PermissionStatus permissions,
       boolean createParent) throws IOException, UnresolvedLinkException {
+    HdfsFileStatus resultingStat = null;
     boolean status = false;
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
@@ -3057,15 +2967,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     try {
       checkOperation(OperationCategory.WRITE);
       status = mkdirsInternal(pc, src, permissions, createParent);
+      if (status) {
+        resultingStat = dir.getFileInfo(src, false);
+      }
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    if (status && isAuditEnabled() && isExternalInvocation()) {
-      final HdfsFileStatus stat = dir.getFileInfo(src, false);
-      logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    getRemoteIp(),
-                    "mkdirs", src, null, stat);
+    if (status) {
+      logAuditEvent(true, "mkdirs", src, null, resultingStat);
     }
     return status;
   }
@@ -3494,11 +3404,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     try {
       return getListingInt(src, startAfter, needLocation);
     } catch (AccessControlException e) {
-      if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      getRemoteIp(),
-                      "listStatus", src, null, null);
-      }
+      logAuditEvent(false, "listStatus", src);
       throw e;
     }
   }
@@ -3519,11 +3425,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           checkTraverse(pc, src);
         }
       }
-      if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(UserGroupInformation.getCurrentUser(),
-                      getRemoteIp(),
-                      "listStatus", src, null, null);
-      }
+      logAuditEvent(true, "listStatus", src);
       dl = dir.getListing(src, startAfter, needLocation);
     } finally {
       readUnlock();
@@ -5270,7 +5172,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         return null;
       }
 
-      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+      UserGroupInformation ugi = getRemoteUser();
       String user = ugi.getUserName();
       Text owner = new Text(user);
       Text realUser = null;
@@ -5311,7 +5213,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         throw new IOException(
             "Delegation Token can be renewed only with kerberos or web authentication");
       }
-      String renewer = UserGroupInformation.getCurrentUser().getShortUserName();
+      String renewer = getRemoteUser().getShortUserName();
       expiryTime = dtSecretManager.renewToken(token, renewer);
       DelegationTokenIdentifier id = new DelegationTokenIdentifier();
       ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
@@ -5339,7 +5241,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot cancel delegation token", safeMode);
       }
-      String canceller = UserGroupInformation.getCurrentUser().getUserName();
+      String canceller = getRemoteUser().getUserName();
       DelegationTokenIdentifier id = dtSecretManager
         .cancelToken(token, canceller);
       getEditLog().logCancelDelegationToken(id);
@@ -5408,7 +5310,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    */
   private AuthenticationMethod getConnectionAuthenticationMethod()
       throws IOException {
-    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    UserGroupInformation ugi = getRemoteUser();
     AuthenticationMethod authMethod = ugi.getAuthenticationMethod();
     if (authMethod == AuthenticationMethod.PROXY) {
       authMethod = ugi.getRealUser().getAuthenticationMethod();
@@ -5432,12 +5334,22 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     return NamenodeWebHdfsMethods.getRemoteIp();
   }
   
+  // optimize ugi lookup for RPC operations to avoid a trip through
+  // UGI.getCurrentUser which is synch'ed
+  private static UserGroupInformation getRemoteUser() throws IOException {
+    UserGroupInformation ugi = null;
+    if (Server.isRpcInvocation()) {
+      ugi = Server.getRemoteUser();
+    }
+    return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser();
+  }
+  
   /**
    * Log fsck event in the audit log 
    */
   void logFsckEvent(String src, InetAddress remoteAddress) throws IOException {
     if (isAuditEnabled()) {
-      logAuditEvent(UserGroupInformation.getCurrentUser(),
+      logAuditEvent(true, getRemoteUser(),
                     remoteAddress,
                     "fsck", src, null, null);
     }

+ 50 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java

@@ -144,6 +144,11 @@ public class SecondaryNameNode implements Runnable {
     return checkpointImage;
   }
 
+  @VisibleForTesting
+  int getMergeErrorCount() {
+    return checkpointImage.getMergeErrorCount();
+  }
+
   @VisibleForTesting
   FSNamesystem getFSNamesystem() {
     return namesystem;
@@ -339,6 +344,7 @@ public class SecondaryNameNode implements Runnable {
     // number of transactions in the edit log that haven't yet been checkpointed.
     //
     long period = checkpointConf.getCheckPeriod();
+    int maxRetries = checkpointConf.getMaxRetriesOnMergeError();
 
     while (shouldRun) {
       try {
@@ -364,6 +370,13 @@ public class SecondaryNameNode implements Runnable {
       } catch (IOException e) {
         LOG.error("Exception in doCheckpoint", e);
         e.printStackTrace();
+        // Prevent a huge number of edits from being created due to
+        // unrecoverable conditions and endless retries.
+        if (checkpointImage.getMergeErrorCount() > maxRetries) {
+          LOG.fatal("Merging failed " + 
+              checkpointImage.getMergeErrorCount() + " times.");
+          terminate(1);
+        }
       } catch (Throwable e) {
         LOG.fatal("Throwable Exception in doCheckpoint", e);
         e.printStackTrace();
@@ -498,9 +511,21 @@ public class SecondaryNameNode implements Runnable {
     RemoteEditLogManifest manifest =
       namenode.getEditLogManifest(sig.mostRecentCheckpointTxId + 1);
 
+    // Fetch fsimage and edits. Reload the image if previous merge failed.
     loadImage |= downloadCheckpointFiles(
-        fsName, checkpointImage, sig, manifest);   // Fetch fsimage and edits
-    doMerge(sig, manifest, loadImage, checkpointImage, namesystem);
+        fsName, checkpointImage, sig, manifest) |
+        checkpointImage.hasMergeError();
+    try {
+      doMerge(sig, manifest, loadImage, checkpointImage, namesystem);
+    } catch (IOException ioe) {
+      // A merge error occurred. The in-memory file system state may be
+      // inconsistent, so the image and edits need to be reloaded.
+      checkpointImage.setMergeError();
+      throw ioe;
+    }
+    // Clear any error since merge was successful.
+    checkpointImage.clearMergeError();
+
     
     //
     // Upload the new image into the NameNode. Then tell the Namenode
@@ -754,6 +779,7 @@ public class SecondaryNameNode implements Runnable {
   
   static class CheckpointStorage extends FSImage {
     
+    private int mergeErrorCount;
     private static class CheckpointLogPurger implements LogsPurgeable {
       
       private NNStorage storage;
@@ -815,6 +841,7 @@ public class SecondaryNameNode implements Runnable {
       // we shouldn't have any editLog instance. Setting to null
       // makes sure we don't accidentally depend on it.
       editLog = null;
+      mergeErrorCount = 0;
       
       // Replace the archival manager with one that can actually work on the
       // 2NN's edits storage.
@@ -881,7 +908,24 @@ public class SecondaryNameNode implements Runnable {
         }
       }
     }
-    
+
+
+    boolean hasMergeError() {
+      return (mergeErrorCount > 0);
+    }
+
+    int getMergeErrorCount() {
+      return mergeErrorCount;
+    }
+
+    void setMergeError() {
+      mergeErrorCount++;
+    }
+
+    void clearMergeError() {
+      mergeErrorCount = 0;
+    }
+ 
     /**
      * Ensure that the current/ directory exists in all storage
      * directories
@@ -915,7 +959,9 @@ public class SecondaryNameNode implements Runnable {
       dstImage.reloadFromImageFile(file, dstNamesystem);
       dstNamesystem.dir.imageLoadComplete();
     }
-    
+    // error simulation code for junit test
+    CheckpointFaultInjector.getInstance().duringMerge();   
+
     Checkpointer.rollForwardByApplyingLogs(manifest, dstImage, dstNamesystem);
     // The following has the side effect of purging old fsimages/edit logs.
     dstImage.saveFSImageInAllDirs(dstNamesystem, dstImage.getLastAppliedTxId());

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -640,6 +640,15 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.namenode.checkpoint.max-retries</name>
+  <value>3</value>
+  <description>The SecondaryNameNode retries failed checkpointing. If the 
+  failure occurs while loading fsimage or replaying edits, the number of
+  retries is limited by this variable. 
+  </description>
+</property>
+
 <property>
   <name>dfs.namenode.num.checkpoints.retained</name>
   <value>2</value>

+ 107 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java

@@ -74,6 +74,8 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
 import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ExitUtil.ExitException;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Level;
 import org.junit.After;
@@ -226,6 +228,111 @@ public class TestCheckpoint {
                toString().indexOf("storageDirToCheck") != -1);
   }
 
+  /*
+   * Simulate exception during edit replay.
+   */
+  @Test(timeout=5000)
+  public void testReloadOnEditReplayFailure () throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    FSDataOutputStream fos = null;
+    SecondaryNameNode secondary = null;
+    MiniDFSCluster cluster = null;
+    FileSystem fs = null;
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
+          .build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      secondary = startSecondaryNameNode(conf);
+      fos = fs.create(new Path("tmpfile0"));
+      fos.write(new byte[] { 0, 1, 2, 3 });
+      secondary.doCheckpoint();
+      fos.write(new byte[] { 0, 1, 2, 3 });
+      fos.hsync();
+
+      // Cause merge to fail in next checkpoint.
+      Mockito.doThrow(new IOException(
+          "Injecting failure during merge"))
+          .when(faultInjector).duringMerge();
+
+      try {
+        secondary.doCheckpoint();
+        fail("Fault injection failed.");
+      } catch (IOException ioe) {
+        // This is expected.
+      } 
+      Mockito.reset(faultInjector);
+ 
+      // The error must be recorded, so next checkpoint will reload image.
+      fos.write(new byte[] { 0, 1, 2, 3 });
+      fos.hsync();
+      
+      assertTrue("Another checkpoint should have reloaded image",
+          secondary.doCheckpoint());
+    } finally {
+      if (secondary != null) {
+        secondary.shutdown();
+      }
+      if (fs != null) {
+        fs.close();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+      Mockito.reset(faultInjector);
+    }
+  }
+
+  /*
+   * Simulate 2NN exit due to too many merge failures.
+   */
+  @Test(timeout=10000)
+  public void testTooManyEditReplayFailures() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_KEY, "1");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, "1");
+
+    FSDataOutputStream fos = null;
+    SecondaryNameNode secondary = null;
+    MiniDFSCluster cluster = null;
+    FileSystem fs = null;
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
+          .checkExitOnShutdown(false).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      fos = fs.create(new Path("tmpfile0"));
+      fos.write(new byte[] { 0, 1, 2, 3 });
+
+      // Cause merge to fail in next checkpoint.
+      Mockito.doThrow(new IOException(
+          "Injecting failure during merge"))
+          .when(faultInjector).duringMerge();
+
+      secondary = startSecondaryNameNode(conf);
+      secondary.doWork();
+      // Fail if we get here.
+      fail("2NN did not exit.");
+    } catch (ExitException ee) {
+      // ignore
+      ExitUtil.resetFirstExitException();
+      assertEquals("Max retries", 1, secondary.getMergeErrorCount() - 1);
+    } finally {
+      if (secondary != null) {
+        secondary.shutdown();
+      }
+      if (fs != null) {
+        fs.close();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+      Mockito.reset(faultInjector);
+    }
+  }
+
   /*
    * Simulate namenode crashing after rolling edit log.
    */

+ 6 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -754,6 +754,12 @@ Release 0.23.7 - UNRELEASED
     mapred-default has mapreduce.job.split.metainfo.maxsize (Jason Lowe via
     jeagles)
 
+    MAPREDUCE-4794. DefaultSpeculator generates error messages on normal
+    shutdown (Jason Lowe via jeagles)
+
+    MAPREDUCE-5043. Fetch failure processing can cause AM event queue to
+    backup and eventually OOM (Jason Lowe via bobby)
+
 Release 0.23.6 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 2 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app.job;
 import java.util.List;
 
 import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@@ -37,6 +38,7 @@ public interface TaskAttempt {
   List<String> getDiagnostics();
   Counters getCounters();
   float getProgress();
+  Phase getPhase();
   TaskAttemptState getState();
 
   /** 

+ 14 - 14
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -1672,6 +1672,20 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
       SingleArcTransition<JobImpl, JobEvent> {
     @Override
     public void transition(JobImpl job, JobEvent event) {
+      //get number of shuffling reduces
+      int shufflingReduceTasks = 0;
+      for (TaskId taskId : job.reduceTasks) {
+        Task task = job.tasks.get(taskId);
+        if (TaskState.RUNNING.equals(task.getState())) {
+          for(TaskAttempt attempt : task.getAttempts().values()) {
+            if(attempt.getPhase() == Phase.SHUFFLE) {
+              shufflingReduceTasks++;
+              break;
+            }
+          }
+        }
+      }
+
       JobTaskAttemptFetchFailureEvent fetchfailureEvent = 
         (JobTaskAttemptFetchFailureEvent) event;
       for (org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId mapId : 
@@ -1680,20 +1694,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
         fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
         job.fetchFailuresMapping.put(mapId, fetchFailures);
         
-        //get number of shuffling reduces
-        int shufflingReduceTasks = 0;
-        for (TaskId taskId : job.reduceTasks) {
-          Task task = job.tasks.get(taskId);
-          if (TaskState.RUNNING.equals(task.getState())) {
-            for(TaskAttempt attempt : task.getAttempts().values()) {
-              if(attempt.getReport().getPhase() == Phase.SHUFFLE) {
-                shufflingReduceTasks++;
-                break;
-              }
-            }
-          }
-        }
-        
         float failureRate = shufflingReduceTasks == 0 ? 1.0f : 
           (float) fetchFailures / shufflingReduceTasks;
         // declare faulty if fetch-failures >= max-allowed-failures

+ 10 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -993,6 +993,16 @@ public abstract class TaskAttemptImpl implements
     }
   }
 
+  @Override
+  public Phase getPhase() {
+    readLock.lock();
+    try {
+      return reportedStatus.phase;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
   @Override
   public TaskAttemptState getState() {
     readLock.lock();

+ 6 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java

@@ -91,6 +91,7 @@ public class DefaultSpeculator extends AbstractService implements
   private final Configuration conf;
   private AppContext context;
   private Thread speculationBackgroundThread = null;
+  private volatile boolean stopped = false;
   private BlockingQueue<SpeculatorEvent> eventQueue
       = new LinkedBlockingQueue<SpeculatorEvent>();
   private TaskRuntimeEstimator estimator;
@@ -170,7 +171,7 @@ public class DefaultSpeculator extends AbstractService implements
         = new Runnable() {
             @Override
             public void run() {
-              while (!Thread.currentThread().isInterrupted()) {
+              while (!stopped && !Thread.currentThread().isInterrupted()) {
                 long backgroundRunStartTime = clock.getTime();
                 try {
                   int speculations = computeSpeculations();
@@ -189,8 +190,9 @@ public class DefaultSpeculator extends AbstractService implements
                   Object pollResult
                       = scanControl.poll(wait, TimeUnit.MILLISECONDS);
                 } catch (InterruptedException e) {
-                  LOG.error("Background thread returning, interrupted : " + e);
-                  e.printStackTrace(System.out);
+                  if (!stopped) {
+                    LOG.error("Background thread returning, interrupted", e);
+                  }
                   return;
                 }
               }
@@ -205,6 +207,7 @@ public class DefaultSpeculator extends AbstractService implements
 
   @Override
   public void stop() {
+    stopped = true;
     // this could be called before background thread is established
     if (speculationBackgroundThread != null) {
       speculationBackgroundThread.interrupt();

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java

@@ -276,6 +276,11 @@ public class MockJobs extends MockApps {
         return report.getProgress();
       }
 
+      @Override
+      public Phase getPhase() {
+        return report.getPhase();
+      }
+
       @Override
       public TaskAttemptState getState() {
         return report.getTaskAttemptState();

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
@@ -638,6 +639,11 @@ public class TestRuntimeEstimators {
       return myAttemptID.getTaskId().getTaskType() == TaskType.MAP ? getMapProgress() : getReduceProgress();
     }
 
+    @Override
+    public Phase getPhase() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
     @Override
     public TaskAttemptState getState() {
       if (overridingState != null) {

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java

@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@@ -106,6 +107,11 @@ public class CompletedTaskAttempt implements TaskAttempt {
     return report;
   }
 
+  @Override
+  public Phase getPhase() {
+    return Phase.CLEANUP;
+  }
+
   @Override
   public TaskAttemptState getState() {
     return state;

+ 7 - 0
hadoop-yarn-project/CHANGES.txt

@@ -25,6 +25,13 @@ Release 2.0.4-beta - UNRELEASED
     YARN-365. Change NM heartbeat handling to not generate a scheduler event
     on each heartbeat. (Xuan Gong via sseth)
 
+    YARN-380. Fix yarn node -status output to be better readable. (Omkar Vinit
+    Joshi via vinodkv)
+
+    YARN-410. Fixed RM UI so that the new lines diagnostics for a failed app on
+    the per-application page are translated to html line breaks. (Omkar Vinit
+    Joshi via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 6 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java

@@ -20,12 +20,14 @@ package org.apache.hadoop.yarn.client.cli;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.Date;
 import java.util.List;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
+import org.apache.commons.lang.time.DateFormatUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
@@ -129,9 +131,10 @@ public class NodeCLI extends YarnCLI {
       nodeReportStr.print("\tHealth-Status(isNodeHealthy) : ");
       nodeReportStr.println(nodeReport.getNodeHealthStatus()
           .getIsNodeHealthy());
-      nodeReportStr.print("\tLast-Last-Health-Update : ");
-      nodeReportStr.println(nodeReport.getNodeHealthStatus()
-          .getLastHealthReportTime());
+      nodeReportStr.print("\tLast-Health-Update : ");
+      nodeReportStr.println(DateFormatUtils.format(
+          new Date(nodeReport.getNodeHealthStatus().
+            getLastHealthReportTime()),"E dd/MMM/yy hh:mm:ss:SSzz"));
       nodeReportStr.print("\tHealth-Report : ");
       nodeReportStr
           .println(nodeReport.getNodeHealthStatus().getHealthReport());

+ 4 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java

@@ -31,10 +31,12 @@ import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 
 import junit.framework.Assert;
 
+import org.apache.commons.lang.time.DateFormatUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -185,7 +187,8 @@ public class TestYarnCLI {
     pw.println("\tNode-State : RUNNING");
     pw.println("\tNode-Http-Address : host1:8888");
     pw.println("\tHealth-Status(isNodeHealthy) : false");
-    pw.println("\tLast-Last-Health-Update : 0");
+    pw.println("\tLast-Health-Update : "
+      + DateFormatUtils.format(new Date(0), "E dd/MMM/yy hh:mm:ss:SSzz"));
     pw.println("\tHealth-Report : null");
     pw.println("\tContainers : 0");
     pw.println("\tMemory-Used : 0M");

+ 18 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/InfoBlock.java

@@ -20,7 +20,11 @@ package org.apache.hadoop.yarn.webapp.view;
 
 import org.apache.hadoop.yarn.webapp.ResponseInfo;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
-import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TD;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR;
+
 
 import com.google.inject.Inject;
 
@@ -47,7 +51,19 @@ public class InfoBlock extends HtmlBlock {
       String value = String.valueOf(item.value);
       if (item.url == null) {
         if (!item.isRaw) {
-          tr.td(value);
+          TD<TR<TABLE<DIV<Hamlet>>>> td = tr.td();
+          if ( value.lastIndexOf('\n') > 0) {
+            String []lines = value.split("\n");
+        	DIV<TD<TR<TABLE<DIV<Hamlet>>>>> singleLineDiv;
+            for ( String line :lines) {
+              singleLineDiv = td.div();
+              singleLineDiv._r(line);
+              singleLineDiv._();
+            }
+          } else {
+            td._r(value);
+          }
+          td._();
         } else {
           tr.td()._r(value)._();
         }

+ 81 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/webapp/view/TestInfoBlock.java

@@ -0,0 +1,81 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.webapp.view;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.yarn.webapp.ResponseInfo;
+import org.apache.hadoop.yarn.webapp.test.WebAppTests;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestInfoBlock {
+
+  public static StringWriter sw;
+
+  public static PrintWriter pw;
+
+  public static class MultilineInfoBlock extends InfoBlock{
+    
+    static ResponseInfo resInfo;
+
+    static {
+      resInfo = new ResponseInfo();
+      resInfo._("Single_line_value", "This is one line.");
+      resInfo._("Multiple_line_value", "This is first line.\nThis is second line.");	
+    }
+
+    @Override
+    public PrintWriter writer() {
+      return TestInfoBlock.pw;
+    }
+
+    MultilineInfoBlock(ResponseInfo info) {
+      super(resInfo);
+    }
+
+    public MultilineInfoBlock() {
+      super(resInfo);
+    }
+  }
+
+  @Before
+  public void setup() {
+    sw = new StringWriter();
+    pw = new PrintWriter(sw);
+  }
+
+  @Test(timeout=60000L)
+  public void testMultilineInfoBlock() throws Exception{
+
+    WebAppTests.testBlock(MultilineInfoBlock.class);
+    TestInfoBlock.pw.flush();
+    String output = TestInfoBlock.sw.toString().replaceAll(" +", " ");
+    String expectedSinglelineData = "<tr class=\"odd\">\n"
+      + " <th>\n Single_line_value\n <td>\n This is one line.\n";
+    String expectedMultilineData = "<tr class=\"even\">\n"
+      + " <th>\n Multiple_line_value\n <td>\n <div>\n"
+      + " This is first line.\n </div>\n <div>\n"
+      + " This is second line.\n </div>\n";
+    assertTrue(output.contains(expectedSinglelineData) && output.contains(expectedMultilineData));
+  }
+}