Browse Source

Merge branch 'condor-branch-1' of https://github.com/hortonworks/hadoop into condor-branch-1

brandonli 12 years ago
parent
commit
a8e9e3c401

+ 39 - 5
CHANGES.txt

@@ -8,6 +8,9 @@ Release 1.3.0 - unreleased
 
   IMPROVEMENTS
 
+    HADOOP-9450. HADOOP_USER_CLASSPATH_FIRST is not honored; CLASSPATH
+    is PREpended instead of APpended. (Chris Nauroth and harsh via harsh)
+
   BUG FIXES
 
     MAPREDUCE-5047. keep.failed.task.files=true causes job failure on 
@@ -34,6 +37,12 @@ Release 1.3.0 - unreleased
     MAPREDUCE-2817. MiniRMCluster hardcodes 'mapred.local.dir' configuration 
     to 'build/test/mapred/local'. (rkanter via tucu)
 
+    MAPREDUCE-5133. TestSubmitJob.testSecureJobExecution is flaky due to job 
+    dir deletion race. (sandyr via tucu)
+
+    HDFS-4699. Additional conditions for avoiding unnecessary 
+    DataNode.checkDiskError calls. (Chris Nauroth via kihwal)
+
 Release 1.2.0 - unreleased
 
   INCOMPATIBLE CHANGES
@@ -105,6 +114,8 @@ Release 1.2.0 - unreleased
     MAPREDUCE-4824. Provide a mechanism for jobs to indicate they should not
     be recovered on JobTracker restart. (tomwhite & acmurthy via acmurthy) 
 
+    HDFS-4776. Backport SecondaryNameNode web ui.  (szetszwo)
+
   IMPROVEMENTS
 
     HADOOP-9434. Backport HADOOP-9267: hadoop -h|-{0,2}help should print usage.
@@ -244,6 +255,8 @@ Release 1.2.0 - unreleased
 
   BUG FIXES
 
+    MAPREDUCE-5202. Revert MAPREDUCE-4397 to improve LinuxTaskController security. (omalley)
+
     HADOOP-9467. Metrics2 record filter should check name as well as tags.
     (Chris Nauroth and Ganeshan Iyler via llu)
 
@@ -599,6 +612,22 @@ Release 1.2.0 - unreleased
 
     HADOOP-9473. Typo in FileUtil copy() method. (Glen Mazza via suresh)
 
+    MAPREDUCE-5066. Added a timeout for the job.end.notification.url. (Ivan
+    Mitic via acmurthy)
+
+    HDFS-4715. Backport HDFS-3577, HDFS-3318 and HDFS-3788: fix some WebHDFS
+    performance issues.  (Mark Wagner via szetszwo)
+
+    HADOOP-9492. Update testConf.xml for HADOOP-9473.  (Jing Zhao via szetszwo)
+
+    MAPREDUCE-4737. Ensure that mapreduce APIs are semantically consistent
+    with mapred API w.r.t Mapper.cleanup and Reducer.cleanup; in the sense
+    that cleanup is now called even if there is an error. The old mapred API
+    already ensures that Mapper.close and Reducer.close are invoked during
+    error handling. Note that it is an incompatible change, however end-users
+    can override Mapper.run and Reducer.run to get the old (inconsistent)
+    behaviour. (acmurthy) 
+
     MAPREDUCE-5169. Fixed JobTracker recovery to ensure both job-info and
     jobToken file are saved correctly to prevent race-condition between job
     submission and initialization. (acmurthy)
@@ -609,11 +638,6 @@ Release 1.2.0 - unreleased
     HADOOP-9458. Fix RPC.getProxy to ensure it uses retries for
     getProtocolVersion too. (szetszwo via acmurthy)
 
-    HDFS-4715. Backport HDFS-3577, HDFS-3318 and HDFS-3788: fix some WebHDFS
-    performance issues.  (Mark Wagner via szetszwo)
-
-    HADOOP-9492. Update testConf.xml for HADOOP-9473.  (Jing Zhao via szetszwo)
-
     HADOOP-9502. chmod/chown do not return error exit codes for some exceptions.
     (szetszwo)
 
@@ -622,6 +646,16 @@ Release 1.2.0 - unreleased
     false to indicate they don't want to be recovered. (Mayank Bansal via
     acmurthy) 
 
+    MAPREDUCE-5198. Fix a race condition during TaskTracker re-init which was
+    causing failures since task directories were being cleaned up in multiple
+    threads. (arpit via acmurthy)
+
+    MAPREDUCE-5154. Ensure job delegation token renewal is cancelled after job
+    staging directory is deleted. (Sandy Ryza & Arun C. Murthy via acmurthy)
+
+    HADOOP-9537. Backport changes to add support running Hadoop client on AIX.
+    (Aaron T. Myers, backported by Arpit Agarwal via suresh)
+
 Release 1.1.2 - 2013.01.30
 
   INCOMPATIBLE CHANGES

+ 11 - 6
bin/hadoop

@@ -156,9 +156,7 @@ fi
 
 # CLASSPATH initially contains $HADOOP_CONF_DIR
 CLASSPATH="${HADOOP_CONF_DIR}"
-if [ "$HADOOP_USER_CLASSPATH_FIRST" != "" ] && [ "$HADOOP_CLASSPATH" != "" ] ; then
-  CLASSPATH=${CLASSPATH}:${HADOOP_CLASSPATH}
-fi
+
 CLASSPATH=${CLASSPATH}:$JAVA_HOME/lib/tools.jar
 
 # for developers, add Hadoop classes to CLASSPATH
@@ -232,9 +230,16 @@ else
   done
 fi
 
-# add user-specified CLASSPATH last
-if [ "$HADOOP_USER_CLASSPATH_FIRST" = "" ] && [ "$HADOOP_CLASSPATH" != "" ]; then
-  CLASSPATH=${CLASSPATH}:${HADOOP_CLASSPATH}
+# Add the user-specified CLASSPATH via HADOOP_CLASSPATH
+# Add it first or last depending on if user has
+# set env-var HADOOP_USER_CLASSPATH_FIRST
+if [ "$HADOOP_CLASSPATH" != "" ]; then
+  # Prefix it if its to be preceded
+  if [ "$HADOOP_USER_CLASSPATH_FIRST" != "" ]; then
+    CLASSPATH=${HADOOP_CLASSPATH}:${CLASSPATH}
+  else
+    CLASSPATH=${CLASSPATH}:${HADOOP_CLASSPATH}
+  fi
 fi
 
 # default log directory & file

+ 7 - 0
build.xml

@@ -576,6 +576,13 @@
      webxml="${build.webapps}/datanode/WEB-INF/web.xml">
     </jsp-compile>
 
+    <jsp-compile
+     uriroot="${src.webapps}/secondary"
+     outputdir="${build.src}"
+     package="org.apache.hadoop.hdfs.server.namenode"
+     webxml="${build.webapps}/secondary/WEB-INF/web.xml">
+    </jsp-compile>
+
     <!-- Compile Java files (excluding JSPs) checking warnings -->
     <javac 
      encoding="${build.encoding}" 

+ 0 - 23
src/c++/task-controller/impl/configuration.c

@@ -87,29 +87,6 @@ static int is_only_root_writable(const char *file) {
   return 1;
 }
 
-/**
- * Get the full path of the configuration file.
- * Use $HADOOP_SECURITY_CONF_DIR for the configuration directory, and if
- * it's not set, use the default value in default_conf_dir.
- */
-void get_config_path(char* conf_file_path, int size,
-                     char* default_conf_dir,
-                     const char* conf_file_name) {
-  if (conf_file_name == NULL) {
-    fprintf(LOGFILE, "Null configuration filename passed in\n");
-    exit(INVALID_CONFIG_FILE);
-  }
-  char *orig_conf_dir = getenv("HADOOP_SECURITY_CONF_DIR");
-  if (orig_conf_dir == NULL) {
-    if (default_conf_dir == NULL) {
-      fprintf(LOGFILE, "Null default configuration directory passed in\n");
-      exit(INVALID_CONFIG_FILE);
-    }
-    orig_conf_dir = default_conf_dir;
-  }
-  snprintf(conf_file_path, size, "%s/%s", orig_conf_dir, conf_file_name);
-}
-
 /**
  * Ensure that the configuration file and all of the containing directories
  * are only writable by root. Otherwise, an attacker can change the 

+ 0 - 9
src/c++/task-controller/impl/configuration.h

@@ -16,15 +16,6 @@
  * limitations under the License.
  */
 
-/**
- * Get the full path of the configuration file.
- * Use $HADOOP_SECURITY_CONF_DIR for the configuration directory, and if
- * it's not set, use the default value in default_conf_dir.
- */
-void get_config_path(char* conf_file_path, int size,
-                     char* default_conf_dir,
-                     const char* conf_file_name);
-
 /**
  * Ensure that the configuration file and all of the containing directories
  * are only writable by root. Otherwise, an attacker can change the 

+ 1 - 5
src/c++/task-controller/impl/main.c

@@ -80,11 +80,7 @@ int main(int argc, char **argv) {
   #error HADOOP_CONF_DIR must be defined
 #endif
 
-  char orig_conf_file[PATH_MAX + 1]; // realpath is limitted by PATH_MAX
-  orig_conf_file[PATH_MAX] = 0; // in case of snprintf error
-  get_config_path(orig_conf_file, PATH_MAX + 1,
-                  STRINGIFY(HADOOP_CONF_DIR),
-                  CONF_FILENAME);
+  char *orig_conf_file = STRINGIFY(HADOOP_CONF_DIR) "/" CONF_FILENAME;
   char *conf_file = realpath(orig_conf_file, NULL);
 
   if (conf_file == NULL) {

+ 0 - 23
src/c++/task-controller/test/test-task-controller.c

@@ -281,27 +281,6 @@ void test_check_user() {
   }
 }
 
-void test_get_config_path() {
-  printf("\nTesting get_config_path\n");
-  char conf_file_1[PATH_MAX];
-  char conf_file_2[PATH_MAX];
-  get_config_path(conf_file_1, PATH_MAX, TEST_ROOT, "test.cfg");
-  char *conf_dir = getenv("HADOOP_SECURITY_CONF_DIR");
-  if (conf_dir == NULL) {
-    if (strcmp(conf_file_1, TEST_ROOT "/test.cfg") != 0) {
-      printf("FAIL: got wrong configuration file path\n");
-      exit(1);
-    }
-  }
-  else {
-    snprintf(conf_file_2, PATH_MAX, "%s/%s", conf_dir, "test.cfg");
-    if (strcmp(conf_file_1, conf_file_2) != 0) {
-      printf("FAIL: got wrong configuration file path\n");
-      exit(1);
-    }
-  }
-}
-
 void test_check_configuration_permissions() {
   printf("\nTesting check_configuration_permissions\n");
   if (check_configuration_permissions("/etc/passwd") != 0) {
@@ -843,8 +822,6 @@ int main(int argc, char **argv) {
   printf("\nTesting get_job_log_dir()\n");
   test_get_job_log_dir();
 
-  test_get_config_path();
-
   test_check_configuration_permissions();
 
   printf("\nTesting get_task_log_dir()\n");

+ 62 - 25
src/core/org/apache/hadoop/security/UserGroupInformation.java

@@ -253,20 +253,30 @@ public class UserGroupInformation {
   
   private static String OS_LOGIN_MODULE_NAME;
   private static Class<? extends Principal> OS_PRINCIPAL_CLASS;
+
   private static final boolean windows =
       System.getProperty("os.name").startsWith("Windows");
   private static final boolean is64Bit =
       System.getProperty("os.arch").contains("64");
+
+  private static final boolean ibmJava = System.getProperty("java.vendor").contains("IBM");
+  private static final boolean aix = System.getProperty("os.name").equals("AIX");
+
   private static Thread renewerThread = null;
   private static volatile boolean shouldRunRenewerThread = true;
   
   /* Return the OS login module class name */
   private static String getOSLoginModuleName() {
-    if (System.getProperty("java.vendor").contains("IBM")) {
-      return windows ? (is64Bit
-          ? "com.ibm.security.auth.module.Win64LoginModule"
-          : "com.ibm.security.auth.module.NTLoginModule")
-        : "com.ibm.security.auth.module.LinuxLoginModule";
+    if (ibmJava) {
+      if (windows) {
+        return is64Bit ? "com.ibm.security.auth.module.Win64LoginModule"
+            : "com.ibm.security.auth.module.NTLoginModule";
+      } else if (aix) {
+        return is64Bit ? "com.ibm.security.auth.module.AIX64LoginModule"
+            : "com.ibm.security.auth.module.AIXLoginModule";
+      } else {
+        return "com.ibm.security.auth.module.LinuxLoginModule";
+      }
     } else {
       return windows ? "com.sun.security.auth.module.NTLoginModule"
         : "com.sun.security.auth.module.UnixLoginModule";
@@ -278,21 +288,24 @@ public class UserGroupInformation {
   private static Class<? extends Principal> getOsPrincipalClass() {
     ClassLoader cl = ClassLoader.getSystemClassLoader();
     try {
-      if (System.getProperty("java.vendor").contains("IBM")) {
-        if (windows) {
-          return (Class<? extends Principal>) (is64Bit
-            ? cl.loadClass("com.ibm.security.auth.UsernamePrincipal")
-            : cl.loadClass("com.ibm.security.auth.NTUserPrincipal"));
+      String principalClass = null;
+      if (ibmJava) {
+        if (is64Bit) {
+          principalClass = "com.ibm.security.auth.UsernamePrincipal";
         } else {
-          return (Class<? extends Principal>) (is64Bit
-            ? cl.loadClass("com.ibm.security.auth.UsernamePrincipal")
-            : cl.loadClass("com.ibm.security.auth.LinuxPrincipal"));
+          if (windows) {
+            principalClass = "com.ibm.security.auth.NTUserPrincipal";
+          } else if (aix) {
+            principalClass = "com.ibm.security.auth.AIXPrincipal";
+          } else {
+            principalClass = "com.ibm.security.auth.LinuxPrincipal";
+          }
         }
       } else {
-        return (Class<? extends Principal>) (windows
-           ? cl.loadClass("com.sun.security.auth.NTUserPrincipal")
-           : cl.loadClass("com.sun.security.auth.UnixPrincipal"));
+        principalClass = windows ? "com.sun.security.auth.NTUserPrincipal"
+            : "com.sun.security.auth.UnixPrincipal";
       }
+      return (Class<? extends Principal>) cl.loadClass(principalClass);  
     } catch (ClassNotFoundException e) {
       LOG.error("Unable to find JAAS classes:" + e.getMessage());
     }
@@ -365,12 +378,21 @@ public class UserGroupInformation {
     private static final Map<String,String> USER_KERBEROS_OPTIONS = 
       new HashMap<String,String>();
     static {
-      USER_KERBEROS_OPTIONS.put("doNotPrompt", "true");
-      USER_KERBEROS_OPTIONS.put("useTicketCache", "true");
-      USER_KERBEROS_OPTIONS.put("renewTGT", "true");
+      if (ibmJava) {
+        USER_KERBEROS_OPTIONS.put("useDefaultCcache", "true");
+      } else {
+        USER_KERBEROS_OPTIONS.put("doNotPrompt", "true");
+        USER_KERBEROS_OPTIONS.put("useTicketCache", "true");
+        USER_KERBEROS_OPTIONS.put("renewTGT", "true");
+      }
       String ticketCache = System.getenv("KRB5CCNAME");
       if (ticketCache != null) {
-        USER_KERBEROS_OPTIONS.put("ticketCache", ticketCache);
+        if (ibmJava) {
+          // The first value searched when "useDefaultCcache" is used.
+          System.setProperty("KRB5CCNAME", ticketCache);
+        } else {
+          USER_KERBEROS_OPTIONS.put("ticketCache", ticketCache);
+        }
       }
     }
     private static final AppConfigurationEntry USER_KERBEROS_LOGIN =
@@ -380,10 +402,14 @@ public class UserGroupInformation {
     private static final Map<String,String> KEYTAB_KERBEROS_OPTIONS = 
       new HashMap<String,String>();
     static {
-      KEYTAB_KERBEROS_OPTIONS.put("doNotPrompt", "true");
-      KEYTAB_KERBEROS_OPTIONS.put("useKeyTab", "true");
-      KEYTAB_KERBEROS_OPTIONS.put("storeKey", "true");
-      KEYTAB_KERBEROS_OPTIONS.put("refreshKrb5Config", "true");
+      if (ibmJava) {
+        KEYTAB_KERBEROS_OPTIONS.put("credsType", "both");
+      } else {
+        KEYTAB_KERBEROS_OPTIONS.put("doNotPrompt", "true");
+        KEYTAB_KERBEROS_OPTIONS.put("useKeyTab", "true");
+        KEYTAB_KERBEROS_OPTIONS.put("storeKey", "true");
+        KEYTAB_KERBEROS_OPTIONS.put("refreshKrb5Config", "true");
+      }
     }
     private static final AppConfigurationEntry KEYTAB_KERBEROS_LOGIN =
       new AppConfigurationEntry(KerberosUtil.getKrb5LoginModuleName(),
@@ -424,7 +450,12 @@ public class UserGroupInformation {
       } else if (USER_KERBEROS_CONFIG_NAME.equals(appName)) {
         return USER_KERBEROS_CONF;
       } else if (KEYTAB_KERBEROS_CONFIG_NAME.equals(appName)) {
-        KEYTAB_KERBEROS_OPTIONS.put("keyTab", keytabFile);
+        if (ibmJava) {
+          KEYTAB_KERBEROS_OPTIONS.put("useKeytab",
+              prependFileAuthority(keytabFile));
+        } else {
+          KEYTAB_KERBEROS_OPTIONS.put("keyTab", keytabFile);
+        }
         KEYTAB_KERBEROS_OPTIONS.put("principal", keytabPrincipal);
         return KEYTAB_KERBEROS_CONF;
       }
@@ -460,6 +491,11 @@ public class UserGroupInformation {
     user.setLogin(login);
   }
 
+  private static String prependFileAuthority(String keytabPath) {
+    return keytabPath.startsWith("file://") ? keytabPath
+        : "file://" + keytabPath;
+  }
+
   /**
    * Create a UserGroupInformation for the given subject.
    * This does not change the subject or acquire new credentials.
@@ -539,6 +575,7 @@ public class UserGroupInformation {
         }
         loginUser.spawnAutoRenewalThreadForUserCreds();
       } catch (LoginException le) {
+        LOG.debug("failure to login", le);
         throw new IOException("failure to login", le);
       }
       if (LOG.isDebugEnabled()) {

+ 4 - 1
src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -929,7 +929,10 @@ public class DataNode extends Configured
     LOG.warn("checkDiskError: exception: ", e);  
     if (e instanceof SocketException || e instanceof SocketTimeoutException
     	  || e instanceof ClosedByInterruptException 
-    	  || e.getMessage().startsWith("Broken pipe")) {
+    	  || e.getMessage().startsWith("An established connection was aborted")
+    	  || e.getMessage().startsWith("Broken pipe")
+    	  || e.getMessage().startsWith("Connection reset")
+    	  || e.getMessage().contains("java.nio.channels.SocketChannel")) {
       LOG.info("Not checking disk as checkDiskError was called on a network" +
         " related exception");	
       return;

+ 8 - 0
src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java

@@ -637,4 +637,12 @@ public class JspHelper {
         + VersionInfo.getDate() + " by " + VersionInfo.getUser()
         + "</td></tr>\n</table></div>";
   }
+
+  /** Return a table containing version information. */
+  public static String getVersionTable() {
+    return "<div id='dfstable'><table>"       
+        + "\n  <tr><td id='col1'>Version:</td><td>" + VersionInfo.getVersion() + ", " + VersionInfo.getRevision()
+        + "\n  <tr><td id='col1'>Compiled:</td><td>" + VersionInfo.getDate() + " by " + VersionInfo.getUser()
+        + "\n</table></div>";
+  }
 }

+ 21 - 6
src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java

@@ -17,17 +17,19 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.hdfs.protocol.FSConstants.BUFFER_SIZE;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.URI;
 import java.security.DigestInputStream;
 import java.security.MessageDigest;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -35,11 +37,9 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
-import static org.apache.hadoop.hdfs.protocol.FSConstants.BUFFER_SIZE;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.MD5Hash;
@@ -76,6 +76,9 @@ public class SecondaryNameNode implements Runnable {
   public static final Log LOG = 
     LogFactory.getLog(SecondaryNameNode.class.getName());
 
+  private final long starttime = System.currentTimeMillis();
+  private volatile long lastCheckpointTime = 0;
+
   private String fsName;
   private CheckpointStorage checkpointImage;
 
@@ -91,8 +94,20 @@ public class SecondaryNameNode implements Runnable {
   private Collection<File> checkpointDirs;
   private Collection<File> checkpointEditsDirs;
   private long checkpointPeriod;	// in seconds
-  private long checkpointSize;    // size (in MB) of current Edit Log
-
+  private long checkpointSize;    // size (in bytes) of current Edit Log
+
+  /** {@inheritDoc} */
+  public String toString() {
+    return getClass().getSimpleName() + " Status" 
+      + "\nName Node Address    : " + nameNodeAddr   
+      + "\nStart Time           : " + new Date(starttime)
+      + "\nLast Checkpoint Time : " + (lastCheckpointTime == 0? "--": new Date(lastCheckpointTime))
+      + "\nCheckpoint Period    : " + checkpointPeriod + " seconds"
+      + "\nCheckpoint Size      : " + StringUtils.byteDesc(checkpointSize)
+                                    + " (= " + checkpointSize + " bytes)" 
+      + "\nCheckpoint Dirs      : " + checkpointDirs
+      + "\nCheckpoint Edits Dirs: " + checkpointEditsDirs;
+  }
   /**
    * Utility class to facilitate junit test error simulation.
    */
@@ -255,6 +270,7 @@ public class SecondaryNameNode implements Runnable {
         }
       };
 
+    infoServer.setAttribute("secondary.name.node", this);
     infoServer.setAttribute("name.system.image", checkpointImage);
     infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
     infoServer.addInternalServlet("getimage", "/getimage",
@@ -354,7 +370,6 @@ public class SecondaryNameNode implements Runnable {
     // pending edit log.
     //
     long period = 5 * 60;              // 5 minutes
-    long lastCheckpointTime = 0;
     if (checkpointPeriod < period) {
       period = checkpointPeriod;
     }

+ 31 - 1
src/mapred/org/apache/hadoop/mapred/CleanupQueue.java

@@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
 import org.apache.hadoop.security.UserGroupInformation;
 
 public class CleanupQueue {
@@ -57,16 +58,38 @@ public class CleanupQueue {
     final Path fullPath;// full path of file or dir
     final Configuration conf;
     final UserGroupInformation ugi;
+    final JobID jobIdTokenRenewalToCancel;
 
     public PathDeletionContext(Path fullPath, Configuration conf) {
-      this(fullPath, conf, null);
+      this(fullPath, conf, null, null);
     }
 
     public PathDeletionContext(Path fullPath, Configuration conf,
         UserGroupInformation ugi) {
+      this(fullPath, conf, ugi, null);
+    }
+    
+    /**
+     * PathDeletionContext ctor which also allows for a job-delegation token
+     * renewal to be cancelled.
+     * 
+     * This is usually used at the end of a job to delete it's final path and 
+     * to cancel renewal of it's job-delegation token.
+     * 
+     * @param fullPath path to be deleted
+     * @param conf job configuration
+     * @param ugi ugi of the job to be used to delete the path
+     * @param jobIdTokenRenewalToCancel jobId of the job whose job-delegation
+     *                                  token renewal should be cancelled. No
+     *                                  cancellation is attempted if this is
+     *                                  <code>null</code>
+     */
+    public PathDeletionContext(Path fullPath, Configuration conf,
+        UserGroupInformation ugi, JobID jobIdTokenRenewalToCancel) {
       this.fullPath = fullPath;
       this.conf = conf;
       this.ugi = ugi;
+      this.jobIdTokenRenewalToCancel = jobIdTokenRenewalToCancel;
     }
     
     protected Path getPathForCleanup() {
@@ -86,6 +109,13 @@ public class CleanupQueue {
              return null;
             }
           });
+      
+      // Cancel renewal of job-delegation token if necessary
+      if (jobIdTokenRenewalToCancel != null && 
+          conf.getBoolean(JobContext.JOB_CANCEL_DELEGATION_TOKEN, true)) {
+        DelegationTokenRenewal.removeDelegationTokenRenewalForJob(
+            jobIdTokenRenewalToCancel);
+      }
     }
 
     @Override

+ 67 - 52
src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java

@@ -39,51 +39,51 @@ public class JobEndNotifier {
   private static volatile boolean running;
   private static BlockingQueue<JobEndStatusInfo> queue =
     new DelayQueue<JobEndStatusInfo>();
+  // Set default timeout to 5 seconds
+  public static final int MAPREDUCE_JOBEND_NOTIFICATION_TIMEOUT_DEFAULT = 5000;
 
   public static void startNotifier() {
     running = true;
     thread = new Thread(
-                        new Runnable() {
-                          public void run() {
-                            try {
-                              while (running) {
-                                sendNotification(queue.take());
-                              }
-                            }
-                            catch (InterruptedException irex) {
-                              if (running) {
-                                LOG.error("Thread has ended unexpectedly", irex);
-                              }
-                            }
-                          }
-
-                          private void sendNotification(JobEndStatusInfo notification) {
-                            try {
-                              int code = httpNotification(notification.getUri());
-                              if (code != 200) {
-                                throw new IOException("Invalid response status code: " + code);
-                              }
-                            }
-                            catch (IOException ioex) {
-                              LOG.error("Notification failure [" + notification + "]", ioex);
-                              if (notification.configureForRetry()) {
-                                try {
-                                  queue.put(notification);
-                                }
-                                catch (InterruptedException iex) {
-                                  LOG.error("Notification queuing error [" + notification + "]",
-                                            iex);
-                                }
-                              }
-                            }
-                            catch (Exception ex) {
-                              LOG.error("Notification failure [" + notification + "]", ex);
-                            }
-                          }
-
-                        }
-
-                        );
+        new Runnable() {
+          public void run() {
+            try {
+              while (running) {
+                sendNotification(queue.take());
+              }
+            }
+            catch (InterruptedException irex) {
+              if (running) {
+                LOG.error("Thread has ended unexpectedly", irex);
+              }
+            }
+          }
+
+          private void sendNotification(JobEndStatusInfo notification) {
+            try {
+              int code = httpNotification(notification.getUri(),
+                  notification.getTimeout());
+              if (code != 200) {
+                throw new IOException("Invalid response status code: " + code);
+              }
+            }
+            catch (IOException ioex) {
+              LOG.error("Notification failure [" + notification + "]", ioex);
+              if (notification.configureForRetry()) {
+                try {
+                  queue.put(notification);
+                }
+                catch (InterruptedException iex) {
+                  LOG.error("Notification queuing error [" + notification + "]",
+                      iex);
+                }
+              }
+            }
+            catch (Exception ex) {
+              LOG.error("Notification failure [" + notification + "]", ex);
+            }
+          }
+        });
     thread.start();
   }
 
@@ -97,9 +97,10 @@ public class JobEndNotifier {
     JobEndStatusInfo notification = null;
     String uri = conf.getJobEndNotificationURI();
     if (uri != null) {
-      // +1 to make logic for first notification identical to a retry
-      int retryAttempts = conf.getInt("job.end.retry.attempts", 0) + 1;
+      int retryAttempts = conf.getInt("job.end.retry.attempts", 0);
       long retryInterval = conf.getInt("job.end.retry.interval", 30000);
+      int timeout = conf.getInt("mapreduce.job.end-notification.timeout",
+          MAPREDUCE_JOBEND_NOTIFICATION_TIMEOUT_DEFAULT);
       if (uri.contains("$jobId")) {
         uri = uri.replace("$jobId", status.getJobID().toString());
       }
@@ -109,7 +110,8 @@ public class JobEndNotifier {
             (status.getRunState() == JobStatus.FAILED) ? "FAILED" : "KILLED";
         uri = uri.replace("$jobStatus", statusStr);
       }
-      notification = new JobEndStatusInfo(uri, retryAttempts, retryInterval);
+      notification = new JobEndStatusInfo(
+          uri, retryAttempts, retryInterval, timeout);
     }
     return notification;
   }
@@ -126,12 +128,17 @@ public class JobEndNotifier {
     }
   }
 
-  private static int httpNotification(String uri) throws IOException {
+  private static int httpNotification(String uri, int timeout)
+      throws IOException {
     URI url = new URI(uri, false);
-    HttpClient m_client = new HttpClient();
+
+    HttpClient httpClient = new HttpClient();
+    httpClient.getParams().setSoTimeout(timeout);
+    httpClient.getParams().setConnectionManagerTimeout(timeout);
+
     HttpMethod method = new GetMethod(url.getEscapedURI());
     method.setRequestHeader("Accept", "*/*");
-    return m_client.executeMethod(method);
+    return httpClient.executeMethod(method);
   }
 
   // for use by the LocalJobRunner, without using a thread&queue,
@@ -139,9 +146,10 @@ public class JobEndNotifier {
   public static void localRunnerNotification(JobConf conf, JobStatus status) {
     JobEndStatusInfo notification = createNotification(conf, status);
     if (notification != null) {
-      while (notification.configureForRetry()) {
+      do {
         try {
-          int code = httpNotification(notification.getUri());
+          int code = httpNotification(notification.getUri(),
+              notification.getTimeout());
           if (code != 200) {
             throw new IOException("Invalid response status code: " + code);
           }
@@ -157,13 +165,13 @@ public class JobEndNotifier {
         }
         try {
           synchronized (Thread.currentThread()) {
-            Thread.currentThread().sleep(notification.getRetryInterval());
+            Thread.sleep(notification.getRetryInterval());
           }
         }
         catch (InterruptedException iex) {
           LOG.error("Notification retry error [" + notification + "]", iex);
         }
-      }
+      } while (notification.configureForRetry());
     }
   }
 
@@ -172,11 +180,14 @@ public class JobEndNotifier {
     private int retryAttempts;
     private long retryInterval;
     private long delayTime;
+    private int timeout;
 
-    JobEndStatusInfo(String uri, int retryAttempts, long retryInterval) {
+    JobEndStatusInfo(String uri, int retryAttempts, long retryInterval,
+        int timeout) {
       this.uri = uri;
       this.retryAttempts = retryAttempts;
       this.retryInterval = retryInterval;
+      this.timeout = timeout;
       this.delayTime = System.currentTimeMillis();
     }
 
@@ -192,6 +203,10 @@ public class JobEndNotifier {
       return retryInterval;
     }
 
+    public int getTimeout() {
+      return timeout;
+    }
+
     public long getDelayTime() {
       return delayTime;
     }

+ 2 - 7
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -3319,13 +3319,13 @@ public class JobInProgress {
         CleanupQueue.getInstance().addToQueue(
             new PathDeletionContext(tempDir, conf));
 
-        // delete the staging area for the job
+        // delete the staging area for the job and cancel delegation token
         String jobTempDir = conf.get("mapreduce.job.dir");
         if (jobTempDir != null && conf.getKeepTaskFilesPattern() == null &&
             !conf.getKeepFailedTaskFiles()) {
           Path jobTempDirPath = new Path(jobTempDir);
           CleanupQueue.getInstance().addToQueue(
-              new PathDeletionContext(jobTempDirPath, conf, userUGI));
+              new PathDeletionContext(jobTempDirPath, conf, userUGI, jobId));
         }
 
       } catch (IOException e) {
@@ -3341,11 +3341,6 @@ public class JobInProgress {
       this.runningReduces = null;
     }
     
-    // remove jobs delegation tokens
-    if(conf.getBoolean(JobContext.JOB_CANCEL_DELEGATION_TOKEN, true)) {
-      DelegationTokenRenewal.removeDelegationTokenRenewalForJob(jobId);
-    } // else don't remove it.May be used by spawned tasks
-
     //close the user's FS
     try {
       fs.close();

+ 39 - 20
src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java

@@ -268,32 +268,51 @@ class LinuxTaskController extends TaskController {
 
   @Override
   public void deleteAsUser(String user, String subDir) throws IOException {
-    String[] command = 
-      new String[]{taskControllerExe, 
-                   user,
-                   localStorage.getDirsString(),
-                   Integer.toString(Commands.DELETE_AS_USER.getValue()),
-                   subDir};
-    ShellCommandExecutor shExec = new ShellCommandExecutor(command);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("deleteAsUser: " + Arrays.toString(command));
+    String[] command = new String[] { taskControllerExe, user,
+        localStorage.getDirsString(),
+        Integer.toString(Commands.DELETE_AS_USER.getValue()), subDir };
+    ShellCommandExecutor shExec = null;
+    try {
+      shExec = new ShellCommandExecutor(command);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("deleteAsUser: " + Arrays.toString(command));
+      }
+      shExec.execute();
+    } catch (IOException e) {
+      if (shExec != null) {
+        int exitCode = shExec.getExitCode();
+        LOG.info("deleteAsUser: " + Arrays.toString(command));
+        LOG.warn("Exit code is : " + exitCode);
+        LOG.info("Output from deleteAsUser LinuxTaskController:");
+        logOutput(shExec.getOutput());
+      }
+      throw e;
     }
-    shExec.execute();
   }
 
   @Override
   public void deleteLogAsUser(String user, String subDir) throws IOException {
-    String[] command = 
-      new String[]{taskControllerExe, 
-                   user,
-                   localStorage.getDirsString(),
-                   Integer.toString(Commands.DELETE_LOG_AS_USER.getValue()),
-                   subDir};
-    ShellCommandExecutor shExec = new ShellCommandExecutor(command);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("deleteLogAsUser: " + Arrays.toString(command));
+    String[] command = new String[] { taskControllerExe, user,
+        localStorage.getDirsString(),
+        Integer.toString(Commands.DELETE_LOG_AS_USER.getValue()), subDir };
+    ShellCommandExecutor shExec = null;
+    try {
+      shExec = new ShellCommandExecutor(command);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("deleteLogAsUser: " + Arrays.toString(command));
+      }
+      shExec.execute();
+    } catch (IOException e) {
+      if (shExec != null) {
+        int exitCode = shExec.getExitCode();
+        LOG.info("deleteLogAsUser: " + Arrays.toString(command));
+        LOG.warn("Exit code is : " + exitCode);
+        LOG.info("Output from deleteLogAsUser LinuxTaskController:");
+        logOutput(shExec.getOutput());
+      }
+      throw e;
     }
-    shExec.execute();
   }
 
   @Override

+ 68 - 11
src/mapred/org/apache/hadoop/mapred/MapTask.java

@@ -48,12 +48,11 @@ import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.serializer.Deserializer;
@@ -62,13 +61,8 @@ import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapreduce.split.JobSplit;
-import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.IndexedSorter;
 import org.apache.hadoop.util.Progress;
@@ -434,10 +428,14 @@ class MapTask extends Task {
     try {
       runner.run(in, new OldOutputCollector(collector, conf), reporter);
       collector.flush();
-    } finally {
-      //close
-      in.close();                               // close input
+      
+      in.close();
+      in = null;
       collector.close();
+      collector = null;
+    } finally {
+      closeQuietly(in);
+      closeQuietly(collector);
     }
   }
 
@@ -764,7 +762,9 @@ class MapTask extends Task {
       input.initialize(split, mapperContext);
       mapper.run(mapperContext);
       input.close();
+      input = null;
       output.close(mapperContext);
+      output = null;
     } catch (NoSuchMethodException e) {
       throw new IOException("Can't find Context constructor", e);
     } catch (InstantiationException e) {
@@ -773,6 +773,9 @@ class MapTask extends Task {
       throw new IOException("Can't invoke Context constructor", e);
     } catch (IllegalAccessException e) {
       throw new IOException("Can't invoke Context constructor", e);
+    } finally {
+      closeQuietly(input);
+      closeQuietly(output, mapperContext);
     }
   }
 
@@ -1739,5 +1742,59 @@ class MapTask extends Task {
       super(s);
     }
   }
+  
+  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
+  void closeQuietly(RecordReader<INKEY, INVALUE> c) {
+    if (c != null) {
+      try {
+        c.close();
+      } catch (IOException ie) {
+        // Ignore
+        LOG.info("Ignoring exception during close for " + c, ie);
+      }
+    }
+  }
+
+  private <OUTKEY, OUTVALUE>
+  void closeQuietly(MapOutputCollector<OUTKEY, OUTVALUE> c) {
+    if (c != null) {
+      try {
+        c.close();
+      } catch (Exception ie) {
+        // Ignore
+        LOG.info("Ignoring exception during close for " + c, ie);
+      }
+    }
+  }
+  
+  private <INKEY, INVALUE, OUTKEY, OUTVALUE>
+  void closeQuietly(
+      org.apache.hadoop.mapreduce.RecordReader<INKEY, INVALUE> c) {
+    if (c != null) {
+      try {
+        c.close();
+      } catch (Exception ie) {
+        // Ignore
+        LOG.info("Ignoring exception during close for " + c, ie);
+      }
+    }
+  }
+
+
+
+  private <INKEY, INVALUE, OUTKEY, OUTVALUE>
+  void closeQuietly(
+      org.apache.hadoop.mapreduce.RecordWriter<OUTKEY, OUTVALUE> c,
+      org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
+          mapperContext) {
+    if (c != null) {
+      try {
+        c.close(mapperContext);
+      } catch (Exception ie) {
+        // Ignore
+        LOG.info("Ignoring exception during close for " + c, ie);
+      }
+    }
+  }
 
 }

+ 27 - 15
src/mapred/org/apache/hadoop/mapred/ReduceTask.java

@@ -488,14 +488,16 @@ class ReduceTask extends Task {
     // make output collector
     String finalName = getOutputName(getPartition());
 
-    final RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY, OUTVALUE>(
-        reduceOutputCounter, job, reporter, finalName);
+    RecordWriter<OUTKEY, OUTVALUE> out = 
+        new OldTrackingRecordWriter<OUTKEY, OUTVALUE>(
+            reduceOutputCounter, job, reporter, finalName);
+    final RecordWriter<OUTKEY, OUTVALUE> finalOut = out;
     
     OutputCollector<OUTKEY,OUTVALUE> collector = 
       new OutputCollector<OUTKEY,OUTVALUE>() {
         public void collect(OUTKEY key, OUTVALUE value)
           throws IOException {
-          out.write(key, value);
+          finalOut.write(key, value);
           // indicate that progress update needs to be sent
           reporter.progress();
         }
@@ -528,18 +530,14 @@ class ReduceTask extends Task {
 
       //Clean up: repeated in catch block below
       reducer.close();
+      reducer = null;
+      
       out.close(reporter);
+      out = null;
       //End of clean up.
-    } catch (IOException ioe) {
-      try {
-        reducer.close();
-      } catch (IOException ignored) {}
-        
-      try {
-        out.close(reporter);
-      } catch (IOException ignored) {}
-      
-      throw ioe;
+    } finally {
+      IOUtils.cleanup(LOG, reducer);
+      closeQuietly(out, reporter);
     }
   }
 
@@ -647,8 +645,11 @@ class ReduceTask extends Task {
                                                trackedRW, committer,
                                                reporter, comparator, keyClass,
                                                valueClass);
-    reducer.run(reducerContext);
-    trackedRW.close(reducerContext);
+    try {
+      reducer.run(reducerContext);
+    } finally {
+      trackedRW.close(reducerContext);
+    }
   }
 
   private static enum CopyOutputErrorType {
@@ -3011,4 +3012,15 @@ class ReduceTask extends Task {
 	  return decompressedSize;
 	}
   }
+  
+  private <OUTKEY, OUTVALUE>
+  void closeQuietly(RecordWriter<OUTKEY, OUTVALUE> c, Reporter r) {
+    if (c != null) {
+      try {
+        c.close(r);
+      } catch (Exception e) {
+        LOG.info("Exception in closing " + c, e);
+      }
+    }
+  }
 }

+ 15 - 10
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -1458,7 +1458,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
       new TreeMap<TaskAttemptID, TaskInProgress>();
     tasksToClose.putAll(tasks);
     for (TaskInProgress tip : tasksToClose.values()) {
-      tip.jobHasFinished(false);
+      tip.jobHasFinished(true, false);
     }
     
     this.running = false;
@@ -2239,7 +2239,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
         rjob.distCacheMgr.release();
         // Add this tips of this job to queue of tasks to be purged 
         for (TaskInProgress tip : rjob.tasks) {
-          tip.jobHasFinished(false);
+          tip.jobHasFinished(false, false);
           Task t = tip.getTask();
           if (t.isMapTask()) {
             indexCache.removeMap(tip.getTask().getTaskID().toString());
@@ -2313,7 +2313,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
       // Remove the task from running jobs, 
       // removing the job if it's the last task
       removeTaskFromJob(tip.getTask().getJobID(), tip);
-      tip.jobHasFinished(wasFailure);
+      tip.jobHasFinished(false, wasFailure);
       if (tip.getTask().isMapTask()) {
         indexCache.removeMap(tip.getTask().getTaskID().toString());
       }
@@ -2611,7 +2611,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
           tip.reportDiagnosticInfo(msg);
           try {
             tip.kill(true);
-            tip.cleanup(true);
+            tip.cleanup(false, true);
           } catch (IOException ie2) {
             LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
           } catch (InterruptedException ie2) {
@@ -3182,7 +3182,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
         removeTaskFromJob(task.getJobID(), this);
       }
       try {
-        cleanup(needCleanup);
+        cleanup(false, needCleanup);
       } catch (IOException ie) {
       }
 
@@ -3270,11 +3270,13 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
     /**
      * We no longer need anything from this task, as the job has
      * finished.  If the task is still running, kill it and clean up.
-     * 
+     *
+     * @param ttReInit is the TaskTracker executing re-initialization sequence?
      * @param wasFailure did the task fail, as opposed to was it killed by
      *                   the framework
      */
-    public void jobHasFinished(boolean wasFailure) throws IOException {
+    public void jobHasFinished(boolean ttReInit, boolean wasFailure) 
+        throws IOException {
       // Kill the task if it is still running
       synchronized(this){
         if (getRunState() == TaskStatus.State.RUNNING ||
@@ -3291,7 +3293,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
       }
       
       // Cleanup on the finished task
-      cleanup(true);
+      cleanup(ttReInit, true);
     }
 
     /**
@@ -3373,7 +3375,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
      * otherwise the current working directory of the task 
      * i.e. &lt;taskid&gt;/work is cleaned up.
      */
-    void cleanup(boolean needCleanup) throws IOException {
+    void cleanup(boolean ttReInit, boolean needCleanup) throws IOException {
       TaskAttemptID taskId = task.getTaskID();
       LOG.debug("Cleaning up " + taskId);
 
@@ -3402,7 +3404,10 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
           return;
         }
         try {
-          removeTaskFiles(needCleanup);
+          // TT re-initialization sequence: no need to cleanup, TT will cleanup
+          if (!ttReInit) {
+            removeTaskFiles(needCleanup);
+          }
         } catch (Throwable ie) {
           LOG.info("Error cleaning up task runner: "
               + StringUtils.stringifyException(ie));

+ 6 - 3
src/mapred/org/apache/hadoop/mapreduce/Mapper.java

@@ -140,9 +140,12 @@ public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    */
   public void run(Context context) throws IOException, InterruptedException {
     setup(context);
-    while (context.nextKeyValue()) {
-      map(context.getCurrentKey(), context.getCurrentValue(), context);
+    try {
+      while (context.nextKeyValue()) {
+        map(context.getCurrentKey(), context.getCurrentValue(), context);
+      }
+    } finally {
+      cleanup(context);
     }
-    cleanup(context);
   }
 }

+ 6 - 3
src/mapred/org/apache/hadoop/mapreduce/Reducer.java

@@ -172,9 +172,12 @@ public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
    */
   public void run(Context context) throws IOException, InterruptedException {
     setup(context);
-    while (context.nextKey()) {
-      reduce(context.getCurrentKey(), context.getValues(), context);
+    try {
+      while (context.nextKey()) {
+        reduce(context.getCurrentKey(), context.getValues(), context);
+      }
+    } finally {
+      cleanup(context);
     }
-    cleanup(context);
   }
 }

+ 263 - 0
src/test/org/apache/hadoop/mapred/TestJobEndNotifier.java

@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.HttpServer;
+
+public class TestJobEndNotifier extends TestCase {
+  HttpServer server;
+  URL baseUrl;
+
+  @SuppressWarnings("serial")
+  public static class JobEndServlet extends HttpServlet {
+    public static volatile int calledTimes = 0;
+    public static URI requestUri;
+
+    @Override
+    public void doGet(HttpServletRequest request, 
+                      HttpServletResponse response
+                      ) throws ServletException, IOException {
+      InputStreamReader in = new InputStreamReader(request.getInputStream());
+      PrintStream out = new PrintStream(response.getOutputStream());
+
+      calledTimes++;
+      try {
+        requestUri = new URI(null, null,
+            request.getRequestURI(), request.getQueryString(), null);
+      } catch (URISyntaxException e) {
+      }
+
+      in.close();
+      out.close();
+    }
+  }
+
+  // Servlet that delays requests for a long time
+  @SuppressWarnings("serial")
+  public static class DelayServlet extends HttpServlet {
+    public static volatile int calledTimes = 0;
+
+    @Override
+    public void doGet(HttpServletRequest request, 
+                      HttpServletResponse response
+                      ) throws ServletException, IOException {
+      boolean timedOut = false;
+      calledTimes++;
+      try {
+        // Sleep for a long time
+        Thread.sleep(1000000);
+      } catch (InterruptedException e) {
+        timedOut = true;
+      }
+      assertTrue("DelayServlet should be interrupted", timedOut);
+    }
+  }
+
+  // Servlet that fails all requests into it
+  @SuppressWarnings("serial")
+  public static class FailServlet extends HttpServlet {
+    public static volatile int calledTimes = 0;
+
+    @Override
+    public void doGet(HttpServletRequest request, 
+                      HttpServletResponse response
+                      ) throws ServletException, IOException {
+      calledTimes++;
+      throw new IOException("I am failing!");
+    }
+  }
+
+  public void setUp() throws Exception {
+    new File(System.getProperty("build.webapps", "build/webapps") + "/test"
+        ).mkdirs();
+    server = new HttpServer("test", "0.0.0.0", 0, true);
+    server.addServlet("delay", "/delay", DelayServlet.class);
+    server.addServlet("jobend", "/jobend", JobEndServlet.class);
+    server.addServlet("fail", "/fail", FailServlet.class);
+    server.start();
+    int port = server.getPort();
+    baseUrl = new URL("http://localhost:" + port + "/");
+
+    JobEndServlet.calledTimes = 0;
+    JobEndServlet.requestUri = null;
+    DelayServlet.calledTimes = 0;
+    FailServlet.calledTimes = 0;
+  }
+
+  public void tearDown() throws Exception {
+    server.stop();
+  }
+
+  /**
+   * Validate that $jobId and $jobStatus fields are properly substituted
+   * in the output URI
+   */
+  public void testUriSubstitution() throws InterruptedException {
+    try {
+      JobEndNotifier.startNotifier();
+
+      JobStatus jobStatus = createTestJobStatus(
+          "job_20130313155005308_0001", JobStatus.SUCCEEDED);
+      JobConf jobConf = createTestJobConf(
+          new Configuration(), 0,
+          baseUrl + "jobend?jobid=$jobId&status=$jobStatus");
+      JobEndNotifier.registerNotification(jobConf, jobStatus);
+
+      int maxLoop = 100;
+      while (JobEndServlet.calledTimes != 1 && maxLoop-- > 0) {
+        Thread.sleep(100);
+      }
+
+      // Validate params
+      assertEquals(1, JobEndServlet.calledTimes);
+      assertEquals("jobid=job_20130313155005308_0001&status=SUCCEEDED",
+          JobEndServlet.requestUri.getQuery());
+    } finally {
+      JobEndNotifier.stopNotifier();
+    }
+  }
+
+  /**
+   * Validate job.end.retry.attempts logic.
+   */
+  public void testRetryCount() throws InterruptedException {
+    try {
+      JobEndNotifier.startNotifier();
+
+      int retryAttempts = 3;
+      JobStatus jobStatus = createTestJobStatus(
+          "job_20130313155005308_0001", JobStatus.SUCCEEDED);
+      JobConf jobConf = createTestJobConf(
+          new Configuration(), retryAttempts, baseUrl + "fail");
+      JobEndNotifier.registerNotification(jobConf, jobStatus);
+
+      int maxLoop = 100;
+      while (FailServlet.calledTimes != (retryAttempts + 1) && maxLoop-- > 0) {
+        Thread.sleep(100);
+      }
+
+      // Validate params
+      assertEquals(retryAttempts + 1, FailServlet.calledTimes);
+    } finally {
+      JobEndNotifier.stopNotifier();
+    }
+  }
+
+  /**
+   * Validate that the notification times out after reaching
+   * mapreduce.job.end-notification.timeout.
+   */
+  public void testNotificationTimeout() throws InterruptedException {
+    try {
+      Configuration conf = new Configuration();
+      // Reduce the timeout to 1 second
+      conf.setInt("mapreduce.job.end-notification.timeout", 1000);
+      JobEndNotifier.startNotifier();
+
+      // Submit one notification that will delay infinitely
+      JobStatus jobStatus = createTestJobStatus(
+          "job_20130313155005308_0001", JobStatus.SUCCEEDED);
+      JobConf jobConf = createTestJobConf(
+          conf, 0, baseUrl + "delay");
+      JobEndNotifier.registerNotification(jobConf, jobStatus);
+
+      // Submit another notification that will return promptly
+      jobConf.setJobEndNotificationURI(baseUrl + "jobend");
+      JobEndNotifier.registerNotification(jobConf, jobStatus);
+
+      // Make sure the notification passed thru
+      int maxLoop = 100;
+      while (JobEndServlet.calledTimes != 1 && maxLoop-- > 0) {
+        Thread.sleep(100);
+      }
+      assertEquals("JobEnd notification should have been received by now",
+          1, JobEndServlet.calledTimes);
+      assertEquals(1, DelayServlet.calledTimes);
+      assertEquals("/jobend", JobEndServlet.requestUri.getPath());
+    } finally {
+      JobEndNotifier.stopNotifier();
+    }
+  }
+
+  /**
+   * Basic validation for localRunnerNotification.
+   */
+  public void testLocalJobRunnerUriSubstitution() throws InterruptedException {
+    JobStatus jobStatus = createTestJobStatus(
+        "job_20130313155005308_0001", JobStatus.SUCCEEDED);
+    JobConf jobConf = createTestJobConf(
+        new Configuration(), 0,
+        baseUrl + "jobend?jobid=$jobId&status=$jobStatus");
+    JobEndNotifier.localRunnerNotification(jobConf, jobStatus);
+
+    // No need to wait for the notification to go thru since calls are
+    // synchronous
+
+    // Validate params
+    assertEquals(1, JobEndServlet.calledTimes);
+    assertEquals("jobid=job_20130313155005308_0001&status=SUCCEEDED",
+        JobEndServlet.requestUri.getQuery());
+  }
+
+  /**
+   * Validate job.end.retry.attempts for the localJobRunner.
+   */
+  public void testLocalJobRunnerRetryCount() throws InterruptedException {
+    int retryAttempts = 3;
+    JobStatus jobStatus = createTestJobStatus(
+        "job_20130313155005308_0001", JobStatus.SUCCEEDED);
+    JobConf jobConf = createTestJobConf(
+        new Configuration(), retryAttempts, baseUrl + "fail");
+    JobEndNotifier.localRunnerNotification(jobConf, jobStatus);
+
+    // Validate params
+    assertEquals(retryAttempts + 1, FailServlet.calledTimes);
+  }
+
+  private static JobStatus createTestJobStatus(String jobId, int state) {
+    return new JobStatus(
+        JobID.forName(jobId), 0.5f, 0.0f,
+        state);
+  }
+
+  private static JobConf createTestJobConf(
+      Configuration conf, int retryAttempts, String notificationUri) {
+    JobConf jobConf = new JobConf(conf);
+    jobConf.setInt("job.end.retry.attempts", retryAttempts);
+    jobConf.set("job.end.retry.interval", "0");
+    jobConf.setJobEndNotificationURI(notificationUri);
+    return jobConf;
+  }
+}

+ 12 - 4
src/test/org/apache/hadoop/mapred/TestSubmitJob.java

@@ -288,10 +288,18 @@ public class TestSubmitJob extends TestCase {
           reduceSignalFile.toString());
       // wait for job to be done
       UtilsForTests.waitTillDone(jClient);
-
-      // check if the staging area is cleaned up
-      LOG.info("Check if job submit dir is cleanup or not");
-      assertFalse(fs.exists(jobSubmitDirpath));
+      
+      // Check that the job submit directory is cleaned up
+      int maxChecks = 20;
+      int sleepMs = 100;
+      for (int i = 0; fs.exists(jobSubmitDirpath) && i < maxChecks; i++) {
+        try {
+          Thread.sleep(sleepMs);
+        } catch (InterruptedException ex) {}
+      }
+      
+      assertFalse("Job submit dir was not cleaned up after " +
+          maxChecks * sleepMs + " ms", fs.exists(jobSubmitDirpath));
     } finally {
       if (mr != null) {
         mr.shutdown();

+ 320 - 0
src/test/org/apache/hadoop/mapreduce/TestMapperReducerCleanup.java

@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.WordCount.IntSumReducer;
+import org.apache.hadoop.examples.WordCount.TokenizerMapper;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestMapperReducerCleanup {
+
+  static boolean mapCleanup = false;
+  static boolean reduceCleanup = false;
+  static boolean recordReaderCleanup = false;
+  static boolean recordWriterCleanup = false;
+  
+  static void reset() {
+    mapCleanup = false;
+    reduceCleanup = false; 
+    recordReaderCleanup = false;
+    recordWriterCleanup = false;
+  }
+  
+  private static class FailingMapper
+      extends Mapper<LongWritable, Text, LongWritable, Text> {
+
+    /** Map method with different behavior based on the thread id */
+    public void map(LongWritable key, Text val, Context c)
+        throws IOException, InterruptedException {
+      throw new IOException("TestMapperReducerCleanup");
+    }
+
+    protected void cleanup(Context context) 
+        throws IOException, InterruptedException {
+      mapCleanup = true;
+      super.cleanup(context);
+    }
+  }
+
+  private static class TrackingTokenizerMapper extends TokenizerMapper {
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)
+        throws IOException, InterruptedException {
+      mapCleanup = true;
+      super.cleanup(context);
+    }
+    
+  }
+
+  private static class FailingReducer
+      extends Reducer<LongWritable, Text, LongWritable, LongWritable> {
+
+    public void reduce(LongWritable key, Iterable<Text> vals, Context context)
+        throws IOException, InterruptedException {
+      throw new IOException("TestMapperReducerCleanup");
+    }
+
+    protected void cleanup(Context context) 
+        throws IOException, InterruptedException {
+      reduceCleanup = true;
+      super.cleanup(context);
+    }
+  }
+
+  private static class TrackingIntSumReducer extends IntSumReducer {
+
+    protected void cleanup(Context context) 
+        throws IOException, InterruptedException {
+      reduceCleanup = true;
+      super.cleanup(context);
+    }
+}
+
+  public static class TrackingTextInputFormat extends TextInputFormat {
+
+    public static class TrackingRecordReader extends LineRecordReader {
+      @Override
+      public synchronized void close() throws IOException {
+        recordReaderCleanup = true;
+        super.close();
+      }
+    }
+
+    @Override
+    public RecordReader<LongWritable, Text> createRecordReader(
+        InputSplit split, TaskAttemptContext context) {
+      return new TrackingRecordReader();
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  public static class TrackingTextOutputFormat extends TextOutputFormat {
+    
+    public static class TrackingRecordWriter extends LineRecordWriter {
+
+      public TrackingRecordWriter(DataOutputStream out) {
+        super(out);
+      }
+
+      @Override
+      public synchronized void close(TaskAttemptContext context)
+          throws IOException {
+        recordWriterCleanup = true;
+        super.close(context);
+      }
+
+    }
+    
+    @Override
+    public RecordWriter getRecordWriter(TaskAttemptContext job)
+        throws IOException, InterruptedException {
+      Configuration conf = job.getConfiguration();
+
+      Path file = getDefaultWorkFile(job, "");
+      FileSystem fs = file.getFileSystem(conf);
+      FSDataOutputStream fileOut = fs.create(file, false);
+      
+      return new TrackingRecordWriter(fileOut);
+    }
+  
+  }
+
+
+  /**
+   * Create a single input file in the input directory.
+   * @param dirPath the directory in which the file resides
+   * @param id the file id number
+   * @param numRecords how many records to write to each file.
+   */
+  private void createInputFile(Path dirPath, int id, int numRecords)
+      throws IOException {
+    final String MESSAGE = "This is a line in a file: ";
+
+    Path filePath = new Path(dirPath, "" + id);
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    OutputStream os = fs.create(filePath);
+    BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+
+    for (int i = 0; i < numRecords; i++) {
+      w.write(MESSAGE + id + " " + i + "\n");
+    }
+
+    w.close();
+  }
+
+  private final String INPUT_DIR = "input";
+  private final String OUTPUT_DIR = "output";
+
+  private Path getInputPath() {
+    String dataDir = System.getProperty("test.build.data");
+    if (null == dataDir) {
+      return new Path(INPUT_DIR);
+    } else {
+      return new Path(new Path(dataDir), INPUT_DIR);
+    }
+  }
+
+  private Path getOutputPath() {
+    String dataDir = System.getProperty("test.build.data");
+    if (null == dataDir) {
+      return new Path(OUTPUT_DIR);
+    } else {
+      return new Path(new Path(dataDir), OUTPUT_DIR);
+    }
+  }
+
+  private Path createInput() throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path inputPath = getInputPath();
+
+    // Clear the input directory if it exists, first.
+    if (fs.exists(inputPath)) {
+      fs.delete(inputPath, true);
+    }
+
+    // Create an input file
+    createInputFile(inputPath, 0, 10);
+
+    return inputPath;
+  }
+
+  @Test
+  public void testMapCleanup() throws Exception {
+    reset();
+    
+    Job job = new Job();
+
+    Path inputPath = createInput();
+    Path outputPath = getOutputPath();
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    if (fs.exists(outputPath)) {
+      fs.delete(outputPath, true);
+    }
+
+    job.setMapperClass(FailingMapper.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+    job.setOutputFormatClass(TrackingTextOutputFormat.class);
+    job.setNumReduceTasks(0);
+    FileInputFormat.addInputPath(job, inputPath);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    job.waitForCompletion(true);
+
+    Assert.assertTrue(mapCleanup);
+    Assert.assertTrue(recordReaderCleanup);
+    Assert.assertTrue(recordWriterCleanup);
+  }
+
+  @Test
+  public void testReduceCleanup() throws Exception {
+    reset();
+    
+    Job job = new Job();
+
+    Path inputPath = createInput();
+    Path outputPath = getOutputPath();
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    if (fs.exists(outputPath)) {
+      fs.delete(outputPath, true);
+    }
+
+    job.setMapperClass(TrackingTokenizerMapper.class);
+    job.setReducerClass(FailingReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+    job.setOutputFormatClass(TrackingTextOutputFormat.class);
+    job.setNumReduceTasks(1);
+    FileInputFormat.addInputPath(job, inputPath);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    job.waitForCompletion(true);
+
+    Assert.assertTrue(mapCleanup);
+    Assert.assertTrue(reduceCleanup);
+    Assert.assertTrue(recordReaderCleanup);
+    Assert.assertTrue(recordWriterCleanup);
+  }
+  
+  @Test
+  public void testJobSuccessCleanup() throws Exception {
+    reset();
+    
+    Job job = new Job();
+
+    Path inputPath = createInput();
+    Path outputPath = getOutputPath();
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    if (fs.exists(outputPath)) {
+      fs.delete(outputPath, true);
+    }
+
+    job.setMapperClass(TrackingTokenizerMapper.class);
+    job.setReducerClass(TrackingIntSumReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+    job.setOutputFormatClass(TrackingTextOutputFormat.class);
+    job.setNumReduceTasks(1);
+    FileInputFormat.addInputPath(job, inputPath);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    job.waitForCompletion(true);
+
+    Assert.assertTrue(mapCleanup);
+    Assert.assertTrue(reduceCleanup);
+    Assert.assertTrue(recordReaderCleanup);
+    Assert.assertTrue(recordWriterCleanup);
+  }
+
+}

+ 13 - 0
src/webapps/secondary/index.html

@@ -0,0 +1,13 @@
+<meta HTTP-EQUIV="REFRESH" content="0;url=status.jsp"/>
+<html>
+<head><title>Hadoop Administration</title></head>
+
+<body>
+<h1>Hadoop Administration</h1>
+
+<ul> 
+  <li><a href="status.jsp">Status</a></li> 
+</ul>
+
+</body> 
+</html>

+ 20 - 0
src/webapps/secondary/status.jsp

@@ -0,0 +1,20 @@
+<%@ page
+  contentType="text/html; charset=UTF-8"
+  import="org.apache.hadoop.util.*"
+%>
+
+<html>
+<link rel="stylesheet" type="text/css" href="/static/hadoop.css">
+<title>Hadoop SecondaryNameNode</title>
+    
+<body>
+<h1>SecondaryNameNode</h1>
+<%= JspHelper.getVersionTable() %>
+<hr />
+<pre>
+<%= application.getAttribute("secondary.name.node").toString() %>
+</pre>
+
+<br />
+<b><a href="/logs/">Logs</a></b>
+<%= ServletUtil.htmlFooter() %>